阿里云-云小站(无限量代金券发放中)
【腾讯云】云服务器、云数据库、COS、CDN、短信等热卖云产品特惠抢购

Hadoop1.2.1源码解析系列:JT与TT之间的心跳通信机制——命令篇

109次阅读
没有评论

共计 22206 个字符,预计需要花费 56 分钟才能阅读完成。

前两篇文章简单介绍了 Hadoop 心跳机制的两个重要角色:JT 和 TT,虽然不是太详细,但是大纸业说清楚了一些事,在 JT 篇的最后对于 JT 返回 TT 的心跳响应中的一些命令一笔带过,这篇文章将重要介绍这些命令:ReinitTrackerAction,KillTaskAction,KillJobAction,CommitTaskAction,LaunchTaskAction。每个命令都对应着一系列行为,所有的行为都是由 TT 完成。下面分别看看这五个 TaskTrackerAction,每个命令从三个部分进行解释:1)命令内容;2)JT 下达命令;3)TT 处理命令。

目录

Hadoop1.2.1 源码解析系列:JT 与 TT 之间的心跳通信机制——TT 篇 http://www.linuxidc.com/Linux/2014-06/103216.htm

Hadoop1.2.1 源码解析系列:JT 与 TT 之间的心跳通信机制——JT 篇 http://www.linuxidc.com/Linux/2014-06/103217.htm

Hadoop1.2.1 源码解析系列:JT 与 TT 之间的心跳通信机制——命令篇 http://www.linuxidc.com/Linux/2014-06/103218.htm

1.ReinitTrackerAction

1)命令内容:

该命令指示 TT 进行重新初始化操作。一般当 JT 与 TT 之间状态不一致时,JT 就会像 TT 下达该命令,命令 TT 进行重新初始化。重新初始化会 TT 会清空其上的 task,以及初始化一些状态信息和参数,最重要的是 justInited 变量会变成 true,表示刚初始化的,这时再次发送心跳时 JT 接收到的参数 initialContact 就为 true 了,表示 TT 首次联系 JT,保证 JT 和 TT 之间的状态一致。

该对象内部非常简单,简单到啥也没有。出了一个 actionType==ActionType.REINIT_TRACKER(表示重新初始化,共有五种类型,对应五种命令)外啥也没有。

class ReinitTrackerAction extends TaskTrackerAction {
  public ReinitTrackerAction() {
    super(ActionType.REINIT_TRACKER);
  }
  public void write(DataOutput out) throws IOException {}
  public void readFields(DataInput in) throws IOException {}
}

2)JT 下达命令:
 
如上所说 JT 对 TT 下达该命令一般是由于两者之间状态不一致导致,具体见下代码(去掉了注释)。

if (initialContact != true) {
      if (prevHeartbeatResponse == null) {
        if (hasRestarted()) {
          addRestartInfo = true;
          recoveryManager.unMarkTracker(trackerName);
        } else {
          LOG.warn(“Serious problem, cannot find record of ‘previous’ ” +
                  “heartbeat for ‘” + trackerName +
                  “‘; reinitializing the tasktracker”);
          return new HeartbeatResponse(responseId,
              new TaskTrackerAction[] {new ReinitTrackerAction()});
        }
      } else {
        if (prevHeartbeatResponse.getResponseId() != responseId) {
          LOG.info(“Ignoring ‘duplicate’ heartbeat from ‘” +
              trackerName + “‘; resending the previous ‘lost’ response”);
          return prevHeartbeatResponse;
        }
      }
    }

initialContact 由 TT 发送,表示 TT 是否首次联系 JT,一般该变量只是在 TT 实例化时和初始化(initialize() 方法)时赋为 true,当调用了 TT 的 offerService() 方法之后该变量就会被赋成 false。TT 的 main() 方法会先实例化一个 TT 对象,然后会调用其 initialize() 方法,接着会调用 TT 的 offerService() 方法,该方法内会向 JT 发送一次心跳,这次心跳是该 TT 启动时发送的第一次心跳,所有 restarted 和 initialContact 都是 true,在这次心跳之后,offerService() 方法会将 restarted 和 initialContact 都设为 false。

