阿里云-云小站(无限量代金券发放中)
【腾讯云】云服务器、云数据库、COS、CDN、短信等热卖云产品特惠抢购

MapReduce 计数器简介

413次阅读
没有评论

共计 15831 个字符,预计需要花费 40 分钟才能阅读完成。

在许多情况下,一个用户需要了解待分析的数据,尽管这并非所要执行的分析任务 的核心内容。以统计数据集中无效记录数目的任务为例,如果发现无效记录的比例 相当高,那么就需要认真思考为何存在如此多无效记录。是所采用的检测程序存在 缺陷,还是数据集质量确实很低,包含大量无效记录?如果确定是数据集的质量问 题,则可能需要扩大数据集的规模,以增大有效记录的比例,从而进行有意义的 分析。

计数器是一种收集作业统计信息的有效手段,用于质量控制或应用级统计。计数器 还可辅助诊断系统故障。如果需要将日志信息传输到 map 或 reduce 任务,更好的 方法通常是尝试传输计数器值以监测某一特定事件是否发生。对于大型分布式作业 而言,使用计数器更为方便。首先,获取计数器值比输出日志更方便,其次,根据 计数器值统计特定事件的发生次数要比分析一堆日志文件容易得多。

————————————– 分割线 ————————————–

Ubuntu 13.04 上搭建 Hadoop 环境 http://www.linuxidc.com/Linux/2013-06/86106.htm

Ubuntu 12.10 +Hadoop 1.2.1 版本集群配置 http://www.linuxidc.com/Linux/2013-09/90600.htm

Ubuntu 上搭建 Hadoop 环境(单机模式 + 伪分布模式)http://www.linuxidc.com/Linux/2013-01/77681.htm

Ubuntu 下 Hadoop 环境的配置 http://www.linuxidc.com/Linux/2012-11/74539.htm

单机版搭建 Hadoop 环境图文教程详解 http://www.linuxidc.com/Linux/2012-02/53927.htm

————————————– 分割线 ————————————–

2、内置计数器

Hadoop 为每个作业维护若干内置计数器, 以描述该作业的各项指标。例如,某些计数器记录已处理的字节数和记录数,使用户可监控已处理的输入数据量和已产生的输出数据量,并以此对 job 做适当的优化。

14/06/08 15:13:35 INFO mapreduce.Job: Counters: 46
  File System Counters
  FILE: Number of bytes read=159
  FILE: Number of bytes written=159447
  FILE: Number of read operations=0
  FILE: Number of large read operations=0
  FILE: Number of write operations=0
  HDFS: Number of bytes read=198
  HDFS: Number of bytes written=35
  HDFS: Number of read operations=6
  HDFS: Number of large read operations=0
  HDFS: Number of write operations=2
  Job Counters
  Launched map tasks=1
  Launched reduce tasks=1
  Rack-local map tasks=1
  Total time spent by all maps in occupied slots (ms)=3896
  Total time spent by all reduces in occupied slots (ms)=9006
  Map-Reduce Framework
  Map input records=3
  Map output records=12
  Map output bytes=129
  Map output materialized bytes=159
  Input split bytes=117
  Combine input records=0
  Combine output records=0
  Reduce input groups=4
  Reduce shuffle bytes=159
  Reduce input records=12
  Reduce output records=4
  Spilled Records=24
  Shuffled Maps =1
  Failed Shuffles=0
  Merged Map outputs=1
  GC time elapsed (ms)=13
  CPU time spent (ms)=3830
  Physical memory (bytes) snapshot=537718784
  Virtual memory (bytes) snapshot=7365263360
  Total committed heap usage (bytes)=2022309888
  Shuffle Errors
  BAD_ID=0
  CONNECTION=0
  IO_ERROR=0
  WRONG_LENGTH=0
  WRONG_MAP=0
  WRONG_REDUCE=0
  File Input Format Counters
  Bytes Read=81
  File Output Format Counters
  Bytes Written=35

