阿里云-云小站(无限量代金券发放中)
【腾讯云】云服务器、云数据库、COS、CDN、短信等热卖云产品特惠抢购

开源DataX集成可视化项目Datax-Web的使用

322次阅读
没有评论

共计 6013 个字符,预计需要花费 16 分钟才能阅读完成。

上一篇文章我们已经搭建好了 Datax-Web 后台,这篇文章我们具体讲一下如何通过 Datax-Web 来配置,同步 MySQL 数据库。

目标

开源 DataX 集成可视化项目 Datax-Web 的使用

MySql 数据库全量同步

1. 执行器配置

开源 DataX 集成可视化项目 Datax-Web 的使用

1、” 调度中心 OnLine:” 右侧显示在线的 ” 调度中心 ” 列表, 任务执行结束后, 将会以 failover 的模式进行回调调度中心通知执行结果, 避免回调的单点风险;

2、“执行器列表”中显示在线的执行器列表, 可通过 ”OnLine 机器 ” 查看对应执行器的集群机器;

开源 DataX 集成可视化项目 Datax-Web 的使用

1、AppName:(与 datax-executor 中 application.yml 的 datax.job.executor.appname 保持一致)
每个执行器集群的唯一标示 AppName, 执行器会周期性以 AppName 为对象进行自动注册。可通过该配置自动发现注册成功的执行器, 供任务调度时使用;

2、名称: 执行器的名称, 因为 AppName 限制字母数字等组成, 可读性不强, 名称为了提高执行器的可读性;

3、排序: 执行器的排序, 系统中需要执行器的地方, 如任务新增, 将会按照该排序读取可用的执行器列表;

4、注册方式:调度中心获取执行器地址的方式;

自动注册:执行器自动进行执行器注册,调度中心通过底层注册表可以动态发现执行器机器地址;

手动录入:人工手动录入执行器的地址信息,多地址逗号分隔,供调度中心使用;

5、机器地址:” 注册方式 ” 为 ” 手动录入 ” 时有效,支持人工维护执行器的地址信息;

2. 创建数据源

数据源管理—> 添加

开源 DataX 集成可视化项目 Datax-Web 的使用

如图填写 MySQL 的账号信息,点击测试连接,无误之后确认。

第四步使用

3. 创建任务模版

开源 DataX 集成可视化项目 Datax-Web 的使用

第四步使用

4. 构建 JSON 脚本

1. 任务批量构建

步骤一,步骤二,选择第二步中创建的数据源,JSON 构建目前支持的数据源有 hive,mysql,oracle,postgresql,sqlserver,hbase,mongodb,clickhouse 其它数据源的 JSON 构建正在开发中, 暂时需要手动编写。

任务管理—> 任务批量构建—> 选择数据库源

开源 DataX 集成可视化项目 Datax-Web 的使用

2. 字段映射

3. 批量创建任务

手动执行一次

4. 启动任务

开源 DataX 集成可视化项目 Datax-Web 的使用

查看日志

报错

2023-03-28 16:41:14 [JobThread.run-130] <br>----------- datax-web job execute start -----------<br>----------- Param:
2023-03-28 16:41:14 [BuildCommand.buildDataXParam-100] ------------------Command parameters:
2023-03-28 16:41:14 [ExecutorJobHandler.execute-57] ------------------DataX process id: 29802
2023-03-28 16:41:14 [AnalysisStatistics.analysisStatisticsLog-53]   File "/data/datax/bin/datax.py", line 114
2023-03-28 16:41:14 [AnalysisStatistics.analysisStatisticsLog-53]     print readerRef
2023-03-28 16:41:14 [AnalysisStatistics.analysisStatisticsLog-53]           ^
2023-03-28 16:41:14 [AnalysisStatistics.analysisStatisticsLog-53] SyntaxError: Missing parentheses in call to 'print'. Did you mean print(readerRef)?
2023-03-28 16:41:14 [JobThread.run-165] <br>----------- datax-web job execute end(finish) -----------<br>----------- ReturnT:ReturnT [code=500, msg=command exit value(1) is failed, content=null]
2023-03-28 16:41:14 [ProcessCallbackThread.callbackLog-186] <br>----------- datax-web job callback finish.
2023-03-28 16:41:14 [TriggerCallbackThread.callbackLog-186] <br>----------- datax-web job callback finish.