当 initialContact==false 时,且 prevHeartbeatResponse==null,prevHeartbeatResponse 变量是 JT 从其保存的心跳记录中取出该 TT 的上次心跳记录,为 null 则表示 JT 没有收到过来自该 TT 的心跳记录,但是 initialContact==false 表示 TT 认为他不是首次联系 JT,即 JT 有接收到过 TT 的心跳请求,这样 JT 与 TT 就产生了状态不一致情况。注释给出了一种可能的解释:This is the first heartbeat from the old tracker to the newly  started JobTracker. 意思是 JT 是重新实例化,即 JT 刚重启过,但是 TT 未重启,或者重新初始化,即 old。出现这种情况时,JT 会根据是否有任务需要恢复来判断是否让 TT 重新初始化。至于为什么要在对 TT 下达重新初始化命令之前判断是否有任务需要恢复,是因为如果 JT 检查出有任务需要恢复,那么有可能需要回复的任务在该 TT 上运行过,那么就需要该 TT 来恢复该任务,而 TT 重新初始化之后会丢失所有任务。总之如果 JT 不需要进行任务恢复则对 TT 下达重新初始化命令。还有另外一种情况 JT 也会对 TT 下达该命令。

    if (!processHeartbeat(status, initialContact, now)) {
      if (prevHeartbeatResponse != null) {
        trackerToHeartbeatResponseMap.remove(trackerName);
      }
      return new HeartbeatResponse(newResponseId,
                  new TaskTrackerAction[] {new ReinitTrackerAction()});
    }

当 JT 调用 processHeartbeat() 方法处理心跳请求时返回 false,则对 TT 下达重新初始化命令。processHeartbeat() 方法返回 false 的原因如下:

boolean seenBefore = updateTaskTrackerStatus(trackerName,
                                                    trackerStatus);
        TaskTracker taskTracker = getTaskTracker(trackerName);
        if (initialContact) {
          // If it’s first contact, then clear out
          // any state hanging around
          if (seenBefore) {
            lostTaskTracker(taskTracker);
          }
        } else {
          // If not first contact, there should be some record of the tracker
          if (!seenBefore) {
            LOG.warn(“Status from unknown Tracker : ” + trackerName);
            updateTaskTrackerStatus(trackerName, null);
            return false;
          }
        }

当 initialContact==false 且 seenBefore==false 时返回 false,seenBefore 表示 JT 的 taskTrackers 队列中是否存在该 TT,不存在则 seenBefore==false,所以 processHeartbeat() 原因是 TT 不是首次联系 JT,但是 JT 中并不存在该 TT 的信息,又是一种不一致状态,所以 JT 会对其下达重新初始化命令。

以上就是 JT 对 TT 下达重新初始化命令产生的两种情况,归根到底都是由于 JT 与 TT 之间状态不一致导致,即 TT 认为他不是首次联系 JT,但是 JT 却没有 TT 的以前记录。

————————————– 分割线 ————————————–

Ubuntu 13.04 上搭建 Hadoop 环境 http://www.linuxidc.com/Linux/2013-06/86106.htm

Ubuntu 12.10 +Hadoop 1.2.1 版本集群配置 http://www.linuxidc.com/Linux/2013-09/90600.htm

Ubuntu 上搭建 Hadoop 环境(单机模式 + 伪分布模式)http://www.linuxidc.com/Linux/2013-01/77681.htm

Ubuntu 下 Hadoop 环境的配置 http://www.linuxidc.com/Linux/2012-11/74539.htm

单机版搭建 Hadoop 环境图文教程详解 http://www.linuxidc.com/Linux/2012-02/53927.htm

————————————– 分割线 ————————————–

3)TT 处理命令:

TT 发送心跳都是在 TT 的 offerService() 方法中调用的,该方法在 TT 运行过程中一直执行,当 TT 的心跳请求接收到响应时,会首先对收到的 HeartbeatResponse 中的 TaskTrackerAction 进行判断,判断是否有 ReinitTrackerAction 命令。

        TaskTrackerAction[] actions = heartbeatResponse.getActions();
        if(LOG.isDebugEnabled()) {
          LOG.debug(“Got heartbeatResponse from JobTracker with responseId: ” +
                    heartbeatResponse.getResponseId() + ” and ” +
                    ((actions != null) ? actions.length : 0) + ” actions”);
        }
        if (reinitTaskTracker(actions)) {
          return State.STALE;
        }

reinitTaskTracker() 方法判断是否有 ReinitTrackerAction 命令,

  private boolean reinitTaskTracker(TaskTrackerAction[] actions) {
    if (actions != null) {
      for (TaskTrackerAction action : actions) {
        if (action.getActionId() ==
            TaskTrackerAction.ActionType.REINIT_TRACKER) {
          LOG.info(“Recieved ReinitTrackerAction from JobTracker”);
          return true;
        }
      }
    }
    return false;
  }

简单的根据 Action 的 Id 进行判断,当判断出心跳的返回结果中有 ReinitTrackerAction 命令时,则退出 offerService() 的无限循环,并返回 State.STALE。TT 是个线程,所以 offerService() 的返回值返回到 run() 方法。

