阿里云-云小站(无限量代金券发放中)
【腾讯云】云服务器、云数据库、COS、CDN、短信等热卖云产品特惠抢购

Hadoop数据输入的源码解析

415次阅读
没有评论

共计 7535 个字符,预计需要花费 19 分钟才能阅读完成。

我们知道,任何一个工程项目,最重要的是三个部分:输入,中间处理,输出。今天我们来深入的了解一下我们熟知的 Hadoop 系统中,输入是如何输入的?

在 hadoop 中,输入数据都是通过对应的 InputFormat 类和 RecordReader 类来实现的,其中 InputFormat 来实现将对应输入文件进行分片,RecordReader 类将对应分片中的数据读取进来。具体的方式如下:

(1)InputFormat 类是一个接口。

public interface InputFormat<K, V> {

  InputSplit[] getSplits(JobConf job, int numSplits) throws IOException;

  RecordReader<K, V> getRecordReader(InputSplit split,

                                    JobConf job,

                                    Reporter reporter) throws IOException;

}

(2)FileInputFormat 类实现了 InputFormat 接口。该类实现了 getSplits 方法,但是它也没有实现对应的 getRecordReader 方法。也就是说 FileInputFormat 还是一个抽象类。这里需要说明的一个问题是,FileInputFormat 用 isSplitable 方法来指定对应的文件是否支持数据的切分。默认情况下都是支持的,一般子类都需要重新实现它。

public abstract class FileInputFormat<K, V> implements InputFormat<K, V> {

  public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {

    FileStatus[] files = listStatus(job);

    // Save the number of input files in the job-conf

    job.setLong(NUM_INPUT_FILES, files.length);

    long totalSize = 0;                          // compute total size

    for (FileStatus file: files) {// check we have valid files

      if (file.isDir()) {

        throw new IOException(“Not a file: “+ file.getPath());

      }

      totalSize += file.getLen();

    }

    long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);

    long minSize = Math.max(job.getLong(“mapred.min.split.size”, 1),

                            minSplitSize);

    // generate splits

    ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);

    NetworkTopology clusterMap = new NetworkTopology();

    for (FileStatus file: files) {

      Path path = file.getPath();

      FileSystem fs = path.getFileSystem(job);

      long length = file.getLen();

      BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);

      if ((length != 0) && isSplitable(fs, path)) {

        long blockSize = file.getBlockSize();

        long splitSize = computeSplitSize(goalSize, minSize, blockSize);

 

        long bytesRemaining = length;

        while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {

          String[] splitHosts = getSplitHosts(blkLocations,

              length-bytesRemaining, splitSize, clusterMap);

          splits.add(new FileSplit(path, length-bytesRemaining, splitSize,

              splitHosts));

          bytesRemaining -= splitSize;

        }

        if (bytesRemaining != 0) {

          splits.add(new FileSplit(path, length-bytesRemaining, bytesRemaining,

                    blkLocations[blkLocations.length-1].getHosts()));

        }

      } else if (length != 0) {

        String[] splitHosts = getSplitHosts(blkLocations,0,length,clusterMap);

        splits.add(new FileSplit(path, 0, length, splitHosts));

      } else {

        //Create empty hosts array for zero length files

        splits.add(new FileSplit(path, 0, length, new String[0]));

      }

    }

    LOG.debug(“Total # of splits: ” + splits.size());

    return splits.toArray(new FileSplit[splits.size()]);

  }

// 该方法是用来判断是否可以进行数据的切分

  protected boolean isSplitable(FileSystem fs, Path filename) {

    return true;

  }

// 但是它也没有实现对应的 getRecordReader 方法。也就是说 FileInputFormat 还是一个抽象类。

  public abstract RecordReader<K, V> getRecordReader(InputSplit split,

                                              JobConf job,

                                               Reporter reporter)  throws IOException;

 }

(3)TextFileInputFormat类仅仅实现了 FileInputFormat 类的 getRecordReader 方法,并且重写了 isSplitable 方法,他并没有实现 getSplits 方法,由此可知,他的 getSplits 的实现还是交由父类 FileInputFormat 来实现的。(这里需要注意 TextFileInputFormat 并不是 InputFormat 的子类,TextFileInputFormat 它仅仅是继承了 InputFormat 的 getRecordReader 的方法而已。)

