阿里云-云小站(无限量代金券发放中)
【腾讯云】云服务器、云数据库、COS、CDN、短信等热卖云产品特惠抢购

Hadoop基于文件的数据结构及实例

117次阅读
没有评论

共计 5486 个字符,预计需要花费 14 分钟才能阅读完成。

基于文件的数据结构
两种文件格式:
1、SequenceFile
2、MapFile

SequenceFile
1、SequenceFile 文件是 Hadoop 用来存储二进制形式的 <key,value> 对而设计的一种平面文件 (Flat File)。

2、可以把 SequenceFile 当做一个容器,把所有文件打包到 SequenceFile 类中可以高效的对小文件进行存储和处理。

3、SequenceFile 文件并不按照其存储的 key 进行排序存储,SequenceFile 的内部类 Writer** 提供了 append 功能 **。

4、SequenceFile 中的 key 和 value 可以是任意类型 Writable 或者是自定义 Writable 类型。

SequenceFile 压缩
1、SequenceFile 的内部格式取决于是否启用压缩,如果是,要么是记录压缩,要么是块压缩。
2、三种类型:
A.无压缩类型:如果没有启用压缩(默认设置),那么每个记录就由它的记录长度(字节数)、键的长度,键和值组成。长度字段为四字节。

B.记录压缩类型:记录压缩格式与无压缩格式基本相同,不同的是值字节是用定义在头部的编码器来压缩。注意,键是不压缩的。

C.块压缩类型:块压缩一次压缩多个记录,因此它比记录压缩更紧凑,而且一般优先选择。当记录的字节数达到最小大小,才会添加到块。该最小值由 io.seqfile.compress.blocksize 中的属性定义。默认值是 1000000 字节。格式为记录数、键长度、键、值长度、值。

无压缩格式与记录压缩格式

Hadoop 基于文件的数据结构及实例

块压缩格式

Hadoop 基于文件的数据结构及实例

SequenceFile 文件格式的好处:
A.支持基于记录(Record)或块(Block)的数据压缩。
B.支持 splittable,能够作为 MapReduce 的输入分片。
C.修改简单:主要负责修改相应的业务逻辑,而不用考虑具体的存储格式。
SequenceFile 文件格式的坏处:
坏处是需要一个合并文件的过程,且合并后的文件将不方便查看。因为它是二进制文件。

读写 SequenceFile
写过程:
1)创建 Configuration
2)获取 FileSystem
3)创建文件输出路径 Path
4)调用 SequenceFile.createWriter 得到 SequenceFile.Writer 对象
5)调用 SequenceFile.Writer.append 追加写入文件
6)关闭流
读过程:
1)创建 Configuration
2)获取 FileSystem
3)创建文件输出路径 Path
4)new 一个 SequenceFile.Reader 进行读取
5)得到 keyClass 和 valueClass
6)关闭流

org.apache.hadoop.io
Class SequenceFile
There are three SequenceFile Writers based on the SequenceFile.CompressionType used to compress key/value pairs:
1、Writer : Uncompressed records.
2、RecordCompressWriter : Record-compressed files, only compress values.
3、BlockCompressWriter : Block-compressed files, both keys & values are collected in ‘blocks’ separately and compressed. The size of the ‘block’ is configurable1
无压缩方式、记录压缩、块压缩实例
package SequenceFile;

import java.io.IOException;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.hadoop.util.ReflectionUtils;

public class Demo01 {

    final static String uri= “hdfs://liguodong:8020/liguodong”;
    final static String[] data = {
        “apache,software”,”chinese,good”,”james,NBA”,”index,pass”
    };

    public static void main(String[] args) throws IOException {
        //1
        Configuration configuration = new Configuration();
        //2
        FileSystem fs = FileSystem.get(URI.create(uri),configuration);
        //3
        Path path = new Path(“/tmp.seq”);

        write(fs,configuration,path);
        read(fs,configuration,path);

    }