while (running && !staleState && !shuttingDown && !denied) {
            try {
              State osState = offerService();
              if (osState == State.STALE) {
                staleState = true;
              } else if (osState == State.DENIED) {
                denied = true;
              }
            } catch (Exception ex) {
              if (!shuttingDown) {
                LOG.info(“Lost connection to JobTracker [” +
                        jobTrackAddr + “].  Retrying…”, ex);
                try {
                  Thread.sleep(5000);
                } catch (InterruptedException ie) {
                }
              }
            }
          }

因为 offerService() 的返回值是 State.STALE,所以 staleState==true,会退出循环。

} finally {
          // If denied we’ll close via shutdown below. We should close
          // here even if shuttingDown as shuttingDown can be set even
          // if shutdown is not called.
          if (!denied) {
            close();
          }
        }
        if (shuttingDown) {return;}
        if (denied) {break;}
        LOG.warn(“Reinitializing local state”);
        initialize();

调用 initialize() 方法重新初始化。之后继续循环。

更多详情见请继续阅读下一页的精彩内容 :http://www.linuxidc.com/Linux/2014-06/103218p2.htm

2.KillTaskAction

1)命令内容:

从名字就可以看出该命令是指 kill 掉 task 的意思。对象内部同 ReinitTrackerAction 一样,只不过多存储一个 taskId(TaskAttemptID)对象,所以在序列化时会将该变量序列化到流中,以便 TT 接收到命令时可以准确知道需要 kill 掉哪个 TaskAttempt。

2)JT 下达命令:

JT 通过调用 getTasksToKill() 方法,获取该 TT 上所有需要 kill 的 task,下面看看该方法如何获取需要 kill 掉的 task。

private synchronized List<TaskTrackerAction> getTasksToKill(
                                                              String taskTracker) {
   
    Set<TaskAttemptID> taskIds = trackerToTaskMap.get(taskTracker);
    List<TaskTrackerAction> killList = new ArrayList<TaskTrackerAction>();
    if (taskIds != null) {
      for (TaskAttemptID killTaskId : taskIds) {
        TaskInProgress tip = taskidToTIPMap.get(killTaskId);
        if (tip == null) {
          continue;
        }
        if (tip.shouldClose(killTaskId)) {
          //
          // This is how the JobTracker ends a task at the TaskTracker.
          // It may be successfully completed, or may be killed in
          // mid-execution.
          //
          if (!tip.getJob().isComplete()) {
            killList.add(new KillTaskAction(killTaskId));
            if (LOG.isDebugEnabled()) {
              LOG.debug(taskTracker + ” -> KillTaskAction: ” + killTaskId);
            }
          }
        }
      }
    }
    // add the stray attempts for uninited jobs
    synchronized (trackerToTasksToCleanup) {
      Set<TaskAttemptID> set = trackerToTasksToCleanup.remove(taskTracker);
      if (set != null) {
        for (TaskAttemptID id : set) {
          killList.add(new KillTaskAction(id));
        }
      }
    }
    return killList;
  }

这里有两处获得需要 kill 掉的任务,首先看看第一处。第一处,首先从 JT 上保存的 trackerToTaskMap 对象中获取该 TT 所有的 TaskAttempt(一个 Task 可能包含多个 TaskAttempt)对象(trackerToTaskMap 保存了 taskTrackerName–>TaskAttemptID 的信息),然后判断每个 TaskAttempt 是否需要被 kill,判断方法是调用 TaskInProgress 对象的 shouldClose() 方法。下面看看 shouldClose() 方法。

public boolean shouldClose(TaskAttemptID taskid) {
    boolean close = false;
    TaskStatus ts = taskStatuses.get(taskid);
    if ((ts != null) &&
        (!tasksReportedClosed.contains(taskid)) &&
        ((this.failed) ||
        ((job.getStatus().getRunState() != JobStatus.RUNNING &&
        (job.getStatus().getRunState() != JobStatus.PREP))))) {
      tasksReportedClosed.add(taskid);
      close = true;
    } else if (isComplete() &&
              !(isMapTask() && !jobSetup &&
                  !jobCleanup && isComplete(taskid)) &&
              !tasksReportedClosed.contains(taskid)) {
      tasksReportedClosed.add(taskid);
      close = true;
    } else if (isCommitPending(taskid) && !shouldCommit(taskid) &&
              !tasksReportedClosed.contains(taskid)) {
      tasksReportedClosed.add(taskid);
      close = true;
    } else {
      close = tasksToKill.keySet().contains(taskid);
    } 
    return close;
  }

该方法通过判断 TaskAttempt 所属的 Task 对象的状态来确定是否需要被关闭,具体判断条件如下:

