阿里云-云小站(无限量代金券发放中)
【腾讯云】云服务器、云数据库、COS、CDN、短信等热卖云产品特惠抢购

Ubuntu 14.04 LTS下HBase开发实例学习

425次阅读
没有评论

共计 37272 个字符,预计需要花费 94 分钟才能阅读完成。

1 开发环境

在进行 Hbase 开发前,需要安装 JDK、Hadoop 和 HBase,选择一款合适的开发 IDE,具体安装方法就不介绍了,网上有很多参考资料,这里给出我的开发环境:

操作系统:Ubuntu 14.04 LTS

Java 版本:jdk1.7.0_79

Hadoop 版本:hadoop-2.6.0-cdh5.7.1

HBase 版本:hbase-1.2.0-cdh5.7.1

Ecipse 版本:Eclipse Java EE LunaRelease

Hadoop+HBase 搭建云存储总结 PDF http://www.linuxidc.com/Linux/2013-05/83844.htm

Ubuntu Server 14.04 下 Hbase 数据库安装  http://www.linuxidc.com/Linux/2016-05/131499.htm

HBase 结点之间时间不一致造成 regionserver 启动失败 http://www.linuxidc.com/Linux/2013-06/86655.htm

Hadoop+ZooKeeper+HBase 集群配置 http://www.linuxidc.com/Linux/2013-06/86347.htm

Hadoop 集群安装 &HBase 实验环境搭建 http://www.linuxidc.com/Linux/2013-04/83560.htm

基于 Hadoop 集群的 HBase 集群的配置 http://www.linuxidc.com/Linux/2013-03/80815.htm‘

Hadoop 安装部署笔记之 -HBase 完全分布模式安装 http://www.linuxidc.com/Linux/2012-12/76947.htm

单机版搭建 HBase 环境图文教程详解 http://www.linuxidc.com/Linux/2012-10/72959.htm

使用 Maven 构建项目,在 pom.xml 中添加 hbase 的依赖如下:

 

<repositories>
        <repository>
          <id>cloudera</id>
          <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
        </repository>
    </repositories>

    <dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>3.8.1</version>
            <scope>test</scope>
        </dependency>
        <dependency>  
            <groupId>org.apache.hadoop</groupId>  
            <artifactId>hadoop-common</artifactId>  
            <version>2.6.0-cdh5.7.1</version>  
        </dependency>  
        <dependency>  
            <groupId>org.apache.hadoop</groupId>  
            <artifactId>hadoop-hdfs</artifactId>  
            <version>2.6.0-cdh5.7.1</version>  
        </dependency>
        <dependency>  
            <groupId>org.apache.hbase</groupId>  
            <artifactId>hbase-client</artifactId>  
            <version>1.2.0-cdh5.7.1</version>  
        </dependency>
		<dependency>  
            <groupId>org.apache.hbase</groupId>  
            <artifactId>hbase-server</artifactId>  
            <version>1.2.0-cdh5.7.1</version>  
        </dependency>
    </dependencies>

 

2 初始化配置

 

首先需要设置 HBase 的配置,如 ZooKeeper 的地址、端口号等等。可以通过 org.apache.hadoop.conf.Configuration.set 方法手工设置 HBase 的配置信息,也可以直接将 HBase 的 hbase-site.xml 配置文件引入项目即可。下面给出配置代码:

// 声明静态配置
    private static Configuration conf = null;
    static {conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum", "localhost");
        conf.set("hbase.zookeeper.property.clientPort", "2181");
    }

3 常见 API 的使用

HBase 的常用操作包括建表、插入表数据、删除表数据、获取一行数据、表扫描、删除列族、删除表等等,下面给出具体代码。

3.1 创建数据库表

// 创建数据库表
    public static void createTable(String tableName, String[] columnFamilys) throws IOException {
        // 建立一个数据库的连接
        Connection conn = ConnectionFactory.createConnection(conf);
        // 创建一个数据库管理员
        HBaseAdmin hAdmin = (HBaseAdmin) conn.getAdmin();
        if (hAdmin.tableExists(tableName)) {System.out.println(tableName + "表已存在");
            conn.close();
            System.exit(0);
        } else {
            // 新建一个表描述
            HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(tableName));
            // 在表描述里添加列族
            for (String columnFamily : columnFamilys) {tableDesc.addFamily(new HColumnDescriptor(columnFamily));
            }
            // 根据配置好的表描述建表
            hAdmin.createTable(tableDesc);
            System.out.println("创建" + tableName + "表成功");
        }
        conn.close();}

3.2 添加一条数据

// 添加一条数据
    public static void addRow(String tableName, String rowKey, String columnFamily, String column, String value) 
            throws IOException {
        // 建立一个数据库的连接
        Connection conn = ConnectionFactory.createConnection(conf);
        // 获取表
        HTable table = (HTable) conn.getTable(TableName.valueOf(tableName));
        // 通过 rowkey 创建一个 put 对象
        Put put = new Put(Bytes.toBytes(rowKey));
        // 在 put 对象中设置列族、列、值
        put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(value));
        // 插入数据, 可通过 put(List<Put>) 批量插入
        table.put(put);
        // 关闭资源
        table.close();
        conn.close();}

3.3 获取一条数据

// 通过 rowkey 获取一条数据
    public static void getRow(String tableName, String rowKey) throws IOException {
        // 建立一个数据库的连接
        Connection conn = ConnectionFactory.createConnection(conf);
        // 获取表
        HTable table = (HTable) conn.getTable(TableName.valueOf(tableName));
        // 通过 rowkey 创建一个 get 对象
        Get get = new Get(Bytes.toBytes(rowKey));
        // 输出结果
        Result result = table.get(get);
        for (Cell cell : result.rawCells()) {
            System.out.println("行键:" + new String(CellUtil.cloneRow(cell)) + "\t" +
                    "列族:" + new String(CellUtil.cloneFamily(cell)) + "\t" + 
                    "列名:" + new String(CellUtil.cloneQualifier(cell)) + "\t" + 
                    "值:" + new String(CellUtil.cloneValue(cell)) + "\t" +
                    "时间戳:" + cell.getTimestamp());
        }
        // 关闭资源
        table.close();
        conn.close();}

3.4 全表扫描

// 全表扫描
    public static void scanTable(String tableName) throws IOException {
        // 建立一个数据库的连接
        Connection conn = ConnectionFactory.createConnection(conf);
        // 获取表
        HTable table = (HTable) conn.getTable(TableName.valueOf(tableName));
        // 创建一个扫描对象
        Scan scan = new Scan();
        // 扫描全表输出结果
        ResultScanner results = table.getScanner(scan);
        for (Result result : results) {for (Cell cell : result.rawCells()) {
                System.out.println("行键:" + new String(CellUtil.cloneRow(cell)) + "\t" +
                        "列族:" + new String(CellUtil.cloneFamily(cell)) + "\t" + 
                        "列名:" + new String(CellUtil.cloneQualifier(cell)) + "\t" + 
                        "值:" + new String(CellUtil.cloneValue(cell)) + "\t" +
                        "时间戳:" + cell.getTimestamp());
            }
        }
        // 关闭资源
		results.close();
        table.close();
        conn.close();}

3.5 删除一条数据

// 删除一条数据
    public static void delRow(String tableName, String rowKey) throws IOException {
        // 建立一个数据库的连接
        Connection conn = ConnectionFactory.createConnection(conf);
        // 获取表
        HTable table = (HTable) conn.getTable(TableName.valueOf(tableName));
        // 删除数据
        Delete delete = new Delete(Bytes.toBytes(rowKey));
        table.delete(delete);
        // 关闭资源
        table.close();
        conn.close();}