计数器由其关联任务维护,并定期传到 tasktracker,再由 tasktracker 传给 jobtracker. 因此,计数器能够被全局地聚集。详见第 hadoop 权威指南第 170 页的“进度和状态的更新”小节。与其他计数器(包括用户定义的计数器)不同,内置的作业计数器实际上 由 jobtracker 维护,不必在整个网络中发送。
 一个任务的计数器值每次都是完整传输的,而非自上次传输之后再继续数未完成的传输,以避免由于消息丢失而引发的错误。另外,如果一个任务在作业执行期间失 败,则相关计数器值会减小。仅当一个作业执行成功之后,计数器的值才是完整可 靠的。
 
3、用户定义的 Java 计数器
 MapReduce 允许用户编写程序来定义计数器,计数器的值可在 mapper 或 reducer 中增加。多个计数器由一个 Java 枚举(enum) 类型来定义,以便对计数器分组。一 个作业可以定义的枚举类型数量不限,各个枚举类型所包含的字段数量也不限。枚 举类型的名称即为组的名称,枚举类型的字段就是计数器名称。计数器是全局的。换言之,MapReduce 框架将跨所有 map 和 reduce 聚集这些计数器,并在作业结束 时产生一个最终结果。
 Note1:需要说明的是,不同的 hadoop 版本定义的方式会有些许差异。
 
(1)在 0.20.x 版本中使用 counter 很简单, 直接定义即可,如无此 counter,hadoop 会自动添加此 counter.
 Counter ct = context.getCounter(“INPUT_WORDS”, “count”);
 ct.increment(1); 
(2)在 0.19.x 版本中, 需要定义 enum
 enum MyCounter {INPUT_WORDS};
 reporter.incrCounter(MyCounter.INPUT_WORDS, 1);
 RunningJob job = JobClient.runJob(conf);
 Counters c = job.getCounters();
 long cnt = c.getCounter(MyCounter.INPUT_WORDS);

更多详情见请继续阅读下一页的精彩内容:http://www.linuxidc.com/Linux/2014-08/105649p2.htm

Notice2:使用计数器需要清楚的是它们都存储在 jobTracker 的内存里。Mapper/Reducer 任务序列化它们,连同更新状态被发送。为了运行正常且 jobTracker 不会出问题,计数器的数量应该在 10-100 个,计数器不仅仅只用来聚合 MapReduce job 的统计值。新版本的 Hadoop 限制了计数器的数量,以防给 jobTracker 带来损害。你最不想看到的事情就是由于定义上百个计数器而使 jobTracker 宕机。
 
下面咱们来看一个计数器的实例(以下代码请运行在 0.20.1 版本以上):
 

3.1 测试数据:
 hello world 2013 mapreduce
hello world 2013 mapreduce
hello world 2013 mapreduce
3.2 代码:
 /**
 * Project Name:CDHJobs
 * File Name:MapredCounter.java
 * Package Name:tmp
 * Date:2014-6- 8 下午 2:12:48
 * Copyright (c) 2014, decli#qq.com All Rights Reserved.
 *
 */