a. 满足 Task 的 taskStatuses 队列中包含此 TaskAttempt,且 tasksReportedClosed 队列不包含该 TaskAttempt 对象(tasksReportedClosed 中保存所有已被关闭的 TaskAttempt,所以 tasksReportedClosed 中存在该 TaskAttempt 则表示该 TaskAttempt 已被关闭,则无需重复关闭),且满足该 TaskInProgress 对象的 failed==true,即该 Task 已失败,或者 Task 所属的 Job 处于 SUCCEEDED、FAILED、KILLED 三个状态,则表示该 TaskAttempt 需要被关闭(Close),则返回 true,同时将该 TaskAttempt 对象添加到 tasksReportedClosed 队列中,以避免下次重复关闭。

b. 满足该 Task 已完成,且该 Task 不是一个 map 任务,也不是 jobSetup 或者 jobCleanup 任务,且该 TaskAttempt 是该 Task 成功的那个 TaskAttempt,且 tasksReportedClosed 不包含该 TaskAttempt。这里需要知道一个 Task 可能会有多个 TaskAttempt,这是由于推测执行导致(可以去了解下推测执行),这多个 TaskAttempt 之中只要有一个完成,则该 Task 就完成,完成的那一个 TaskAttempt 会被标记在 Task 对象中(successfulTaskId 参数)。还有一点,即当该任务是 map 任务时,并不关闭该 TaskAttempt,注释给出的解释是:However, for completed map tasks we do not close the task which actually was the one responsible for _completing_ the TaskInProgress. (不解)

c. 满足该 TaskAttempt 已完成,但是未决定提交,且不是 successfulTaskId 参数标志的 TaskAttempt,且 tasksReportedClosed 不包含该 TaskAttempt。这个条件表示当多个 TaskAttempt 同时运行时,有一个已完成且成功提交,那么余下的就算成功了的 TaskAttempt 也会被关闭,因为一个 Task 只需要一次成功提交即可。

d. 该 TaskAttempt 在 tasksToKill 队列中。tasksToKill 队列存放着由客户端发起的 kill 命令指定 kill 的 TaskAttempt。比如当我们手动在命令行或者其他地方执行 Hadoop job -kill 时,会 kill 掉该 Job 所有的 TaskAttempt。

由上判断出 TaskAttempt 对象需要关闭后,会判断如果该 TaskAttempt 需要被关闭的原因不是由于其所属的 Job 已完成,则对其创建一个 KillTaskAction 对象,并添加到心跳响应结果中。这里由于所属 Job 完成而需要关闭的 TaskAttempt 对象,并不作为 KillTaskAction 命令返回给 TT。

下面第二处:

// add the stray attempts for uninited jobs
    synchronized (trackerToTasksToCleanup) {
      Set<TaskAttemptID> set = trackerToTasksToCleanup.remove(taskTracker);
      if (set != null) {
        for (TaskAttemptID id : set) {
          killList.add(new KillTaskAction(id));
        }
      }
    }

这里是从 trackerToTasksToCleanup 队列中获取该 TT 上所有需要 cleanup 的 TaskAttempt。在 updateTaskStatuses() 方法中往 trackerToJobsToCleanup 队列中添加任务,而 updateTaskStatuses() 方法在 processHeartbeat() 中调用,也就是当 JT 接收到 TT 的心跳请求之后,会处理此次心跳,然后根据 TT 发送过来的 TaskTrackerStatus 中包含的 TaskStatus 信息,获取每个 TaskStatus 所对应的 Job,如果 Job 不存在,则将该 TaskStatus 所属的 Job 添加到 trackerToJobsToCleanup 队列(获取 KillJobAction 时会用到)。如果 Job 存在,但是 Job 没有初始化,也会将该 TaskStatus 所属的 TaskAttempt 添加到 trackerToTasksToCleanup 队列。
 
以上就是 JT 如何判断哪些 TaskAttempt 需要被 kill,并通过 KillTaskAction 向 TT 下达命令。

3)TT 处理命令:

if (actions != null){
          for(TaskTrackerAction action: actions) {
            if (action instanceof LaunchTaskAction) {
              addToTaskQueue((LaunchTaskAction)action);
            } else if (action instanceof CommitTaskAction) {
              CommitTaskAction commitAction = (CommitTaskAction)action;
              if (!commitResponses.contains(commitAction.getTaskID())) {
                LOG.info(“Received commit task action for ” +
                          commitAction.getTaskID());
                commitResponses.add(commitAction.getTaskID());
              }
            } else {
              addActionToCleanup(action);
            }
          }
        }

可以看出 TT 将 KillTaskAction 和 KillJobAction 一样处理,都是调用 addActionToCleanup(action) 方法,而 LaunchTaskAction 则调用 addToTaskQueue((LaunchTaskAction)action),CommitTaskAction 调用 commitResponses.add(commitAction.getTaskID())。
 
