首页 > 其他 > 详细

Datax与hadoop2.x兼容部署与实际项目应用工作记录分享

时间:2014-08-15 18:06:49      阅读:728      评论:0      收藏:0      [点我收藏+]

一、概述

    Hadoop的版本更新挺快的,已经到了2.4,但是其周边工具的更新速度还是比较慢的,一些旧的周边工具版本对hadoop2.x的兼容性做得还不完善,特别是sqoop。最近,在为hadoop2.2.0找适合的sqoop版本时遇到了很多问题。尝试了多个sqoop1.4.x版本的直接简单粗暴的报版本不兼容问题,其中测了sqoop-1.4.4.bin__hadoop-0.23这个版本,在该版本中直接用sqoop的脚本export HDFS的数据是没有问题的,但是一旦调用JAVA API来进行对HDFS的数据的export的时候就各种不兼容问题,原因是这个版本的API也是基于hadoop1.x来写的。另外还尝试了使用sqoop2(之前blog写过关于sqoop2的部署和使用情况:http://zengzhaozheng.blog.51cto.com/8219051/1431882 ),这个版本取消了sqoop1的脚本执行方式,可以采取交互式、api或者rest的方式工作,但是我在使用的过程中还是存在的一些问题:sqoop2(我用的是1.99.3)无法指定列的分隔符、对\N等字符的处理有问题、对列值的类型判断存在问题等(其详细问题所在请看,sqoop1.99.3源代码的org.apache.sqoop.job.io.Data类)。

    这个礼拜终于找到了一个比较好的方案来取代sqoop作为HDFS到mysql的数据export模块,那就是大淘宝开源的datax。虽然datax采用的是单机方式的作业方式,但是经过试验我对比了一下其和sqoop性能上的差异,在数据量不是特别大的情况下datax和sqoop的性能相差不是很明显的,在少量数据的情况下datax的性能稍微好点。

    这篇blog将简单介绍一下这个datax这个框架以及它的用法,特别地说说如果修改datax才能使得datax运行在hadoop2.x上(datax是基于hadoop1.x进行开发的)。另外,主要和大家分享一下我在自己项目中如何使用datax,如何通过自己编写的shell脚本将datax、mysql和项目粘合起来。


二、datax简介和datax在hadoop2.x上的兼容部署

1、datax简介

    DataX是一个在异构的数据库/文件系统之间高速交换数据的工具,实现了在任意的数据处理系统(RDBMS/Hdfs/Local filesystem)之间的数据交换。Datax框架中我最欣赏的就是基于插件的模式,你在部署的时候可以只安装那些用到的Reader/Writer插件rpm包,没有用的可以不用安装。同时,你也可以根据自己的特殊需求很快的写出Reader、Writer。Datax采用Framework + plugin架构构建,Framework处理了缓冲,流控,并发,上下文加载等高速数据交换的大部分技术问题,提供了简单的接口与插件交互,插件仅需实现对数据处理系统的访问。Datax的运行方式采用stand-alone方式,在数据传输过程在单进程内完成,全内存操作,不读写磁盘,也没有IPC通信。下面是一个来自大淘宝开源官网的datax架构图:

bubuko.com,布布扣

各个组件的作用:

  • Job: 一道数据同步作业

  • Splitter: 作业切分模块,将一个大任务与分解成多个可以并发的小任务.

  • Sub-job: 数据同步作业切分后的小任务

  • Reader(Loader): 数据读入模块,负责运行切分后的小任务,将数据从源头装载入DataX

  • Storage: Reader和Writer通过Storage交换数据

  • Writer(Dumper): 数据写出模块,负责将数据从DataX导入至目的数据地


Datax内置插件:
    DataX框架内部通过双缓冲队列、线程池封装等技术,集中处理了高速数据交换遇到的问题,提供简单的接口与插件交互,插件分为Reader和Writer两类,基于框架提供的插件接口,可以十分便捷的开发出需要的插件。比如想要从oracle导出数据到mysql,那么需要做的就是开发出OracleReader和MysqlWriter插件,装配到框架上即可。并且这样的插件一般情况下在其他数据交换场合是可以通用的。更大的惊喜是我们已经开发了如下插件:


Reader插件

  • hdfsreader : 支持从hdfs文件系统获取数据。

  • mysqlreader: 支持从mysql数据库获取数据。

  • sqlserverreader: 支持从sqlserver数据库获取数据。

  • oraclereader : 支持从oracle数据库获取数据。

  • streamreader: 支持从stream流获取数据(常用于测试)

  • httpreader : 支持从http URL获取数据。


Writer插件

  • hdfswriter:支持向hdbf写入数据。

  • mysqlwriter:支持向mysql写入数据。

  • oraclewriter:支持向oracle写入数据。

  • streamwriter:支持向stream流写入数据。(常用于测试)


2、datax在hadoop2.x上的兼容部署

    Datax是基于Hadoop1.x开发的,因此要想基于HADOOP2.x使用hdfsreader和hdfswriter插件,那么必须对这些插件的本地库以及一些jar包替换掉,同时要增加Hadoop2.x所需的依赖包,下面以hdfsreader为例说明:

bubuko.com,布布扣

进入到plugins目录找到hdfsreader,将hadoop-0.19.2-core.jar删除,将本地库替换为$HAOOP_HOME2.x/lib/native/libhadoop.so。同时添加Hadoop2.x的依赖包,如下图:

bubuko.com,布布扣

另外,Datax需要hadoop1.x的hadoop-core.xml配置文件,但是hadoop2.x中不存在这个文件,这里有一个解决方法,就是将各个配置文件的配置项都集中写到一个新建的配置文件中,单独有datax使用,这个配置文件在datax的job xml文件由参数hadoop-conf配上。到现在为止,datax与hadoop2.x的兼容性修改已经完成了。

    还要做其他环境的调整,确保java版本>=1.6,python的版本>=2.6(对于python的版本选择上,个人推荐2.6或者2.7,如果pytyon版本上到3.x的话会有错误,个人经验)。最后修改一下各个插件的rpm包的build路径:

bubuko.com,布布扣

下面以t_dp_datax_engine.spec为例子:

bubuko.com,布布扣

上面红色方框的地方是指build rpm 插件后新产生的文件夹位置,改为自己编辑的目录。

下面以t_dp_datax_engine.spec为例子,看看怎么build rpm 插件:

具体执行过程如下:

1、请先check out一份DataX源码,并cd切换到DataX源码中的rpm目录

2、编译打包DataX engine包,使用rpmbuild --ba t_dp_datax_engine.spec(请确保有root权限),打包生成的rpm后如下图所示

 bubuko.com,布布扣

Rpm制作完成后,即可分发、安装,例如使用

rpm -ivh  t_dp_datax_engine.rpm

即可安装DataX engine 包,需要注意的是engine的rpm地址源自于上图的截图中信息。

如下图:

 bubuko.com,布布扣

安装完成后,在/home/taobao/datax/目录下会存在如下文件:

bubuko.com,布布扣


其他的插件按照这种方式按照好就ok了。


三、datax的实际应用记录分享

    在blog的这部分主要分享一下我对datax使用的一个小案例,希望能够给初用datax的同学一点点参照。

  • 具体业务场景:

    需要将存储在HDFS上的一些表export到mysql中,不希望datax对每一个表的export操作都产生一个job xml文件,希望对不同的表动态使用同一个 job xml文件(这个用datax配置文件动态参数结合shell实现)。同时,根据公司业务的需求当不同的HDFS 表export到mysql的前后还需要做一些基于mysql的DML操作(这个可以通过datax 配置文件中的pre以及post参数进行配置,但是我为了方便流程的控制用shell取代了)。

  • 实现步骤:

步骤1:

执行$DATAX_HOME/bin/datax.py -e命令,选择data source来源,这里我们选择7:

bubuko.com,布布扣

接着选择export的目标源,这个我们选择0:

bubuko.com,布布扣

步骤2:

根据自己的业务需求和HADOOP的相应环境配置产生的job xml,进入到$DATAX_HOME/jobs,编辑job配置文件,我的配置如下(里边的一些动态参数有下面我自己写的Shell中进行控制):

<?xml version="1.0" encoding="UTF-8"?>

<jobs>
  <job id="hdfsreader_to_mysqlwriter_job">
    <reader>
      <plugin>hdfsreader</plugin>
      <!--
description:HDFS login account, e.g. ‘username, groupname(groupname...),#password
mandatory:true
name:ugi
-->
      <param key="hadoop.job.ugi" value="hadoop,supergroup#jpkjcluster"/>
      <!--
description:hadoop-site.xml path
mandatory:false
name:hadoop_conf
-->
      <param key="hadoop_conf" value="/data/hadoop/hadoop-2.2.0/etc/hadoop/datax_hadoop_conf.xml"/>
      <!--
description:hdfs path, format like: hdfs://ip:port/path, or file:///home/taobao/
mandatory:true
name:dir
-->
      <param key="dir" value="hdfs://172.16.8.1:8020/user/hive/warehouse/jl.db/${hdfs_table}/day=${export_day}"/>
      <!--
default:\t
description:how to sperate a line
mandatory:false
name:fieldSplit
-->
      <param key="field_split" value=","/>
      <!--
default:UTF-8
range:UTF-8|GBK|GB2312
description:hdfs encode
mandatory:false
name:encoding
-->
      <param key="encoding" value="UTF-8"/>
      <!--
default:4096
range:[1024-4194304]
description:how large the buffer
mandatory:false
name:bufferSize
-->
      <param key="buffer_size" value="4096"/>
      <!--
default:\N
range:
description:replace the nullstring to null
mandatory:false
name:nullString
-->
      <param key="nullstring" value="\N"/>
      <!--
default:true
range:true|false
description:ingore key
mandatory:false
name:ignoreKey
-->
      <param key="ignore_key" value="true"/>
      <!--
default:
range:
description:how to filter column
mandatory:false
name:colFilter
      <param key="col_filter" value="?"/>
-->
      <!--
default:1
range:1-100
description:concurrency of the job
mandatory:false
name:concurrency
-->
      <param key="concurrency" value="${reader_concurrency}"/>
    </reader>
    <writer>
      <plugin>mysqlwriter</plugin>
      <!--
description:Mysql database ip address
mandatory:true
name:ip
-->
      <param key="ip" value="jl-master"/>
      <!--
default:3306
description:Mysql database port
mandatory:true
name:port
-->
      <param key="port" value="3306"/>
      <!--
description:Mysql database name
mandatory:true
name:dbname
-->
      <param key="dbname" value="newidigg_jilin"/>
      <!--
description:Mysql database login username
mandatory:true
name:username
-->
<param key="username" value="hadoop"/>
      <!--
description:Mysql database login password
mandatory:true
name:password
-->
      <param key="password" value="jpkjcluster"/>
      <!--
default:
range:
description:table to be dumped data into
mandatory:true
name:table
-->
      <param key="table" value="${mysql_table}"/>
      <!--
range:
description:order of columns
mandatory:false
name:colorder
      <param key="colorder" value="?"/>
-->
      <!--
default:UTF-8
range:UTF-8|GBK|GB2312
description:
mandatory:false
name:encoding
-->
      <param key="encoding" value="UTF-8"/>
      <!--
description:execute sql before dumping data
mandatory:false
name:pre
      <param key="pre" value="${preSql}"/>
-->
   <!--
description:execute sql after dumping data
mandatory:false
name:post
      <param key="post" value="${postSql}"/>
-->
      <!--
default:0
range:[0-65535]
description:error limit
mandatory:false
name:limit
-->
      <param key="limit" value="0"/>
      <!--
mandatory:false
name:set
      <param key="set" value="?"/>
-->
      <!--
default:false
range:[true/false]
mandatory:false
name:replace
-->
      <param key="replace" value="false"/>
      <!--
range:params1|params2|...
description:mysql driver params
mandatory:false
name:mysql.params
      <param key="mysql.params" value="?"/>
-->
      <!--
default:1
range:1-100
description:concurrency of the job
mandatory:false
-->
      <param key="concurrency" value="${writer_concurrency}"/>
    </writer>
  </job>
</jobs>


步骤3:

    编写Shell脚本export_hdfs2mysql.sh对整个Datax作业根据业务需求进行控制:

#!/bin/bash
#author:曾昭正
#create time:2014-08-14
workspace=`dirname $0`
dataxHome=‘/data/hadoop/datax‘

export_day=$1
reader_concurrency=1
writer_concurrency=1
mysqlUser=‘hadoop‘
mysqlPassword=‘jpkjcluster‘
mysqlServerHost=‘jl-master‘
currentDatabase=‘newidigg_jilin‘
preSql=‘‘
postSql=‘‘

importTable=(‘tb_userview_domain_noMdn‘ ‘tb_fact_app_v2‘ ‘tb_fact_domain‘ ‘tb_fact_tag‘ ‘tb_fact_top5_www‘ ‘tb_fact_upwww_time‘ ‘tb_fact_search‘ ‘tb_userview_domain‘ ‘tb_userview_kpi_order‘ ‘tb_userview_search‘ ‘tb_userview_time‘ ‘tb_userview_tag‘);

#function which is used to DDL or DML msyql
function mysqlController(){
    #这里注意一下:这里的$1不同于整个脚本的参数$1,这里是指函数的第一个参数
    local sqlString=$*
    echo `date +%Y-%m-%d" "%H:%M:%S` "执行:${sqlString}"
    mysql -u ${mysqlUser} --password=${mysqlPassword} -h ${mysqlServerHost} -e "
        use ${currentDatabase};
        ${sqlString};
    "
}

#通用表导入模块
function commonImport(){
	local current_table=$1
	#create temporary table before importing data into mysql.
	echo `date +%Y-%m-%d" "%H:%M:%S` "......进入处理${current_table}表入mysql库环节......"
	echo `date +%Y-%m-%d" "%H:%M:%S` "入库前创建临时表"
	preSql="drop table if exists ${current_table}_${export_day};create table ${current_table}_${export_day} like ${current_table}"
	mysqlController ${preSql}
	
	#import data from hdfs into msyql.
	echo `date +%Y-%m-%d" "%H:%M:%S` "将hdfs的${current_table}表导入mysql...."
	#调整mysql的导入线程数
	writer_concurrency=2
	#进行导入
    python ${dataxHome}/bin/datax.py ${dataxHome}/jobs/hdfsreader_to_mysqlwriter_1407525566122.xml -p"-Dhdfs_table=${current_table} -Dexport_day=${export_day} -Dreader_concurrency=${reader_concurrency} -Dwriter_concurrency=${writer_concurrency} -Dmysql_table=${current_table}_${export_day}"
	
	#Updata Data Relationship after importing data into mysql.
	if [ ${current_table} == "tb_userview_search" ]
		then
	      postSql="drop table if exists ${current_table};      	  rename table ${current_table}_${export_day} to ${current_table}; 	 	  CREATE INDEX mdn_index ON ${current_table}(mdn);
		  "
	else
		 postSql="drop table if exists ${current_table};      	  rename table ${current_table}_${export_day} to ${current_table}; 	 	  Alter table ${current_table} add primary key(mdn);
		  "
	fi
	mysqlController ${postSql}
	echo `date +%Y-%m-%d" "%H:%M:%S` "......完成处理${current_table}表入mysql操作......"
}

for tableItem in ${importTable[*]}
do

if [ ${tableItem} == "tb_userview_domain" -o ${tableItem} == "tb_userview_kpi_order" -o ${tableItem} == "tb_userview_search" -o ${tableItem} == "tb_userview_time" -o ${tableItem} == "tb_userview_tag" ]
   then
	commonImport ${tableItem}
else
    #delete dirty data
	preSql="delete from ${tableItem} where day_id=${export_day};"
    mysqlController ${preSql}
    python ${dataxHome}/bin/datax.py ${dataxHome}/jobs/hdfsreader_to_mysqlwriter_1407525566122.xml -p"-Dhdfs_table=${tableItem} -Dexport_day=${export_day} -Dreader_concurrency=${reader_concurrency} -Dwriter_concurrency=${writer_concurrency} -Dmysql_table=${tableItem}"

# >> ${workspace}/../logs/exportData.log

fi
done

简单说说我这个shell脚本的用途,主要是对datax中的job配置文件的动态参数进行控制。另外,根据公司业务的不同需求,这十几个需要导入mysql的表其中有些表在导入之前和导入之后需要做不同的完善工作,这个通过这shell来控制。对于这个Shell脚本我是花了点时间进行重构的,功能点还是比较清晰、简洁的。

步骤4:

    执行脚本:nohup ./export_hdfs2mysql.sh 20140815 >> ./../idigg_task/logs/export.log &  大功告成。


三、总结

    本blog主要介绍了datax框架、对它的部署、与hadoop2.x的兼容性修改和结合我的个人开发案例说了下datax的实际使用。整个Datax的部署和使用过程还是比较方便的,其效率也是相当不错,而且性能是可控的(通过job配置文件配置读、写线程数)。在大多数情况下,datax和sqoop的性能上可以作为互补,是一个相当不错的产品。另外,说说Shell。Shell是我个人最喜欢的一种威武工具,它不仅具有天然的操作系统原生优势,同时它具有强大的粘合作用,可以将各种技术非常完美的粘合在一个项目之中。熟练的掌握Shell的编写,可以使一个开发者的战斗力上升几个等级,这个是我在实际工作中总结出来的绝对的真理。


转载请标明出处:http://zengzhaozheng.blog.51cto.com/8219051/1540679 



本文出自 “一只风骚的蚂蚁” 博客,谢绝转载!

Datax与hadoop2.x兼容部署与实际项目应用工作记录分享,布布扣,bubuko.com

Datax与hadoop2.x兼容部署与实际项目应用工作记录分享

原文:http://zengzhaozheng.blog.51cto.com/8219051/1540679

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!