package tmp;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.CounterGroup;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCountWithCounter {

  static enum WordsNature {
    STARTS_WITH_DIGIT, STARTS_WITH_LETTER, ALL
  }

  /**
  * The map class of WordCount.
  */
  public static class TokenCounterMapper extends Mapper<Object, Text, Text, IntWritable> {

    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
      StringTokenizer itr = new StringTokenizer(value.toString());
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        context.write(word, one);
      }
    }
  }

  /**
  * The reducer class of WordCount
  */
  public static class TokenCounterReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException,
        InterruptedException {
      int sum = 0;

      String token = key.toString();
      if (StringUtils.isNumeric(token)) {
        context.getCounter(WordsNature.STARTS_WITH_DIGIT).increment(1);
      } else if (StringUtils.isAlpha(token)) {
        context.getCounter(WordsNature.STARTS_WITH_LETTER).increment(1);
      }
      context.getCounter(WordsNature.ALL).increment(1);

      for (IntWritable value : values) {
        sum += value.get();
      }
      context.write(key, new IntWritable(sum));
    }
  }

  /**
  * The main entry point.
  */
  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = new Job(conf, “WordCountWithCounter”);
    job.setJarByClass(WordCountWithCounter.class);
    job.setMapperClass(TokenCounterMapper.class);
    job.setReducerClass(TokenCounterReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(“/tmp/dsap/rawdata/june/a.txt”));
    FileOutputFormat.setOutputPath(job, new Path(“/tmp/dsap/rawdata/june/a_result”));
    int exitCode = job.waitForCompletion(true) ? 0 : 1;

    Counters counters = job.getCounters();
    Counter c1 = counters.findCounter(WordsNature.STARTS_WITH_DIGIT);
    System.out.println(“————–>>>>: ” + c1.getDisplayName() + “: ” + c1.getValue());

    // The below example shows how to get built-in counter groups that Hadoop provides basically.
    for (CounterGroup group : counters) {
      System.out.println(“==========================================================”);
      System.out.println(“* Counter Group: ” + group.getDisplayName() + ” (” + group.getName() + “)”);
      System.out.println(”  number of counters in this group: ” + group.size());
      for (Counter counter : group) {
        System.out.println(”  ++++ ” + counter.getDisplayName() + “: ” + counter.getName() + “: “
            + counter.getValue());
      }
    }
    System.exit(exitCode);
  }
}
3.3 结果与 计数器详解
 
运行结果下面会一并给出。Counter 有 ” 组 group” 的概念,用于表示逻辑上相同范围的所有数值。MapReduce job 提供的默认 Counter 分为 7 个组,下面逐一介绍。这里也拿上面的测试数据来做详细比对,我将会针对具体的计数器,挑选一些主要的简述一下。
 … 前面省略 job 运行信息 xx 字 …
  ALL=4
  STARTS_WITH_DIGIT=1
  STARTS_WITH_LETTER=3
————–>>>>: STARTS_WITH_DIGIT: 1
==========================================================
#MapReduce job 执行所依赖的数据来自于不同的文件系统,这个 group 表示 job 与文件系统交互的读写统计
* Counter Group: File System Counters (org.apache.hadoop.mapreduce.FileSystemCounter)
 number of counters in this group: 10
 #job 读取本地文件系统的文件字节数。假定我们当前 map 的输入数据都来自于 HDFS,那么在 map 阶段,这个数据应该是 0。但 reduce 在执行前,它 的输入数据是经过 shuffle 的 merge 后存储在 reduce 端本地磁盘中,所以这个数据就是所有 reduce 的总输入字节数。
 ++++ FILE: Number of bytes read: FILE_BYTES_READ: 159
 #map 的中间结果都会 spill 到本地磁盘中,在 map 执行完后,形成最终的 spill 文件。所以 map 端这里的数据就表示 map task 往本地磁盘中总共写了多少字节。与 map 端相对应的是,reduce 端在 shuffle 时,会不断地拉取 map 端的中间结果,然后做 merge 并 不断 spill 到自己的本地磁盘中。最终形成一个单独文件,这个文件就是 reduce 的输入文件。
 ++++ FILE: Number of bytes written: FILE_BYTES_WRITTEN: 159447
 ++++ FILE: Number of read operations: FILE_READ_OPS: 0
 ++++ FILE: Number of large read operations: FILE_LARGE_READ_OPS: 0
 ++++ FILE: Number of write operations: FILE_WRITE_OPS: 0
 # 整个 job 执行过程中,只有 map 端运行时,才从 HDFS 读取数据,这些数据不限于源文件内容,还包括所有 map 的 split 元数据。所以这个值应该比 FileInputFormatCounters.BYTES_READ 要略大些。
 ++++ HDFS: Number of bytes read: HDFS_BYTES_READ: 198
 #Reduce 的最终结果都会写入 HDFS,就是一个 job 执行结果的总量。
 ++++ HDFS: Number of bytes written: HDFS_BYTES_WRITTEN: 35
 ++++ HDFS: Number of read operations: HDFS_READ_OPS: 6
 ++++ HDFS: Number of large read operations: HDFS_LARGE_READ_OPS: 0
 ++++ HDFS: Number of write operations: HDFS_WRITE_OPS: 2