下面看看对 KillTaskAction 和 KillJobAction 的处理。

  void addActionToCleanup(TaskTrackerAction action) throws InterruptedException {

    String actionId = getIdForCleanUpAction(action);

    // add the action to the queue only if its not added in the first place
    String previousActionId = allCleanupActions.putIfAbsent(actionId, actionId);
    if (previousActionId != null) {
      return;
    } else {
      activeCleanupActions.put(action);
    }
  }

将 Action 中的 JobId 或者 TaskId 添加到 allCleanupActions 队列中,如果对应的 JobId 或者 TaskId 已存在与 allCleanupActions 中,则将 Action 添加到 activeCleanupActions 队列中。activeCleanupActions 队列由 taskCleanupThread 线程进行操作,该线程在 TT 实例化化时初始化,并在 TT 运行一开始调用 startCleanupThreads() 方法启动,该线程会一直��行 taskCleanUp() 方法进行清除工作。

if (action instanceof KillJobAction) {
      purgeJob((KillJobAction) action);
    } else if (action instanceof KillTaskAction) {
      processKillTaskAction((KillTaskAction) action);
    } else {
      LOG.error(“Non-delete action given to cleanup thread: ” + action);
    }

分别调用 purgeJob 和 purgeTask 方法执行清除工作。

前两篇文章简单介绍了 Hadoop 心跳机制的两个重要角色:JT 和 TT,虽然不是太详细,但是大纸业说清楚了一些事,在 JT 篇的最后对于 JT 返回 TT 的心跳响应中的一些命令一笔带过,这篇文章将重要介绍这些命令:ReinitTrackerAction,KillTaskAction,KillJobAction,CommitTaskAction,LaunchTaskAction。每个命令都对应着一系列行为,所有的行为都是由 TT 完成。下面分别看看这五个 TaskTrackerAction,每个命令从三个部分进行解释:1)命令内容;2)JT 下达命令;3)TT 处理命令。

目录

Hadoop1.2.1 源码解析系列:JT 与 TT 之间的心跳通信机制——TT 篇 http://www.linuxidc.com/Linux/2014-06/103216.htm

Hadoop1.2.1 源码解析系列:JT 与 TT 之间的心跳通信机制——JT 篇 http://www.linuxidc.com/Linux/2014-06/103217.htm

Hadoop1.2.1 源码解析系列:JT 与 TT 之间的心跳通信机制——命令篇 http://www.linuxidc.com/Linux/2014-06/103218.htm

1.ReinitTrackerAction

1)命令内容:

该命令指示 TT 进行重新初始化操作。一般当 JT 与 TT 之间状态不一致时,JT 就会像 TT 下达该命令,命令 TT 进行重新初始化。重新初始化会 TT 会清空其上的 task,以及初始化一些状态信息和参数,最重要的是 justInited 变量会变成 true,表示刚初始化的,这时再次发送心跳时 JT 接收到的参数 initialContact 就为 true 了,表示 TT 首次联系 JT,保证 JT 和 TT 之间的状态一致。

该对象内部非常简单,简单到啥也没有。出了一个 actionType==ActionType.REINIT_TRACKER(表示重新初始化,共有五种类型,对应五种命令)外啥也没有。

class ReinitTrackerAction extends TaskTrackerAction {
  public ReinitTrackerAction() {
    super(ActionType.REINIT_TRACKER);
  }
  public void write(DataOutput out) throws IOException {}
  public void readFields(DataInput in) throws IOException {}
}

2)JT 下达命令:
 
如上所说 JT 对 TT 下达该命令一般是由于两者之间状态不一致导致,具体见下代码(去掉了注释)。

if (initialContact != true) {
      if (prevHeartbeatResponse == null) {
        if (hasRestarted()) {
          addRestartInfo = true;
          recoveryManager.unMarkTracker(trackerName);
        } else {
          LOG.warn(“Serious problem, cannot find record of ‘previous’ ” +
                  “heartbeat for ‘” + trackerName +
                  “‘; reinitializing the tasktracker”);
          return new HeartbeatResponse(responseId,
              new TaskTrackerAction[] {new ReinitTrackerAction()});
        }
      } else {
        if (prevHeartbeatResponse.getResponseId() != responseId) {
          LOG.info(“Ignoring ‘duplicate’ heartbeat from ‘” +
              trackerName + “‘; resending the previous ‘lost’ response”);
          return prevHeartbeatResponse;
        }
      }
    }

initialContact 由 TT 发送,表示 TT 是否首次联系 JT,一般该变量只是在 TT 实例化时和初始化(initialize() 方法)时赋为 true,当调用了 TT 的 offerService() 方法之后该变量就会被赋成 false。TT 的 main() 方法会先实例化一个 TT 对象,然后会调用其 initialize() 方法,接着会调用 TT 的 offerService() 方法,该方法内会向 JT 发送一次心跳,这次心跳是该 TT 启动时发送的第一次心跳,所有 restarted 和 initialContact 都是 true,在这次心跳之后,offerService() 方法会将 restarted 和 initialContact 都设为 false。