    public static void write(FileSystem fs,Configuration configuration,Path path) throws IOException{
        //4
        IntWritable key = new IntWritable();
        Text value = new Text();
        // 无压缩
        /*@SuppressWarnings(“deprecation”)
        SequenceFile.Writer writer = SequenceFile.createWriter
                (fs,configuration,path,key.getClass(),value.getClass());*/
        // 记录压缩
        @SuppressWarnings(“deprecation”)
        SequenceFile.Writer writer = SequenceFile.createWriter
                (fs,configuration,path,key.getClass(),
                        value.getClass(),CompressionType.RECORD,new BZip2Codec());
        // 块压缩
        /*@SuppressWarnings(“deprecation”)
        SequenceFile.Writer writer = SequenceFile.createWriter
                (fs,configuration,path,key.getClass(),
                value.getClass(),CompressionType.BLOCK,new BZip2Codec());*/

        //5
        for (int i = 0; i < 30; i++) {
            key.set(100-i);
            value.set(data[i%data.length]);
            writer.append(key, value);
        }
        //6、关闭流
        IOUtils.closeStream(writer);       
    }

    public static void read(FileSystem fs,Configuration configuration,Path path) throws IOException {
        //4
        @SuppressWarnings(“deprecation”)
        SequenceFile.Reader reader = new SequenceFile.Reader(fs, path,configuration);
        //5
        Writable key = (Writable) ReflectionUtils.newInstance
                (reader.getKeyClass(), configuration);
        Writable value = (Writable) ReflectionUtils.newInstance
                (reader.getValueClass(), configuration);

        while(reader.next(key,value)){
            System.out.println(“key = ” + key);
            System.out.println(“value = ” + value);
            System.out.println(“position = “+ reader.getPosition());
        }
        IOUtils.closeStream(reader);
    }
}

运行结果:

key = 100
value = apache,software
position = 164
key = 99
value = chinese,good
position = 197
key = 98
value = james,NBA
position = 227
key = 97
value = index,pass
position = 258
key = 96
value = apache,software
position = 294
key = 95
value = chinese,good
position = 327
……
key = 72
value = apache,software
position = 1074
key = 71
value = chinese,good
position = 11071
MapFile
public class MapFile {
  /** The name of the index file. */
  public static final String INDEX_FILE_NAME = “index”;

  /** The name of the data file. */
  public static final String DATA_FILE_NAME = “data”;
}

MapFile 是经过排序的索引的 SequenceFile,可以根据 key 进行查找。

与 SequenceFile 不同的是,MapFile 的 Key 一定要实现 WritableComparable 接口 , 即 Key 值是可比较的,而 value 是 Writable 类型的。
可以使用 MapFile.fix() 方法来重建索引,把 SequenceFile 转换成 MapFile。
它有两个静态成员变量:

static final String INDEX_FILE_NAME
static final String DATA_FILE_NAME

通过观察其目录结构可以看到 MapFile 由两部分组成,分别是 data 和 index。
index 作为文件的数据索引,主要记录了每个 Record 的 key 值,以及该 Record 在文件中的偏移位置。

在 MapFile 被访问的时候,索引文件会被加载到内存,通过索引映射关系可迅速定位到指定 Record 所在文件位置。
因此,相对 SequenceFile 而言,MapFile 的检索效率是高效的,缺点是会消耗一部分内存来存储 index 数据。
需注意的是,MapFile 并不会把所有 Record 都记录到 index 中去,默认情况下每隔 128 条记录存储一个索引映射。当然,记录间隔可人为修改,通过 MapFIle.Writer 的 setIndexInterval() 方法,或修改 io.map.index.interval 属性;

读写 MapFile
写过程:
1)创建 Configuration
2)获取 FileSystem
3)创建文件输出路径 Path
4)new 一个 MapFile.Writer 对象
5)调用 MapFile.Writer.append 追加写入文件
6)关闭流
读过程:
1)创建 Configuration
2)获取 FileSystem
3)创建文件输出路径 Path
4)new 一个 MapFile.Reader 进行读取
5)得到 keyClass 和 valueClass
6)关闭流

具体操作与 SequenceFile 相似。

命令行查看二进制文件
hdfs dfs -text /liguodong/tmp.seq

更多 Hadoop 相关信息见 Hadoop 专题页面 http://www.linuxidc.com/topicnews.aspx?tid=13

本文永久更新链接地址 :http://www.linuxidc.com/Linux/2015-06/118547.htm

正文完
星哥说事-微信公众号
post-qrcode
 
星锅
版权声明:本站原创文章,由 星锅 2022-01-20发表,共计5486字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
【腾讯云】推广者专属福利,新客户无门槛领取总价值高达2860元代金券,每种代金券限量500张,先到先得。
阿里云-最新活动爆款每日限量供应
评论(没有评论)
验证码
【腾讯云】云服务器、云数据库、COS、CDN、短信等云产品特惠热卖中