3.6 删除多条数据

// 删除多条数据
    public static void delRows(String tableName, String[] rows) throws IOException {
        // 建立一个数据库的连接
        Connection conn = ConnectionFactory.createConnection(conf);
        // 获取表
        HTable table = (HTable) conn.getTable(TableName.valueOf(tableName));
        // 删除多条数据
        List<Delete> list = new ArrayList<Delete>();
        for (String row : rows) {Delete delete = new Delete(Bytes.toBytes(row));
            list.add(delete);
        }
        table.delete(list);
        // 关闭资源
        table.close();
        conn.close();}

3.7 删除列族

// 删除列族
    public static void delColumnFamily(String tableName, String columnFamily) throws IOException {
        // 建立一个数据库的连接
        Connection conn = ConnectionFactory.createConnection(conf);
        // 创建一个数据库管理员
        HBaseAdmin hAdmin = (HBaseAdmin) conn.getAdmin();
        // 删除一个表的指定列族
        hAdmin.deleteColumn(tableName, columnFamily);
        // 关闭资源
        conn.close();}

3.8 删除数据库表

// 删除数据库表
    public static void deleteTable(String tableName) throws IOException {
        // 建立一个数据库的连接
        Connection conn = ConnectionFactory.createConnection(conf);
        // 创建一个数据库管理员
        HBaseAdmin hAdmin = (HBaseAdmin) conn.getAdmin();
        if (hAdmin.tableExists(tableName)) {
            // 失效表
            hAdmin.disableTable(tableName);
            // 删除表
            hAdmin.deleteTable(tableName);
            System.out.println("删除" + tableName + "表成功");
            conn.close();} else {System.out.println("需要删除的" + tableName + "表不存在");
            conn.close();
            System.exit(0);
        }
    }

3.9 追加插入

// 追加插入 (将原有 value 的后面追加新的 value,如原有 value= a 追加 value=bc 则最后的 value=abc)
    public static void appendData(String tableName, String rowKey, String columnFamily, String column, String value) 
            throws IOException {
        // 建立一个数据库的连接
        Connection conn = ConnectionFactory.createConnection(conf);
        // 获取表
        HTable table = (HTable) conn.getTable(TableName.valueOf(tableName));
        // 通过 rowkey 创建一个 append 对象
        Append append = new Append(Bytes.toBytes(rowKey));
        // 在 append 对象中设置列族、列、值
        append.add(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(value));
        // 追加数据
        table.append(append);
        // 关闭资源
        table.close();
        conn.close();}

3.10 符合条件后添加数据

// 符合条件后添加数据 (只能针对某一个 rowkey 进行原子操作)
    public static boolean checkAndPut(String tableName, String rowKey, String columnFamilyCheck, String columnCheck, String valueCheck, String columnFamily, String column, String value) throws IOException {
        // 建立一个数据库的连接
        Connection conn = ConnectionFactory.createConnection(conf);
        // 获取表
        HTable table = (HTable) conn.getTable(TableName.valueOf(tableName));
        // 设置需要添加的数据
        Put put = new Put(Bytes.toBytes(rowKey));
        put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(value));
        // 当判断条件为真时添加数据
        boolean result = table.checkAndPut(Bytes.toBytes(rowKey), Bytes.toBytes(columnFamilyCheck), 
                Bytes.toBytes(columnCheck), Bytes.toBytes(valueCheck), put);
        // 关闭资源
        table.close();
        conn.close();
        
        return result;
    }

3.11 符合条件后删除数据

// 符合条件后刪除数据 (只能针对某一个 rowkey 进行原子操作)
    public static boolean checkAndDelete(String tableName, String rowKey, String columnFamilyCheck, String columnCheck, 
            String valueCheck, String columnFamily, String column) throws IOException {
        // 建立一个数据库的连接
        Connection conn = ConnectionFactory.createConnection(conf);
        // 获取表
        HTable table = (HTable) conn.getTable(TableName.valueOf(tableName));
        // 设置需要刪除的 delete 对象
        Delete delete = new Delete(Bytes.toBytes(rowKey));
        delete.addColumn(Bytes.toBytes(columnFamilyCheck), Bytes.toBytes(columnCheck));
        // 当判断条件为真时添加数据
        boolean result = table.checkAndDelete(Bytes.toBytes(rowKey), Bytes.toBytes(columnFamilyCheck), Bytes.toBytes(columnCheck), 
                Bytes.toBytes(valueCheck), delete);
        // 关闭资源
        table.close();
        conn.close();

        return result;
    }

3.12 计数器

// 计数器 (amount 为正数则计数器加,为负数则计数器减,为 0 则获取当前计数器的值)
    public static long incrementColumnValue(String tableName, String rowKey, String columnFamily, String column, long amount) 
            throws IOException {
        // 建立一个数据库的连接
        Connection conn = ConnectionFactory.createConnection(conf);
        // 获取表
        HTable table = (HTable) conn.getTable(TableName.valueOf(tableName));
        // 计数器
        long result = table.incrementColumnValue(Bytes.toBytes(rowKey), Bytes.toBytes(columnFamily), Bytes.toBytes(column), amount);
        // 关闭资源
        table.close();
        conn.close();
        
        return result;
    }

4 内置过滤器的使用

HBase 为筛选数据提供了一组过滤器,通过这个过滤器可以在 HBase 中数据的多个维度(行、列、数据版本)上进行对数据的筛选操作,也就是说过滤器最终能够筛选的数据能够细化到具体的一个存储单元格上(由行键、列名、时间戳定位)。通常来说,通过行键、值来筛选数据的应用场景较多。需要说明的是,过滤器会极大地影响查询效率。所以,在数据量较大的数据表中,应尽量避免使用过滤器。

下面介绍一些常用的 HBase 内置过滤器的用法:

1、RowFilter:筛选出匹配的所有的行。使用 BinaryComparator 可以筛选出具有某个行键的行,或者通过改变比较运算符(下面的例子中是 CompareFilter.CompareOp.EQUAL)来筛选出符合某一条件的多条数据,如下示例就是筛选出行键为 row1 的一行数据。

// 筛选出匹配的所有的行
Filter rf = new RowFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes("row1")));

2、PrefixFilter:筛选出具有特定前缀的行键的数据。这个过滤器所实现的功能其实也可以由 RowFilter 结合 RegexComparator 来实现,不过这里提供了一种简便的使用方法,如下示例就是筛选出行键以 row 为前缀的所有的行。

// 筛选匹配行键的前缀成功的行
Filter pf = new PrefixFilter(Bytes.toBytes("row"));

3、KeyOnlyFilter:这个过滤器唯一的功能就是只返回每行的行键,值全部为空,这对于只关注于行键的应用场景来说非常合适,这样忽略掉其值就可以减少传递到客户端的数据量,能起到一定的优化作用。

 

// 返回所有的行键,但值全是空
Filter kof = new KeyOnlyFilter();

 

4、RandomRowFilter:按照一定的几率(<= 0 会过滤掉所有的行,>= 1 会包含所有的行)来返回随机的结果集,对于同样的数据集,多次使用同一个 RandomRowFilter 会返回不同的结果集,对于需要随机抽取一部分数据的应用场景,可以使用此过滤器。