==========================================================
# 这个 group 描述与 job 调度相关的统计
* Counter Group: Job Counters (org.apache.hadoop.mapreduce.JobCounter)
 number of counters in this group: 5
 #Job 在被调度时,如果启动了一个 data-local(源文件的幅本在执行 map task 的 taskTracker 本地)
 ++++ Data-local map tasks
 #当前 job 为某些 map task 的执行保留了 slot,总共保留的时间是多少
 ++++ FALLOW_SLOTS_MILLIS_MAPS/REDUCES
 #所有 map task 占用 slot 的总时间,包含执行时间和创建 / 销毁子 JVM 的时间
 ++++ SLOTS_MILLIS_MAPS/REDUCES
 # 此 job 启动了多少个 map task
 ++++ Launched map tasks: TOTAL_LAUNCHED_MAPS: 1
 # 此 job 启动了多少个 reduce task
 ++++ Launched reduce tasks: TOTAL_LAUNCHED_REDUCES: 1
 ++++ Rack-local map tasks: RACK_LOCAL_MAPS: 1
 ++++ Total time spent by all maps in occupied slots (ms): SLOTS_MILLIS_MAPS: 3896
 ++++ Total time spent by all reduces in occupied slots (ms): SLOTS_MILLIS_REDUCES: 9006
==========================================================
# 这个 Counter group 包含了相当多地 job 执行细节数据。这里需要有个概念认识是:一般情况下,record 就表示一行数据,而相对地 byte 表示这行数据的大小是 多少,这里的 group 表示经过 reduce merge 后像这样的输入形式{“aaa”, [5, 8, 2, …]}。
* Counter Group: Map-Reduce Framework (org.apache.hadoop.mapreduce.TaskCounter)
 number of counters in this group: 20
 #所有 map task 从 HDFS 读取的文件总行数
 ++++ Map input records: MAP_INPUT_RECORDS: 3
 #map task 的直接输出 record 是多少,就是在 map 方法中调用 context.write 的次数,也就是未经过 Combine 时的原生输出条数
 ++++ Map output records: MAP_OUTPUT_RECORDS: 12
 # Map 的输出结果 key/value 都会被序列化到内存缓冲区中,所以这里的 bytes 指序列化后的最终字节之和
 ++++ Map output bytes: MAP_OUTPUT_BYTES: 129
 ++++ Map output materialized bytes: MAP_OUTPUT_MATERIALIZED_BYTES: 159
 # #与 map task 的 split 相关的数据都会保存于 HDFS 中,而在保存时元数据也相应地存储着数据是以怎样的压缩方式放入的,它的具体类型是什么,这些额外的数据是 MapReduce 框架加入的,与 job 无关,这里记录的大小就是表示额外信息的字节大小
 ++++ Input split bytes: SPLIT_RAW_BYTES: 117
 #Combiner 是为了减少尽量减少需要拉取和移动的数据,所以 combine 输入条数与 map 的输出条数是一致的。
 ++++ Combine input records: COMBINE_INPUT_RECORDS: 0
 # 经过 Combiner 后,相同 key 的数据经过压缩,在 map 端自己解决了很多重复数据,表示最终在 map 端中间文件中的所有条目数
 ++++ Combine output records: COMBINE_OUTPUT_RECORDS: 0
 #Reduce 总共读取了多少个这样的 groups
 ++++ Reduce input groups: REDUCE_INPUT_GROUPS: 4
 #Reduce 端的 copy 线程总共从 map 端抓取了多少的中间数据,表示各个 map task 最终的中间文件总和
 ++++ Reduce shuffle bytes: REDUCE_SHUFFLE_BYTES: 159
 #如果有 Combiner 的话,那么这里的数值就等于 map 端 Combiner 运算后的最后条数,如果没有,那么就应该等于 map 的输出条数
 ++++ Reduce input records: REDUCE_INPUT_RECORDS: 12
 #所有 reduce 执行后输出的总条目数
 ++++ Reduce output records: REDUCE_OUTPUT_RECORDS: 4
 #spill 过程在 map 和 reduce 端都会发生,这里统计在总共从内存往磁盘中 spill 了多少条数据
 ++++ Spilled Records: SPILLED_RECORDS: 24
 #每个 reduce 几乎都得从所有 map 端拉取数据,每个 copy 线程拉取成功一个 map 的数据,那么增 1,所以它的总数基本等于 reduce number * map number
 ++++ Shuffled Maps : SHUFFLED_MAPS: 1
 # copy 线程在抓取 map 端中间数据时,如果因为网络连接异常或是 IO 异常,所引起的 shuffle 错误次数
 ++++ Failed Shuffles: FAILED_SHUFFLE: 0
 #记录着 shuffle 过程中总共经历了多少次 merge 动作
 ++++ Merged Map outputs: MERGED_MAP_OUTPUTS: 1
 #通过 JMX 获取到执行 map 与 reduce 的子 JVM 总共的 GC 时间消耗
 ++++ GC time elapsed (ms): GC_TIME_MILLIS: 13
 ++++ CPU time spent (ms): CPU_MILLISECONDS: 3830
 ++++ Physical memory (bytes) snapshot: PHYSICAL_MEMORY_BYTES: 537718784
 ++++ Virtual memory (bytes) snapshot: VIRTUAL_MEMORY_BYTES: 7365263360
 ++++ Total committed heap usage (bytes): COMMITTED_HEAP_BYTES: 2022309888