public class TextInputFormat extends FileInputFormat<LongWritable, Text>

  implements JobConfigurable {

 

  private CompressionCodecFactory compressionCodecs = null;

 

  public void configure(JobConf conf) {

    compressionCodecs = new CompressionCodecFactory(conf);

  }

 

// 子类重新实现了 isSplitable 方法

  protected boolean isSplitable(FileSystem fs, Path file) {

    final CompressionCodec codec = compressionCodecs.getCodec(file);

    if (null == codec) {

      return true;

    }

    return codec instanceof SplittableCompressionCodec;

  }

 // 该方法实现了将文件中的数据读入到对应的 Map 方法中。

  public RecordReader<LongWritable, Text> getRecordReader(

                                          InputSplit genericSplit, JobConf job,

                                          Reporter reporter)

    throws IOException {

   

    reporter.setStatus(genericSplit.toString());

    String delimiter = job.get(“textinputformat.record.delimiter”);

    byte[] recordDelimiterBytes = null;

    if (null != delimiter) recordDelimiterBytes = delimiter.getBytes();

    return new LineRecordReader(job, (FileSplit) genericSplit,

        recordDelimiterBytes);

  }

}

从上面可以看出一个 Text 格式的文件是通过什么样的类继承层次输入到 map 方法中。下面主要介绍一下,到底是如何切分的?我们从类的继承层次关系上可以看出,具体的切分方式是通过 FileInputFormat 类来实现的。因此,要了解文件是如何切分的,只需要查看一下 FileInputFormat 类中的 getSplits 方法的实现细节即可。下面我再次把 FileInputFormat 类中的 getSplits 方法贴出来:然后分析每一句代码。

  public InputSplit[] getSplits(JobConf job, int numSplits)

    throws IOException {

    FileStatus[] files = listStatus(job); // 列出当前 job 中所有的输入文件

   

    // Save the number of input files in the job-conf

job.setLong(NUM_INPUT_FILES, files.length); // 设置当前 job 的输入文件数目

 

// 计算当前 job 所有输入文件总的大小

long totalSize = 0;                          // compute total size

// 遍历每一个文件

    for (FileStatus file: files) {// check we have valid files

      if (file.isDir()) {

        throw new IOException(“Not a file: “+ file.getPath());

      }

      totalSize += file.getLen();

    }

// numSplits 是分片数,goalSize 是平均每一个分片的大小,minSize 是每个分片最小值

    long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);

    long minSize = Math.max(job.getLong(“mapred.min.split.size”, 1),

                            minSplitSize);

 

    // generate splits  计算分片

    ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);

    NetworkTopology clusterMap = new NetworkTopology();

    for (FileStatus file: files) {

      Path path = file.getPath();

      FileSystem fs = path.getFileSystem(job);

      long length = file.getLen();

    // 获取文件的位置

      BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);

//isSplitable 方法根据对应文件名称判断对应文件是否可以切分

     if ((length != 0) && isSplitable(fs, path)) {

        long blockSize = file.getBlockSize();// 获取文件块的大小

        // computeSplitSize 方法计算真正的分片大小

        long splitSize = computeSplitSize(goalSize, minSize, blockSize);

 

        long bytesRemaining = length;// 文件剩余大小

 

// SPLIT_SLOP=1.1,文件大小 / 分片的大小 > SPLIT_SLOP 则进行切分。

        while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {

        // splitHosts 用来记录分片元数据信息(包括切片的位置,大小等等)

          String[] splitHosts = getSplitHosts(blkLocations,

              length-bytesRemaining, splitSize, clusterMap);

          splits.add(new FileSplit(path, length-bytesRemaining, splitSize,

              splitHosts));

          bytesRemaining -= splitSize;

        }

       

        if (bytesRemaining != 0) {

          splits.add(new FileSplit(path, length-bytesRemaining, bytesRemaining,

                    blkLocations[blkLocations.length-1].getHosts()));

        }

      } else if (length != 0) {// 如果文件不能切分,相应的会将整个文件作为一个分片。

        String[] splitHosts = getSplitHosts(blkLocations,0,length,clusterMap);

        splits.add(new FileSplit(path, 0, length, splitHosts));

      } else {

        //Create empty hosts array for zero length files

        splits.add(new FileSplit(path, 0, length, new String[0]));

      }

    }

    LOG.debug(“Total # of splits: ” + splits.size());

    return splits.toArray(new FileSplit[splits.size()]);

  }

