阿里云-云小站(无限量代金券发放中)
【腾讯云】云服务器、云数据库、COS、CDN、短信等热卖云产品特惠抢购

Datax与Hadoop2.x兼容部署与实际项目应用工作记录分享

144次阅读
没有评论

共计 15055 个字符,预计需要花费 38 分钟才能阅读完成。

一、概述

Hadoop 的版本更新挺快的,已经到了 2.4,但是其周边工具的更新速度还是比较慢的,一些旧的周边工具版本对 hadoop2.x 的兼容性做得还不完善,特别是 sqoop。最近,在为 hadoop2.2.0 找适合的 sqoop 版本时遇到了很多问题。尝试了多个 sqoop1.4.x 版本的直接简单粗暴的报版本不兼容问题,其中测了 sqoop-1.4.4.bin__hadoop-0.23 这个版本,在该版本中直接用 sqoop 的脚本 export HDFS 的数据是没有问题的,但是一旦调用 JAVA API 来进行对 HDFS 的数据的 export 的时候就各种不兼容问题,原因是这个版本的 API 也是基于 hadoop1.x 来写的。另外还尝试了使用 sqoop2(之前 blog 写过关于 sqoop2 的部署和使用情况:http://www.linuxidc.com/Linux/2014-10/107935.htm),这个版本取消了 sqoop1 的脚本执行方式,可以采取交互式、api 或者 rest 的方式工作,但是我在使用的过程中还是存在的一些问题:sqoop2(我用的是 1.99.3)无法指定列的分隔符、对 \N 等字符的处理有问题、对列值的类型判断存在问题等(其详细问题所在请看,sqoop1.99.3 源代码的 org.apache.sqoop.job.io.Data 类)。

这个礼拜终于找到了一个比较好的方案来取代 sqoop 作为 HDFS 到 mysql 的数据 export 模块,那就是大淘宝开源的 datax。虽然 datax 采用的是单机方式的作业方式,但是经过试验我对比了一下其和 sqoop 性能上的差异,在数据量不是特别大的情况下 datax 和 sqoop 的性能相差不是很明显的,在少量数据的情况下 datax 的性能稍微好点。

这篇 blog 将简单介绍一下这个 datax 这个框架以及它的用法,特别地说说如果修改 datax 才能使得 datax 运行在 hadoop2.x 上(datax 是基于 hadoop1.x 进行开发的)。另外,主要和大家分享一下我在自己项目中如何使用 datax,如何通过自己编写的 shell 脚本将 datax、mysql 和项目粘合起来。

二、datax 简介和 datax 在 hadoop2.x 上的兼容部署

1、datax 简介

DataX 是一个在异构的数据库 / 文件系统之间高速交换数据的工具,实现了在任意的数据处理系统(RDBMS/Hdfs/Local filesystem)之间的数据交换。Datax 框架中我最欣赏的就是基于插件的模式,你在部署的时候可以只安装那些用到的 Reader/Writer 插件 rpm 包,没有用的可以不用安装。同时,你也可以根据自己的特殊需求很快的写出 Reader、Writer。Datax 采用 Framework + plugin 架构构建,Framework 处理了缓冲,流控,并发,上下文加载等高速数据交换的大部分技术问题,提供了简单的接口与插件交互,插件仅需实现对数据处理系统的访问。Datax 的运行方式采用 stand-alone 方式,在数据传输过程在单进程内完成,全内存操作,不读写磁盘,也没有 IPC 通信。下面是一个来自大淘宝开源官网的 datax 架构图:

Datax 与 Hadoop2.x 兼容部署与实际项目应用工作记录分享

各个组件的作用:

 

  • Job: 一道数据同步作业

  • Splitter: 作业切分模块,将一个大任务与分解成多个可以并发的小任务.

  • Sub-job:数据同步作业切分后的小任务

  • Reader(Loader): 数据读入模块,负责运行切分后的小任务,将数据从源头装载入 DataX

  • Storage: Reader 和 Writer 通过 Storage 交换数据

  • Writer(Dumper): 数据写出模块,负责将数据从 DataX 导入至目的数据地 

Datax 内置插件:
    DataX 框架内部通过双缓冲队列、线程池封装等技术,集中处理了高速数据交换遇到的问题,提供简单的接口与插件交互,插件分为 Reader 和 Writer 两类,基于框架提供的插件接口,可以十分便捷的开发出需要的插件。比如想要从 Oracle 导出数据到 mysql,那么需要做的就是开发出 OracleReader 和 MysqlWriter 插件,装配到框架上即可。并且这样的插件一般情况下在其他数据交换场合是可以通用的。更大的惊喜是我们已经开发了如下插件:

Reader 插件

  • hdfsreader : 支持从 hdfs 文件系统获取数据。

  • mysqlreader: 支持从 mysql 数据库获取数据。

  • sqlserverreader: 支持从 sqlserver 数据库获取数据。

  • oraclereader : 支持从 oracle 数据库获取数据。

  • streamreader: 支持从 stream 流获取数据(常用于测试)

  • httpreader : 支持从 http URL 获取数据。

Writer 插件

  • hdfswriter:支持向 hdbf 写入数据。

  • mysqlwriter:支持向 mysql 写入数据。

  • oraclewriter:支持向 oracle 写入数据。

  • streamwriter:支持向 stream 流写入数据。(常用于测试)

     

 

————————————– 分割线 ————————————–

 

Ubuntu 13.04 上搭建 Hadoop 环境 http://www.linuxidc.com/Linux/2013-06/86106.htm

 

Ubuntu 12.10 +Hadoop 1.2.1 版本集群配置 http://www.linuxidc.com/Linux/2013-09/90600.htm

 

Ubuntu 上搭建 Hadoop 环境(单机模式 + 伪分布模式)http://www.linuxidc.com/Linux/2013-01/77681.htm

 

Ubuntu 下 Hadoop 环境的配置 http://www.linuxidc.com/Linux/2012-11/74539.htm

 

单机版搭建 Hadoop 环境图文教程详解 http://www.linuxidc.com/Linux/2012-02/53927.htm

 

Hadoop 中 HDFS 和 MapReduce 节点基本简介 http://www.linuxidc.com/Linux/2013-09/89653.htm

 

《Hadoop 实战》中文版 + 英文文字版 + 源码【PDF】http://www.linuxidc.com/Linux/2012-10/71901.htm

 

Hadoop: The Definitive Guide【PDF 版】http://www.linuxidc.com/Linux/2012-01/51182.htm

 

————————————– 分割线 ————————————–

 

2、datax 在 hadoop2.x 上的兼容部署

    Datax 是基于 Hadoop1.x 开发的,因此要想基于 HADOOP2.x 使用 hdfsreader 和 hdfswriter 插件,那么必须对这些插件的本地库以及一些 jar 包替换掉,同时要增加 Hadoop2.x 所需的依赖包,下面以 hdfsreader 为例说明:

Datax 与 Hadoop2.x 兼容部署与实际项目应用工作记录分享

进入到 plugins 目录找到 hdfsreader,将 hadoop-0.19.2-core.jar 删除,将本地库替换为 $HAOOP_HOME2.x/lib/native/libhadoop.so。同时添加 Hadoop2.x 的依赖包,如下图:

Datax 与 Hadoop2.x 兼容部署与实际项目应用工作记录分享

另外,Datax 需要 hadoop1.x 的 hadoop-core.xml 配置文件,但是 hadoop2.x 中不存在这个文件,这里有一个解决方法,就是将各个配置文件的配置项都集中写到一个新建的配置文件中,单独有 datax 使用,这个配置文件在 datax 的 job xml 文件由参数 hadoop-conf 配上。到现在为止,datax 与 hadoop2.x 的兼容性修改已经完成了。

    还要做其他环境的调整,确保 java 版本 >=1.6,python 的版本 >=2.6(对于 python 的版本选择上,个人推荐 2.6 或者 2.7,如果 pytyon 版本上到 3.x 的话会有错误,个人经验)。最后修改一下各个插件的 rpm 包的 build 路径:

Datax 与 Hadoop2.x 兼容部署与实际项目应用工作记录分享

下面以 t_dp_datax_engine.spec 为例子:

Datax 与 Hadoop2.x 兼容部署与实际项目应用工作记录分享

上面红色方框的地方是指 build rpm 插件后新产生的文件夹位置,改为自己编辑的目录。

下面以 t_dp_datax_engine.spec 为例子,看看���么 build rpm 插件:

具体执行过程如下:

1、请先 check out 一份 DataX 源码,并 cd 切换到 DataX 源码中的 rpm 目录

2、编译打包 DataX engine 包,使用 rpmbuild –ba t_dp_datax_engine.spec(请确保有 root 权限),打包生成的 rpm 后如下图所示

 Datax 与 Hadoop2.x 兼容部署与实际项目应用工作记录分享

Rpm 制作完成后,即可分发、安装,例如使用

rpm -ivh  t_dp_datax_engine.rpm

即可安装 DataX engine 包,需要注意的是 engine 的 rpm 地址源自于上图的截图中信息。

如下图:

 Datax 与 Hadoop2.x 兼容部署与实际项目应用工作记录分享

安装完成后,在 /home/taobao/datax/ 目录下会存在如下文件:

Datax 与 Hadoop2.x 兼容部署与实际项目应用工作记录分享

其他的插件按照这种方式按照好就 ok 了。 

更多详情见请继续阅读下一页的精彩内容:http://www.linuxidc.com/Linux/2014-10/107936p2.htm

三、datax 的实际应用记录分享

    在 blog 的这部分主要分享一下我对 datax 使用的一个小案例,希望能够给初用 datax 的同学一点点参照。

  • 具体业务场景:

    需要将存储在 HDFS 上的一些表 export 到 mysql 中,不希望 datax 对每一个表的 export 操作都产生一个 job xml 文件,希望对不同的表动态使用同一个 job xml 文件(这个用 datax 配置文件动态参数结合 shell 实现)。同时,根据公司业务的需求当不同的 HDFS 表 export 到 mysql 的前后还需要做一些基于 mysql 的 DML 操作(这个可以通过 datax 配置文件中的 pre 以及 post 参数进行配置,但是我为了方便流程的控制用 shell 取代了)。

  • 实现步骤:

     

步骤 1:

执行 $DATAX_HOME/bin/datax.py - e 命令,选择 data source 来源,这里我们选择 7:

Datax 与 Hadoop2.x 兼容部署与实际项目应用工作记录分享

接着选择 export 的目标源, 这个我们选择 0:

Datax 与 Hadoop2.x 兼容部署与实际项目应用工作记录分享

步骤 2:

根据自己的业务需求和 Hadoop 的相应环境配置产生的 job xml,进入到$DATAX_HOME/jobs,编辑 job 配置文件,我的配置如下(里边的一些动态参数有下面我自己写的 Shell 中进行控制):

<?xml version=”1.0″ encoding=”UTF-8″?>
 
<jobs>
  <job id=”hdfsreader_to_mysqlwriter_job”>
    <reader>
      <plugin>hdfsreader</plugin>
      <!–
description:HDFS login account, e.g. ‘username, groupname(groupname…),#password
mandatory:true
name:ugi
–>
      <param key=”hadoop.job.ugi” value=”hadoop,supergroup#jpkjcluster”/>
      <!–
description:hadoop-site.xml path
mandatory:false
name:hadoop_conf
–>
      <param key=”hadoop_conf” value=”/data/hadoop/hadoop-2.2.0/etc/hadoop/datax_hadoop_conf.xml”/>
      <!–
description:hdfs path, format like: hdfs://ip:port/path, or file:///home/taobao/
mandatory:true
name:dir
–>
      <param key=”dir” value=”hdfs://172.16.8.1:8020/user/hive/warehouse/jl.db/${hdfs_table}/day=${export_day}”/>
      <!–
default:\t
description:how to sperate a line
mandatory:false
name:fieldSplit
–>
      <param key=”field_split” value=”,”/>
      <!–
default:UTF-8
range:UTF-8|GBK|GB2312
description:hdfs encode
mandatory:false
name:encoding
–>
      <param key=”encoding” value=”UTF-8″/>
      <!–
default:4096
range:[1024-4194304]
description:how large the buffer
mandatory:false
name:bufferSize
–>
      <param key=”buffer_size” value=”4096″/>
      <!–
default:\N
range:
description:replace the nullstring to null
mandatory:false
name:nullString
–>
      <param key=”nullstring” value=”\N”/>
      <!–
default:true
range:true|false
description:ingore key
mandatory:false
name:ignoreKey
–>
      <param key=”ignore_key” value=”true”/>
      <!–
default:
range:
description:how to filter column
mandatory:false
name:colFilter
      <param key=”col_filter” value=”?”/>
–>
      <!–
default:1
range:1-100
description:concurrency of the job
mandatory:false
name:concurrency
–>
      <param key=”concurrency” value=”${reader_concurrency}”/>
    </reader>
    <writer>
      <plugin>mysqlwriter</plugin>
      <!–
description:Mysql database ip address
mandatory:true
name:ip
–>
      <param key=”ip” value=”jl-master”/>
      <!–
default:3306
description:Mysql database port
mandatory:true
name:port
–>
      <param key=”port” value=”3306″/>
      <!–
description:Mysql database name
mandatory:true
name:dbname
–>
      <param key=”dbname” value=”newidigg_jilin”/>
      <!–
description:Mysql database login username
mandatory:true
name:username
–>
<param key=”username” value=”hadoop”/>
      <!–
description:Mysql database login password
mandatory:true
name:password
–>
      <param key=”password” value=”jpkjcluster”/>
      <!–
default:
range:
description:table to be dumped data into
mandatory:true
name:table
–>
      <param key=”table” value=”${mysql_table}”/>
      <!–
range:
description:order of columns
mandatory:false
name:colorder
      <param key=”colorder” value=”?”/>
–>
      <!–
default:UTF-8
range:UTF-8|GBK|GB2312
description:
mandatory:false
name:encoding
–>
      <param key=”encoding” value=”UTF-8″/>
      <!–
description:execute sql before dumping data
mandatory:false
name:pre
      <param key=”pre” value=”${preSql}”/>
–>
  <!–
description:execute sql after dumping data
mandatory:false
name:post
      <param key=”post” value=”${postSql}”/>
–>
      <!–
default:0
range:[0-65535]
description:error limit
mandatory:false
name:limit
–>
      <param key=”limit” value=”0″/>
      <!–
mandatory:false
name:set
      <param key=”set” value=”?”/>
–>
      <!–
default:false
range:[true/false]
mandatory:false
name:replace
–>
      <param key=”replace” value=”false”/>
      <!–
range:params1|params2|…
description:mysql driver params
mandatory:false
name:mysql.params
      <param key=”mysql.params” value=”?”/>
–>
      <!–
default:1
range:1-100
description:concurrency of the job
mandatory:false
–>
      <param key=”concurrency” value=”${writer_concurrency}”/>
    </writer>
  </job>
</jobs>

步骤 3:

编写 Shell 脚本 export_hdfs2mysql.sh 对整个 Datax 作业根据业务需求进行控制:

#!/bin/bash
#author: 曾昭正
#create time:2014-08-14
workspace=`dirname $0`
dataxHome=’/data/hadoop/datax’
 
export_day=$1
reader_concurrency=1
writer_concurrency=1
mysqlUser=’hadoop’
mysqlPassword=’jpkjcluster’
mysqlServerHost=’jl-master’
currentDatabase=’newidigg_jilin’
preSql=”
postSql=”
 
importTable=(‘tb_userview_domain_noMdn’ ‘tb_fact_app_v2’ ‘tb_fact_domain’ ‘tb_fact_tag’ ‘tb_fact_top5_www’ ‘tb_fact_upwww_time’ \
‘tb_fact_search’ ‘tb_userview_domain’ ‘tb_userview_kpi_order’ ‘tb_userview_search’ ‘tb_userview_time’ ‘tb_userview_tag’);
 
#function which is used to DDL or DML msyql
function mysqlController(){
    #这里注意一下: 这里的 $1 不同于整个脚本的参数 $1,这里是指函数的第一个参数
    local sqlString=$*
    echo `date +%Y-%m-%d” “%H:%M:%S` “ 执行:${sqlString}”
    mysql -u ${mysqlUser} –password=${mysqlPassword} -h ${mysqlServerHost} -e “
        use ${currentDatabase};
        ${sqlString};
    “
}
 
# 通用表导入模块
function commonImport(){
    local current_table=$1
    #create temporary table before importing data into mysql.
    echo `date +%Y-%m-%d” “%H:%M:%S` “…… 进入处理 ${current_table} 表入 mysql 库环节 ……”
    echo `date +%Y-%m-%d” “%H:%M:%S` “ 入库前创建临时表 ”
    preSql=”drop table if exists ${current_table}_${export_day};create table ${current_table}_${export_day} like ${current_table}”
    mysqlController ${preSql}
   
    #import data from hdfs into msyql.
    echo `date +%Y-%m-%d” “%H:%M:%S` “ 将 hdfs 的 ${current_table}表导入 mysql….”
    #调整 mysql 的导入线程数
    writer_concurrency=2
    #调用 Datax 将 hdfs 文件导入 mysql
    python ${dataxHome}/bin/datax.py ${dataxHome}/jobs/hdfsreader_to_mysqlwriter_1407525566122.xml -p”-Dhdfs_table=${current_table} -Dexport_day=${export_day} -Dreader_concurrency=${reader_concurrency} -Dwriter_concurrency=${writer_concurrency} -Dmysql_table=${current_table}_${export_day}”
   
    #Updata Data Relationship after importing data into mysql.
    if [${current_table} == “tb_userview_search” ]
        then
          postSql=”drop table if exists ${current_table}; \
        rename table ${current_table}_${export_day} to ${current_table}; \
        CREATE INDEX mdn_index ON ${current_table}(mdn);
          “
    else
        postSql=”drop table if exists ${current_table}; \
        rename table ${current_table}_${export_day} to ${current_table}; \
        Alter table ${current_table} add primary key(mdn);
          “
    fi
    mysqlController ${postSql}
    echo `date +%Y-%m-%d” “%H:%M:%S` “…… 完成处理 ${current_table} 表入 mysql 操作 ……”
}
 
for tableItem in ${importTable[*]}
do
 
if [${tableItem} == “tb_userview_domain” -o ${tableItem} == “tb_userview_kpi_order” -o ${tableItem} == “tb_userview_search” -o ${tableItem} == “tb_userview_time” -o ${tableItem} == “tb_userview_tag” ]
  then
    commonImport ${tableItem}
else
    #delete dirty data
    preSql=”delete from ${tableItem} where day_id=${export_day};”
    mysqlController ${preSql}
    #调用 Datax 将 hdfs 文件导入 mysql
    python ${dataxHome}/bin/datax.py ${dataxHome}/jobs/hdfsreader_to_mysqlwriter_1407525566122.xml -p”-Dhdfs_table=${tableItem} -Dexport_day=${export_day} -Dreader_concurrency=${reader_concurrency} -Dwriter_concurrency=${writer_concurrency} -Dmysql_table=${tableItem}”
 
# >> ${workspace}/../logs/exportData.log
 
fi
done

简单说说我这个 shell 脚本的用途,主要是对 datax 中的 job 配置文件的动态参数进行控制。另外,根据公司业务的不同需求,这十几个需要导入 mysql 的表其中有些表在导入之前和导入之后需要做不同的完善工作,这个通过这 shell 来控制。对于这个 Shell 脚本我是花了点时间进行重构的,功能点还是比较清晰、简洁的。

步骤 4:

执行 Datax,Hadoop2.x 脚本:nohup ./export_hdfs2mysql.sh 20140815 >> ./../idigg_task/logs/export.log &  大功告成。

三、总结

本 blog 主要介绍了 datax 框架、对它的部署、与 hadoop2.x 的兼容性修改和结合我的个人开发案例说了下 datax 的实际使用。整个 Datax 的部署和使用过程还是比较方便的,其效率也是相当不错,而且性能是可控的(通过 job 配置文件配置读、写线程数)。在大多数情况下,datax 和 sqoop 的性能上可以作为互补,是一个相当不错的产品。另外,说说 Shell。Shell 是我个人最喜欢的一种威武工具,它不仅具有天然的操作系统原生优势,同时它具有强大的粘合作用,可以将各种技术非常完美的粘合在一个项目之中。熟练的掌握 Shell 的编写,可以使一个开发者的战斗力上升几个等级,这个是我在实际工作中总结出来的绝对的真理。

更多 Hadoop 相关信息见Hadoop 专题页面 http://www.linuxidc.com/topicnews.aspx?tid=13

一、概述

Hadoop 的版本更新挺快的,已经到了 2.4,但是其周边工具的更新速度还是比较慢的,一些旧的周边工具版本对 hadoop2.x 的兼容性做得还不完善,特别是 sqoop。最近,在为 hadoop2.2.0 找适合的 sqoop 版本时遇到了很多问题。尝试了多个 sqoop1.4.x 版本的直接简单粗暴的报版本不兼容问题,其中测了 sqoop-1.4.4.bin__hadoop-0.23 这个版本,在该版本中直接用 sqoop 的脚本 export HDFS 的数据是没有问题的,但是一旦调用 JAVA API 来进行对 HDFS 的数据的 export 的时候就各种不兼容问题,原因是这个版本的 API 也是基于 hadoop1.x 来写的。另外还尝试了使用 sqoop2(之前 blog 写过关于 sqoop2 的部署和使用情况:http://www.linuxidc.com/Linux/2014-10/107935.htm),这个版本取消了 sqoop1 的脚本执行方式,可以采取交互式、api 或者 rest 的方式工作,但是我在使用的过程中还是存在的一些问题:sqoop2(我用的是 1.99.3)无法指定列的分隔符、对 \N 等字符的处理有问题、对列值的类型判断存在问题等(其详细问题所在请看,sqoop1.99.3 源代码的 org.apache.sqoop.job.io.Data 类)。

这个礼拜终于找到了一个比较好的方案来取代 sqoop 作为 HDFS 到 mysql 的数据 export 模块,那就是大淘宝开源的 datax。虽然 datax 采用的是单机方式的作业方式,但是经过试验我对比了一下其和 sqoop 性能上的差异,在数据量不是特别大的情况下 datax 和 sqoop 的性能相差不是很明显的,在少量数据的情况下 datax 的性能稍微好点。

这篇 blog 将简单介绍一下这个 datax 这个框架以及它的用法,特别地说说如果修改 datax 才能使得 datax 运行在 hadoop2.x 上(datax 是基于 hadoop1.x 进行开发的)。另外,主要和大家分享一下我在自己项目中如何使用 datax,如何通过自己编写的 shell 脚本将 datax、mysql 和项目粘合起来。

二、datax 简介和 datax 在 hadoop2.x 上的兼容部署

1、datax 简介

DataX 是一个在异构的数据库 / 文件系统之间高速交换数据的工具,实现了在任意的数据处理系统(RDBMS/Hdfs/Local filesystem)之间的数据交换。Datax 框架中我最欣赏的就是基于插件的模式,你在部署的时候可以只安装那些用到的 Reader/Writer 插件 rpm 包,没有用的可以不用安装。同时,你也可以根据自己的特殊需求很快的写出 Reader、Writer。Datax 采用 Framework + plugin 架构构建,Framework 处理了缓冲,流控,并发,上下文加载等高速数据交换的大部分技术问题,提供了简单的接口与插件交互,插件仅需实现对数据处理系统的访问。Datax 的运行方式采用 stand-alone 方式,在数据传输过程在单进程内完成,全内存操作,不读写磁盘,也没有 IPC 通信。下面是一个来自大淘宝开源官网的 datax 架构图:

Datax 与 Hadoop2.x 兼容部署与实际项目应用工作记录分享

各个组件的作用:

 

  • Job: 一道数据同步作业

  • Splitter: 作业切分模块,将一个大任务与分解成多个可以并发的小任务.

  • Sub-job:数据同步作业切分后的小任务

  • Reader(Loader): 数据读入模块,负责运行切分后的小任务,将数据从源头装载入 DataX

  • Storage: Reader 和 Writer 通过 Storage 交换数据

  • Writer(Dumper): 数据写出模块,负责将数据从 DataX 导入至目的数据地 

Datax 内置插件:
    DataX 框架内部通过双缓冲队列、线程池封装等技术,集中处理了高速数据交换遇到的问题,提供简单的接口与插件交互,插件分为 Reader 和 Writer 两类,基于框架提供的插件接口,可以十分便捷的开发出需要的插件。比如想要从 Oracle 导出数据到 mysql,那么需要做的就是开发出 OracleReader 和 MysqlWriter 插件,装配到框架上即可。并且这样的插件一般情况下在其他数据交换场合是可以通用的。更大的惊喜是我们已经开发了如下插件:

Reader 插件

  • hdfsreader : 支持从 hdfs 文件系统获取数据。

  • mysqlreader: 支持从 mysql 数据库获取数据。

  • sqlserverreader: 支持从 sqlserver 数据库获取数据。

  • oraclereader : 支持从 oracle 数据库获取数据。

  • streamreader: 支持从 stream 流获取数据(常用于测试)

  • httpreader : 支持从 http URL 获取数据。

Writer 插件

  • hdfswriter:支持向 hdbf 写入数据。

  • mysqlwriter:支持向 mysql 写入数据。

  • oraclewriter:支持向 oracle 写入数据。

  • streamwriter:支持向 stream 流写入数据。(常用于测试)

     

 

————————————– 分割线 ————————————–

 

Ubuntu 13.04 上搭建 Hadoop 环境 http://www.linuxidc.com/Linux/2013-06/86106.htm

 

Ubuntu 12.10 +Hadoop 1.2.1 版本集群配置 http://www.linuxidc.com/Linux/2013-09/90600.htm

 

Ubuntu 上搭建 Hadoop 环境(单机模式 + 伪分布模式)http://www.linuxidc.com/Linux/2013-01/77681.htm

 

Ubuntu 下 Hadoop 环境的配置 http://www.linuxidc.com/Linux/2012-11/74539.htm

 

单机版搭建 Hadoop 环境图文教程详解 http://www.linuxidc.com/Linux/2012-02/53927.htm

 

Hadoop 中 HDFS 和 MapReduce 节点基本简介 http://www.linuxidc.com/Linux/2013-09/89653.htm

 

《Hadoop 实战》中文版 + 英文文字版 + 源码【PDF】http://www.linuxidc.com/Linux/2012-10/71901.htm

 

Hadoop: The Definitive Guide【PDF 版】http://www.linuxidc.com/Linux/2012-01/51182.htm

 

————————————– 分割线 ————————————–

 

2、datax 在 hadoop2.x 上的兼容部署

    Datax 是基于 Hadoop1.x 开发的,因此要想基于 HADOOP2.x 使用 hdfsreader 和 hdfswriter 插件,那么必须对这些插件的本地库以及一些 jar 包替换掉,同时要增加 Hadoop2.x 所需的依赖包,下面以 hdfsreader 为例说明:

Datax 与 Hadoop2.x 兼容部署与实际项目应用工作记录分享

进入到 plugins 目录找到 hdfsreader,将 hadoop-0.19.2-core.jar 删除,将本地库替换为 $HAOOP_HOME2.x/lib/native/libhadoop.so。同时添加 Hadoop2.x 的依赖包,如下图:

Datax 与 Hadoop2.x 兼容部署与实际项目应用工作记录分享

另外,Datax 需要 hadoop1.x 的 hadoop-core.xml 配置文件,但是 hadoop2.x 中不存在这个文件,这里有一个解决方法,就是将各个配置文件的配置项都集中写到一个新建的配置文件中,单独有 datax 使用,这个配置文件在 datax 的 job xml 文件由参数 hadoop-conf 配上。到现在为止,datax 与 hadoop2.x 的兼容性修改已经完成了。

    还要做其他环境的调整,确保 java 版本 >=1.6,python 的版本 >=2.6(对于 python 的版本选择上,个人推荐 2.6 或者 2.7,如果 pytyon 版本上到 3.x 的话会有错误,个人经验)。最后修改一下各个插件的 rpm 包的 build 路径:

Datax 与 Hadoop2.x 兼容部署与实际项目应用工作记录分享

下面以 t_dp_datax_engine.spec 为例子:

Datax 与 Hadoop2.x 兼容部署与实际项目应用工作记录分享

上面红色方框的地方是指 build rpm 插件后新产生的文件夹位置,改为自己编辑的目录。

下面以 t_dp_datax_engine.spec 为例子,看看���么 build rpm 插件:

具体执行过程如下:

1、请先 check out 一份 DataX 源码,并 cd 切换到 DataX 源码中的 rpm 目录

2、编译打包 DataX engine 包,使用 rpmbuild –ba t_dp_datax_engine.spec(请确保有 root 权限),打包生成的 rpm 后如下图所示

 Datax 与 Hadoop2.x 兼容部署与实际项目应用工作记录分享

Rpm 制作完成后,即可分发、安装,例如使用

rpm -ivh  t_dp_datax_engine.rpm

即可安装 DataX engine 包,需要注意的是 engine 的 rpm 地址源自于上图的截图中信息。

如下图:

 Datax 与 Hadoop2.x 兼容部署与实际项目应用工作记录分享

安装完成后,在 /home/taobao/datax/ 目录下会存在如下文件:

Datax 与 Hadoop2.x 兼容部署与实际项目应用工作记录分享

其他的插件按照这种方式按照好就 ok 了。 

更多详情见请继续阅读下一页的精彩内容:http://www.linuxidc.com/Linux/2014-10/107936p2.htm

正文完
星哥说事-微信公众号
post-qrcode
 
星锅
版权声明:本站原创文章,由 星锅 2022-01-20发表,共计15055字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
【腾讯云】推广者专属福利,新客户无门槛领取总价值高达2860元代金券,每种代金券限量500张,先到先得。
阿里云-最新活动爆款每日限量供应
评论(没有评论)
验证码
【腾讯云】云服务器、云数据库、COS、CDN、短信等云产品特惠热卖中