// 随机选出一部分的行
Filter rrf = new RandomRowFilter((float) 0.8);

5、InclusiveStopFilter:扫描的时候,我们可以设置一个开始行键和一个终止行键,默认情况下,这个行键的返回是前闭后开区间,即包含起始行,但不包含终止行。如果我们想要同时包含起始行和终止行,那么可以使用此过滤器。

// 包含了扫描的上限在结果之内
Filter isf = new InclusiveStopFilter(Bytes.toBytes("row1"));

6、FirstKeyOnlyFilter:如果想要返回的结果集中只包含第一列的数据,那么这个过滤器能够满足要求。它在找到每行的第一列之后会停止扫描,从而使扫描的性能也得到了一定的提升。

// 筛选出每行的第一个单元格
Filter fkof = new FirstKeyOnlyFilter();

7、ColumnPrefixFilter:它按照列名的前缀来筛选单元格,如果我们想要对返回的列的前缀加以限制的话,可以使用这个过滤器。

// 筛选出前缀匹配的列
Filter cpf = new ColumnPrefixFilter(Bytes.toBytes("qual1"));

8、ValueFilter:按照具体的值来筛选单元格的过滤器,这会把一行中值不能满足的单元格过滤掉,如下面的构造器,对于每一行的一个列,如果其对应的值不包含 ROW2_QUAL1,那么这个列就不会返回给客户端。

// 筛选某个(值的条件满足的)特定的单元格
Filter vf = new ValueFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator("ROW2_QUAL1"));

9、ColumnCountGetFilter:这个过滤器在遇到一行的列数超过我们所设置的限制值的时候,结束扫描操作。

// 如果突然发现一行中的列数超过设定的最大值时,整个扫描操作会停止
Filter ccf = new ColumnCountGetFilter(2);

10、SingleColumnValueFilter:用一列的值决定这一行的数据是否被过滤,可对它的对象调用 setFilterIfMissing 方法,默认的参数是 false。其作用是,对于咱们要使用作为条件的列,如果参数为 true,这样的行将会被过滤掉,如果参数为 false,这样的行会包含在结果集中。

// 将满足条件的列所在的行过滤掉
SingleColumnValueFilter scvf = new SingleColumnValueFilter(•          Bytes.toBytes("colfam1"),   
•          Bytes.toBytes("qual2"),   
•          CompareFilter.CompareOp.NOT_EQUAL,   
•          new SubstringComparator("BOGUS"));  
scvf.setFilterIfMissing(true);

11、SingleColumnValueExcludeFilter:这个过滤器与第 10 种过滤器唯一的区别就是,作为筛选条件的列,其行不会包含在返回的结果中。

12、SkipFilter:这是一种附加过滤器,其与 ValueFilter 结合使用,如果发现一行中的某一列不符合条件,那么整行就会被过滤掉。

// 发现某一行中的一列需要过滤时,整个行就会被过滤掉
Filter skf = new SkipFilter(vf);

13、WhileMatchFilter:使用这个过滤器,当遇到不符合设定条件的数据的时候,整个扫描结束。

// 当遇到不符合过滤器 rf 设置的条件时,整个扫描结束
Filter wmf = new WhileMatchFilter(rf);

14. FilterList:可以用于综合使用多个过滤器。其有两种关系:Operator.MUST_PASS_ONE 表示关系 AND,Operator.MUST_PASS_ALL 表示关系 OR,并且 FilterList 可以嵌套使用,使得我们能够表达更多的需求。

// 综合使用多个过滤器,AND 和 OR 两种关系
List<Filter> filters = new ArrayList<Filter>();  
filters.add(rf);  
filters.add(vf);  
FilterList fl = new FilterList(FilterList.Operator.MUST_PASS_ALL,filters);

下面给出一个使用 RowFilter 过滤器的完整示例:

public class HBaseFilter {
    
    private static final String TABLE_NAME = "table1";

    public static void main(String[] args) throws IOException {
        // 设置配置
        Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum", "localhost");
        conf.set("hbase.zookeeper.property.clientPort", "2181");
        // 建立一个数据库的连接
        Connection conn = ConnectionFactory.createConnection(conf);
        // 获取表
        HTable table = (HTable) conn.getTable(TableName.valueOf(TABLE_NAME));
        // 创建一个扫描对象
        Scan scan = new Scan();
        // 创建一个 RowFilter 过滤器
        Filter filter = new RowFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes("abc")));
        // 将过滤器加入扫描对象
        scan.setFilter(filter);
        // 输出结果
        ResultScanner results = table.getScanner(scan);
        for (Result result : results) {for (Cell cell : result.rawCells()) {
                System.out.println("行键:" + new String(CellUtil.cloneRow(cell)) + "\t" +
                        "列族:" + new String(CellUtil.cloneFamily(cell)) + "\t" + 
                        "列名:" + new String(CellUtil.cloneQualifier(cell)) + "\t" + 
                        "值:" + new String(CellUtil.cloneValue(cell)) + "\t" +
                        "时间戳:" + cell.getTimestamp());
            }
        }
        // 关闭资源
        results.close();
        table.close();
        conn.close();}

}

5 HBase 与 MapReduce

我们知道,在伪分布式模式和完全分布式模式下的 HBase 是架构在 HDFS 之上的,因此完全可以将 MapReduce 编程框架和 HBase 结合起来使用。也就是说,将 HBase 作为底层存储结构,MapReduce 调用 HBase 进行特殊的处理,这样能够充分结合 HBase 分布式大型数据库和 MapReduce 并行计算的优点。

HBase 实现了 TableInputFormatBase 类,该类提供了对表数据的大部分操作,其子类 TableInputFormat 则提供了完整的实现,用于处理表数据并生成键值对。TableInputFormat 类将数据表按照 Region 分割成 split,即有多少个 Regions 就有多个 splits,然后将 Region 按行键分成 <key,value> 对,key 值对应与行键,value 值为该行所包含的数据。

HBase 实现了 MapReduce 计算框架对应的 TableMapper 类和 TableReducer 类。其中,TableMapper 类并没有具体的功能,只是将输入的 <key,value> 对的类型分别限定为 Result 和 ImmutableBytesWritable。IdentityTableMapper 类和 IdentityTableReducer 类则是上述两个类的具体实现,其和 Mapper 类和 Reducer 类一样,只是简单地将 <key,value> 对输出到下一个阶段。

HBase 实现的 TableOutputFormat 将输出的 <key,value> 对写到指定的 HBase 表中,该类不会对 WAL(Write-Ahead Log)进行操作,即如果服务器发生故障将面临丢失数据的风险。可以使用 MultipleTableOutputFormat 类解决这个问题,该类可以对是否写入 WAL 进行设置。

为了能使 Hadoop 集群上运行 HBase 程序,还需要把相关的类文件引入 Hadoop 集群上,不然会出现 ClassNotFoundException 错误。其具体方法是可在 hadoop 的环境配置文件 hadoop-env.sh 中引入 HBASE_HOME 和 HBase 的相关 jar 包,或者直接将 HBase 的 jar 包打包到应用程序文件中。

下面这个例子是将 MapReduce 和 HBase 结合起来的 WordCount 程序,它首先从指定文件中搜集数据,进行统计计算,最后将结果存储到 HBase 中:

 

package com.hbase.demo;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

public class HBaseWordCount {
    