// 真正计算分片大小的地方。

protected long computeSplitSize(long goalSize, long minSize,

                                      long blockSize) {

    return Math.max(minSize, Math.min(goalSize, blockSize));

  }

综上所述,对于 MR 的输入文件采用的方式是通过 FileInputFormat 类来进行数据的切分,在切分之前,是通过 isSplitable 方法来判断是否可以切分,若不能切分,则会将整个文件作为一个分片作为输入。因此,若有业务需求需要对应文件不能进行切分的话,可以将 isSplitable 方法方位 false 即可。

这里还需要注意一个问题,倘若你的文件都是小文件的话,对应的 getSplits 方法也不会对其进行切分的。一般情况小文件指的是其大小小于对应 hadoop 中 HDFS 的块的大小(128M)。 

下面关于 Hadoop 的文章您也可能喜欢,不妨看看:

Ubuntu14.04 下 Hadoop2.4.1 单机 / 伪分布式安装配置教程  http://www.linuxidc.com/Linux/2015-02/113487.htm

CentOS 安装和配置 Hadoop2.2.0  http://www.linuxidc.com/Linux/2014-01/94685.htm

Ubuntu 13.04 上搭建 Hadoop 环境 http://www.linuxidc.com/Linux/2013-06/86106.htm

Ubuntu 12.10 +Hadoop 1.2.1 版本集群配置 http://www.linuxidc.com/Linux/2013-09/90600.htm

Ubuntu 上搭建 Hadoop 环境(单机模式 + 伪分布模式)http://www.linuxidc.com/Linux/2013-01/77681.htm

Ubuntu 下 Hadoop 环境的配置 http://www.linuxidc.com/Linux/2012-11/74539.htm

单机版搭建 Hadoop 环境图文教程详解 http://www.linuxidc.com/Linux/2012-02/53927.htm

更多 Hadoop 相关信息见Hadoop 专题页面 http://www.linuxidc.com/topicnews.aspx?tid=13

本文永久更新链接地址:http://www.linuxidc.com/Linux/2016-05/131571.htm

正文完
星哥玩云-微信公众号
post-qrcode
 0
星锅
版权声明:本站原创文章,由 星锅 于2022-01-21发表,共计7535字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
【腾讯云】推广者专属福利,新客户无门槛领取总价值高达2860元代金券,每种代金券限量500张,先到先得。
阿里云-最新活动爆款每日限量供应
评论(没有评论)
验证码
【腾讯云】云服务器、云数据库、COS、CDN、短信等云产品特惠热卖中

星哥玩云

星哥玩云
星哥玩云
分享互联网知识
用户数
4
文章数
19351
评论数
4
阅读量
7992149
文章搜索
热门文章
星哥带你玩飞牛NAS-6:抖音视频同步工具,视频下载自动下载保存

星哥带你玩飞牛NAS-6:抖音视频同步工具,视频下载自动下载保存

星哥带你玩飞牛 NAS-6:抖音视频同步工具,视频下载自动下载保存 前言 各位玩 NAS 的朋友好,我是星哥!...
星哥带你玩飞牛NAS-3:安装飞牛NAS后的很有必要的操作

星哥带你玩飞牛NAS-3:安装飞牛NAS后的很有必要的操作

星哥带你玩飞牛 NAS-3:安装飞牛 NAS 后的很有必要的操作 前言 如果你已经有了飞牛 NAS 系统,之前...
我把用了20年的360安全卫士卸载了

我把用了20年的360安全卫士卸载了

