阿里云-云小站(无限量代金券发放中)
【腾讯云】云服务器、云数据库、COS、CDN、短信等热卖云产品特惠抢购

Hadoop job初始化源码浅析

425次阅读
没有评论

共计 11114 个字符,预计需要花费 28 分钟才能阅读完成。

Hadoop 的 job 提交过程相对来说还是有点复杂的,所以在学习源码的时候会显得有些乱,时常看了后面忘了前面,所以在看了多遍之后决定用文章的方式记录下来,一边自己下次再看的时候能够清晰些,同时也为初次接触这方面源码的同学提供一些帮助吧。希望自己可以写的足够详细。(本文针对 hadoop1.2.1)

1.job.waitForCompletion:一般情况下我们提交一个 job 都是通过 job.waitForCompletion 方法提交,该方法内部会调用 job.submit()方法

public boolean waitForCompletion(boolean verbose
                                  ) throws IOException, InterruptedException,
                                            ClassNotFoundException {
    if (state == JobState.DEFINE) {
      submit();
    }
    if (verbose) {
      jobClient.monitorAndPrintJob(conf, info);
    } else {
      info.waitForCompletion();
    }
    return isSuccessful();
  }

2.job.submit():在 submit 中会调用 setUseNewAPI(),setUseNewAPI()这个方法主要是判断是使用新的 api 还是旧的 api,之后会调用 connect()方法,该方法主要是实例化 jobClient,然后会调用 jobClient.submitJobInternal(conf)这个方法进行 job 的提交

public void submit() throws IOException, InterruptedException,
                              ClassNotFoundException {
    ensureState(JobState.DEFINE);
    setUseNewAPI();
   
    // Connect to the JobTracker and submit the job
    connect();
    info = jobClient.submitJobInternal(conf);
    super.setJobID(info.getID());
    state = JobState.RUNNING;
  }

3.jobClient.submitJobInternal():这个方法会将 job 运行时所需的所有文件上传到 jobTarcker 文件系统(一般是 hdfs)中,同时进行备份(备份数默认是 10,通过 mapred.submit.replication 变量可以设置),这个方法需要深入进行解读。

 

4.JobSubmissionFiles.getStagingDir:这个方法是在 jobClient.submitJobInternal()最先调用的,这个方法主要是获取一个 job 提交的根目录,主要是通过 Path stagingArea = client.getStagingAreaDir(); 方法获得,这个方法最终会调用 jobTracker.getStagingAreaDirInternal()方法,代码如下:

 private String getStagingAreaDirInternal(String user) throws IOException {
    final Path stagingRootDir =
      new Path(conf.get(“mapreduce.jobtracker.staging.root.dir”,
            “/tmp/hadoop/mapred/staging”));
    final FileSystem fs = stagingRootDir.getFileSystem(conf);
    return fs.makeQualified(new Path(stagingRootDir,
                              user+”/.staging”)).toString();
  }

在获取了 stagingDir 之后会执行 JobID jobId = jobSubmitClient.getNewJobId(); 为 job 获取一个 jobId,然后执行 Path submitJobDir = new Path(jobStagingArea, jobId.toString()); 获得该 job 提交的路径,也就是在 stagingDir 目录下建一个以 jobId 为文件名的目录。有了 submitJobDir 之后就可以将 job 运行所需的全部文件上传到对应的目录下了,具体是调用 jobClient.copyAndConfigureFiles(jobCopy, submitJobDir)这个方法。

 

5.jobClient.copyAndConfigureFiles(jobCopy, submitJobDir):这个方法最终调用 jobClient.copyAndConfigureFiles(job, jobSubmitDir, replication); 这个方法实现文件上传。

6.jobClient.copyAndConfigureFiles(job, jobSubmitDir, replication):这个方法首先获取用户在使用命令执行 job 的时候所指定的 -libjars, -files, -archives 文件,对应的 conf 配置参数是 tmpfiles tmpjars tmparchives,这个过程是在 ToolRunner.run()的时候进行解析的,当用户指定了这三个参数之后,会将这三个参数对应的文件都上传到 hdfs 上,下面我们具体看一个参数的处理:tmpfiles(其他两个基本相同)