    public static class hBaseMapper extends Mapper<LongWritable, Text, Text, IntWritable> {private final static IntWritable ONE = new IntWritable(1);
        private Text word = new Text();

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String[] words = value.toString().split(" ");
            for (String w : words) {word.set(w);
                context.write(word, ONE);
            }
        }
    }
    
    public static class hBaseReducer extends TableReducer<Text, IntWritable, NullWritable> {

        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable value : values) {sum += value.get();
            }
            
            // Put 实例化,每个词存一行
            Put put = new Put(key.getBytes());
            // 列族为 content, 列名为 count, 列值为单词的数目
            put.addColumn("content".getBytes(), "count".getBytes(), String.valueOf(sum).getBytes());
            
            context.write(NullWritable.get(), put);
        }
        
    }
    
    // 创建 HBase 数据表
    public static void createHBaseTable(String tableName) throws IOException {
        // 配置 HBse
        Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum", "localhost");
        conf.set("hbase.zookeeper.property.clientPort", "2181");
        // 建立一个数据库的连接
        Connection conn = ConnectionFactory.createConnection(conf);
        // 创建一个数据库管理员
        HBaseAdmin hAdmin = (HBaseAdmin) conn.getAdmin();
        // 判断表是否存在
        if (hAdmin.tableExists(tableName)) {System.out.println("该数据表已存在,正在重新创建");
            hAdmin.disableTable(tableName);
            hAdmin.deleteTable(tableName);
        }
        // 创建表描述
        HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(tableName));
        // 在表描述里添加列族
        tableDesc.addFamily(new HColumnDescriptor("content"));
        // 创建表
        hAdmin.createTable(tableDesc);
        System.out.println("创建" + tableName + "表成功");
    }
    
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {if (args.length != 3) {System.out.println("args error");
            System.exit(0);
        }
        
        String input = args[0];
        String jobName = args[1];
        String tableName = args[2];
        
        // 创建数据表
        HBaseWordCount.createHBaseTable(tableName);
        
        // 配置 MapReduce(或者将 hadoop 和 hbase 的相关配置文件引入项目)
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS", "localhost:9000");
       conf.set("mapred.job.tracker", "localhost:9001");
        conf.set("hbase.zookeeper.quorum", "localhost");
        conf.set("hbase.zookeeper.property.clientPort", "2181");
        conf.set(TableOutputFormat.OUTPUT_TABLE, tableName);
        
        // 配置任务
        Job job = Job.getInstance(conf, jobName);
        job.setJarByClass(HBaseWordCount.class);
        job.setMapperClass(hBaseMapper.class);
        job.setReducerClass(hBaseReducer.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TableOutputFormat.class);
        FileInputFormat.addInputPath(job, new Path(input));
        
        // 执行 MR 任务
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }

}

更多详情见请继续阅读下一页的精彩内容 :http://www.linuxidc.com/Linux/2016-08/134181p2.htm

6 HBase 的 Bulkload

HBase 可以让我们随机的、实时的访问大数据,但是怎样有效的将数据导入到 HBase 呢?HBase 有多种导入数据的方法,最直接的方法就是在 MapReduce 作业中使用 TableOutputFormat 作为输出,或者使用标准的客户端 API,但是这些都不是非常有效的方法。

如果 HDFS 中有海量数据要导入 HBase,可以先将这些数据生成 HFile 文件,然后批量导入 HBase 的数据表中,这样可以极大地提升数据导入 HBase 的效率。这就是 HBase 的 Bulkload,即利用 MapReduce 作业输出 HBase 内部数据格式的表数据,然后将生成的 StoreFiles 直接导入到集群中。与使用 HBase API 相比,使用 Bulkload 导入数据占用更少的 CPU 和网络资源。两个表之间的数据迁移也可以使用这种方法。下面给出具体示例:

 

package com.hbase.demo;

import java.io.IOException;

import org.apache.Hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.mapreduce.PutSortReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class HBaseBulk {
    
    public static class bulkMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            // 将输入数据用 tab 键分词
            String[] values = value.toString().split("\t");
            if (values.length == 2) {
                // 设置行键、列族、列名和值
                byte[] rowKey = Bytes.toBytes(values[0]);
                byte[] family = Bytes.toBytes("content");
                byte[] column = Bytes.toBytes("number");
                byte[] colValue = Bytes.toBytes(values[1]);
                // 将行键序列化作为 mapper 输出的 key
                ImmutableBytesWritable rowKeyWritable = new ImmutableBytesWritable(rowKey);
                // 将 put 对象作为 mapper 输出的 value
                Put put = new Put(rowKey);
                put.addColumn(family, column, colValue);
                context.write(rowKeyWritable, put);
            }
        }
    }
  
    @SuppressWarnings("deprecation")
    public static void main(String[] args) throws Exception {if (args.length != 3) {System.out.println("args error");
            System.exit(0);
        }
        
        String input = args[0];
        String output = args[1];
        String jobName = args[2];
        String tableName = args[3];
        
        // 配置 MapReduce(或者将 hadoop 的相关配置文件引入项目)
        Configuration hadoopConf = new Configuration();
        hadoopConf.set("fs.defaultFS", "localhost:9000");
        hadoopConf.set("mapred.job.tracker", "localhost:9001");
        Job job = Job.getInstance(hadoopConf, jobName);
        job.setJarByClass(HBaseBulk.class);
        job.setMapperClass(bulkMapper.class);
        job.setReducerClass(PutSortReducer.class);
        job.setMapOutputKeyClass(ImmutableBytesWritable.class);
        job.setMapOutputValueClass(Put.class);
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(HFileOutputFormat2.class);
        FileInputFormat.addInputPath(job, new Path(input));
        FileOutputFormat.setOutputPath(job, new Path(output));
        
        // 配置 HBase(或者将 hbase 的相关配置文件引入项目)
        Configuration hbaseConf = HBaseConfiguration.create();
        hbaseConf.set("hbase.zookeeper.quorum", "localhost");
        hbaseConf.set("hbase.zookeeper.property.clientPort", "2181");
        
        // 生成 HFile
        Connection conn = ConnectionFactory.createConnection(hbaseConf);
        HTable table = (HTable) conn.getTable(TableName.valueOf(tableName));
        HFileOutputFormat2.configureIncrementalLoad(job, table);
        
        // 执行任务
        job.waitForCompletion(true);
        
        // 将 HFile 文件导入 HBase
        LoadIncrementalHFiles loader = new LoadIncrementalHFiles(hbaseConf);
        loader.doBulkLoad(new Path(output), table);
    }
}

上述代码首先将 HDFS 中的数据文件通过 MapReduce 任务生成 HFile 文件,然后将 HFile 文件导入 HBase 数据表(该数据表已存在)。HDFS 中的数据文件和导入 HBase 后的数据表分别如下图所示:

 

Ubuntu 14.04 LTS 下 HBase 开发实例学习

 

Ubuntu 14.04 LTS 下 HBase 开发实例学习

HBase 的详细介绍 :请点这里
HBase 的下载地址 :请点这里

本文永久更新链接地址 :http://www.linuxidc.com/Linux/2016-08/134181.htm

1 开发环境

在进行 Hbase 开发前,需要安装 JDK、Hadoop 和 HBase,选择一款合适的开发 IDE,具体安装方法就不介绍了,网上有很多参考资料,这里给出我的开发环境:

操作系统:Ubuntu 14.04 LTS

Java 版本:jdk1.7.0_79

Hadoop 版本:hadoop-2.6.0-cdh5.7.1

HBase 版本:hbase-1.2.0-cdh5.7.1

Ecipse 版本:Eclipse Java EE LunaRelease