当 initialContact==false 时,且 prevHeartbeatResponse==null,prevHeartbeatResponse 变量是 JT 从其保存的心跳记录中取出该 TT 的上次心跳记录,为 null 则表示 JT 没有收到过来自该 TT 的心跳记录,但是 initialContact==false 表示 TT 认为他不是首次联系 JT,即 JT 有接收到过 TT 的心跳请求,这样 JT 与 TT 就产生了状态不一致情况。注释给出了一种可能的解释:This is the first heartbeat from the old tracker to the newly  started JobTracker. 意思是 JT 是重新实例化,即 JT 刚重启过,但是 TT 未重启,或者重新初始化,即 old。出现这种情况时,JT 会根据是否有任务需要恢复来判断是否让 TT 重新初始化。至于为什么要在对 TT 下达重新初始化命令之前判断是否有任务需要恢复,是因为如果 JT 检查出有任务需要恢复,那么有可能需要回复的任务在该 TT 上运行过,那么就需要该 TT 来恢复该任务,而 TT 重新初始化之后会丢失所有任务。总之如果 JT 不需要进行任务恢复则对 TT 下达重新初始化命令。还有另外一种情况 JT 也会对 TT 下达该命令。

    if (!processHeartbeat(status, initialContact, now)) {
      if (prevHeartbeatResponse != null) {
        trackerToHeartbeatResponseMap.remove(trackerName);
      }
      return new HeartbeatResponse(newResponseId,
                  new TaskTrackerAction[] {new ReinitTrackerAction()});
    }

当 JT 调用 processHeartbeat() 方法处理心跳请求时返回 false,则对 TT 下达重新初始化命令。processHeartbeat() 方法返回 false 的原因如下:

boolean seenBefore = updateTaskTrackerStatus(trackerName,
                                                    trackerStatus);
        TaskTracker taskTracker = getTaskTracker(trackerName);
        if (initialContact) {
          // If it’s first contact, then clear out
          // any state hanging around
          if (seenBefore) {
            lostTaskTracker(taskTracker);
          }
        } else {
          // If not first contact, there should be some record of the tracker
          if (!seenBefore) {
            LOG.warn(“Status from unknown Tracker : ” + trackerName);
            updateTaskTrackerStatus(trackerName, null);
            return false;
          }
        }

当 initialContact==false 且 seenBefore==false 时返回 false,seenBefore 表示 JT 的 taskTrackers 队列中是否存在该 TT,不存在则 seenBefore==false,所以 processHeartbeat() 原因是 TT 不是首次联系 JT,但是 JT 中并不存在该 TT 的信息,又是一种不一致状态,所以 JT 会对其下达重新初始化命令。

以上就是 JT 对 TT 下达重新初始化命令产生的两种情况,归根到底都是由于 JT 与 TT 之间状态不一致导致,即 TT 认为他不是首次联系 JT,但是 JT 却没有 TT 的以前记录。

————————————– 分割线 ————————————–

Ubuntu 13.04 上搭建 Hadoop 环境 http://www.linuxidc.com/Linux/2013-06/86106.htm

Ubuntu 12.10 +Hadoop 1.2.1 版本集群配置 http://www.linuxidc.com/Linux/2013-09/90600.htm

Ubuntu 上搭建 Hadoop 环境(单机模式 + 伪分布模式)http://www.linuxidc.com/Linux/2013-01/77681.htm

Ubuntu 下 Hadoop 环境的配置 http://www.linuxidc.com/Linux/2012-11/74539.htm

单机版搭建 Hadoop 环境图文教程详解 http://www.linuxidc.com/Linux/2012-02/53927.htm

————————————– 分割线 ————————————–

3)TT 处理命令:

TT 发送心跳都是在 TT 的 offerService() 方法中调用的,该方法在 TT 运行过程中一直执行,当 TT 的心跳请求接收到响应时,会首先对收到的 HeartbeatResponse 中的 TaskTrackerAction 进行判断,判断是否有 ReinitTrackerAction 命令。

        TaskTrackerAction[] actions = heartbeatResponse.getActions();
        if(LOG.isDebugEnabled()) {
          LOG.debug(“Got heartbeatResponse from JobTracker with responseId: ” +
                    heartbeatResponse.getResponseId() + ” and ” +
                    ((actions != null) ? actions.length : 0) + ” actions”);
        }
        if (reinitTaskTracker(actions)) {
          return State.STALE;
        }

