阿里云-云小站(无限量代金券发放中)
【腾讯云】云服务器、云数据库、COS、CDN、短信等热卖云产品特惠抢购

Hadoop JobTracker提交job源码浅析

397次阅读
没有评论

共计 9338 个字符,预计需要花费 24 分钟才能阅读完成。

上一篇文章说到 jobClient 提交 job 的过程,这篇文章是接着上一篇文章继续写的 http://www.linuxidc.com/Linux/2013-12/93700.htm

上一篇说到 jobSubmitClient.submitJob(jobId, submitJobDir.toString(), jobCopy.getCredentials()) 这里,这里就是 jobTracker 进行 job 的提交过程,还有一个 JobSubmissionProtocol 的实现是 LocalJobRunner,这是本地执行的时候使用的,真正集群运行 Job 还是使用的 jobTracker,所以只看 jobTracker 类的 submitJob。

1.jobTracker.submitJob():第一句就是 checkJobTrackerState() 这个是检查 jobTracker 状态,是否运行中,这里说一句,jobTracker 是在 Hadoop 集群启动的时候启动的,也就是在执行 start-all 或者 start-mapred 的时候启动,启动的时候会调用 JobTracker 的 main 方法,然后在 jps 的时候就可以看见一个 jobTracker 的进程了。下面来看一下 JobTracker.main() 方法。

2.JobTracker.main():第一句是 JobTracker tracker = startTracker(new JobConf()),这是实例化一个 jobTracke 实例。

3.JobTracker.startTracker():result = new JobTracker(conf, identifier),实例化一个 jobTracker 对象,在实例化的时候会做很多事,所以还是进去瞅瞅。

4.JobTracker.JobTracker():实例化的时候会初始化很多参数,记也记不住,主要看下实例化 taskScheduler 的内容:Class<? extends TaskScheduler> schedulerClass

      = conf.getClass(“mapred.jobtracker.taskScheduler”,JobQueueTaskScheduler.class, TaskScheduler.class);taskScheduler = (TaskScheduler) ReflectionUtils.newInstance(schedulerClass, conf),这两句就是根据配置文件设置的 taskScheduler 类名,通过反射获得对应的 taskScheduler 对象,在实例化的时候虽然不同的 TaskScheduler 具体操作不一样,但是统一的都会初始化一个 JobListener 对象,这个对象就是后面将要监听 job 的 listener。剩下的内容就不说了。回到 JobTracker.startTracker() 方法。

5.JobTracker.JobTracker():在实例化 jobTracker 之后,会执行 result.taskScheduler.setTaskTrackerManager(result),这个就是将 jobTracker 对象设置给 taskScheduler。后面就什么了,现在可以回到 main 方法了

public static JobTracker startTracker(JobConf conf, String identifier, boolean initialize)
  throws IOException, InterruptedException {
    DefaultMetricsSystem.initialize(“JobTracker”);
    JobTracker result = null;
    while (true) {
      try {
        result = new JobTracker(conf, identifier);
        result.taskScheduler.setTaskTrackerManager(result);
        break;
      } catch (VersionMismatch e) {
        throw e;
      } catch (BindException e) {
        throw e;
      } catch (UnknownHostException e) {
        throw e;
      } catch (AccessControlException ace) {
        // in case of jobtracker not having right access
        // bail out
        throw ace;
      } catch (IOException e) {
        LOG.warn(“Error starting tracker: ” +
                StringUtils.stringifyException(e));
      }
      Thread.sleep(1000);
    }
    if (result != null) {
      JobEndNotifier.startNotifier();
      MBeans.register(“JobTracker”, “JobTrackerInfo”, result);
      if(initialize == true) {
        result.setSafeModeInternal(SafeModeAction.SAFEMODE_ENTER);
        result.initializeFilesystem();
        result.setSafeModeInternal(SafeModeAction.SAFEMODE_LEAVE);
        result.initialize();
      }
    }
    return result;
  }

更多详情见请继续阅读下一页的精彩内容:http://www.linuxidc.com/Linux/2013-12/93701p2.htm

相关阅读

Ubuntu 13.04 上搭建 Hadoop 环境 http://www.linuxidc.com/Linux/2013-06/86106.htm