==========================================================
# 这组内描述 Shuffle 过程中的各种错误情况发生次数,基本定位于 Shuffle 阶段 copy 线程抓取 map 端中间数据时的各种错误。
* Counter Group: Shuffle Errors (Shuffle Errors)
 number of counters in this group: 6
 #每个 map 都有一个 ID,如 attempt_201109020150_0254_m_000000_0,如果 reduce 的 copy 线程抓取过来的元数据中这个 ID 不是标准格式,那么此 Counter 增加
 ++++ BAD_ID: BAD_ID: 0
 #表示 copy 线程建立到 map 端的连接有误
 ++++ CONNECTION: CONNECTION: 0
 #Reduce 的 copy 线程如果在抓取 map 端数据时出现 IOException,那么这个值相应增加
 ++++ IO_ERROR: IO_ERROR: 0
 #map 端的那个中间结果是有压缩好的有格式数据,所有它有两个 length 信息:源数据大小与压缩后数据大小。如果这两个 length 信息传输的有误(负值),那么此 Counter 增加
 ++++ WRONG_LENGTH: WRONG_LENGTH: 0
 #每个 copy 线程当然是有目的: 为某个 reduce 抓取某些 map 的中间结果,如果当前抓取的 map 数据不是 copy 线程之前定义好的 map,那么就表示把数据拉错了
 ++++ WRONG_MAP: WRONG_MAP: 0
 #与上面描述一致,如果抓取的数据表示它不是为此 reduce 而准备的,那还是拉错数据了。
 ++++ WRONG_REDUCE: WRONG_REDUCE: 0
==========================================================
# 这个 group 表示 map task 读取文件内容(总输入数据) 的统计
* Counter Group: File Input Format Counters (org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter)
 number of counters in this group: 1
# Map task 的所有输入数据 (字节),等于各个 map task 的 map 方法传入的所有 value 值字节之和。
 ++++ Bytes Read: BYTES_READ: 81
==========================================================
## 这个 group 表示 reduce task 输出文件内容(总输出数据) 的统计
* Counter Group: File Output Format Counters (org.apache.hadoop.mapreduce.lib.output.FileOutputFormatCounter)
 number of counters in this group: 1
 ++++ Bytes Written: BYTES_WRITTEN: 35