Hadoop+HBase 搭建云存储总结 PDF http://www.linuxidc.com/Linux/2013-05/83844.htm

Ubuntu Server 14.04 下 Hbase 数据库安装  http://www.linuxidc.com/Linux/2016-05/131499.htm

HBase 结点之间时间不一致造成 regionserver 启动失败 http://www.linuxidc.com/Linux/2013-06/86655.htm

Hadoop+ZooKeeper+HBase 集群配置 http://www.linuxidc.com/Linux/2013-06/86347.htm

Hadoop 集群安装 &HBase 实验环境搭建 http://www.linuxidc.com/Linux/2013-04/83560.htm

基于 Hadoop 集群的 HBase 集群的配置 http://www.linuxidc.com/Linux/2013-03/80815.htm‘

Hadoop 安装部署笔记之 -HBase 完全分布模式安装 http://www.linuxidc.com/Linux/2012-12/76947.htm

单机版搭建 HBase 环境图文教程详解 http://www.linuxidc.com/Linux/2012-10/72959.htm

使用 Maven 构建项目,在 pom.xml 中添加 hbase 的依赖如下:

 

<repositories>
        <repository>
          <id>cloudera</id>
          <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
        </repository>
    </repositories>

    <dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>3.8.1</version>
            <scope>test</scope>
        </dependency>
        <dependency>  
            <groupId>org.apache.hadoop</groupId>  
            <artifactId>hadoop-common</artifactId>  
            <version>2.6.0-cdh5.7.1</version>  
        </dependency>  
        <dependency>  
            <groupId>org.apache.hadoop</groupId>  
            <artifactId>hadoop-hdfs</artifactId>  
            <version>2.6.0-cdh5.7.1</version>  
        </dependency>
        <dependency>  
            <groupId>org.apache.hbase</groupId>  
            <artifactId>hbase-client</artifactId>  
            <version>1.2.0-cdh5.7.1</version>  
        </dependency>
		<dependency>  
            <groupId>org.apache.hbase</groupId>  
            <artifactId>hbase-server</artifactId>  
            <version>1.2.0-cdh5.7.1</version>  
        </dependency>
    </dependencies>

 

2 初始化配置

 

首先需要设置 HBase 的配置,如 ZooKeeper 的地址、端口号等等。可以通过 org.apache.hadoop.conf.Configuration.set 方法手工设置 HBase 的配置信息,也可以直接将 HBase 的 hbase-site.xml 配置文件引入项目即可。下面给出配置代码:

// 声明静态配置
    private static Configuration conf = null;
    static {conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum", "localhost");
        conf.set("hbase.zookeeper.property.clientPort", "2181");
    }

3 常见 API 的使用

HBase 的常用操作包括建表、插入表数据、删除表数据、获取一行数据、表扫描、删除列族、删除表等等,下面给出具体代码。

3.1 创建数据库表

// 创建数据库表
    public static void createTable(String tableName, String[] columnFamilys) throws IOException {
        // 建立一个数据库的连接
        Connection conn = ConnectionFactory.createConnection(conf);
        // 创建一个数据库管理员
        HBaseAdmin hAdmin = (HBaseAdmin) conn.getAdmin();
        if (hAdmin.tableExists(tableName)) {System.out.println(tableName + "表已存在");
            conn.close();
            System.exit(0);
        } else {
            // 新建一个表描述
            HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(tableName));
            // 在表描述里添加列族
            for (String columnFamily : columnFamilys) {tableDesc.addFamily(new HColumnDescriptor(columnFamily));
            }
            // 根据配置好的表描述建表
            hAdmin.createTable(tableDesc);
            System.out.println("创建" + tableName + "表成功");
        }
        conn.close();}

3.2 添加一条数据

// 添加一条数据
    public static void addRow(String tableName, String rowKey, String columnFamily, String column, String value) 
            throws IOException {
        // 建立一个数据库的连接
        Connection conn = ConnectionFactory.createConnection(conf);
        // 获取表
        HTable table = (HTable) conn.getTable(TableName.valueOf(tableName));
        // 通过 rowkey 创建一个 put 对象
        Put put = new Put(Bytes.toBytes(rowKey));
        // 在 put 对象中设置列族、列、值
        put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(value));
        // 插入数据, 可通过 put(List<Put>) 批量插入
        table.put(put);
        // 关闭资源
        table.close();
        conn.close();}

3.3 获取一条数据

// 通过 rowkey 获取一条数据
    public static void getRow(String tableName, String rowKey) throws IOException {
        // 建立一个数据库的连接
        Connection conn = ConnectionFactory.createConnection(conf);
        // 获取表
        HTable table = (HTable) conn.getTable(TableName.valueOf(tableName));
        // 通过 rowkey 创建一个 get 对象
        Get get = new Get(Bytes.toBytes(rowKey));
        // 输出结果
        Result result = table.get(get);
        for (Cell cell : result.rawCells()) {
            System.out.println("行键:" + new String(CellUtil.cloneRow(cell)) + "\t" +
                    "列族:" + new String(CellUtil.cloneFamily(cell)) + "\t" + 
                    "列名:" + new String(CellUtil.cloneQualifier(cell)) + "\t" + 
                    "值:" + new String(CellUtil.cloneValue(cell)) + "\t" +
                    "时间戳:" + cell.getTimestamp());
        }
        // 关闭资源
        table.close();
        conn.close();}

3.4 全表扫描

// 全表扫描
    public static void scanTable(String tableName) throws IOException {
        // 建立一个数据库的连接
        Connection conn = ConnectionFactory.createConnection(conf);
        // 获取表
        HTable table = (HTable) conn.getTable(TableName.valueOf(tableName));
        // 创建一个扫描对象
        Scan scan = new Scan();
        // 扫描全表输出结果
        ResultScanner results = table.getScanner(scan);
        for (Result result : results) {for (Cell cell : result.rawCells()) {
                System.out.println("行键:" + new String(CellUtil.cloneRow(cell)) + "\t" +
                        "列族:" + new String(CellUtil.cloneFamily(cell)) + "\t" + 
                        "列名:" + new String(CellUtil.cloneQualifier(cell)) + "\t" + 
                        "值:" + new String(CellUtil.cloneValue(cell)) + "\t" +
                        "时间戳:" + cell.getTimestamp());
            }
        }
        // 关闭资源
		results.close();
        table.close();
        conn.close();}

3.5 删除一条数据

// 删除一条数据
    public static void delRow(String tableName, String rowKey) throws IOException {
        // 建立一个数据库的连接
        Connection conn = ConnectionFactory.createConnection(conf);
        // 获取表
        HTable table = (HTable) conn.getTable(TableName.valueOf(tableName));
        // 删除数据
        Delete delete = new Delete(Bytes.toBytes(rowKey));
        table.delete(delete);
        // 关闭资源
        table.close();
        conn.close();}

3.6 删除多条数据

// 删除多条数据
    public static void delRows(String tableName, String[] rows) throws IOException {
        // 建立一个数据库的连接
        Connection conn = ConnectionFactory.createConnection(conf);
        // 获取表
        HTable table = (HTable) conn.getTable(TableName.valueOf(tableName));
        // 删除多条数据
        List<Delete> list = new ArrayList<Delete>();
        for (String row : rows) {Delete delete = new Delete(Bytes.toBytes(row));
            list.add(delete);
        }
        table.delete(list);
        // 关闭资源
        table.close();
        conn.close();}