7.jobClient 处理 tmpfiles:该方法会将 tmpfiles 参数值按‘,’分割,然后将每一个文件上传到 hdfs,其中如何文件的路径本身就在 hdfs 中,那么将不进行上传操作,上传操作只针对文件不在 hdfs 中的文件。调用的方法是:Path newPath = copyRemoteFiles(fs,filesDir, tmp, job, replication),该方法内部使用的是 FileUtil.copy(remoteFs, originalPath, jtFs, newPath, false, job)方法将文件上传至 hdfs,注意此处的 remoteFs 和 jtFs,remoteFs 就是需上传文件的原始文件系统,jtFs 则是 jobTracker 的文件系统(hdfs)。在文件上传至 hdfs 之后,会执行 DistributedCache.createSymlink(job)这个方法,这个方法是创建一个别名(好像是这么个名字),这里需要注意的是 tmpfiles 和 tmparchives 都会创建别名,而 tmpjars 则不会,个人认为 tmpjars 则 jar 文件,不是用户在 job 运行期间调用,所以不需要别名,而 tmpfiles 和 tmparchives 则在 job 运行期间用户可能会调用,所以使用别名可以方便用户调用

8. 将这三个参数指定的文件上传到 hdfs 之后,需要将 job 的 jar 文件上传到 hdfs,名称为 submitJobDir/job.jar,使用 fs.copyFromLocalFile(originalJarFile, submitJarFile)上传即可。

到这里 jobClient.copyAndConfigureFiles(jobCopy, submitJobDir)方法就完成了,期间丢了 jobClient.copyAndConfigureFiles(jobCopy, submitJobDir),TrackerDistributedCacheManager.determineTimestampsAndCacheVisibilities(job),TrackerDistributedCacheManager.getDelegationTokens(job, job.getCredentials())三个方法,这三个方法是进行一些 cached archives and files 的校验和保存其时间戳和权限内容

9. 继续我们的 jobClient.submitJobInternal()方法,这之后会根据我们设置的 outputFormat 类执行 output.checkOutputSpecs(context),进行输出路径的检验,主要是保证输出路径不存在,存在会抛出异常。这之后就是对输入文件进行分片操作了,writeSplits(context, submitJobDir)。

更多详情见请继续阅读下一页的精彩内容:http://www.linuxidc.com/Linux/2013-12/93700p2.htm

相关阅读

Ubuntu 13.04 上搭建 Hadoop 环境 http://www.linuxidc.com/Linux/2013-06/86106.htm

Ubuntu 12.10 +Hadoop 1.2.1 版本集群配置 http://www.linuxidc.com/Linux/2013-09/90600.htm

Ubuntu 上搭建 Hadoop 环境(单机模式 + 伪分布模式)http://www.linuxidc.com/Linux/2013-01/77681.htm

Ubuntu 下 Hadoop 环境的配置 http://www.linuxidc.com/Linux/2012-11/74539.htm

单机版搭建 Hadoop 环境图文教程详解 http://www.linuxidc.com/Linux/2012-02/53927.htm

搭建 Hadoop 环境(在 Winodws 环境下用虚拟机虚拟两个 Ubuntu 系统进行搭建)http://www.linuxidc.com/Linux/2011-12/48894.htm

10.jobClient.writeSplits():这个方法内部会根据我们之前判断的使用 new-api 还是 old-api 分别进行分片操作,我们只看 new-api 的分片操作。

private int writeSplits(org.apache.Hadoop.mapreduce.JobContext job,
Path jobSubmitDir) throws IOException,
InterruptedException, ClassNotFoundException {
JobConf jConf = (JobConf)job.getConfiguration();
int maps;
if (jConf.getUseNewMapper()) {
maps = writeNewSplits(job, jobSubmitDir);
} else {
maps = writeOldSplits(jConf, jobSubmitDir);
}
return maps;
}

11.jobClient.writeNewSplits():这个方法主要是根据我们设置的 inputFormat.class 通过反射获得 inputFormat 对象,然后调用 inputFormat 对象的 getSplits 方法,当获得分片信息之后调用 JobSplitWriter.createSplitFiles 方法将分片的信息写入到 submitJobDir/job.split 文件中。