==========================================================
# 自定义计数器的统计
* Counter Group: tmp.WordCountWithCounter$WordsNature (tmp.WordCountWithCounter$WordsNature)
 number of counters in this group: 3
 ++++ ALL: ALL: 4
 ++++ STARTS_WITH_DIGIT: STARTS_WITH_DIGIT: 1
 ++++ STARTS_WITH_LETTER: STARTS_WITH_LETTER: 3
4、最后的问题:
 
如果想要在 MapReduce 中实现一个类似计数器的“全局变量”,可以在 map、reduce 中以任意数据类型、任意修改变量值,并在 main 函数中回调获取该怎么办呢?

更多 Hadoop 相关信息见Hadoop 专题页面 http://www.linuxidc.com/topicnews.aspx?tid=13

在许多情况下,一个用户需要了解待分析的数据,尽管这并非所要执行的分析任务 的核心内容。以统计数据集中无效记录数目的任务为例,如果发现无效记录的比例 相当高,那么就需要认真思考为何存在如此多无效记录。是所采用的检测程序存在 缺陷,还是数据集质量确实很低,包含大量无效记录?如果确定是数据集的质量问 题,则可能需要扩大数据集的规模,以增大有效记录的比例,从而进行有意义的 分析。

计数器是一种收集作业统计信息的有效手段,用于质量控制或应用级统计。计数器 还可辅助诊断系统故障。如果需要将日志信息传输到 map 或 reduce 任务,更好的 方法通常是尝试传输计数器值以监测某一特定事件是否发生。对于大型分布式作业 而言,使用计数器更为方便。首先,获取计数器值比输出日志更方便,其次,根据 计数器值统计特定事件的发生次数要比分析一堆日志文件容易得多。

————————————– 分割线 ————————————–

Ubuntu 13.04 上搭建 Hadoop 环境 http://www.linuxidc.com/Linux/2013-06/86106.htm

Ubuntu 12.10 +Hadoop 1.2.1 版本集群配置 http://www.linuxidc.com/Linux/2013-09/90600.htm

Ubuntu 上搭建 Hadoop 环境(单机模式 + 伪分布模式)http://www.linuxidc.com/Linux/2013-01/77681.htm

Ubuntu 下 Hadoop 环境的配置 http://www.linuxidc.com/Linux/2012-11/74539.htm

单机版搭建 Hadoop 环境图文教程详解 http://www.linuxidc.com/Linux/2012-02/53927.htm

————————————– 分割线 ————————————–

2、内置计数器

Hadoop 为每个作业维护若干内置计数器, 以描述该作业的各项指标。例如,某些计数器记录已处理的字节数和记录数,使用户可监控已处理的输入数据量和已产生的输出数据量,并以此对 job 做适当的优化。

14/06/08 15:13:35 INFO mapreduce.Job: Counters: 46
  File System Counters
  FILE: Number of bytes read=159
  FILE: Number of bytes written=159447
  FILE: Number of read operations=0
  FILE: Number of large read operations=0
  FILE: Number of write operations=0
  HDFS: Number of bytes read=198
  HDFS: Number of bytes written=35
  HDFS: Number of read operations=6
  HDFS: Number of large read operations=0
  HDFS: Number of write operations=2
  Job Counters
  Launched map tasks=1
  Launched reduce tasks=1
  Rack-local map tasks=1
  Total time spent by all maps in occupied slots (ms)=3896
  Total time spent by all reduces in occupied slots (ms)=9006
  Map-Reduce Framework
  Map input records=3
  Map output records=12
  Map output bytes=129
  Map output materialized bytes=159
  Input split bytes=117
  Combine input records=0
  Combine output records=0
  Reduce input groups=4
  Reduce shuffle bytes=159
  Reduce input records=12
  Reduce output records=4
  Spilled Records=24
  Shuffled Maps =1
  Failed Shuffles=0
  Merged Map outputs=1
  GC time elapsed (ms)=13
  CPU time spent (ms)=3830
  Physical memory (bytes) snapshot=537718784
  Virtual memory (bytes) snapshot=7365263360
  Total committed heap usage (bytes)=2022309888
  Shuffle Errors
  BAD_ID=0
  CONNECTION=0
  IO_ERROR=0
  WRONG_LENGTH=0
  WRONG_MAP=0
  WRONG_REDUCE=0
  File Input Format Counters
  Bytes Read=81
  File Output Format Counters
  Bytes Written=35