经过查询是本机装了多版本的 python

[root@node3 bin]#  whereis python
python: /usr/bin/python /usr/bin/python2.7 /usr/bin/python3.6 /usr/bin/python3.6m /usr/lib/python2.7 /usr/lib/python3.6 /usr/lib64/python2.7 /usr/lib64/python3.6 /etc/python /usr/include/python2.7 /usr/include/python3.6m /root/anaconda3/bin/python /root/anaconda3/bin/python3.9 /root/anaconda3/bin/python3.9-config /usr/share/man/man1/python.1.gz

[root@node3 bin]# python -V
Python 3.9.13
[root@node3 bin]# /usr/bin/python -V
Python 2.7.5

经过修复使 Python 改为 2.7 再执行任务

[root@node3 ~]# python -V
Python 2.7.5

还有一种修复方式是

Python (2.x) (支持 Python3 需要修改替换 datax/bin 下面的三个 python 文件,替换文件在 doc/datax-web/datax-python3 下) 必选,主要用于调度执行底层 DataX 的启动脚本,默认的方式是以 Java 子进程方式执行 DataX,用户可以选择以 Python 方式来做自定义的改造

5. 查看任务

查看日志:

开源 DataX 集成可视化项目 Datax-Web 的使用

再用 Navicat 查看目标库中数据是否一致。

开源 DataX 集成可视化项目 Datax-Web 的使用

DataX-Web 增量配置说明

一、根据日期进行增量数据抽取

1. 页面任务配置

打开菜单任务管理页面,选择添加任务

按下图中 5 个步骤进行配置

开源 DataX 集成可视化项目 Datax-Web 的使用

  • 1. 任务类型选 DataX 任务
  • 2. 辅助参数选择时间自增
  • 3. 增量开始时间选择,即 sql 中查询时间的开始时间,用户使用此选项方便第一次的全量同步。第一次同步完成后,该时间被更新为上一次的任务触发时间,任务失败不更新。
  • 4. 增量时间字段,-DlastTime=’%s’-DcurrentTime=’%s’先来解析下这段字符串
1.- D 是 DataX 参数的标识符,必配
2.- D 后面的 lastTime 和 currentTime 是 DataX json 中 where 条件的时间字段标识符,必须和 json 中的变量名称保持一致
3.='%s' 是项目用来去替换时间的占位符,比配并且格式要完全一致
4. 注意 -DlastTime='%s' 和 -DcurrentTime='%s' 中间有一个空格,空格必须保留并且是一个空格
  • 5. 时间格式,可以选择自己数据库中时间的格式,也可以通过 json 中配置 sql 时间转换函数来处理

注意,注意,注意: 配置一定要仔细看文档(后面我们也会对这块配置进行优化,避免大家犯错)

2.JSON 配置

datax.json

{
  "job": {
    "setting": {
      "speed": {"channel": 16}
    },
    "content": [
      {
        "reader": {
          "name": "mysqlreader",
          "parameter": {
            "splitPk": "id",
            "username": "root",
            "password": "root",
            "column": ["*"],
            "connection": [
              {
                
                "jdbcUrl": ["jdbc:mysql://localhost:3306/test?characterEncoding=utf8"],
				"querySql": ["select * from test_list where operationDate >= FROM_UNIXTIME(${lastTime}) and operationDate < FROM_UNIXTIME(${currentTime})"
                                ]
              }
            ]
          }
        },
        "writer": {
          "name": "mysqlwriter",
          "parameter": {
           
            "username": "root",
            "password": "123456",
            "column": ["*"],
            "batchSize": "4096",
            "connection": [
              {
                "jdbcUrl": "jdbc:mysql://localhost:3307/test?characterEncoding=utf8",
                "table": ["test_list"]
              }
            ]
          }
        }
      }
    ]
  }
}

querySql 解析

select * from test_list where operationDate >= ${lastTime} and operationDate < ${currentTime}
  • 1. 此处的关键点在 {lastTime},{currentTime},${} 是 DataX 动态参数的固定格式,lastTime,currentTime 就是我们页面配置中 -DlastTime=’%s’-DcurrentTime=’%s’中的 lastTime,currentTime,注意字段一定要一致。
  • 2. 如果任务配置页面,时间类型选择为时间戳但是数据库时间格式不是时间戳,例如是:2019-11-26 11:40:57 此时可以用 FROM_UNIXTIME(${lastTime})进行转换。