private <T extends InputSplit>
int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,
InterruptedException, ClassNotFoundException {
Configuration conf = job.getConfiguration();
InputFormat<?, ?> input =
ReflectionUtils.newInstance(job.getInputFormatClass(), conf);

List<InputSplit> splits = input.getSplits(job);
T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);

// sort the splits into order based on size, so that the biggest
// go first
Arrays.sort(array, new SplitComparator());
JobSplitWriter.createSplitFiles(jobSubmitDir, conf,
jobSubmitDir.getFileSystem(conf), array);
return array.length;
}

12.JobSplitWriter.createSplitFiles:这个方法的作用就是讲分片信息写入到 submitJobDir/job.split 文件中,方法内部调用 JobSplitWriter.writeNewSplits 进行写操作

 

13.JobSplitWriter.writeNewSplits:该方法具体对每一个 InputSplit 对象进行序列化写入到输出流中,具体每个 InputSplit 对象写入的信息包括:split.getClass().getName(),serializer.serialize(split)将整个对象序列化。然后将 InputSplit 对象的 locations 信息放入 SplitMetaInfo 对象中,同时还包括 InputSpilt 元信息在 job.split 文件中的偏移量,该 InputSplit 的长度,再将 SplitMetaInfo 对象。然后调用 JobSplitWriter.writeJobSplitMetaInfo()方法将 SplitMetaInfo 对象写入 submitJobDir/job.splitmetainfo 文件中。

14.JobSplitWriter.writeJobSplitMetaInfo():将 SplitMetaInfo 对象写入 submitJobDir/job.splitmetainfo 文件中,具体写入的信息包括:JobSplit.META_SPLIT_FILE_HEADER,splitVersion,allSplitMetaInfo.length(SplitMetaInfo 对象的个数,一个 split 对应一个 SplitMetaInfo),然后分别将所有的 SplitMetaInfo 对象序列化到输出流中,到此文件的分片工作完成。

15. 继续回头看 jobClient.submitJobInternal()方法:在上一步进行分片操作之后,或返回切片的数目,据此设定 map 的数量,所以在 job 中设置的 map 数量是没有用的。

16. 继续往下走:

String queue = jobCopy.getQueueName();
AccessControlList acl = jobSubmitClient.getQueueAdmins(queue);
jobCopy.set(QueueManager.toFullPropertyName(queue,
QueueACL.ADMINISTER_JOBS.getAclName()), acl.getACLString());

这三句话是获得 job 对应的任务队列信息,这里涉及到 hadoop 的作业调度内容,就不深入研究了
17. 继续:下面就是讲 job 的配置文件信息(jobConf 对象) 写入到 xml 文件中,以便用户查看,具体文件是:submitJobDir/job.xml,通过 jobCopy.writeXml(out)方法,
方法比较简单,就是写 xml 文件。下面就进入到 jobTracker 提交任务环节了,status = jobSubmitClient.submitJob(jobId, submitJobDir.toString(), jobCopy.getCredentials()),
就到这吧,后面下次再慢慢研究。
总结下:在用户提交 job 之后,第一步主要是 jobClient 对 job 进行一些必要的文件上传操作,主要包括:
1)为 job 生成一个 jobId,然后获得 job 提交的 stagingDir,根据 jobId 获得 submitJobDir,之后所有的 job 运行时文件豆浆保存在此目录下
2)将用户在命令行通过 -libjars, -files, -archives 指定的文件上传到 jobTracker 的文件系统中,并将 job.jar 上传到 hdfs 中
3)校验输出路径
4)进行输入文件的分片操作,并将分片信息写入 submitJobDir 下的相应文件中,有两个文件:job.split 以及 job.splitmetainfo
5)将 job 的配置参数(jobConf 对象) 写入到 job.xml 文件中
这就是 jobClient 提交 job 的全部过程,如有遗漏下面评论指出,谢谢

更多 Hadoop 相关信息见Hadoop 专题页面 http://www.linuxidc.com/topicnews.aspx?tid=13