计数器由其关联任务维护,并定期传到 tasktracker,再由 tasktracker 传给 jobtracker. 因此,计数器能够被全局地聚集。详见第 hadoop 权威指南第 170 页的“进度和状态的更新”小节。与其他计数器(包括用户定义的计数器)不同,内置的作业计数器实际上 由 jobtracker 维护,不必在整个网络中发送。
 一个任务的计数器值每次都是完整传输的,而非自上次传输之后再继续数未完成的传输,以避免由于消息丢失而引发的错误。另外,如果一个任务在作业执行期间失 败,则相关计数器值会减小。仅当一个作业执行成功之后,计数器的值才是完整可 靠的。
 
3、用户定义的 Java 计数器
 MapReduce 允许用户编写程序来定义计数器,计数器的值可在 mapper 或 reducer 中增加。多个计数器由一个 Java 枚举(enum) 类型来定义,以便对计数器分组。一 个作业可以定义的枚举类型数量不限,各个枚举类型所包含的字段数量也不限。枚 举类型的名称即为组的名称,枚举类型的字段就是计数器名称。计数器是全局的。换言之,MapReduce 框架将跨所有 map 和 reduce 聚集这些计数器,并在作业结束 时产生一个最终结果。
 Note1:需要说明的是,不同的 hadoop 版本定义的方式会有些许差异。
 
(1)在 0.20.x 版本中使用 counter 很简单, 直接定义即可,如无此 counter,hadoop 会自动添加此 counter.
 Counter ct = context.getCounter(“INPUT_WORDS”, “count”);
 ct.increment(1); 
(2)在 0.19.x 版本中, 需要定义 enum
 enum MyCounter {INPUT_WORDS};
 reporter.incrCounter(MyCounter.INPUT_WORDS, 1);
 RunningJob job = JobClient.runJob(conf);
 Counters c = job.getCounters();
 long cnt = c.getCounter(MyCounter.INPUT_WORDS);

更多详情见请继续阅读下一页的精彩内容:http://www.linuxidc.com/Linux/2014-08/105649p2.htm

正文完
星哥玩云-微信公众号
post-qrcode
 0
星锅
版权声明:本站原创文章,由 星锅 于2022-01-20发表,共计15831字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
【腾讯云】推广者专属福利,新客户无门槛领取总价值高达2860元代金券,每种代金券限量500张,先到先得。
阿里云-最新活动爆款每日限量供应
评论(没有评论)
验证码
【腾讯云】云服务器、云数据库、COS、CDN、短信等云产品特惠热卖中

星哥玩云

星哥玩云
星哥玩云
分享互联网知识
用户数
4
文章数
19351
评论数
4
阅读量
7987225
文章搜索
热门文章
星哥带你玩飞牛NAS-6:抖音视频同步工具,视频下载自动下载保存

星哥带你玩飞牛NAS-6:抖音视频同步工具,视频下载自动下载保存

星哥带你玩飞牛 NAS-6:抖音视频同步工具,视频下载自动下载保存 前言 各位玩 NAS 的朋友好,我是星哥!...
星哥带你玩飞牛NAS-3:安装飞牛NAS后的很有必要的操作

星哥带你玩飞牛NAS-3:安装飞牛NAS后的很有必要的操作

星哥带你玩飞牛 NAS-3:安装飞牛 NAS 后的很有必要的操作 前言 如果你已经有了飞牛 NAS 系统,之前...
我把用了20年的360安全卫士卸载了

我把用了20年的360安全卫士卸载了