reinitTaskTracker() 方法判断是否有 ReinitTrackerAction 命令,

  private boolean reinitTaskTracker(TaskTrackerAction[] actions) {
    if (actions != null) {
      for (TaskTrackerAction action : actions) {
        if (action.getActionId() ==
            TaskTrackerAction.ActionType.REINIT_TRACKER) {
          LOG.info(“Recieved ReinitTrackerAction from JobTracker”);
          return true;
        }
      }
    }
    return false;
  }

简单的根据 Action 的 Id 进行判断,当判断出心跳的返回结果中有 ReinitTrackerAction 命令时,则退出 offerService() 的无限循环,并返回 State.STALE。TT 是个线程,所以 offerService() 的返回值返回到 run() 方法。

while (running && !staleState && !shuttingDown && !denied) {
            try {
              State osState = offerService();
              if (osState == State.STALE) {
                staleState = true;
              } else if (osState == State.DENIED) {
                denied = true;
              }
            } catch (Exception ex) {
              if (!shuttingDown) {
                LOG.info(“Lost connection to JobTracker [” +
                        jobTrackAddr + “].  Retrying…”, ex);
                try {
                  Thread.sleep(5000);
                } catch (InterruptedException ie) {
                }
              }
            }
          }

因为 offerService() 的返回值是 State.STALE,所以 staleState==true,会退出循环。

} finally {
          // If denied we’ll close via shutdown below. We should close
          // here even if shuttingDown as shuttingDown can be set even
          // if shutdown is not called.
          if (!denied) {
            close();
          }
        }
        if (shuttingDown) {return;}
        if (denied) {break;}
        LOG.warn(“Reinitializing local state”);
        initialize();

调用 initialize() 方法重新初始化。之后继续循环。

更多详情见请继续阅读下一页的精彩内容 :http://www.linuxidc.com/Linux/2014-06/103218p2.htm

3.KillJobAction

1)命令内容:

KillJobAction 跟 KillTaskAction 很相似,只不过 KillJobAction 是 kill job,而 KillTaskAction 是 kill task。所以 KillJobAction 内部保存一个 jobId(JobID)对象。

2)JT 下达命令:

同 KillJobAction 一样,JT 通过调用 getJobsForCleanup() 方法获取该 TT 上需要 kill 掉的 job 信息。

  private List<TaskTrackerAction> getJobsForCleanup(String taskTracker) {
    Set<JobID> jobs = null;
    synchronized (trackerToJobsToCleanup) {
      jobs = trackerToJobsToCleanup.remove(taskTracker);
    }
    if (jobs != null) {
      // prepare the actions list
      List<TaskTrackerAction> killList = new ArrayList<TaskTrackerAction>();
      for (JobID killJobId : jobs) {
        killList.add(new KillJobAction(killJobId));
        if(LOG.isDebugEnabled()) {
          LOG.debug(taskTracker + ” -> KillJobAction: ” + killJobId);
        }
      }
      return killList;
    }
    return null;
  }

该方法很简单,只是从 trackerToJobsToCleanup 队列中获取该 TT 所对应的需要 Cleanup 的 Job 信息。trackerToJobsToCleanup 队列 JT 在两种情况会向其添加内容,第一个是当一个 Job 完成时,通过 JT 的 finalizeJob() 方法;另一中情况是通过 JT 的 processHeartbeat() 处理心跳时,调用 updateTaskStatuses() 方法,获取心跳发送方(TT)上所有的 Task 信息,如果有的 Task 在 JT 上没有对应的 Job 存在,则将该 Task 所保存的 JobId 添加到 trackerToJobsToCleanup 队列中,等待清除。

3)TT 处理命令:

同上 KillTaskAction。

4.CommitTaskAction

1)命令内容:

CommitTaskAction 是指 TT 需要提交 Task,内部保存一个 taskId(TaskAttemptID)对象。

2)JT 下达命令:

JT 通过调用 getTasksToSave() 方法获取该 TT 需要提交的任务信息。

private synchronized List<TaskTrackerAction> getTasksToSave(
                                                TaskTrackerStatus tts) {
    List<TaskStatus> taskStatuses = tts.getTaskReports();
    if (taskStatuses != null) {
      List<TaskTrackerAction> saveList = new ArrayList<TaskTrackerAction>();
      for (TaskStatus taskStatus : taskStatuses) {
        if (taskStatus.getRunState() == TaskStatus.State.COMMIT_PENDING) {
          TaskAttemptID taskId = taskStatus.getTaskID();
          TaskInProgress tip = taskidToTIPMap.get(taskId);
          if (tip == null) {
            continue;
          }
          if (tip.shouldCommit(taskId)) {
            saveList.add(new CommitTaskAction(taskId));
            if (LOG.isDebugEnabled()) {
              LOG.debug(tts.getTrackerName() +
                      ” -> CommitTaskAction: ” + taskId);
            }
          }
        }
      }
      return saveList;
    }
    return null;
  }