Hadoop 的 job 提交过程相对来说还是有点复杂的,所以在学习源码的时候会显得有些乱,时常看了后面忘了前面,所以在看了多遍之后决定用文章的方式记录下来,一边自己下次再看的时候能够清晰些,同时也为初次接触这方面源码的同学提供一些帮助吧。希望自己可以写的足够详细。(本文针对 hadoop1.2.1)

1.job.waitForCompletion:一般情况下我们提交一个 job 都是通过 job.waitForCompletion 方法提交,该方法内部会调用 job.submit()方法

public boolean waitForCompletion(boolean verbose
                                  ) throws IOException, InterruptedException,
                                            ClassNotFoundException {
    if (state == JobState.DEFINE) {
      submit();
    }
    if (verbose) {
      jobClient.monitorAndPrintJob(conf, info);
    } else {
      info.waitForCompletion();
    }
    return isSuccessful();
  }

2.job.submit():在 submit 中会调用 setUseNewAPI(),setUseNewAPI()这个方法主要是判断是使用新的 api 还是旧的 api,之后会调用 connect()方法,该方法主要是实例化 jobClient,然后会调用 jobClient.submitJobInternal(conf)这个方法进行 job 的提交

public void submit() throws IOException, InterruptedException,
                              ClassNotFoundException {
    ensureState(JobState.DEFINE);
    setUseNewAPI();
   
    // Connect to the JobTracker and submit the job
    connect();
    info = jobClient.submitJobInternal(conf);
    super.setJobID(info.getID());
    state = JobState.RUNNING;
  }

3.jobClient.submitJobInternal():这个方法会将 job 运行时所需的所有文件上传到 jobTarcker 文件系统(一般是 hdfs)中,同时进行备份(备份数默认是 10,通过 mapred.submit.replication 变量可以设置),这个方法需要深入进行解读。

 

4.JobSubmissionFiles.getStagingDir:这个方法是在 jobClient.submitJobInternal()最先调用的,这个方法主要是获取一个 job 提交的根目录,主要是通过 Path stagingArea = client.getStagingAreaDir(); 方法获得,这个方法最终会调用 jobTracker.getStagingAreaDirInternal()方法,代码如下:

 private String getStagingAreaDirInternal(String user) throws IOException {
    final Path stagingRootDir =
      new Path(conf.get(“mapreduce.jobtracker.staging.root.dir”,
            “/tmp/hadoop/mapred/staging”));
    final FileSystem fs = stagingRootDir.getFileSystem(conf);
    return fs.makeQualified(new Path(stagingRootDir,
                              user+”/.staging”)).toString();
  }

在获取了 stagingDir 之后会执行 JobID jobId = jobSubmitClient.getNewJobId(); 为 job 获取一个 jobId,然后执行 Path submitJobDir = new Path(jobStagingArea, jobId.toString()); 获得该 job 提交的路径,也就是在 stagingDir 目录下建一个以 jobId 为文件名的目录。有了 submitJobDir 之后就可以将 job 运行所需的全部文件上传到对应的目录下了,具体是调用 jobClient.copyAndConfigureFiles(jobCopy, submitJobDir)这个方法。

 

5.jobClient.copyAndConfigureFiles(jobCopy, submitJobDir):这个方法最终调用 jobClient.copyAndConfigureFiles(job, jobSubmitDir, replication); 这个方法实现文件上传。

6.jobClient.copyAndConfigureFiles(job, jobSubmitDir, replication):这个方法首先获取用户在使用命令执行 job 的时候所指定的 -libjars, -files, -archives 文件,对应的 conf 配置参数是 tmpfiles tmpjars tmparchives,这个过程是在 ToolRunner.run()的时候进行解析的,当用户指定了这三个参数之后,会将这三个参数对应的文件都上传到 hdfs 上,下面我们具体看一个参数的处理:tmpfiles(其他两个基本相同)