我把用了 20 年的 360 安全卫士卸载了 是的,正如标题你看到的。 原因 偷摸安装自家的软件 莫名其妙安装...
再见zabbix!轻量级自建服务器监控神器在Linux 的完整部署指南

再见zabbix!轻量级自建服务器监控神器在Linux 的完整部署指南

再见 zabbix!轻量级自建服务器监控神器在 Linux 的完整部署指南 在日常运维中,服务器监控是绕不开的...
飞牛NAS中安装Navidrome音乐文件中文标签乱码问题解决、安装FntermX终端

飞牛NAS中安装Navidrome音乐文件中文标签乱码问题解决、安装FntermX终端

飞牛 NAS 中安装 Navidrome 音乐文件中文标签乱码问题解决、安装 FntermX 终端 问题背景 ...
阿里云CDN
阿里云CDN-提高用户访问的响应速度和成功率
随机文章
星哥带你玩飞牛NAS-3:安装飞牛NAS后的很有必要的操作

星哥带你玩飞牛NAS-3:安装飞牛NAS后的很有必要的操作

星哥带你玩飞牛 NAS-3:安装飞牛 NAS 后的很有必要的操作 前言 如果你已经有了飞牛 NAS 系统,之前...
再见zabbix!轻量级自建服务器监控神器在Linux 的完整部署指南

再见zabbix!轻量级自建服务器监控神器在Linux 的完整部署指南

再见 zabbix!轻量级自建服务器监控神器在 Linux 的完整部署指南 在日常运维中,服务器监控是绕不开的...
12.2K Star 爆火!开源免费的 FileConverter:右键一键搞定音视频 / 图片 / 文档转换,告别多工具切换

12.2K Star 爆火!开源免费的 FileConverter:右键一键搞定音视频 / 图片 / 文档转换,告别多工具切换

12.2K Star 爆火!开源免费的 FileConverter:右键一键搞定音视频 / 图片 / 文档转换...
Prometheus:监控系统的部署与指标收集

Prometheus:监控系统的部署与指标收集

Prometheus:监控系统的部署与指标收集 在云原生体系中,Prometheus 已成为最主流的监控与报警...
星哥带你玩飞牛NAS-13:自动追番、订阅下载 + 刮削,动漫党彻底解放双手!

星哥带你玩飞牛NAS-13:自动追番、订阅下载 + 刮削,动漫党彻底解放双手!

星哥带你玩飞牛 NAS-13:自动追番、订阅下载 + 刮削,动漫党彻底解放双手! 作为动漫爱好者,你是否还在为...

免费图片视频管理工具让灵感库告别混乱

一言一句话
-「
手气不错
开源MoneyPrinterTurbo 利用AI大模型,一键生成高清短视频!

开源MoneyPrinterTurbo 利用AI大模型,一键生成高清短视频!

  开源 MoneyPrinterTurbo 利用 AI 大模型,一键生成高清短视频! 在短视频内容...
安装并使用谷歌AI编程工具Antigravity(亲测有效)

安装并使用谷歌AI编程工具Antigravity(亲测有效)

  安装并使用谷歌 AI 编程工具 Antigravity(亲测有效) 引言 Antigravity...
每年0.99刀,拿下你的第一个顶级域名,详细注册使用

每年0.99刀,拿下你的第一个顶级域名,详细注册使用

每年 0.99 刀,拿下你的第一个顶级域名,详细注册使用 前言 作为长期折腾云服务、域名建站的老玩家,星哥一直...
让微信公众号成为 AI 智能体:从内容沉淀到智能问答的一次升级

让微信公众号成为 AI 智能体:从内容沉淀到智能问答的一次升级

让微信公众号成为 AI 智能体:从内容沉淀到智能问答的一次升级 大家好,我是星哥,之前写了一篇文章 自己手撸一...
4盘位、4K输出、J3455、遥控,NAS硬件入门性价比之王

4盘位、4K输出、J3455、遥控,NAS硬件入门性价比之王

  4 盘位、4K 输出、J3455、遥控,NAS 硬件入门性价比之王 开篇 在 NAS 市场中,威...