3.7 删除列族

// 删除列族
    public static void delColumnFamily(String tableName, String columnFamily) throws IOException {
        // 建立一个数据库的连接
        Connection conn = ConnectionFactory.createConnection(conf);
        // 创建一个数据库管理员
        HBaseAdmin hAdmin = (HBaseAdmin) conn.getAdmin();
        // 删除一个表的指定列族
        hAdmin.deleteColumn(tableName, columnFamily);
        // 关闭资源
        conn.close();}

3.8 删除数据库表

// 删除数据库表
    public static void deleteTable(String tableName) throws IOException {
        // 建立一个数据库的连接
        Connection conn = ConnectionFactory.createConnection(conf);
        // 创建一个数据库管理员
        HBaseAdmin hAdmin = (HBaseAdmin) conn.getAdmin();
        if (hAdmin.tableExists(tableName)) {
            // 失效表
            hAdmin.disableTable(tableName);
            // 删除表
            hAdmin.deleteTable(tableName);
            System.out.println("删除" + tableName + "表成功");
            conn.close();} else {System.out.println("需要删除的" + tableName + "表不存在");
            conn.close();
            System.exit(0);
        }
    }

3.9 追加插入

// 追加插入 (将原有 value 的后面追加新的 value,如原有 value= a 追加 value=bc 则最后的 value=abc)
    public static void appendData(String tableName, String rowKey, String columnFamily, String column, String value) 
            throws IOException {
        // 建立一个数据库的连接
        Connection conn = ConnectionFactory.createConnection(conf);
        // 获取表
        HTable table = (HTable) conn.getTable(TableName.valueOf(tableName));
        // 通过 rowkey 创建一个 append 对象
        Append append = new Append(Bytes.toBytes(rowKey));
        // 在 append 对象中设置列族、列、值
        append.add(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(value));
        // 追加数据
        table.append(append);
        // 关闭资源
        table.close();
        conn.close();}

3.10 符合条件后添加数据

// 符合条件后添加数据 (只能针对某一个 rowkey 进行原子操作)
    public static boolean checkAndPut(String tableName, String rowKey, String columnFamilyCheck, String columnCheck, String valueCheck, String columnFamily, String column, String value) throws IOException {
        // 建立一个数据库的连接
        Connection conn = ConnectionFactory.createConnection(conf);
        // 获取表
        HTable table = (HTable) conn.getTable(TableName.valueOf(tableName));
        // 设置需要添加的数据
        Put put = new Put(Bytes.toBytes(rowKey));
        put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(value));
        // 当判断条件为真时添加数据
        boolean result = table.checkAndPut(Bytes.toBytes(rowKey), Bytes.toBytes(columnFamilyCheck), 
                Bytes.toBytes(columnCheck), Bytes.toBytes(valueCheck), put);
        // 关闭资源
        table.close();
        conn.close();
        
        return result;
    }

3.11 符合条件后删除数据

// 符合条件后刪除数据 (只能针对某一个 rowkey 进行原子操作)
    public static boolean checkAndDelete(String tableName, String rowKey, String columnFamilyCheck, String columnCheck, 
            String valueCheck, String columnFamily, String column) throws IOException {
        // 建立一个数据库的连接
        Connection conn = ConnectionFactory.createConnection(conf);
        // 获取表
        HTable table = (HTable) conn.getTable(TableName.valueOf(tableName));
        // 设置需要刪除的 delete 对象
        Delete delete = new Delete(Bytes.toBytes(rowKey));
        delete.addColumn(Bytes.toBytes(columnFamilyCheck), Bytes.toBytes(columnCheck));
        // 当判断条件为真时添加数据
        boolean result = table.checkAndDelete(Bytes.toBytes(rowKey), Bytes.toBytes(columnFamilyCheck), Bytes.toBytes(columnCheck), 
                Bytes.toBytes(valueCheck), delete);
        // 关闭资源
        table.close();
        conn.close();

        return result;
    }

3.12 计数器

// 计数器 (amount 为正数则计数器加,为负数则计数器减,为 0 则获取当前计数器的值)
    public static long incrementColumnValue(String tableName, String rowKey, String columnFamily, String column, long amount) 
            throws IOException {
        // 建立一个数据库的连接
        Connection conn = ConnectionFactory.createConnection(conf);
        // 获取表
        HTable table = (HTable) conn.getTable(TableName.valueOf(tableName));
        // 计数器
        long result = table.incrementColumnValue(Bytes.toBytes(rowKey), Bytes.toBytes(columnFamily), Bytes.toBytes(column), amount);
        // 关闭资源
        table.close();
        conn.close();
        
        return result;
    }

4 内置过滤器的使用

HBase 为筛选数据提供了一组过滤器,通过这个过滤器可以在 HBase 中数据的多个维度(行、列、数据版本)上进行对数据的筛选操作,也就是说过滤器最终能够筛选的数据能够细化到具体的一个存储单元格上(由行键、列名、时间戳定位)。通常来说,通过行键、值来筛选数据的应用场景较多。需要说明的是,过滤器会极大地影响查询效率。所以,在数据量较大的数据表中,应尽量避免使用过滤器。

下面介绍一些常用的 HBase 内置过滤器的用法:

1、RowFilter:筛选出匹配的所有的行。使用 BinaryComparator 可以筛选出具有某个行键的行,或者通过改变比较运算符(下面的例子中是 CompareFilter.CompareOp.EQUAL)来筛选出符合某一条件的多条数据,如下示例就是筛选出行键为 row1 的一行数据。

// 筛选出匹配的所有的行
Filter rf = new RowFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes("row1")));

2、PrefixFilter:筛选出具有特定前缀的行键的数据。这个过滤器所实现的功能其实也可以由 RowFilter 结合 RegexComparator 来实现,不过这里提供了一种简便的使用方法,如下示例就是筛选出行键以 row 为前缀的所有的行。

// 筛选匹配行键的前缀成功的行
Filter pf = new PrefixFilter(Bytes.toBytes("row"));

3、KeyOnlyFilter:这个过滤器唯一的功能就是只返回每行的行键,值全部为空,这对于只关注于行键的应用场景来说非常合适,这样忽略掉其值就可以减少传递到客户端的数据量,能起到一定的优化作用。

 

// 返回所有的行键,但值全是空
Filter kof = new KeyOnlyFilter();

 

4、RandomRowFilter:按照一定的几率(<= 0 会过滤掉所有的行,>= 1 会包含所有的行)来返回随机的结果集,对于同样的数据集,多次使用同一个 RandomRowFilter 会返回不同的结果集,对于需要随机抽取一部分数据的应用场景,可以使用此过滤器。

// 随机选出一部分的行
Filter rrf = new RandomRowFilter((float) 0.8);

5、InclusiveStopFilter:扫描的时候,我们可以设置一个开始行键和一个终止行键,默认情况下,这个行键的返回是前闭后开区间,即包含起始行,但不包含终止行。如果我们想要同时包含起始行和终止行,那么可以使用此过滤器。

// 包含了扫描的上限在结果之内
Filter isf = new InclusiveStopFilter(Bytes.toBytes("row1"));

6、FirstKeyOnlyFilter:如果想要返回的结果集中只包含第一列的数据,那么这个过滤器能够满足要求。它在找到每行的第一列之后会停止扫描,从而使扫描的性能也得到了一定的提升。

// 筛选出每行的第一个单元格
Filter fkof = new FirstKeyOnlyFilter();

