阿里云-云小站(无限量代金券发放中)
【腾讯云】云服务器、云数据库、COS、CDN、短信等热卖云产品特惠抢购

Hadoop1.2.1源码解析系列:JT与TT之间的心跳通信机制——JT篇

119次阅读
没有评论

共计 15562 个字符,预计需要花费 39 分钟才能阅读完成。

上一篇浅析了 Hadoop 心跳机制的 TT(TaskTracker)方面(http://www.linuxidc.com/Linux/2014-06/103216.htm),这一篇浅析下 JT(JobTracker)方面。
 
我们知道心跳是 TT 通过 RPC 请求调用 JT 的 heartbeat() 方法的,TT 在调用 JT 的 heartbeat 回收集自身的状态信息封装到 TaskTrackerStatus 对象中,传递给 JT。下面看看 JT 如何处理来自 TT 的心跳。

目录

Hadoop1.2.1 源码解析系列:JT 与 TT 之间的心跳通信机制——TT 篇 http://www.linuxidc.com/Linux/2014-06/103216.htm

Hadoop1.2.1 源码解析系列:JT 与 TT 之间的心跳通信机制——JT 篇 http://www.linuxidc.com/Linux/2014-06/103217.htm

Hadoop1.2.1 源码解析系列:JT 与 TT 之间的心跳通信机制——命令篇 http://www.linuxidc.com/Linux/2014-06/103218.htm
 
1.JobTracker.heartbeat():

 // Make sure heartbeat is from a tasktracker allowed by the jobtracker.
    if (!acceptTaskTracker(status)) {
      throw new DisallowedTaskTrackerException(status);
    }

第一步是检查发送心跳请求的 TT 是否属于可允许的 TT,这个是根据一个 HostsFileReader 对象进行判断的,该对象是在实例化 JT 的时候创建的,这个类保存了两个队列,分别是 includes 和 excludes 队列,includes 表示可以访问的 host 列表,excludes 表示不可访问的 host 列表,这两个列表的内容根据两个 mapred.hosts 和 mapred.hosts.exclude(mapred-site,xml 中,默认是 null)这两个参数指定的文件名读取的。具体可参考 JT 源码 1956 行。
 
2.JobTracker.heartbeat():

String trackerName = status.getTrackerName();
    long now = clock.getTime();
    if (restarted) {
      faultyTrackers.markTrackerHealthy(status.getHost());
    } else {
      faultyTrackers.checkTrackerFaultTimeout(status.getHost(), now);
    }

第一步是检查发送心跳请求的 TT 是否属于可允许的 TT,这个是根据一个 HostsFileReader 对象进行判断的,该对象是在实例化 JT 的时候创建的,这个类保存了两个队列,分别是 includes 和 excludes 队列,includes 表示可以访问的 host 列表,excludes 表示不可访问的 host 列表,这两个列表的内容根据两个 mapred.hosts 和 mapred.hosts.exclude(mapred-site,xml 中,默认是 null)这两个参数指定的文件名读取的。具体可参考 JT 源码 1956 行。
 
2.JobTracker.heartbeat():

String trackerName = status.getTrackerName();
    long now = clock.getTime();
    if (restarted) {
      faultyTrackers.markTrackerHealthy(status.getHost());
    } else {
      faultyTrackers.checkTrackerFaultTimeout(status.getHost(), now);
    }

这一步是检查 TT 是否重启,是重启的话标识该 TT 的状态为健康的,否则检查 TT 的健康状态。faultyTrackers.markTrackerHealthy(status.getHost()) 内部将该 TT 所在的 Host 上所有的 TT(从这里可以看出 hadoop 考虑到一个 Host 上可能存在多个 TT 的可能)从黑名单,灰名单和可能存在错误的列表上删除,也就是从 potentiallyFaultyTrackers 队列中移除该 Host,通过更新 JT 的 numGraylistedTrackers/numBlacklistedTrackers 数量以及 JT 的 totalMapTaskCapacity 和 totalReduceTaskCapacity 数量。至于如何检查 TT 健康状态,具体是根据 JT 上记录的关于 TT 执行任务失败的次数来判断的(具体不是太理解)。

————————————– 分割线 ————————————–

Ubuntu 13.04 上搭建 Hadoop 环境 http://www.linuxidc.com/Linux/2013-06/86106.htm

Ubuntu 12.10 +Hadoop 1.2.1 版本集群配置 http://www.linuxidc.com/Linux/2013-09/90600.htm

Ubuntu 上搭建 Hadoop 环境(单机模式 + 伪分布模式)http://www.linuxidc.com/Linux/2013-01/77681.htm

Ubuntu 下 Hadoop 环境的配置 http://www.linuxidc.com/Linux/2012-11/74539.htm

单机版搭建 Hadoop 环境图文教程详解 http://www.linuxidc.com/Linux/2012-02/53927.htm

————————————– 分割线 ————————————–
 
3.JobTracker.heartbeat():

HeartbeatResponse prevHeartbeatResponse =
      trackerToHeartbeatResponseMap.get(trackerName);
    boolean addRestartInfo = false;

    if (initialContact != true) {
      // If this isn’t the ‘initial contact’ from the tasktracker,
      // there is something seriously wrong if the JobTracker has
      // no record of the ‘previous heartbeat’; if so, ask the
      // tasktracker to re-initialize itself.
      if (prevHeartbeatResponse == null) {
        // This is the first heartbeat from the old tracker to the newly
        // started JobTracker
        if (hasRestarted()) {
          addRestartInfo = true;
          // inform the recovery manager about this tracker joining back
          recoveryManager.unMarkTracker(trackerName);
        } else {
          // Jobtracker might have restarted but no recovery is needed
          // otherwise this code should not be reached
          LOG.warn(“Serious problem, cannot find record of ‘previous’ ” +
                  “heartbeat for ‘” + trackerName +
                  “‘; reinitializing the tasktracker”);
          return new HeartbeatResponse(responseId,
              new TaskTrackerAction[] {new ReinitTrackerAction()});
        }

      } else {
               
        // It is completely safe to not process a ‘duplicate’ heartbeat from a
        // {@link TaskTracker} since it resends the heartbeat when rpcs are
        // lost see {@link TaskTracker.transmitHeartbeat()};
        // acknowledge it by re-sending the previous response to let the
        // {@link TaskTracker} go forward.
        if (prevHeartbeatResponse.getResponseId() != responseId) {
          LOG.info(“Ignoring ‘duplicate’ heartbeat from ‘” +
              trackerName + “‘; resending the previous ‘lost’ response”);
          return prevHeartbeatResponse;
        }
      }
    }

此处第一句从 JT 记录的 HeartbeatResponse 队列中获取该 TT 的 HeartbeatResponse 信息,即判断 JT 之前是否收到过该 TT 的心跳请求。如果 initialContact!=true,表示 TT 不是首次连接 JT,同时如果 prevHeartbeatResponse==null,根据注释可以知道如果 TT 不是首次连接 JT,而且 JT 中并没有该 TT 之前的心跳请求信息,表明 This is the first heartbeat from the old tracker to the newly started JobTracker。判断 hasRestarted 是否为 true,hasRestarted 是在 JT 初始化(initialize() 方法)时,根据 recoveryManager 的 shouldRecover 来决定的,hasRestarted=shouldRecover,所以当需要进行 job 恢复时,addRestartInfo 会被设置为 true,即需要 TT 进行 job 恢复操作,同时从 recoveryManager 的 recoveredTrackers 队列中移除该 TT。如果不需要进行任务恢复,则直接返回 HeartbeatResponse,并对 TT 下重新初始化指令(后期介绍),注意此处返回的 responseId 还是原来的 responseId,即 responseId 不变。上面说的都是 prevHeartbeatResponse==null 时的情况,下面说说 prevHeartbeatResponse!=null 时如何处理,当 prevHeartbeatResponse!=null 时会直接返回 prevHeartbeatResponse,而忽略本次心跳请求。

更多详情见请继续阅读下一页的精彩内容 :http://www.linuxidc.com/Linux/2014-06/103217p2.htm

4.JobTracker.heartbeat():

// Process this heartbeat
    short newResponseId = (short)(responseId + 1);
    status.setLastSeen(now);
    if (!processHeartbeat(status, initialContact, now)) {
      if (prevHeartbeatResponse != null) {
        trackerToHeartbeatResponseMap.remove(trackerName);
      }
      return new HeartbeatResponse(newResponseId,
                  new TaskTrackerAction[] {new ReinitTrackerAction()});
    }

首先将 responseId+1,然后记录心跳发送时间。接着来看看 processHeartbeat() 方法。
 
5.JobTracker.processHeartbeat():

 boolean seenBefore = updateTaskTrackerStatus(trackerName,
                                                    trackerStatus);

根据该 TT 的上一次心跳发送的状态信息更新 JT 的一些信息,如 totalMaps,totalReduces,occupiedMapSlots,occupiedReduceSlots 等,接着根据本次心跳发送的 TT 状态信息再次更新这些变量。
 
6.JobTracker.processHeartbeat():

TaskTracker taskTracker = getTaskTracker(trackerName);
        if (initialContact) {
          // If it’s first contact, then clear out
          // any state hanging around
          if (seenBefore) {
            lostTaskTracker(taskTracker);
          }
        } else {
          // If not first contact, there should be some record of the tracker
          if (!seenBefore) {
            LOG.warn(“Status from unknown Tracker : ” + trackerName);
            updateTaskTrackerStatus(trackerName, null);
            return false;
          }
        }

如果该 TT 是首次连接 JT,且存在 oldStatus,则表明 JT 丢失了 TT,具体意思应该是 JT 在一段时间内与 TT 失去了联系,之后 TT 恢复了,所以发送心跳时显示首次连接。lostTaskTracker(taskTracker):会将该 TT 从所有的队列中移除,并将该 TT 上记录的 job 清除掉 (kill 掉),当然对那些已经完成的 Job 不会进行次操作。当 TT 不是首次连接到 JT,但是 JT 却没有该 TT 的历史 status 信息,则表示 JT 对该 TT 未知,所以重新更新 TaskTracker 状态信息。
 
7.JobTracker.processHeartbeat():

    updateTaskStatuses(trackerStatus);
    updateNodeHealthStatus(trackerStatus, timeStamp);

更新 Task 和 NodeHealth 信息,较复杂。
 
8.JobTracker.heartbeat():如果 processHeartbeat() 返回 false,则返回 HeartbeatResponse(),并下达重新初始化 TT 指令。

// Initialize the response to be sent for the heartbeat
    HeartbeatResponse response = new HeartbeatResponse(newResponseId, null);
    List<TaskTrackerAction> actions = new ArrayList<TaskTrackerAction>();
    boolean isBlacklisted = faultyTrackers.isBlacklisted(status.getHost());
    // Check for new tasks to be executed on the tasktracker
    if (recoveryManager.shouldSchedule() && acceptNewTasks && !isBlacklisted) {
      TaskTrackerStatus taskTrackerStatus = getTaskTrackerStatus(trackerName);
      if (taskTrackerStatus == null) {
        LOG.warn(“Unknown task tracker polling; ignoring: ” + trackerName);
      } else {
        List<Task> tasks = getSetupAndCleanupTasks(taskTrackerStatus);
        if (tasks == null) {
          tasks = taskScheduler.assignTasks(taskTrackers.get(trackerName));
        }
        if (tasks != null) {
          for (Task task : tasks) {
            expireLaunchingTasks.addNewTask(task.getTaskID());
            if(LOG.isDebugEnabled()) {
              LOG.debug(trackerName + ” -> LaunchTask: ” + task.getTaskID());
            }
            actions.add(new LaunchTaskAction(task));
          }
        }
      }
    }

此处会实例化一个 HeartbeatResponse 对象,作为本次心跳的返回值,在初始化一个 TaskTrackerAction 队列,用于存放 JT 对 TT 下达的指令。首先需要判断 recoveryManager 的 recoveredTrackers 是否为空,即是否有需要回复的 TT,然后根据 TT 心跳发送的 acceptNewTasks 值,即表明 TT 是否可接收新任务,并且该 TT 不在黑名单中,同上满足以上条件,则 JT 可以为 TT 分配任务。分配任务的选择方式是优先 CleanipTask,然后是 SetupTask,然后才是 Map/Reduce Task。下面来看下 getSetupAndCleanupTasks() 方法。
 
9.JobTracker.getSetupAndCleanupTasks():

// Don’t assign *any* new task in safemode
    if (isInSafeMode()) {
      return null;
    }

如果集群处于 safe 模式,则不分配任务。

    int maxMapTasks = taskTracker.getMaxMapSlots();
    int maxReduceTasks = taskTracker.getMaxReduceSlots();
    int numMaps = taskTracker.countOccupiedMapSlots();
    int numReduces = taskTracker.countOccupiedReduceSlots();
    int numTaskTrackers = getClusterStatus().getTaskTrackers();
    int numUniqueHosts = getNumberOfUniqueHosts();

计算 TT 的最大 map/reduce slot,以及已占用的 map/reduce slot,以及集群可使用的 TT 数量,和集群的 host 数量。

for (Iterator<JobInProgress> it = jobs.values().iterator();
            it.hasNext();) {
          JobInProgress job = it.next();
          t = job.obtainJobCleanupTask(taskTracker, numTaskTrackers,
                                    numUniqueHosts, true);
          if (t != null) {
            return Collections.singletonList(t);
          }
        }

上一篇浅析了 Hadoop 心跳机制的 TT(TaskTracker)方面(http://www.linuxidc.com/Linux/2014-06/103216.htm),这一篇浅析下 JT(JobTracker)方面。
 
我们知道心跳是 TT 通过 RPC 请求调用 JT 的 heartbeat() 方法的,TT 在调用 JT 的 heartbeat 回收集自身的状态信息封装到 TaskTrackerStatus 对象中,传递给 JT。下面看看 JT 如何处理来自 TT 的心跳。

目录

Hadoop1.2.1 源码解析系列:JT 与 TT 之间的心跳通信机制——TT 篇 http://www.linuxidc.com/Linux/2014-06/103216.htm

Hadoop1.2.1 源码解析系列:JT 与 TT 之间的心跳通信机制——JT 篇 http://www.linuxidc.com/Linux/2014-06/103217.htm

Hadoop1.2.1 源码解析系列:JT 与 TT 之间的心跳通信机制——命令篇 http://www.linuxidc.com/Linux/2014-06/103218.htm
 
1.JobTracker.heartbeat():

 // Make sure heartbeat is from a tasktracker allowed by the jobtracker.
    if (!acceptTaskTracker(status)) {
      throw new DisallowedTaskTrackerException(status);
    }

第一步是检查发送心跳请求的 TT 是否属于可允许的 TT,这个是根据一个 HostsFileReader 对象进行判断的,该对象是在实例化 JT 的时候创建的,这个类保存了两个队列,分别是 includes 和 excludes 队列,includes 表示可以访问的 host 列表,excludes 表示不可访问的 host 列表,这两个列表的内容根据两个 mapred.hosts 和 mapred.hosts.exclude(mapred-site,xml 中,默认是 null)这两个参数指定的文件名读取的。具体可参考 JT 源码 1956 行。
 
2.JobTracker.heartbeat():

String trackerName = status.getTrackerName();
    long now = clock.getTime();
    if (restarted) {
      faultyTrackers.markTrackerHealthy(status.getHost());
    } else {
      faultyTrackers.checkTrackerFaultTimeout(status.getHost(), now);
    }

第一步是检查发送心跳请求的 TT 是否属于可允许的 TT,这个是根据一个 HostsFileReader 对象进行判断的,该对象是在实例化 JT 的时候创建的,这个类保存了两个队列,分别是 includes 和 excludes 队列,includes 表示可以访问的 host 列表,excludes 表示不可访问的 host 列表,这两个列表的内容根据两个 mapred.hosts 和 mapred.hosts.exclude(mapred-site,xml 中,默认是 null)这两个参数指定的文件名读取的。具体可参考 JT 源码 1956 行。
 
2.JobTracker.heartbeat():

String trackerName = status.getTrackerName();
    long now = clock.getTime();
    if (restarted) {
      faultyTrackers.markTrackerHealthy(status.getHost());
    } else {
      faultyTrackers.checkTrackerFaultTimeout(status.getHost(), now);
    }

这一步是检查 TT 是否重启,是重启的话标识该 TT 的状态为健康的,否则检查 TT 的健康状态。faultyTrackers.markTrackerHealthy(status.getHost()) 内部将该 TT 所在的 Host 上所有的 TT(从这里可以看出 hadoop 考虑到一个 Host 上可能存在多个 TT 的可能)从黑名单,灰名单和可能存在错误的列表上删除,也就是从 potentiallyFaultyTrackers 队列中移除该 Host,通过更新 JT 的 numGraylistedTrackers/numBlacklistedTrackers 数量以及 JT 的 totalMapTaskCapacity 和 totalReduceTaskCapacity 数量。至于如何检查 TT 健康状态,具体是根据 JT 上记录的关于 TT 执行任务失败的次数来判断的(具体不是太理解)。

————————————– 分割线 ————————————–

Ubuntu 13.04 上搭建 Hadoop 环境 http://www.linuxidc.com/Linux/2013-06/86106.htm

Ubuntu 12.10 +Hadoop 1.2.1 版本集群配置 http://www.linuxidc.com/Linux/2013-09/90600.htm

Ubuntu 上搭建 Hadoop 环境(单机模式 + 伪分布模式)http://www.linuxidc.com/Linux/2013-01/77681.htm

Ubuntu 下 Hadoop 环境的配置 http://www.linuxidc.com/Linux/2012-11/74539.htm

单机版搭建 Hadoop 环境图文教程详解 http://www.linuxidc.com/Linux/2012-02/53927.htm

————————————– 分割线 ————————————–
 
3.JobTracker.heartbeat():

HeartbeatResponse prevHeartbeatResponse =
      trackerToHeartbeatResponseMap.get(trackerName);
    boolean addRestartInfo = false;

    if (initialContact != true) {
      // If this isn’t the ‘initial contact’ from the tasktracker,
      // there is something seriously wrong if the JobTracker has
      // no record of the ‘previous heartbeat’; if so, ask the
      // tasktracker to re-initialize itself.
      if (prevHeartbeatResponse == null) {
        // This is the first heartbeat from the old tracker to the newly
        // started JobTracker
        if (hasRestarted()) {
          addRestartInfo = true;
          // inform the recovery manager about this tracker joining back
          recoveryManager.unMarkTracker(trackerName);
        } else {
          // Jobtracker might have restarted but no recovery is needed
          // otherwise this code should not be reached
          LOG.warn(“Serious problem, cannot find record of ‘previous’ ” +
                  “heartbeat for ‘” + trackerName +
                  “‘; reinitializing the tasktracker”);
          return new HeartbeatResponse(responseId,
              new TaskTrackerAction[] {new ReinitTrackerAction()});
        }

      } else {
               
        // It is completely safe to not process a ‘duplicate’ heartbeat from a
        // {@link TaskTracker} since it resends the heartbeat when rpcs are
        // lost see {@link TaskTracker.transmitHeartbeat()};
        // acknowledge it by re-sending the previous response to let the
        // {@link TaskTracker} go forward.
        if (prevHeartbeatResponse.getResponseId() != responseId) {
          LOG.info(“Ignoring ‘duplicate’ heartbeat from ‘” +
              trackerName + “‘; resending the previous ‘lost’ response”);
          return prevHeartbeatResponse;
        }
      }
    }

此处第一句从 JT 记录的 HeartbeatResponse 队列中获取该 TT 的 HeartbeatResponse 信息,即判断 JT 之前是否收到过该 TT 的心跳请求。如果 initialContact!=true,表示 TT 不是首次连接 JT,同时如果 prevHeartbeatResponse==null,根据注释可以知道如果 TT 不是首次连接 JT,而且 JT 中并没有该 TT 之前的心跳请求信息,表明 This is the first heartbeat from the old tracker to the newly started JobTracker。判断 hasRestarted 是否为 true,hasRestarted 是在 JT 初始化(initialize() 方法)时,根据 recoveryManager 的 shouldRecover 来决定的,hasRestarted=shouldRecover,所以当需要进行 job 恢复时,addRestartInfo 会被设置为 true,即需要 TT 进行 job 恢复操作,同时从 recoveryManager 的 recoveredTrackers 队列中移除该 TT。如果不需要进行任务恢复,则直接返回 HeartbeatResponse,并对 TT 下重新初始化指令(后期介绍),注意此处返回的 responseId 还是原来的 responseId,即 responseId 不变。上面说的都是 prevHeartbeatResponse==null 时的情况,下面说说 prevHeartbeatResponse!=null 时如何处理,当 prevHeartbeatResponse!=null 时会直接返回 prevHeartbeatResponse,而忽略本次心跳请求。

更多详情见请继续阅读下一页的精彩内容 :http://www.linuxidc.com/Linux/2014-06/103217p2.htm

首先获取 Job 的 Cleanup 任务,每个 Job 有两个 Cleanup 任务,分别是 map 和 reduce 的。

for (Iterator<JobInProgress> it = jobs.values().iterator();
it.hasNext();) {
JobInProgress job = it.next();
t = job.obtainTaskCleanupTask(taskTracker, true);
if (t != null) {
return Collections.singletonList(t);
}
}

然后获取一个 Cleanup 任务的 TaskAttempt。

for (Iterator<JobInProgress> it = jobs.values().iterator();
it.hasNext();) {
JobInProgress job = it.next();
t = job.obtainJobSetupTask(taskTracker, numTaskTrackers,
numUniqueHosts, true);
if (t != null) {
return Collections.singletonList(t);
}
}

然后在获取 Job 的 setup 任务。上面这三个全部是获取的 map 任务,而下面是获取 reduce 任务,方法基本一样。

如果该方法返回 null,则表示没有 cleanup 或者 setup 任务需要执行,则执行 map/reduce 任务。

10.JobTracker.heartbeat():

if (tasks == null) {
tasks = taskScheduler.assignTasks(taskTrackers.get(trackerName));
}

此处是使用 TaskScheduler 调度任务,一大难点,后期分析。

11.JobTracker.heartbeat():

if (tasks != null) {
for (Task task : tasks) {
expireLaunchingTasks.addNewTask(task.getTaskID());
if(LOG.isDebugEnabled()) {
LOG.debug(trackerName + ” -> LaunchTask: ” + task.getTaskID());
}
actions.add(new LaunchTaskAction(task));
}
}

生成一个 LaunchTaskAction 指令。

// Check for tasks to be killed
List<TaskTrackerAction> killTasksList = getTasksToKill(trackerName);
if (killTasksList != null) {
actions.addAll(killTasksList);
}

// Check for jobs to be killed/cleanedup
List<TaskTrackerAction> killJobsList = getJobsForCleanup(trackerName);
if (killJobsList != null) {
actions.addAll(killJobsList);
}

// Check for tasks whose outputs can be saved
List<TaskTrackerAction> commitTasksList = getTasksToSave(status);
if (commitTasksList != null) {
actions.addAll(commitTasksList);
}

以上分别是下达 kill task 指令,kill/cleanedup job 指令,commit task 指令。以上四种指令,加上一个 ReinitTackerAction,这是心跳 JT 对 TT 下达的所有五种指令,以后可以相信对其进行分析。

12.JobTracker.heartbeat():

// calculate next heartbeat interval and put in heartbeat response
int nextInterval = getNextHeartbeatInterval();
response.setHeartbeatInterval(nextInterval);
response.setActions(
actions.toArray(new TaskTrackerAction[actions.size()]));

// check if the restart info is req
if (addRestartInfo) {
response.setRecoveredJobs(recoveryManager.getJobsToRecover());
}

// Update the trackerToHeartbeatResponseMap
trackerToHeartbeatResponseMap.put(trackerName, response);

// Done processing the hearbeat, now remove ‘marked’ tasks
removeMarkedTasks(trackerName);

剩下一些收尾工作,如计算下次发送心跳的时间,以及设置需要 TT 进行恢复的任务,更新 trackerToHeartbeatResponseMap 队列,移除标记的 task。最后返回 HeartbeatResponse 对象,完成心跳请求响应。

到此 JT 的 heartbeat() 完成了,中间很多地方比较复杂,都没有去深追,以后有时间可以继续研究,如有错误,请不吝指教,谢谢

更多 Hadoop 相关信息见 Hadoop 专题页面 http://www.linuxidc.com/topicnews.aspx?tid=13

正文完
星哥说事-微信公众号
post-qrcode
 0
星锅
版权声明:本站原创文章,由 星锅 于2022-01-20发表,共计15562字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
【腾讯云】推广者专属福利,新客户无门槛领取总价值高达2860元代金券,每种代金券限量500张,先到先得。
阿里云-最新活动爆款每日限量供应
评论(没有评论)
验证码
【腾讯云】云服务器、云数据库、COS、CDN、短信等云产品特惠热卖中