Ubuntu 12.10 +Hadoop 1.2.1 版本集群配置 http://www.linuxidc.com/Linux/2013-09/90600.htm

Ubuntu 上搭建 Hadoop 环境(单机模式 + 伪分布模式)http://www.linuxidc.com/Linux/2013-01/77681.htm

Ubuntu 下 Hadoop 环境的配置 http://www.linuxidc.com/Linux/2012-11/74539.htm

单机版搭建 Hadoop 环境图文教程详解 http://www.linuxidc.com/Linux/2012-02/53927.htm

搭建 Hadoop 环境(在 Winodws 环境下用虚拟机虚拟两个 Ubuntu 系统进行搭建)http://www.linuxidc.com/Linux/2011-12/48894.htm

6.JobTracker.main():在实例化 jobTracker 之后,会调用 tracker.offerService() 方法,之后 main 方法就没什么了,下面看看 tracker.offerService() 这个方法。

public static void main(String argv[]
                          ) throws IOException, InterruptedException {
    StringUtils.startupShutdownMessage(JobTracker.class, argv, LOG);
   
    try {
      if(argv.length == 0) {
        JobTracker tracker = startTracker(new JobConf());
        tracker.offerService();
      }
      else {
        if (“-dumpConfiguration”.equals(argv[0]) && argv.length == 1) {
          dumpConfiguration(new PrintWriter(System.out));
        }
        else {
          System.out.println(“usage: JobTracker [-dumpConfiguration]”);
          System.exit(-1);
        }
      }
    } catch (Throwable e) {
      LOG.fatal(StringUtils.stringifyException(e));
      System.exit(-1);
    }
  }

7.JobTracker.offerService():这个方法中有一些其他东西,略掉,只看 taskScheduler.start() 这个方法,因为这里只是想分析下 JobTracker 提交 job 的过程,所以省去很多复杂的东西。

 

8.taskScheduler.start():这个方法就是启动 TaskScheduler,这个方法不同 taskScheduler 也不同,但是统一的还是会有一个 taskTrackerManager.addJobInProgressListener(jobListener) 这个操作,taskTrackerManager 就是 jobTracker(第 5 步),这句的意思是为 jobTracker 添加 jobListener,用来监听 job 的。这句的内部就是调用 jobTracker 的 jobInProgressListeners 集合的 add(listener) 方法。

到这里可以说看完了整个 JobTracker 的启动过程,虽然很浅显,但是对于后面将要分析的内容,这些就够了。下面来看看 job 的提交过程,也就是 jobTracker 的 submit() 方法。

