阿里云-云小站(无限量代金券发放中)
【腾讯云】云服务器、云数据库、COS、CDN、短信等热卖云产品特惠抢购

MapReduce工作流多种实现方式

116次阅读
没有评论

共计 4693 个字符,预计需要花费 12 分钟才能阅读完成。

学习 Hadoop,必不可少的就是编写 MapReduce 程序。当然,对于简单的分析程序,我们只需一个 MapReduce 任务就能搞定,然而对于比较复杂的分析程序,我们可能需要多个 Job 或者多个 Map 或者 Reduce 进行分析计算。本课程我们主要学习多个 Job 或者多个 MapReduce 的编程形式。

MapReduce 的主要有以下几种编程形式。

迭代式 MapReduce

MapReduce 迭代方式,通常是将上一个 MapReduce 任务的输出作为下一个 MapReduce 任务的输入,可只保留 MapReduce 任务的最终结果,中间数据可以删除或保留,可根据业务需要自行决定。迭代式 MapReduce 的示例代码如下所示。

Configuration conf = new Configuration();
   
// 第一个 MapReduce 任务
Job job1 = new Job(conf,”job1″);
…..
FileInputFormat.addInputPath(job1,input);//job1 的输入
FileOutputFromat.setOutputPath(job1,out1);//job1 的输出
job1.waitForCompletion(true);

// 第二个 Mapreduce 任务
Job job2 = new Job(conf,”job2″);
…..
FileInputFormat.addInputPath(job2,out1);//job1 的输出作为 job2 的输入
FileOutputFromat.setOutputPath(job2,out2);//job2 的输出
job2.waitForCompletion(true);

// 第三个 Mapreduce 任务
Job job3 = new Job(conf,”job3″);
…..
FileInputFormat.addInputPath(job3,out2);//job2 的输出作为 job3 的输入
FileOutputFromat.setOutputPath(job3,out3);//job3 的输出
job3.waitForCompletion(true);
…..

虽然 MapReduce 的迭代可实现多任务的执行,但是它具有如下两个缺点:

1、每次迭代,如果所有 Job 对象重复创建,代价将非常高。

2、每次迭代,数据都要写入本地,然后从本地读取,I/ O 和网络传输的代价比较大。

依赖关系式 MapReuce

        依赖关系式 MapReduce 主要是由 org.apache.hadoop.mapred.jobcontrol 包中的 JobControl 类来实现。JobControl 的实例表示一个作业的运行图,你可以加入作业配置,然后告知 JobControl 实例作业之间的依赖关系。在一个线程中运行 JobControl 时,它将按照依赖顺序来执行这些作业。也可以查看进程,在作业结束后,可以查询作业的所有状态和每个失败相关的错误信息。如果一个作业失败,JobControl 将不执行与之有依赖关系的后续作业。

依赖关系式 MapReuce 的示例代码如下所示。

Configuration conf1 = new Configuration();
Job job1 = new Job(conf1,”Job1″);
………//job1 其它设置

Configuration conf2 = new Configuration();
Job job2 = new Job(conf2,”Job2″);
………//job2 其它设置

Configuration conf3 = new Configuration();
Job job3 = new Job(conf3,”Job3″);
………//job3 其它设置

ControlledJob cJob1 = new ControlledJob(conf1);// 构造一个 Job
cJob1.setJob(job1);// 设置 MapReduce job
ControlledJob cJob2 = new ControlledJob(conf2);
cJob2.setJob(job2);
ControlledJob cJob3 = new ControlledJob(conf3);
cJob3.setJob(job3);

cJob3.addDependingJob(cJob1);// 设置 job3 和 job1 的依赖关系
cJob3.addDependingJob(cJob2);// 设置 job3 和 job2 的依赖关系

JobControl JC = new JobControl(“123”);
// 把三个构造的 job 加入到 JobControl 中
JC.addJob(cJob1);
JC.addJob(cJob2);
JC.addJob(cJob3);
Thread t = new Thread(JC);
t.start();
while (true) {
    if (jobControl.allFinished()) {
        jobControl.stop();
        break;
    }
}

注意:hadoop 的 JobControl 类实现了线程 Runnable 接口。我们需要实例化一个线程来启动它。直接调用 JobControl 的 run()方法,线程将无法结束。

