阿里云-云小站(无限量代金券发放中)
【腾讯云】云服务器、云数据库、COS、CDN、短信等热卖云产品特惠抢购

Hadoop-2.4.1学习之Mapper和Reducer

189次阅读
没有评论

共计 4348 个字符,预计需要花费 11 分钟才能阅读完成。

MapReduce 允许程序员能够容易地编写并行运行在大规模集群上处理大量数据的程序,确保程序的运行稳定可靠和具有容错处理能力。程序员编写的运行在 MapReduce 上的应用程序称为作业(job),Hadoop 既支持用 Java 编写的 job,也支持其它语言编写的作业,比如 Hadoop Streaming(shell、python)和 Hadoop Pipes(c++)。Hadoop-2.X 不再保留 Hadoop-1.X 版本中的 JobTracker 和 TaskTracker 组件,但这并不意味着 Hadoop-2.X 不再支持 MapReduce 作业,相反 Hadoop-2.X 通过唯一的主 ResourceManager、每个节点一个的从 NodeManager 和每个应用程序一个的 MRAppMaster 保留了对 MapReduce 作业的向后兼容。在新版本中 MapReduce 作业依然由 Map 和 Reduce 任务组成,Map 依然接收由 MapReduce 框架将输入数据分割为数据块,然后 Map 任务以完全并行的方式处理这些数据块,接着 MapReduce 框架对 Map 任务的输出进行排序,并将结果做为 Reduce 任务的输入,最后由 Reduce 任务输出最终的结果,在整个执行过程中 MapReduce 框架负责任务的调度,监控和重新执行失败的任务等。

通常计算节点和存储节点是相同的,MapReduce 框架会有效地将任务安排在存储数据的节点上,有助于降低传输数据时的带宽使用量。MapReduce 应用程序通过实现或者继承合适的接口或类提供了 map 和 reduce 函数,这两个函数负责 Map 任务和 Reduce 任务。作业客户端将编写好的作业提交给 ResourceManager,而不再是 JobTracker,ResourceManager 负责将作业分布到从节点上,调度和监控作业,为作业客户端提供状态和诊断信息。

MapReduce 框架只处理 <key, value> 键值对,也就是将作业的输入视为一些键值对并输出键值对。做为键值的类必须可以被 MapReduce 框架序列化,因此需要实现 Writable 接口,常用的 IntWritable,LongWritable 和 Text 都是实现该接口的类。做为键的类除了要实现 Writable 接口外,还需要实现 WritableComparable 接口,实现该接口主要为了有助于排序,上面提到的三个类也都实现了该接口。

在简要介绍了 MapReduce 框架后,下面深入学习框架中的两个重要概念:Mapper 和 Reducer,正如上文提到了,它们组成了 MapReduce 作业并负责完成实际的业务逻辑处理。

Mapper 是独立的任务,将输入记录转换为中间记录,即对输入的键值对进行处理,并输出为一组中间键值对,输出的键值对使用 context.write(WritableComparable, Writable)方法收集起来,中间记录的键值类型不必与输入记录的键值类型相同,实际上也往往是不同的。一条输入记录经由 Mapper 处理后可能输出为 0 条或者多条中间记录。比如,如果输入记录不满足业务要求(没有包含特定的值或者包含了特定的值)的话,可以直接返回,则会输出 0 条记录,此时 Mapper 起了过滤器的作用。

接着 MapReduce 框架将与给定键相关联的所有中间值分组,然后传递给 Reducer。用户可以通过 Job.setGroupingComparatorClass(Class)方法指定 Comparator 来控制分组。Mapper 的输出被排序然后按照 Reducer 分区,总的分区数与作业启动的 Reducer 任务数相同,程序员可以通过实现自定义的 Partitioner 控制输出的记录由哪个 Reducer 处理,默认使用的是 HashPartitioner。程序员还可以通过 Job.setCombinerClass(Class)指定一个 combiner 来执行中间输出的本地聚合,这有助于减少 Mapper 到 Reducer 的数据传输。Mapper 的中间输出经过排序后总是保存为 (key-len, key,value-len, value) 的格式,应用程序可以通过 Configuration 控制是否将中间输出进行压缩,以及使用何种压缩方式,相关的几个参数有:mapreduce.map.output.compress、mapreduce.map.output.compress.codec。程序员通过 Job.setMapperClass(Class)将 Mapper 传递给 Job,MapReduce 框架调用 Mapper 的 map(WritableComparable, Writable, Context)处理该任务的价值对,应用程序可以覆盖 cleanup(Context)方法实现任何需要的清理工作。

MapReduce 框架为每个由作业的 InputFormat 生成的 InputSplit 启动一个 map 任务,因此总的 map 任务数量由输入数据大小决定,更准确说是由输入文件总的块数决定。虽然可以为较少使用 CPU 的 map 任务在节点上设置 300 个 map 任务,但每个节点更适合并行运行 10-100 个 map 任务。由于任务的启动需要花费一些时间,所以任务的运行最好至少需要 1 分钟,因为如果任务运行的时间很少,整个作业的时间将大部分消耗在任务的建立上面。