7、ColumnPrefixFilter:它按照列名的前缀来筛选单元格,如果我们想要对返回的列的前缀加以限制的话,可以使用这个过滤器。

// 筛选出前缀匹配的列
Filter cpf = new ColumnPrefixFilter(Bytes.toBytes("qual1"));

8、ValueFilter:按照具体的值来筛选单元格的过滤器,这会把一行中值不能满足的单元格过滤掉,如下面的构造器,对于每一行的一个列,如果其对应的值不包含 ROW2_QUAL1,那么这个列就不会返回给客户端。

// 筛选某个(值的条件满足的)特定的单元格
Filter vf = new ValueFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator("ROW2_QUAL1"));

9、ColumnCountGetFilter:这个过滤器在遇到一行的列数超过我们所设置的限制值的时候,结束扫描操作。

// 如果突然发现一行中的列数超过设定的最大值时,整个扫描操作会停止
Filter ccf = new ColumnCountGetFilter(2);

10、SingleColumnValueFilter:用一列的值决定这一行的数据是否被过滤,可对它的对象调用 setFilterIfMissing 方法,默认的参数是 false。其作用是,对于咱们要使用作为条件的列,如果参数为 true,这样的行将会被过滤掉,如果参数为 false,这样的行会包含在结果集中。

// 将满足条件的列所在的行过滤掉
SingleColumnValueFilter scvf = new SingleColumnValueFilter(•          Bytes.toBytes("colfam1"),   
•          Bytes.toBytes("qual2"),   
•          CompareFilter.CompareOp.NOT_EQUAL,   
•          new SubstringComparator("BOGUS"));  
scvf.setFilterIfMissing(true);

11、SingleColumnValueExcludeFilter:这个过滤器与第 10 种过滤器唯一的区别就是,作为筛选条件的列,其行不会包含在返回的结果中。

12、SkipFilter:这是一种附加过滤器,其与 ValueFilter 结合使用,如果发现一行中的某一列不符合条件,那么整行就会被过滤掉。

// 发现某一行中的一列需要过滤时,整个行就会被过滤掉
Filter skf = new SkipFilter(vf);

13、WhileMatchFilter:使用这个过滤器,当遇到不符合设定条件的数据的时候,整个扫描结束。

// 当遇到不符合过滤器 rf 设置的条件时,整个扫描结束
Filter wmf = new WhileMatchFilter(rf);

14. FilterList:可以用于综合使用多个过滤器。其有两种关系:Operator.MUST_PASS_ONE 表示关系 AND,Operator.MUST_PASS_ALL 表示关系 OR,并且 FilterList 可以嵌套使用,使得我们能够表达更多的需求。

// 综合使用多个过滤器,AND 和 OR 两种关系
List<Filter> filters = new ArrayList<Filter>();  
filters.add(rf);  
filters.add(vf);  
FilterList fl = new FilterList(FilterList.Operator.MUST_PASS_ALL,filters);

下面给出一个使用 RowFilter 过滤器的完整示例:

public class HBaseFilter {
    
    private static final String TABLE_NAME = "table1";

    public static void main(String[] args) throws IOException {
        // 设置配置
        Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum", "localhost");
        conf.set("hbase.zookeeper.property.clientPort", "2181");
        // 建立一个数据库的连接
        Connection conn = ConnectionFactory.createConnection(conf);
        // 获取表
        HTable table = (HTable) conn.getTable(TableName.valueOf(TABLE_NAME));
        // 创建一个扫描对象
        Scan scan = new Scan();
        // 创建一个 RowFilter 过滤器
        Filter filter = new RowFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes("abc")));
        // 将过滤器加入扫描对象
        scan.setFilter(filter);
        // 输出结果
        ResultScanner results = table.getScanner(scan);
        for (Result result : results) {for (Cell cell : result.rawCells()) {
                System.out.println("行键:" + new String(CellUtil.cloneRow(cell)) + "\t" +
                        "列族:" + new String(CellUtil.cloneFamily(cell)) + "\t" + 
                        "列名:" + new String(CellUtil.cloneQualifier(cell)) + "\t" + 
                        "值:" + new String(CellUtil.cloneValue(cell)) + "\t" +
                        "时间戳:" + cell.getTimestamp());
            }
        }
        // 关闭资源
        results.close();
        table.close();
        conn.close();}

}

5 HBase 与 MapReduce

我们知道,在伪分布式模式和完全分布式模式下的 HBase 是架构在 HDFS 之上的,因此完全可以将 MapReduce 编程框架和 HBase 结合起来使用。也就是说,将 HBase 作为底层存储结构,MapReduce 调用 HBase 进行特殊的处理,这样能够充分结合 HBase 分布式大型数据库和 MapReduce 并行计算的优点。

HBase 实现了 TableInputFormatBase 类,该类提供了对表数据的大部分操作,其子类 TableInputFormat 则提供了完整的实现,用于处理表数据并生成键值对。TableInputFormat 类将数据表按照 Region 分割成 split,即有多少个 Regions 就有多个 splits,然后将 Region 按行键分成 <key,value> 对,key 值对应与行键,value 值为该行所包含的数据。

HBase 实现了 MapReduce 计算框架对应的 TableMapper 类和 TableReducer 类。其中,TableMapper 类并没有具体的功能,只是将输入的 <key,value> 对的类型分别限定为 Result 和 ImmutableBytesWritable。IdentityTableMapper 类和 IdentityTableReducer 类则是上述两个类的具体实现,其和 Mapper 类和 Reducer 类一样,只是简单地将 <key,value> 对输出到下一个阶段。

HBase 实现的 TableOutputFormat 将输出的 <key,value> 对写到指定的 HBase 表中,该类不会对 WAL(Write-Ahead Log)进行操作,即如果服务器发生故障将面临丢失数据的风险。可以使用 MultipleTableOutputFormat 类解决这个问题,该类可以对是否写入 WAL 进行设置。

为了能使 Hadoop 集群上运行 HBase 程序,还需要把相关的类文件引入 Hadoop 集群上,不然会出现 ClassNotFoundException 错误。其具体方法是可在 hadoop 的环境配置文件 hadoop-env.sh 中引入 HBASE_HOME 和 HBase 的相关 jar 包,或者直接将 HBase 的 jar 包打包到应用程序文件中。

下面这个例子是将 MapReduce 和 HBase 结合起来的 WordCount 程序,它首先从指定文件中搜集数据,进行统计计算,最后将结果存储到 HBase 中:

 

package com.hbase.demo;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

public class HBaseWordCount {
    