我把用了 20 年的 360 安全卫士卸载了 是的,正如标题你看到的。 原因 偷摸安装自家的软件 莫名其妙安装...
再见zabbix!轻量级自建服务器监控神器在Linux 的完整部署指南

再见zabbix!轻量级自建服务器监控神器在Linux 的完整部署指南

再见 zabbix!轻量级自建服务器监控神器在 Linux 的完整部署指南 在日常运维中,服务器监控是绕不开的...
飞牛NAS中安装Navidrome音乐文件中文标签乱码问题解决、安装FntermX终端

飞牛NAS中安装Navidrome音乐文件中文标签乱码问题解决、安装FntermX终端

飞牛 NAS 中安装 Navidrome 音乐文件中文标签乱码问题解决、安装 FntermX 终端 问题背景 ...
阿里云CDN
阿里云CDN-提高用户访问的响应速度和成功率
随机文章
多服务器管理神器 Nexterm 横空出世!NAS/Win/Linux 通吃,SSH/VNC/RDP 一站式搞定

多服务器管理神器 Nexterm 横空出世!NAS/Win/Linux 通吃,SSH/VNC/RDP 一站式搞定

多服务器管理神器 Nexterm 横空出世!NAS/Win/Linux 通吃,SSH/VNC/RDP 一站式搞...
星哥带你玩飞牛NAS-13:自动追番、订阅下载 + 刮削,动漫党彻底解放双手!

星哥带你玩飞牛NAS-13:自动追番、订阅下载 + 刮削,动漫党彻底解放双手!

星哥带你玩飞牛 NAS-13:自动追番、订阅下载 + 刮削,动漫党彻底解放双手! 作为动漫爱好者,你是否还在为...
从“纸堆”到“电子化”文档:用这个开源系统打造你的智能文档管理系统

从“纸堆”到“电子化”文档:用这个开源系统打造你的智能文档管理系统

从“纸堆”到“电子化”文档:用这个开源系统打造你的智能文档管理系统 大家好,我是星哥。公司的项目文档存了一堆 ...
星哥带你玩飞牛NAS硬件03:五盘位+N5105+双网口的成品NAS值得入手吗

星哥带你玩飞牛NAS硬件03:五盘位+N5105+双网口的成品NAS值得入手吗

星哥带你玩飞牛 NAS 硬件 03:五盘位 +N5105+ 双网口的成品 NAS 值得入手吗 前言 大家好,我...
安装Black群晖DSM7.2系统安装教程(在Vmware虚拟机中、实体机均可)!

安装Black群晖DSM7.2系统安装教程(在Vmware虚拟机中、实体机均可)!

安装 Black 群晖 DSM7.2 系统安装教程(在 Vmware 虚拟机中、实体机均可)! 前言 大家好,...

免费图片视频管理工具让灵感库告别混乱

一言一句话
-「
手气不错
自己手撸一个AI智能体—跟创业大佬对话

自己手撸一个AI智能体—跟创业大佬对话

自己手撸一个 AI 智能体 — 跟创业大佬对话 前言 智能体(Agent)已经成为创业者和技术人绕...
开发者福利:免费 .frii.site 子域名,一分钟申请即用

开发者福利:免费 .frii.site 子域名,一分钟申请即用

  开发者福利:免费 .frii.site 子域名,一分钟申请即用 前言 在学习 Web 开发、部署...
Prometheus:监控系统的部署与指标收集

Prometheus:监控系统的部署与指标收集

Prometheus:监控系统的部署与指标收集 在云原生体系中,Prometheus 已成为最主流的监控与报警...
小白也能看懂:什么是云服务器?腾讯云 vs 阿里云对比

小白也能看懂:什么是云服务器?腾讯云 vs 阿里云对比

小白也能看懂:什么是云服务器?腾讯云 vs 阿里云对比 星哥玩云,带你从小白到上云高手。今天咱们就来聊聊——什...
150元打造低成本NAS小钢炮,捡一块3865U工控板

150元打造低成本NAS小钢炮,捡一块3865U工控板

150 元打造低成本 NAS 小钢炮,捡一块 3865U 工控板 一块二手的熊猫 B3 工控板 3865U,搭...