7.jobClient 处理 tmpfiles:该方法会将 tmpfiles 参数值按‘,’分割,然后将每一个文件上传到 hdfs,其中如何文件的路径本身就在 hdfs 中,那么将不进行上传操作,上传操作只针对文件不在 hdfs 中的文件。调用的方法是:Path newPath = copyRemoteFiles(fs,filesDir, tmp, job, replication),该方法内部使用的是 FileUtil.copy(remoteFs, originalPath, jtFs, newPath, false, job)方法将文件上传至 hdfs,注意此处的 remoteFs 和 jtFs,remoteFs 就是需上传文件的原始文件系统,jtFs 则是 jobTracker 的文件系统(hdfs)。在文件上传至 hdfs 之后,会执行 DistributedCache.createSymlink(job)这个方法,这个方法是创建一个别名(好像是这么个名字),这里需要注意的是 tmpfiles 和 tmparchives 都会创建别名,而 tmpjars 则不会,个人认为 tmpjars 则 jar 文件,不是用户在 job 运行期间调用,所以不需要别名,而 tmpfiles 和 tmparchives 则在 job 运行期间用户可能会调用,所以使用别名可以方便用户调用

8. 将这三个参数指定的文件上传到 hdfs 之后,需要将 job 的 jar 文件上传到 hdfs,名称为 submitJobDir/job.jar,使用 fs.copyFromLocalFile(originalJarFile, submitJarFile)上传即可。

到这里 jobClient.copyAndConfigureFiles(jobCopy, submitJobDir)方法就完成了,期间丢了 jobClient.copyAndConfigureFiles(jobCopy, submitJobDir),TrackerDistributedCacheManager.determineTimestampsAndCacheVisibilities(job),TrackerDistributedCacheManager.getDelegationTokens(job, job.getCredentials())三个方法,这三个方法是进行一些 cached archives and files 的校验和保存其时间戳和权限内容

9. 继续我们的 jobClient.submitJobInternal()方法,这之后会根据我们设置的 outputFormat 类执行 output.checkOutputSpecs(context),进行输出路径的检验,主要是保证输出路径不存在,存在会抛出异常。这之后就是对输入文件进行分片操作了,writeSplits(context, submitJobDir)。

更多详情见请继续阅读下一页的精彩内容:http://www.linuxidc.com/Linux/2013-12/93700p2.htm

相关阅读

Ubuntu 13.04 上搭建 Hadoop 环境 http://www.linuxidc.com/Linux/2013-06/86106.htm

Ubuntu 12.10 +Hadoop 1.2.1 版本集群配置 http://www.linuxidc.com/Linux/2013-09/90600.htm

Ubuntu 上搭建 Hadoop 环境(单机模式 + 伪分布模式)http://www.linuxidc.com/Linux/2013-01/77681.htm

Ubuntu 下 Hadoop 环境的配置 http://www.linuxidc.com/Linux/2012-11/74539.htm

单机版搭建 Hadoop 环境图文教程详解 http://www.linuxidc.com/Linux/2012-02/53927.htm

搭建 Hadoop 环境(在 Winodws 环境下用虚拟机虚拟两个 Ubuntu 系统进行搭建)http://www.linuxidc.com/Linux/2011-12/48894.htm

正文完
星哥玩云-微信公众号
post-qrcode
 0
星锅
版权声明:本站原创文章,由 星锅 于2022-01-20发表,共计11114字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
【腾讯云】推广者专属福利,新客户无门槛领取总价值高达2860元代金券,每种代金券限量500张,先到先得。
阿里云-最新活动爆款每日限量供应
评论(没有评论)
验证码
【腾讯云】云服务器、云数据库、COS、CDN、短信等云产品特惠热卖中

星哥玩云

星哥玩云
星哥玩云
分享互联网知识
用户数
4
文章数
19350
评论数
4
阅读量
7963467
文章搜索
热门文章
星哥带你玩飞牛NAS-6:抖音视频同步工具,视频下载自动下载保存

星哥带你玩飞牛NAS-6:抖音视频同步工具,视频下载自动下载保存

星哥带你玩飞牛 NAS-6:抖音视频同步工具,视频下载自动下载保存 前言 各位玩 NAS 的朋友好,我是星哥!...
星哥带你玩飞牛NAS-3:安装飞牛NAS后的很有必要的操作

星哥带你玩飞牛NAS-3:安装飞牛NAS后的很有必要的操作

星哥带你玩飞牛 NAS-3:安装飞牛 NAS 后的很有必要的操作 前言 如果你已经有了飞牛 NAS 系统,之前...
我把用了20年的360安全卫士卸载了