select * from test_list where operationDate >= FROM_UNIXTIME(${lastTime}) and operationDate < FROM_UNIXTIME(${currentTime})

二、根据自增 Id 进行增量数据抽取

1. 页面任务配置

打开菜单任务管理页面,选择添加任务

按下图中 4 个步骤进行配置

  • 1. 任务类型选 DataX 任务
  • 2. 辅助参数选择主键自增
  • 3. 增量主键开始 ID 选择,即 sql 中查询 ID 的开始 ID,用户使用此选项方便第一次的全量同步。第一次同步完成后,该 ID 被更新为上一次的任务触发时最大的 ID,任务失败不更新。
  • 4. 增量时间字段,-DstartId=’%s’-DendId=’%s’先来解析下这段字符串
1.- D 是 DataX 参数的标识符,必配
2.- D 后面的 startId 和 endId 是 DataX json 中 where 条件的 id 字段标识符,必须和 json 中的变量名称保持一致,endId 是任务在每次执行时获取当前表 maxId,也是下一次任务的 startId
3.='%s' 是项目用来去替换时间的占位符,比配并且格式要完全一致
4. 注意 -DstartId='%s' 和 -DendId='%s' 中间有一个空格,空格必须保留并且是一个空格
5.reader 数据源,选择任务同步的读数据源
6. 配置 reader 数据源中需要同步数据的表名及该表的主键

注意,注意,注意: 一定要仔细看文档(后续会对这块配置进行优化,避免大家犯错)

2.JSON 配置

datax.json

{
   "job": {
     "setting": {
       "speed": {
         "channel": 3,
         "byte": 1048576
       },
       "errorLimit": {
         "record": 0,
         "percentage": 0.02
       }
     },
     "content": [
       {
         "reader": {
           "name": "mysqlreader",
           "parameter": {
             "username": "yRjwDFuoPKlqya9h9H2Amg==",
             "password": "yRjwDFuoPKlqya9h9H2Amg==",
             "splitPk": "","connection": [
               {
                 "querySql": ["select * from job_log where id>= ${startId} and id< ${endId}"
                 ],
                 "jdbcUrl": ["jdbc:mysql://localhost:3306/datax_web"]
               }
             ]
           }
         },
         "writer": {
           "name": "mysqlwriter",
           "parameter": {
             "username": "mCFD+p1IMsa0rHicbQohcA==",
             "password": "PhYxJmA/nuBJD1OxKTRzZH8sxuRddOv83hdqDOVR+i0=",
             "column": [
               "`id`",
               "`job_group`",
               "`job_id`",
               "`job_desc`",
               "`executor_address`",
               "`executor_handler`",
               "`executor_param`",
               "`executor_sharding_param`",
               "`executor_fail_retry_count`",
               "`trigger_time`",
               "`trigger_code`",
               "`trigger_msg`",
               "`handle_time`",
               "`handle_code`",
               "`handle_msg`",
               "`alarm_status`",
               "`process_id`",
               "`max_id`"
             ],
             "connection": [
               {
                 "table": ["job_log"],
                 "jdbcUrl": "jdbc:mysql://47.98.125.243:3306/datax_web"
               }
             ]
           }
         }
       }
     ]
   }
 }

querySql 解析

select * from job_log where id>= ${startId} and id< ${endId}
  • 1. 此处的关键点在 {startId},{endId},${} 是 DataX 动态参数的固定格式,startId,endId 就是我们页面配置中 -DstartId=’%s’-DendId=’%s’中的 startId,endId,注意字段一定要一致。

三、JVM 启动参数配置

此选择为非必选,可以配置 DataX 启动时 JVM 的参数,具体配置不做详解。

JVM 启动参数拼接结果为:-j "-Xms2G -Xmx2G"

参考

https://github.com/WeiYe-Jing/datax-web

https://github.com/WeiYe-Jing/datax-web/blob/master/doc/datax-web/increment-desc.md

正文完
星哥说事-微信公众号
post-qrcode
 
星锅
版权声明:本站原创文章,由 星锅 2023-04-01发表,共计6013字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
【腾讯云】推广者专属福利,新客户无门槛领取总价值高达2860元代金券,每种代金券限量500张,先到先得。
【腾讯云】云服务器、云数据库、COS、CDN、短信等云产品特惠热卖中