1.jobTracker.submit():第一步是 checkSafeMode(),检查是否在安全模式,在安全模式则抛出异常。然后执行 jobInfo = new JobInfo(jobId, new Text(ugi.getShortUserName()),new Path(jobSubmitDir),生成一个 jobInfo 对象,jobInfo 主要保存 job 的 id,user,jobSubmitDir(也就是 job 的任务目录,上一篇文章提到)。接着是判断 job 是否可被 recovered(job 失败的时候尝试再次执行),如果允许的话 (默认允许),则将 jobInfo 对象序列化到 job-info 文件中。接着到达最关键的地方,job = new JobInProgress(this, this.conf, jobInfo, 0, ts),为 job 实例化一个 JobInProgress 对象,这个对象将会对 job 以后的所有情况进行负责,如初始化,执行等。下面看看 JobInProgress 对象的初始化操作。

2.JobInProgress:这里看下将 job.xml 下载到本地的操作。然后就是 job 的队列信息,默认的队列名是 default,Queue queue = this.jobtracker.getQueueManager().getQueue(queueName),这个主要是根据 Hadoop 所使用的 taskScheduler 有关,具体不研究。剩下的是一些参数的初始化,如 map 的数目,reduce 的数目等。这里还有个设置 job 的优先级的,默认是 normal。this.priority = conf.getJobPriority();this.status.setJobPriority(this.priority); 还有检查 taskLimit 的操作,就是检查 map+reduce 的任务数是否超出 mapred.jobtracker.maxtasks.per.job 设置的值,默认是 -1,就是没有限制的意思。回到 jobTracker.submit() 方法

this.localJobFile = default_conf.getLocalPath(JobTracker.SUBDIR
          +”/”+jobId + “.xml”);
      Path jobFilePath = JobSubmissionFiles.getJobConfPath(jobSubmitDir);
      jobFile = jobFilePath.toString();
      fs.copyToLocalFile(jobFilePath, localJobFile);
      conf = new JobConf(localJobFile);

上一篇文章说到 jobClient 提交 job 的过程,这篇文章是接着上一篇文章继续写的 http://www.linuxidc.com/Linux/2013-12/93700.htm

上一篇说到 jobSubmitClient.submitJob(jobId, submitJobDir.toString(), jobCopy.getCredentials()) 这里,这里就是 jobTracker 进行 job 的提交过程,还有一个 JobSubmissionProtocol 的实现是 LocalJobRunner,这是本地执行的时候使用的,真正集群运行 Job 还是使用的 jobTracker,所以只看 jobTracker 类的 submitJob。

1.jobTracker.submitJob():第一句就是 checkJobTrackerState() 这个是检查 jobTracker 状态,是否运行中,这里说一句,jobTracker 是在 Hadoop 集群启动的时候启动的,也就是在执行 start-all 或者 start-mapred 的时候启动,启动的时候会调用 JobTracker 的 main 方法,然后在 jps 的时候就可以看见一个 jobTracker 的进程了。下面来看一下 JobTracker.main() 方法。

2.JobTracker.main():第一句是 JobTracker tracker = startTracker(new JobConf()),这是实例化一个 jobTracke 实例。

3.JobTracker.startTracker():result = new JobTracker(conf, identifier),实例化一个 jobTracker 对象,在实例化的时候会做很多事,所以还是进去瞅瞅。

4.JobTracker.JobTracker():实例化的时候会初始化很多参数,记也记不住,主要看下实例化 taskScheduler 的内容:Class<? extends TaskScheduler> schedulerClass

      = conf.getClass(“mapred.jobtracker.taskScheduler”,JobQueueTaskScheduler.class, TaskScheduler.class);taskScheduler = (TaskScheduler) ReflectionUtils.newInstance(schedulerClass, conf),这两句就是根据配置文件设置的 taskScheduler 类名,通过反射获得对应的 taskScheduler 对象,在实例化的时候虽然不同的 TaskScheduler 具体操作不一样,但是统一的都会初始化一个 JobListener 对象,这个对象就是后面将要监听 job 的 listener。剩下的内容就不说了。回到 JobTracker.startTracker() 方法。

5.JobTracker.JobTracker():在实例化 jobTracker 之后,会执行 result.taskScheduler.setTaskTrackerManager(result),这个就是将 jobTracker 对象设置给 taskScheduler。后面就什么了,现在可以回到 main 方法了

public static JobTracker startTracker(JobConf conf, String identifier, boolean initialize)
  throws IOException, InterruptedException {
    DefaultMetricsSystem.initialize(“JobTracker”);
    JobTracker result = null;
    while (true) {
      try {
        result = new JobTracker(conf, identifier);
        result.taskScheduler.setTaskTrackerManager(result);
        break;
      } catch (VersionMismatch e) {
        throw e;
      } catch (BindException e) {
        throw e;
      } catch (UnknownHostException e) {
        throw e;
      } catch (AccessControlException ace) {
        // in case of jobtracker not having right access
        // bail out
        throw ace;
      } catch (IOException e) {
        LOG.warn(“Error starting tracker: ” +
                StringUtils.stringifyException(e));
      }
      Thread.sleep(1000);
    }
    if (result != null) {
      JobEndNotifier.startNotifier();
      MBeans.register(“JobTracker”, “JobTrackerInfo”, result);
      if(initialize == true) {
        result.setSafeModeInternal(SafeModeAction.SAFEMODE_ENTER);
        result.initializeFilesystem();
        result.setSafeModeInternal(SafeModeAction.SAFEMODE_LEAVE);
        result.initialize();
      }
    }
    return result;
  }

更多详情见请继续阅读下一页的精彩内容:http://www.linuxidc.com/Linux/2013-12/93701p2.htm

相关阅读

Ubuntu 13.04 上搭建 Hadoop 环境 http://www.linuxidc.com/Linux/2013-06/86106.htm

Ubuntu 12.10 +Hadoop 1.2.1 版本集群配置 http://www.linuxidc.com/Linux/2013-09/90600.htm

Ubuntu 上搭建 Hadoop 环境(单机模式 + 伪分布模式)http://www.linuxidc.com/Linux/2013-01/77681.htm

Ubuntu 下 Hadoop 环境的配置 http://www.linuxidc.com/Linux/2012-11/74539.htm

单机版搭建 Hadoop 环境图文教程详解 http://www.linuxidc.com/Linux/2012-02/53927.htm

搭建 Hadoop 环境(在 Winodws 环境下用虚拟机虚拟两个 Ubuntu 系统进行搭建)http://www.linuxidc.com/Linux/2011-12/48894.htm

3.jobTracker.submit():实例化 JobInProgress 之后,会根据 jobProfile 获取 job 的队列信息,并判断相应的队列是否在运行中,不在则任务失败。然后检查内存情况 checkMemoryRequirements(job),再调用 taskScheduler 的 taskScheduler.checkJobSubmission(job) 检查任务提交情况(具体是啥玩意,不太情况)。接下来就是执行 status = addJob(jobId, job),为 Job 设置 listener。

 

4.jobTracker.addJob():前面说过,在初始化 jobTracker 的时候会实例化 taskScheduler,然后调用 taskScheduler 的 start() 方法,为 jobTracker 添加 JobListener 对象,所以这里的 JobInProgressListener 对象就是相应的 taskScheduler 的 JobListener,这里为 job 添加了 JobListener。

private synchronized JobStatus addJob(JobID jobId, JobInProgress job)
  throws IOException {
    totalSubmissions++;

    synchronized (jobs) {
      synchronized (taskScheduler) {
        jobs.put(job.getProfile().getJobID(), job);
        for (JobInProgressListener listener : jobInProgressListeners) {
          listener.jobAdded(job);
        }
      }
    }
    myInstrumentation.submitJob(job.getJobConf(), jobId);
    job.getQueueMetrics().submitJob(job.getJobConf(), jobId);

    LOG.info(“Job ” + jobId + ” added successfully for user ‘”
            + job.getJobConf().getUser() + “‘ to queue ‘”
            + job.getJobConf().getQueueName() + “‘”);
    AuditLogger.logSuccess(job.getUser(),
        Operation.SUBMIT_JOB.name(), jobId.toString());
    return job.getStatus();
  }

到这里整个 JobTracker 的 job 提交过程就结束了,中间很多东西没有深入去研究,只是浅显的了解了下,如有错误,请指出,谢谢

更多 Hadoop 相关信息见 Hadoop 专题页面 http://www.linuxidc.com/topicnews.aspx?tid=13

正文完
星哥玩云-微信公众号
post-qrcode
 0
星锅
版权声明:本站原创文章,由 星锅 于2022-01-20发表,共计9338字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
【腾讯云】推广者专属福利,新客户无门槛领取总价值高达2860元代金券,每种代金券限量500张,先到先得。
阿里云-最新活动爆款每日限量供应
评论(没有评论)
验证码
【腾讯云】云服务器、云数据库、COS、CDN、短信等云产品特惠热卖中

星哥玩云

星哥玩云
星哥玩云
分享互联网知识
用户数
4
文章数
19351
评论数
4
阅读量
7986268
文章搜索
热门文章
星哥带你玩飞牛NAS-6:抖音视频同步工具,视频下载自动下载保存

星哥带你玩飞牛NAS-6:抖音视频同步工具,视频下载自动下载保存

星哥带你玩飞牛 NAS-6:抖音视频同步工具,视频下载自动下载保存 前言 各位玩 NAS 的朋友好,我是星哥!...
星哥带你玩飞牛NAS-3:安装飞牛NAS后的很有必要的操作

星哥带你玩飞牛NAS-3:安装飞牛NAS后的很有必要的操作

星哥带你玩飞牛 NAS-3:安装飞牛 NAS 后的很有必要的操作 前言 如果你已经有了飞牛 NAS 系统,之前...
我把用了20年的360安全卫士卸载了

我把用了20年的360安全卫士卸载了

我把用了 20 年的 360 安全卫士卸载了 是的,正如标题你看到的。 原因 偷摸安装自家的软件 莫名其妙安装...
再见zabbix!轻量级自建服务器监控神器在Linux 的完整部署指南

再见zabbix!轻量级自建服务器监控神器在Linux 的完整部署指南

再见 zabbix!轻量级自建服务器监控神器在 Linux 的完整部署指南 在日常运维中,服务器监控是绕不开的...
飞牛NAS中安装Navidrome音乐文件中文标签乱码问题解决、安装FntermX终端

飞牛NAS中安装Navidrome音乐文件中文标签乱码问题解决、安装FntermX终端

飞牛 NAS 中安装 Navidrome 音乐文件中文标签乱码问题解决、安装 FntermX 终端 问题背景 ...
阿里云CDN
阿里云CDN-提高用户访问的响应速度和成功率
随机文章
Prometheus:监控系统的部署与指标收集

Prometheus:监控系统的部署与指标收集

Prometheus:监控系统的部署与指标收集 在云原生体系中,Prometheus 已成为最主流的监控与报警...
星哥带你玩飞牛NAS-1:安装飞牛NAS

星哥带你玩飞牛NAS-1:安装飞牛NAS

星哥带你玩飞牛 NAS-1:安装飞牛 NAS 前言 在家庭和小型工作室场景中,NAS(Network Atta...
亚马逊云崩完,微软云崩!当全球第二大云“摔了一跤”:Azure 宕机背后的配置风险与警示

亚马逊云崩完,微软云崩!当全球第二大云“摔了一跤”:Azure 宕机背后的配置风险与警示

亚马逊云崩完,微软云崩!当全球第二大云“摔了一跤”:Azure 宕机背后的配置风险与警示 首先来回顾一下 10...
每天一个好玩的网站-手机博物馆-CHAZ 3D Experience

每天一个好玩的网站-手机博物馆-CHAZ 3D Experience

每天一个好玩的网站 - 手机博物馆 -CHAZ 3D Experience 一句话介绍:一个用 3D 方式重温...
在Windows系统中通过VMware安装苹果macOS15

在Windows系统中通过VMware安装苹果macOS15

在 Windows 系统中通过 VMware 安装苹果 macOS15 许多开发者和爱好者希望在 Window...

免费图片视频管理工具让灵感库告别混乱

一言一句话
-「
手气不错
星哥带你玩飞牛NAS-14:解锁公网自由!Lucky功能工具安装使用保姆级教程

星哥带你玩飞牛NAS-14:解锁公网自由!Lucky功能工具安装使用保姆级教程

星哥带你玩飞牛 NAS-14:解锁公网自由!Lucky 功能工具安装使用保姆级教程 作为 NAS 玩家,咱们最...
星哥带你玩飞牛NAS硬件02:某鱼6张左右就可拿下5盘位的飞牛圣体NAS

星哥带你玩飞牛NAS硬件02:某鱼6张左右就可拿下5盘位的飞牛圣体NAS

星哥带你玩飞牛 NAS 硬件 02:某鱼 6 张左右就可拿下 5 盘位的飞牛圣体 NAS 前言 大家好,我是星...
星哥带你玩飞牛NAS硬件 01:捡垃圾的最爱双盘,暴风二期矿渣为何成不老神话?

星哥带你玩飞牛NAS硬件 01:捡垃圾的最爱双盘,暴风二期矿渣为何成不老神话?

星哥带你玩飞牛 NAS 硬件 01:捡垃圾的最爱双盘,暴风二期矿渣为何成不老神话? 前言 在选择 NAS 用预...
星哥带你玩飞牛NAS-12:开源笔记的进化之路,效率玩家的新选择

星哥带你玩飞牛NAS-12:开源笔记的进化之路,效率玩家的新选择

星哥带你玩飞牛 NAS-12:开源笔记的进化之路,效率玩家的新选择 前言 如何高效管理知识与笔记,已经成为技术...
仅2MB大小!开源硬件监控工具:Win11 无缝适配,CPU、GPU、网速全维度掌控

仅2MB大小!开源硬件监控工具:Win11 无缝适配,CPU、GPU、网速全维度掌控

还在忍受动辄数百兆的“全家桶”监控软件?后台偷占资源、界面杂乱冗余,想查个 CPU 温度都要层层点选? 今天给...