该方法根据 TT 发送过来的 TaskTrackerStatus 获取该 TT 上所有的 TaskAttempt,然后从 taskidToTIPMap 中获取每个 TaskAttempt 所对应的 TaskInProgress,调用 TaskInProgress 的 shouldCommit() 方法判断该 TaskAttempt 是否应该 commit。下面看看 TaskInProgress 的 shouldCommit() 方法。

  public boolean shouldCommit(TaskAttemptID taskid) {
    return !isComplete() && isCommitPending(taskid) &&
          taskToCommit.equals(taskid);
  }

该方法内部通过 isComplete() 判断该 Task 是否已完成,如果 Task 已完成,则 TaskAttempt 无需 commit,如果 Task 未完成,且 TaskAttempt 处于 COMMIT_PENDING 状态(等待提交),且 Task 的 taskToCommit== 该 TaskAttempt 的 ID,则该 TaskAttempt 应该 commit。这里说一下,Hadoop 中 JT 是不进行任务任务执行的,所有的任务执行都是交由 TT 完成,JT 上只是保存了 Job/Task 的各种队列信息,而 JT 上保存的 Job/Task 等的状态信息的更新都是通过 TT 向 JT 发送心跳完成的,即在 JT 的 processHeartbeat() 方法中,这个方法内部根据 TT 发送过来的其上的所有 Task 状态信息来更新 JT 上保存的 Job/Task 状态信息,使得 JT 能够及时了解每个 Job/Task 的状态变化,以便根据其状态给出合适的处理命令。这里涉及的 taskToCommit 对象就是在 processHeartbeat() 方法调用 JobInProgress 对象的 updateTaskStatus() 方法更新的。

3)TT 处理命令:

TT 将接收到的 CommitTaskAction 命令存放在 commitResponses 队列中,该队列的作用是当 Task 完成时通过 RPC 请求向 TT 询问是否可以提交时,TT 根据 commitResponses 队列中是否包含该 Task 信息来决定是否让 Task 进行提交操作。具体发生在 Task 的 done() 方法中。

5.LaunchTaskAction

1)命令内容:

LaunchTaskAction 是五个命令中最复杂的一个,该命令指示 TT 进行 Task 的运行,所以涉及到 MapReduce 中最核心的问题——任务调度,即如何合理的调度各个任务。LaunchTaskAction 内部保存了一个 task(Task)对象,同时在序列化会先写入 task.isMapTask() 的值(boolean 型),在反序列化时会首先读取 isMapTask 值。

2)JT 下达命令:

List<Task> tasks = getSetupAndCleanupTasks(taskTrackerStatus);
        if (tasks == null) {
          tasks = taskScheduler.assignTasks(taskTrackers.get(trackerName));
        }
        if (tasks != null) {
          for (Task task : tasks) {
            expireLaunchingTasks.addNewTask(task.getTaskID());
            if(LOG.isDebugEnabled()) {
              LOG.debug(trackerName + ” -> LaunchTask: ” + task.getTaskID());
            }
            actions.add(new LaunchTaskAction(task));
          }
        }

JT 下达该命令需要根据 TT 发送心跳时的 acceptNewTasks 值决定是否给该 TT 下达任务。JT 在为 TT 选择任务的时的选择优先级是:Map Cleanup 任务,Map Cleanup 的 TaskAttempt,Map Setup 任务,Reduce Cleanup 任务,Reduce Cleanup 的 TaskAttempt,Reduce Setup 任务,Map/Reduce 任务。除去最后一个任务的选择,其他任务都是由 JT 选择的,最后的 Map/Reduce 任务则由 TaskScheduler 选择,这里涉及到任务调度,说实话不懂,略过。

3)TT 处理命令:

TT 接收到 JT 的 LaunchTaskAction 会调用 addToTaskQueue() 方法,根据 Task 的类型(Map/Reduce)分别添加到 mapLauncher 和 reduceLauncher 对象中。mapLauncher 和 reduceLauncher 对象是以线程模式运行的任务启动器,其在 TT 初始化过程中实例化并启动。这两个线程会进行 Task 的启动。

以上就是心跳响应中的五种命令,有错误之处还望指出,谢谢!

更多 Hadoop 相关信息见 Hadoop 专题页面 http://www.linuxidc.com/topicnews.aspx?tid=13

正文完
星哥说事-微信公众号
post-qrcode
 
星锅
版权声明:本站原创文章,由 星锅 2022-01-20发表,共计22206字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
【腾讯云】推广者专属福利,新客户无门槛领取总价值高达2860元代金券,每种代金券限量500张,先到先得。
阿里云-最新活动爆款每日限量供应
评论(没有评论)
验证码
【腾讯云】云服务器、云数据库、COS、CDN、短信等云产品特惠热卖中