我把用了20年的360安全卫士卸载了

我把用了 20 年的 360 安全卫士卸载了 是的,正如标题你看到的。 原因 偷摸安装自家的软件 莫名其妙安装...
再见zabbix!轻量级自建服务器监控神器在Linux 的完整部署指南

再见zabbix!轻量级自建服务器监控神器在Linux 的完整部署指南

再见 zabbix!轻量级自建服务器监控神器在 Linux 的完整部署指南 在日常运维中,服务器监控是绕不开的...
飞牛NAS中安装Navidrome音乐文件中文标签乱码问题解决、安装FntermX终端

飞牛NAS中安装Navidrome音乐文件中文标签乱码问题解决、安装FntermX终端

飞牛 NAS 中安装 Navidrome 音乐文件中文标签乱码问题解决、安装 FntermX 终端 问题背景 ...
阿里云CDN
阿里云CDN-提高用户访问的响应速度和成功率
随机文章
每年0.99刀,拿下你的第一个顶级域名,详细注册使用

每年0.99刀,拿下你的第一个顶级域名,详细注册使用

每年 0.99 刀,拿下你的第一个顶级域名,详细注册使用 前言 作为长期折腾云服务、域名建站的老玩家,星哥一直...
免费无广告!这款跨平台AI RSS阅读器,拯救你的信息焦虑

免费无广告!这款跨平台AI RSS阅读器,拯救你的信息焦虑

  免费无广告!这款跨平台 AI RSS 阅读器,拯救你的信息焦虑 在算法推荐主导信息流的时代,我们...
Prometheus:监控系统的部署与指标收集

Prometheus:监控系统的部署与指标收集

Prometheus:监控系统的部署与指标收集 在云原生体系中,Prometheus 已成为最主流的监控与报警...
飞牛NAS中安装Navidrome音乐文件中文标签乱码问题解决、安装FntermX终端

飞牛NAS中安装Navidrome音乐文件中文标签乱码问题解决、安装FntermX终端

飞牛 NAS 中安装 Navidrome 音乐文件中文标签乱码问题解决、安装 FntermX 终端 问题背景 ...
浏览器自动化工具!开源 AI 浏览器助手让你效率翻倍

浏览器自动化工具!开源 AI 浏览器助手让你效率翻倍

浏览器自动化工具!开源 AI 浏览器助手让你效率翻倍 前言 在 AI 自动化快速发展的当下,浏览器早已不再只是...

免费图片视频管理工具让灵感库告别混乱

一言一句话
-「
手气不错
免费无广告!这款跨平台AI RSS阅读器,拯救你的信息焦虑

免费无广告!这款跨平台AI RSS阅读器,拯救你的信息焦虑

  免费无广告!这款跨平台 AI RSS 阅读器,拯救你的信息焦虑 在算法推荐主导信息流的时代,我们...
三大开源投屏神器横评:QtScrcpy、scrcpy、escrcpy 谁才是跨平台控制 Android 的最优解?

三大开源投屏神器横评:QtScrcpy、scrcpy、escrcpy 谁才是跨平台控制 Android 的最优解?

  三大开源投屏神器横评:QtScrcpy、scrcpy、escrcpy 谁才是跨平台控制 Andr...
星哥带你玩飞牛NAS-11:咪咕视频订阅部署全攻略

星哥带你玩飞牛NAS-11:咪咕视频订阅部署全攻略

星哥带你玩飞牛 NAS-11:咪咕视频订阅部署全攻略 前言 在家庭影音系统里,NAS 不仅是存储中心,更是内容...
150元打造低成本NAS小钢炮,捡一块3865U工控板

150元打造低成本NAS小钢炮,捡一块3865U工控板

150 元打造低成本 NAS 小钢炮,捡一块 3865U 工控板 一块二手的熊猫 B3 工控板 3865U,搭...
300元就能买到的”小钢炮”?惠普7L四盘位小主机解析

300元就能买到的”小钢炮”?惠普7L四盘位小主机解析

  300 元就能买到的 ” 小钢炮 ”?惠普 7L 四盘位小主机解析 最近...