线性链式 MapReduce

        大量的数据处理任务涉及对记录的预处理和后处理。

        例如:在处理信息检索的文档时,可能一步是移除 stop words(像 a、the 和 is 这样经常出现但不太有意义的词),另一步做 stemming(转换一个词的不同形式为相同的形式,例如转换 finishing 和 finished 为 finish)。

        你可以为预处理与后处理步骤各自编写一个 MapReduce 作业,并把它们链接起来。在这些步骤中可以使用 IdentityReducer(或完全不同的 Reducer)。由于过程中每一个步骤的中间结果都需要占用 I / O 和存储资源,这种做法是低效的。另一种方法是自己写 mapper 去预先调用所有的预处理步骤,再让 reducer 调用所有的后处理步骤。这将强制你采用模块化和可组合的方式来构建预处理和后处理。因此 Hadoop 引入了 ChainMapper 和 ChainReducer 类来简化预处理和后处理的构成。

hadoop 提供了专门的链式 ChainMapper 和 ChainReducer 来处理线性链式 MapReduce 任务。在 Map 或者 Reduce 阶段存在多个 Mapper,这些 Mapper 像 Linux 管道一样,前一个 Mapper 的输出结果直接重定向到后一个 Mapper 的输入,形成流水线。其调用形式如下:

…        // 预处理
ChainMapper.addMapper(…);
ChainReducer.setReducer(…);
ChainReducer.addMapper(…);
…            // 后处理
//addMapper() 调用的方法形式如下:
public static void addMapper(Job job,
Class< extends Mapper> mclass,
Class< extends K1> inputKeyClass,
Class< extends V1> inputValueClass,
Class< extends K2> outputKeyClass,
Class< extends V2> outputValueClass,
Configuration conf
)

addMapper()方法有 8 个参数。第一个和最后一个分别为全局的 Job 和本地的 configuration 对象。第二个参数是 Mapper 类,负责数据处理。余下 4 个参数 inputKeyClass、inputValueClass、outputKeyClass 和 outputValueClass 是这个 Mapper 类中输入 / 输出类的类型。ChainReducer 专门提供了一个 setReducer()方法来设置整个作业唯一的 Reducer,语法与 addMapper()方法类似。

线性链式 MapReduce 的示例代码如下所示。

public void function throws IOException {
Configuration conf = new Configuration();
Job job = new Job(conf);

job.setJobName(“chainjob”);
job.setInputFormat(TextInputFormat.class);
job.setOutputFormat(TextOutputFormat.class);

FileInputFormat.addInputPath(job, in);
FileOutputFormat.setOutputPath(job, out);
// 在作业中添加 Map1 阶段
Configuration map1conf = new Configuration(false);
ChainMapper.addMapper(job, Map1.class, LongWritable.class, Text.class,Text.class, Text.class, true, map1conf);
// 在作业中添加 Map2 阶段
Configuration map2conf = new Configuration(false);
ChainMapper.addMapper(job, Map2.class, Text.class, Text.class,LongWritable.class, Text.class, true, map2conf);
// 在作业中添加 Reduce 阶段
Configuration reduceconf = new Configuration(false);
ChainReducer.setReducer(job,Reduce.class,LongWritable.class,Text.class,Text.class,Text.class,true,reduceconf);
// 在作业中添加 Map3 阶段
Configuration map3conf = new Configuration(false);
ChainReducer.addMapper(job,Map3.class,Text.class,Text.class,LongWritable.class,Text.class,true,map3conf);
// 在作业中添加 Map4 阶段
Configuration map4conf = new Configuration(false);
ChainReducer.addMapper(job,Map4.class,LongWritable.class,Text.class,LongWritable.class,Text.class,true,map4conf);

job.waitForCompletion(true);
}

注意:对于任意一个 MapReduce 作业,Map 和 Reduce 阶段可以有无限个 Mapper,但是 Reduce 只能有一个。所以包含多个 Reduce 的作业,不能使用 ChainMapper/ChainReduce 来完成。

更多 Hadoop 相关信息见Hadoop 专题页面 http://www.linuxidc.com/topicnews.aspx?tid=13

本文永久更新链接地址:http://www.linuxidc.com/Linux/2017-01/139174.htm

正文完
星哥说事-微信公众号
post-qrcode
 0
星锅
版权声明:本站原创文章,由 星锅 于2022-01-21发表,共计4693字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
【腾讯云】推广者专属福利,新客户无门槛领取总价值高达2860元代金券,每种代金券限量500张,先到先得。
阿里云-最新活动爆款每日限量供应
评论(没有评论)
验证码
【腾讯云】云服务器、云数据库、COS、CDN、短信等云产品特惠热卖中