Reducer 将具有相同键的一组中间值降低为一组更小数量的值,比如合并单词的数量等。一个作业启动的 Reducer 数量可以通过 Job.setNumReduceTasks(int)或者 mapred-site.xml 中的参数 mapreduce.job.reduces 设置,但是更推荐前者,因为可以由程序员决定启动多少个 reducer,而后者更多的是提供了一种默认值。程序员使用 Job.setReducerClass(Class)将 Reducer 提交给作业,MapReduce 框架为每对 <key, (list of values)> 调用 reduce(WritableComparable, Iterable<Writable>, Context)方法,同 Mapper 一样,程序员也可以覆盖 cleanup(Context)方法指定需要的清理工作。

Reducer 的处理过程主要包括三个阶段:shuffle(洗牌)、sort(分类)和 reduce。在 shuffle 阶段,MapReduce 框架通过 HTTP 获取所有 Mapper 输出的相关分区。在 Sort 阶段,框架根据键分组 Reducer 的输入(不同的 mapper 可能输出相同的键)。Shuffle 和 sort 是同时进行的,获取 Mapper 的输出后然后合并它们。在 reduce 阶段,调用 reduce(WritableComparable, Iterable<Writable> 处理 <key, (list of values)> 对。Reducer 的输出通常通过 Context.write(WritableComparable,Writable)写入文件系统,比如 HDFS,当然也可以通过使用 DBOutputFormat 将输出写入数据库。Reducer 的输出是未经排序的。

如果不需要 Reducer,可以使用 Job.setNumReduceTasks(int)将 Reducer 的数量设置为 0(如果不使用该方法设置 Reducer 的数量,由于 mapreduce.job.reduces 默认为 1,会启动一个 Reducer),在这种情况下,Mapper 的输出将直接写入 FileOutputFormat.setOutputPath(Job,Path)指定的路径中,并且 MapReduce 框架不会对 Mapper 的输出进行排序。

如果在进行 reduce 之前想使用与分组中间键时不同的比较规则,可以通过 Job.setSortComparatorClass(Class)指定不同的 Comparator。也就是 Job.setGroupingComparatorClass(Class)控制了如何对中间输出分组,而 Job.setSortComparatorClass(Class)控制了在将数据传入 reduce 之前进行的第二次分组。

不同于 Mapper 的数量由输入文件的大小确定,Reducer 的数量可以由程序员明确设置,那么设置多少 Reducer 可以达到较好地效果呢?Reducer 的数量范围为:(0.95 ~1.75) * 节点数量 * 每个节点上最大的容器数。参数 yarn.scheduler.minimum-allocation-mb 设置了每个容器可请求的最小内存,那么最大容器数可根据总的内存除以该参数计算得出。当使用 0.75 时,所有的 Reducer 会被立即加载,并当 Mapper 完成时开始传输 Mapper 的输出。使用 1.75 时,较快的节点将完成它们第一轮的任务,然后加载第二波任务,这样对负载平衡具有更好的效果。增加 Reducer 的数量虽然增加了框架开销,但增加了负载平衡和降低了失败的成本。上面的比例因子比总的 Reducer 数量稍微少些,以为预测执行的任务和失败的任务保留少量的 Reducer 槽,也就是实际的 Reducer 数量为上面公式得出的数量加上保留的 Reducer 数量。

CentOS 安装和配置 Hadoop2.2.0  http://www.linuxidc.com/Linux/2014-01/94685.htm

Ubuntu 13.04 上搭建 Hadoop 环境 http://www.linuxidc.com/Linux/2013-06/86106.htm

Ubuntu 12.10 +Hadoop 1.2.1 版本集群配置 http://www.linuxidc.com/Linux/2013-09/90600.htm

Ubuntu 上搭建 Hadoop 环境(单机模式 + 伪分布模式)http://www.linuxidc.com/Linux/2013-01/77681.htm

Ubuntu 下 Hadoop 环境的配置 http://www.linuxidc.com/Linux/2012-11/74539.htm

单机版搭建 Hadoop 环境图文教程详解 http://www.linuxidc.com/Linux/2012-02/53927.htm

搭建 Hadoop 环境(在 Winodws 环境下用虚拟机虚拟两个 Ubuntu 系统进行搭建)http://www.linuxidc.com/Linux/2011-12/48894.htm

更多 Hadoop 相关信息见Hadoop 专题页面 http://www.linuxidc.com/topicnews.aspx?tid=13

正文完
星哥说事-微信公众号
post-qrcode
 0
星锅
版权声明:本站原创文章,由 星锅 于2022-01-20发表,共计4348字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
【腾讯云】推广者专属福利,新客户无门槛领取总价值高达2860元代金券,每种代金券限量500张,先到先得。
阿里云-最新活动爆款每日限量供应
评论(没有评论)
验证码
【腾讯云】云服务器、云数据库、COS、CDN、短信等云产品特惠热卖中