阿里云-云小站(无限量代金券发放中)
【腾讯云】云服务器、云数据库、COS、CDN、短信等热卖云产品特惠抢购

在MapReduce中利用MultipleOutputs输出多个文件

139次阅读
没有评论

共计 4701 个字符,预计需要花费 12 分钟才能阅读完成。

用户在使用 Mapreduce 时默认以 part-*命名,MultipleOutputs 可以将不同的键值对输出到用户自定义的不同的文件中。

实现过程是在调用 output.write(key, new IntWritable(total), key.toString());

方法时候第三个参数是  public void write(KEYOUT key, VALUEOUT value, String baseOutputPath) 指定了输出文件的命名前缀,那么我们可以通过对不同的 key 使用不同的 baseOutputPath 来使不同 key 对应的 value 输出到不同的文件中,比如将同一天的数据输出到以该日期命名的文件中

Spark 颠覆 MapReduce 保持的排序记录  http://www.linuxidc.com/Linux/2014-10/107909.htm

在 Oracle 数据库中实现 MapReduce  http://www.linuxidc.com/Linux/2014-10/107602.htm

MapReduce 实现矩阵乘法 – 实现代码 http://www.linuxidc.com/Linux/2014-09/106958.htm

基于 MapReduce 的图算法 PDF  http://www.linuxidc.com/Linux/2014-08/105692.htm

Hadoop 的 HDFS 和 MapReduce  http://www.linuxidc.com/Linux/2014-08/105661.htm

MapReduce 计数器简介 http://www.linuxidc.com/Linux/2014-08/105649.htm

Hadoop 技术内幕:深入解析 MapReduce 架构设计与实现原理 PDF 高清扫描版 http://www.linuxidc.com/Linux/2014-06/103576.htm

测试数据:ip-to-hosts.txt

18.217.167.70 United States
206.96.54.107 United States
196.109.151.139 Mauritius
174.52.58.113 United States
142.111.216.8 Canada
162.100.49.185 United States
146.38.26.54 United States
36.35.107.36 China
95.214.95.13 Spain
2.96.191.111 United Kingdom
62.177.119.177 Czech Republic
21.165.189.3 United States
46.190.32.115 Greece
113.173.113.29 Vietnam
42.65.172.142 Taiwan
197.91.198.199 South Africa
68.165.71.27 United States
110.119.165.104 China
171.50.76.89 India
171.207.52.113 Singapore
40.174.30.170 United States
191.170.95.175 United States
17.81.129.101 United States
91.212.157.202 France
173.83.82.99 United States
129.75.56.220 United States
149.25.104.198 United States
103.110.22.19 Indonesia
204.188.117.122 United States
138.23.10.72 United States
172.50.15.32 United States
85.88.38.58 Belgium
49.15.14.6 India
19.84.175.5 United States
50.158.140.215 United States
161.114.120.34 United States
118.211.174.52 Australia
220.98.113.71 Japan
182.101.16.171 China
25.45.75.194 United Kingdom
168.16.162.99 United States
155.60.219.154 Australia
26.216.17.198 United States
68.34.157.157 United States
89.176.196.28 Czech Republic
173.11.51.134 United States
116.207.191.159 China
164.210.124.152 United States
168.17.158.38 United States
174.24.173.11 United States
143.64.173.176 United States
160.164.158.125 Italy
15.111.128.4 United States
22.71.176.163 United States
105.57.100.182 Morocco
111.147.83.42 China
137.157.65.89 Australia

该文件中每行数据有两个字段 分别是 ip 地址和该 ip 地址对应的国家,以 \t 分隔

上代码

 public static class IPCountryReducer
            extends Reducer<Text, IntWritable, Text, IntWritable> {

        private MultipleOutputs output;

        @Override
        protected void setup(Context context
        ) throws IOException, InterruptedException {
            output = new MultipleOutputs(context);
        }

        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context
        ) throws IOException, InterruptedException {
            int total = 0;
            for(IntWritable value: values) {
                total += value.get();
            }
          <span style=”color:#FF0000;”> output.write(new Text(“Output by MultipleOutputs”), NullWritable.get(), key.toString());
            output.write(key, new IntWritable(total), key.toString());</span>

        }

        @Override
        protected void cleanup(Context context
        ) throws IOException, InterruptedException {
            output.close();
        }
    }

在 reduce 的 setup 方法中

 output = new MultipleOutputs(context);

然后在 reduce 中通过该 output 将内容输出到不同的文件中

  private Configuration conf;
    public static final String NAME = “named_output”;

    public static void main(String[] args) throws Exception {
        args =new String[] {“hdfs://caozw:9100/user/hadoop/hadooprealword”,”hdfs://caozw:9100/user/hadoop/hadooprealword/output”};
        ToolRunner.run(new Configuration(), new NamedCountryOutputJob(), args);
    }

    public int run(String[] args) throws Exception {
        if(args.length != 2) {
            System.err.println(“Usage: named_output <input> <output>”);
            System.exit(1);
        }

        Job job = new Job(conf, “IP count by country to named files”);
        job.setInputFormatClass(TextInputFormat.class);

        job.setMapperClass(IPCountryMapper.class);
        job.setReducerClass(IPCountryReducer.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setJarByClass(NamedCountryOutputJob.class);

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        return job.waitForCompletion(true) ? 1 : 0;

    }

    public void setConf(Configuration conf) {
        this.conf = conf;
    }

    public Configuration getConf() {
        return conf;
    }

    public static class IPCountryMapper
            extends Mapper<LongWritable, Text, Text, IntWritable> {

        private static final int country_pos = 1;
        private static final Pattern pattern = Pattern.compile(“\\t”);

        @Override
        protected void map(LongWritable key, Text value,
                          Context context) throws IOException, InterruptedException {
            String country = pattern.split(value.toString())[country_pos];
            context.write(new Text(country), new IntWritable(1));
        }
    }

测试结果:

在 MapReduce 中利用 MultipleOutputs 输出多个文件

在 MapReduce 中利用 MultipleOutputs 输出多个文件

更多 Hadoop 相关信息见 Hadoop 专题页面 http://www.linuxidc.com/topicnews.aspx?tid=13

正文完
星哥说事-微信公众号
post-qrcode
 0
星锅
版权声明:本站原创文章,由 星锅 于2022-01-20发表,共计4701字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
【腾讯云】推广者专属福利,新客户无门槛领取总价值高达2860元代金券,每种代金券限量500张,先到先得。
阿里云-最新活动爆款每日限量供应
评论(没有评论)
验证码
【腾讯云】云服务器、云数据库、COS、CDN、短信等云产品特惠热卖中