    public static class hBaseMapper extends Mapper<LongWritable, Text, Text, IntWritable> {private final static IntWritable ONE = new IntWritable(1);
        private Text word = new Text();

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String[] words = value.toString().split(" ");
            for (String w : words) {word.set(w);
                context.write(word, ONE);
            }
        }
    }
    
    public static class hBaseReducer extends TableReducer<Text, IntWritable, NullWritable> {

        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable value : values) {sum += value.get();
            }
            
            // Put 实例化,每个词存一行
            Put put = new Put(key.getBytes());
            // 列族为 content, 列名为 count, 列值为单词的数目
            put.addColumn("content".getBytes(), "count".getBytes(), String.valueOf(sum).getBytes());
            
            context.write(NullWritable.get(), put);
        }
        
    }
    
    // 创建 HBase 数据表
    public static void createHBaseTable(String tableName) throws IOException {
        // 配置 HBse
        Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum", "localhost");
        conf.set("hbase.zookeeper.property.clientPort", "2181");
        // 建立一个数据库的连接
        Connection conn = ConnectionFactory.createConnection(conf);
        // 创建一个数据库管理员
        HBaseAdmin hAdmin = (HBaseAdmin) conn.getAdmin();
        // 判断表是否存在
        if (hAdmin.tableExists(tableName)) {System.out.println("该数据表已存在,正在重新创建");
            hAdmin.disableTable(tableName);
            hAdmin.deleteTable(tableName);
        }
        // 创建表描述
        HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(tableName));
        // 在表描述里添加列族
        tableDesc.addFamily(new HColumnDescriptor("content"));
        // 创建表
        hAdmin.createTable(tableDesc);
        System.out.println("创建" + tableName + "表成功");
    }
    
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {if (args.length != 3) {System.out.println("args error");
            System.exit(0);
        }
        
        String input = args[0];
        String jobName = args[1];
        String tableName = args[2];
        
        // 创建数据表
        HBaseWordCount.createHBaseTable(tableName);
        
        // 配置 MapReduce(或者将 hadoop 和 hbase 的相关配置文件引入项目)
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS", "localhost:9000");
       conf.set("mapred.job.tracker", "localhost:9001");
        conf.set("hbase.zookeeper.quorum", "localhost");
        conf.set("hbase.zookeeper.property.clientPort", "2181");
        conf.set(TableOutputFormat.OUTPUT_TABLE, tableName);
        
        // 配置任务
        Job job = Job.getInstance(conf, jobName);
        job.setJarByClass(HBaseWordCount.class);
        job.setMapperClass(hBaseMapper.class);
        job.setReducerClass(hBaseReducer.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TableOutputFormat.class);
        FileInputFormat.addInputPath(job, new Path(input));
        
        // 执行 MR 任务
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }

}

更多详情见请继续阅读下一页的精彩内容 :http://www.linuxidc.com/Linux/2016-08/134181p2.htm

正文完
星哥玩云-微信公众号
post-qrcode
 0
星锅
版权声明:本站原创文章,由 星锅 于2022-01-22发表,共计37272字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
【腾讯云】推广者专属福利,新客户无门槛领取总价值高达2860元代金券,每种代金券限量500张,先到先得。
阿里云-最新活动爆款每日限量供应
评论(没有评论)
验证码
【腾讯云】云服务器、云数据库、COS、CDN、短信等云产品特惠热卖中

星哥玩云

星哥玩云
星哥玩云
分享互联网知识
用户数
4
文章数
19350
评论数
4
阅读量
7896361
文章搜索
热门文章
开发者必备神器:阿里云 Qoder CLI 全面解析与上手指南

开发者必备神器:阿里云 Qoder CLI 全面解析与上手指南

开发者必备神器:阿里云 Qoder CLI 全面解析与上手指南 大家好,我是星哥。之前介绍了腾讯云的 Code...
星哥带你玩飞牛NAS-6:抖音视频同步工具,视频下载自动下载保存

星哥带你玩飞牛NAS-6:抖音视频同步工具,视频下载自动下载保存

星哥带你玩飞牛 NAS-6:抖音视频同步工具,视频下载自动下载保存 前言 各位玩 NAS 的朋友好,我是星哥!...
星哥带你玩飞牛NAS-3:安装飞牛NAS后的很有必要的操作

星哥带你玩飞牛NAS-3:安装飞牛NAS后的很有必要的操作

星哥带你玩飞牛 NAS-3:安装飞牛 NAS 后的很有必要的操作 前言 如果你已经有了飞牛 NAS 系统,之前...
我把用了20年的360安全卫士卸载了

我把用了20年的360安全卫士卸载了

我把用了 20 年的 360 安全卫士卸载了 是的,正如标题你看到的。 原因 偷摸安装自家的软件 莫名其妙安装...
再见zabbix!轻量级自建服务器监控神器在Linux 的完整部署指南

再见zabbix!轻量级自建服务器监控神器在Linux 的完整部署指南

再见 zabbix!轻量级自建服务器监控神器在 Linux 的完整部署指南 在日常运维中,服务器监控是绕不开的...
阿里云CDN
阿里云CDN-提高用户访问的响应速度和成功率
随机文章
150元打造低成本NAS小钢炮,捡一块3865U工控板

150元打造低成本NAS小钢炮,捡一块3865U工控板

150 元打造低成本 NAS 小钢炮,捡一块 3865U 工控板 一块二手的熊猫 B3 工控板 3865U,搭...
免费无广告!这款跨平台AI RSS阅读器,拯救你的信息焦虑

免费无广告!这款跨平台AI RSS阅读器,拯救你的信息焦虑

  免费无广告!这款跨平台 AI RSS 阅读器,拯救你的信息焦虑 在算法推荐主导信息流的时代,我们...
如何免费使用强大的Nano Banana Pro?附赠邪修的用法

如何免费使用强大的Nano Banana Pro?附赠邪修的用法

如何免费使用强大的 Nano Banana Pro?附赠邪修的用法 前言 大家好,我是星哥,今天来介绍谷歌的 ...
免费领取huggingface的2核16G云服务器,超简单教程

免费领取huggingface的2核16G云服务器,超简单教程

免费领取 huggingface 的 2 核 16G 云服务器,超简单教程 前言 HuggingFace.co...
240 元左右!五盘位 NAS主机,7 代U硬解4K稳如狗,拓展性碾压同价位

240 元左右!五盘位 NAS主机,7 代U硬解4K稳如狗,拓展性碾压同价位

  240 元左右!五盘位 NAS 主机,7 代 U 硬解 4K 稳如狗,拓展性碾压同价位 在 NA...

免费图片视频管理工具让灵感库告别混乱

一言一句话
-「
手气不错
开发者福利:免费 .frii.site 子域名,一分钟申请即用

开发者福利:免费 .frii.site 子域名,一分钟申请即用

  开发者福利:免费 .frii.site 子域名,一分钟申请即用 前言 在学习 Web 开发、部署...
安装Black群晖DSM7.2系统安装教程(在Vmware虚拟机中、实体机均可)!

安装Black群晖DSM7.2系统安装教程(在Vmware虚拟机中、实体机均可)!

安装 Black 群晖 DSM7.2 系统安装教程(在 Vmware 虚拟机中、实体机均可)! 前言 大家好,...
星哥带你玩飞牛NAS硬件 01:捡垃圾的最爱双盘,暴风二期矿渣为何成不老神话?

星哥带你玩飞牛NAS硬件 01:捡垃圾的最爱双盘,暴风二期矿渣为何成不老神话?

星哥带你玩飞牛 NAS 硬件 01:捡垃圾的最爱双盘,暴风二期矿渣为何成不老神话? 前言 在选择 NAS 用预...
支付宝、淘宝、闲鱼又双叕崩了,Cloudflare也瘫了连监控都挂,根因藏在哪?

支付宝、淘宝、闲鱼又双叕崩了,Cloudflare也瘫了连监控都挂,根因藏在哪?

支付宝、淘宝、闲鱼又双叕崩了,Cloudflare 也瘫了连监控都挂,根因藏在哪? 最近两天的互联网堪称“故障...
浏览器自动化工具!开源 AI 浏览器助手让你效率翻倍

浏览器自动化工具!开源 AI 浏览器助手让你效率翻倍

浏览器自动化工具!开源 AI 浏览器助手让你效率翻倍 前言 在 AI 自动化快速发展的当下,浏览器早已不再只是...