首页 > 其他 > 详细

dataX调优

时间:2019-05-28 23:42:16      阅读:288      评论:0      收藏:0      [点我收藏+]

dataX调优

标签(空格分隔): ETL


一,Datax调优方向

DataX调优要分成几个部分(注:此处任务机指运行Datax任务所在的机器)。

1,网络本身的带宽等硬件因素造成的影响;
2,DataX本身的参数;
3,从源端到任务机;
4,从任务机到目的端;
即当觉得DataX传输速度慢时,需要从上述四个方面着手开始排查。

1,网络带宽等硬件因素调优

此部分主要需要了解网络本身的情况,即从源端到目的端的带宽是多少(实际带宽计算公式),平时使用量和繁忙程度的情况,从而分析是否是本部分造成的速度缓慢。以下提供几个思路。

1,可使用从源端到目的端scp,python http,nethogs等观察实际网络及网卡速度;
2,结合监控观察任务运行时间段时,网络整体的繁忙情况,来判断是否应将任务避开网络高峰运行;
3,观察任务机的负载情况,尤其是网络和磁盘IO,观察其是否成为瓶颈,影响了速度;

2,DataX本身的参数调优

全局

{
   "core":{
        "transport":{
            "channel":{
                "speed":{
                    "channel": 2, ## 此处为数据导入的并发度,建议根据服务器硬件进行调优
                    "record":-1, ##此处解除对读取行数的限制
                    "byte":-1, ##此处解除对字节的限制
                    "batchSize":2048 ##每次读取batch的大小
                }
            }
        }
    },
    "job":{
            ...
        }
    }

局部

"setting": {
            "speed": {
                "channel": 2,
                "record":-1,
                "byte":-1,
                "batchSize":2048
            }
        }
    }
}

# channel增大,为防止OOM,需要修改datax工具的datax.py文件。
# 如下所示,可根据任务机的实际配置,提升-Xms与-Xmx,来防止OOM。
# tunnel并不是越大越好,过分大反而会影响宿主机的性能。
DEFAULT_JVM = "-Xms1g -Xmx1g -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=%s/log" % (DATAX_HOME)

Jvm 调优

python datax.py  --jvm="-Xms3G -Xmx3G" ../job/test.json

此处根据服务器配置进行调优,切记不可太大!否则直接Exception
以上为调优,应该是可以针对每个json文件都可以进行调优。

3,功能测试和性能测试

quick start https://github.com/alibaba/DataX/blob/master/userGuid.md

3.1 动态传参

如果需要导入数据的表太多而表的格式又相同,可以进行json文件的复用,举个简单的例子: python datax.py -p “-Dsdbname=test -Dstable=test” ../job/test.json

"column": ["*"],
"connection": [
    {
        "jdbcUrl": "jdbc:mysql://xxx:xx/${sdbname}?characterEncoding=utf-8",
        "table": ["${stable}"]
    }
],

上述例子可以在linux下与shell进行嵌套使用。

3.2 mysql -> hdfs

示例一:全量导

# 1. 查看配置模板
python datax.py -r mysqlreader -w hdfswriter

# 2. 创建和编辑配置文件
vim custom/mysql2hdfs.json
{
    "job":{
        "setting":{
            "speed":{
                "channel":1
            }
        },
        "content":[
            {
                "reader":{
                    "name":"mysqlreader",
                    "parameter":{
                        "username":"xxx",
                        "password":"xxx",
                        "column":["id","name","age","birthday"],
                        "connection":[
                            {
                                "table":[
                                    "tt_user"
                                ],
                                "jdbcUrl":[
                                    "jdbc:mysql://192.168.1.96:3306/test"
                                ]
                            }
                        ]
                    }
                },
                "writer":{
                    "name":"hdfswriter",
                    "parameter":{
                        "defaultFS":"hdfs://flashHadoop",
                        "hadoopConfig": {
                        "dfs.nameservices": "flashHadoop",
                        "dfs.ha.namenodes.flashHadoop": "nn1,nn2",
                        "dfs.namenode.rpc-address.flashHadoop.nn1": "VECS01118:8020",
                        "dfs.namenode.rpc-address.flashHadoop.nn2": "VECS01119:8020",
                "dfs.client.failover.proxy.provider.flashHadoop":
        "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
        }
                        "fileType":"text",
                        "path":"/tmp/test01",
                        "fileName":"tt_user",
                        "column":[
                            {"name":"id", "type":"INT"},
                            {"name":"name", "type":"VARCHAR"},
                            {"name":"age", "type":"INT"}
                            {"name":"birthday", "type":"date"}
                        ],
                        "writeMode":"append",
                        "fieldDelimiter":"\t",
                        "compress":"GZIP"
                    }
                }
            }
        ]
    }
}

# 3. 启动导数进程
python datax.py custom/mysql2hdfs.json

# 4. 日志结果
2018-11-23 14:37:58.056 [job-0] INFO  JobContainer - 
任务启动时刻                    : 2018-11-23 14:37:45
任务结束时刻                    : 2018-11-23 14:37:58
任务总计耗时                    :                 12s
任务平均流量                    :                9B/s
记录写入速度                    :              0rec/s
读出记录总数                    :                   7
读写失败总数                    :                   0

示例二:增量导(表切分)

{
    "job": {
        "setting": {
            "speed": {
                "channel": 2
            }
        },
        "content": [{
            "reader": {
                "name": "mysqlreader",
                "parameter": {
                    "username": "admin",
                    "password": "qweasd123",
                    "column": [
                        "id",
                        "name",
                        "age",
                        "birthday"
                    ],
                    "splitPk": "id",
                    "where": "id<10",
                    "connection": [{
                        "table": [
                            "tt_user",
                            "ttt_user"
                        ],
                        "jdbcUrl": [
                            "jdbc:mysql://hadoop01:3306/test"
                        ]
                    }]
                }
            },
            "writer": {
                "name": "hdfswriter",
                "parameter": {
                    "defaultFS": "hdfs://flashHadoop",
                    "hadoopConfig": {
                        "dfs.nameservices": "flashHadoop",
                        "dfs.ha.namenodes.minq-cluster": "nn1,nn2",
                        "dfs.namenode.rpc-address.flashHadoop.nn1": "VECS01118:8020",
                        "dfs.namenode.rpc-address.flashHadoop.nn2": "VECS01119:8020",
                        "dfs.client.failover.proxy.provider.flashHadoop": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
                    },
                    "fileType": "text",
                    "path": "/tmp/test/user",
                    "fileName": "mysql_test_user",
                    "column": [{
                            "name": "id",
                            "type": "INT"
                        },
                        {
                            "name": "name",
                            "type": "VARCHAR"
                        },
                        {
                            "name": "age",
                            "type": "INT"
                        },
                        {
                            "name": "birthday",
                            "type": "date"
                        }
                    ],
                    "writeMode": "append",
                    "fieldDelimiter": "\t"
                }
            }
        }]
    }
}

注意:外域机器通信需要用外网ip,未配置hostname访问会访问异常。
可以通过配置 hdfs-site.xml 进行解决:

<property>
    <name>dfs.client.use.datanode.hostname</name>
    <value>true</value>
    <description>only cofig in clients</description>
 </property>

或者通过配置java客户端:

Configuration conf=new Configuration();
conf.set("dfs.client.use.datanode.hostname", "true");

或者通过配置 datax 工作配置:

"hadoopConfig": {
        "dfs.client.use.datanode.hostname":"true",
        "dfs.nameservices": "flashHadoop",
        "dfs.ha.namenodes.minq-cluster": "nn1,nn2",
        "dfs.namenode.rpc-address.flashHadoop.nn1": "VECS00018:8020",
        "dfs.namenode.rpc-address.flashHadoop.nn2": "VECS00019:8020",
        "dfs.client.failover.proxy.provider.minq-cluster": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
}

这段对应源码中:

        hadoopConf = new org.apache.hadoop.conf.Configuration();
        Configuration hadoopSiteParams = taskConfig.getConfiguration(Key.HADOOP_CONFIG);
        JSONObject hadoopSiteParamsAsJsonObject = JSON.parseObject(taskConfig.getString(Key.HADOOP_CONFIG));
        if (null != hadoopSiteParams) {
            Set<String> paramKeys = hadoopSiteParams.getKeys();
            for (String each : paramKeys) {
                hadoopConf.set(each, hadoopSiteParamsAsJsonObject.getString(each));
            }
        }
        hadoopConf.set(HDFS_DEFAULTFS_KEY, taskConfig.getString(Key.DEFAULT_FS));

示例三:增量导(sql查询)

mysql2hdfs-condition.json

{
    "job": {
        "setting": {
            "speed": {
                 "channel":1
            }
        },
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": "xxx",
                        "password": "xxx",
                        "connection": [
                            {
                                "querySql": [
                                    "select id,name,age,birthday from tt_user where id <= 5"
                                ],
                                "jdbcUrl": [
                                    "jdbc:mysql://192.168.1.96:3306/test"
                                ]
                            }
                        ]
                    }
                },
                "writer": {
                    "name": "hdfswriter",
                    "parameter":{
                            "defaultFS": "hdfs://flashHadoop",
                            "hadoopConfig": {
                            "dfs.nameservices": "flashHadoop",
                            "dfs.ha.namenodes.flashHadoop": "nn1,nn2",
                            "dfs.namenode.rpc-address.flashHadoop.nn1": "VECS01118:8020",
                            "dfs.namenode.rpc-address.flashHadoop.nn2": "VECS01119:8020",
        "dfs.client.failover.proxy.provider.flashHadoop":
        "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
                        "fileType":"text",
                        "path":"/tmp/test01",
                        "fileName":"tt_user",
                        "column":[
                            {"name":"id", "type":"INT"},
                            {"name":"name", "type":"VARCHAR"},
                            {"name":"age", "type":"INT"}
                            {"name":"birthday", "type":"date"}
                        ],
                        "writeMode":"append",
                        "fieldDelimiter":"\t"
                    }
                }
            }
        ]
    }
}

hdfs -> mysql

# 1. 查看配置模板
python datax.py -r hdfsreader -w mysqlwriter

# 2. 创建和编辑配置文件
vim custom/hdfs2mysql.json
{
    "job": {
        "setting": {
            "speed": {
                "channel": 1
            }
        },
        "content": [{
            "reader": {
                "name": "hdfsreader",
                "parameter": {
                    "column": [{
                            "index": "0",
                            "type": "long"
                        },
                        {
                            "index": "1",
                            "type": "string"
                        },
                        {
                            "index": "2",
                            "type": "long"
                        },
                        {
                            "index": "3",
                            "type": "date"
                        }
                    ],
                    "defaultFS": "hdfs://flashHadoop",
                    "hadoopConfig": {
                        "dfs.nameservices": "flashHadoop",
                        "dfs.ha.namenodes.flashHadoop": "nn1,nn2",
                        "dfs.namenode.rpc-address.flashHadoop.nn1": "VECS01118:8020",
                        "dfs.namenode.rpc-address.flashHadoop.nn2": "VECS01119:8020",
                        "dfs.client.failover.proxy.provider.flashHadoop": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
                    },
                    "encoding": "UTF-8",
                    "fileType": "text",
                    "path": "/tmp/test/tt_user*",
                    "fieldDelimiter": "\t"
                }
            },
            "writer": {
                "name": "mysqlwriter",
                "parameter": {
                    "column": [
                        "id",
                        "name",
                        "age",
                        "birthday"
                    ],
                    "connection": [{
                        "jdbcUrl": "jdbc:mysql://192.168.1.96:3306/test",
                        "table": ["ttt_user"]
                    }],
                    "username": "zhangqingli",
                    "password": "xxx",
                    "preSql": [
                        "select * from ttt_user",
                        "select name from ttt_user"
                    ],
                    "session": [
                        "set session sql_mode='ANSI'"
                    ],
                    "writeMode": "insert"
                }
            }
        }]
    }
}

# 3. 启动导数进程
python datax.py custom/hdfs2mysql.json

# 4. 日志结果
任务启动时刻                    : 2018-11-23 14:44:54
任务结束时刻                    : 2018-11-23 14:45:06
任务总计耗时                    :                 12s
任务平均流量                    :                9B/s
记录写入速度                    :              0rec/s
读出记录总数                    :                   7
读写失败总数                    :                   0

mongo -> hdfs

示例一:全量导

{
    "job": {
        "setting": {
            "speed": {
                "channel": 1
            }
        },
        "content": [{
            "reader": {
                "name": "mongodbreader",
                "parameter": {
                    "address": ["192.168.1.96:27017"],
                    "userName": "xxxx",
                    "userPassword": "xxxx",
                    "dbName": "test",
                    "collectionName": "student",
                    "column": [
                        {"name": "_id", "type": "string"},
                        {"name": "name", "type": "string"},
                        {"name": "age", "type": "double"},
                        {"name": "clazz", "type": "double"},
                        {"name": "hobbies", "type": "Array"},
                        {"name": "ss", "type": "Array"}
                    ],
                    "splitter": ","
                }
            },
            "writer": {
                "name": "hdfswriter",
                "parameter":{
                        "defaultFS": "hdfs://flashHadoop",
                    "hadoopConfig": {
                        "dfs.nameservices": "flashHadoop",
                        "dfs.ha.namenodes.flashHadoop": "nn1,nn2",
                        "dfs.namenode.rpc-address.flashHadoop.nn1": "VECS01118:8020",
                        "dfs.namenode.rpc-address.flashHadoop.nn2": "VECS01119:8020",
                        "dfs.client.failover.proxy.provider.flashHadoop": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
                    },
                    "fileType":"text",
                    "path":"/tmp/test01",
                    "fileName":"mongo_student",
                    "column":[
                        {"name": "_id", "type": "string"},
                        {"name": "name", "type": "string"},
                        {"name": "age", "type": "double"},
                        {"name": "clazz", "type": "double"},
                        {"name": "hobbies", "type": "string"},
                        {"name": "ss", "type": "string"}
                    ],
                    "writeMode":"append",
                    "fieldDelimiter":"\u0001"
                }
            }
        }]
    }
}

示例二:mongo增量导

{
    "job": {
        "setting": {
            "speed": {
                "channel": 2
            }
        },
        "content": [{
            "reader": {
                "name": "mongodbreader",
                "parameter": {
                    "address": ["地址"],
                    "userName": "用户名",
                    "userPassword": "密码",
                    "dbName": "库名",
                    "collectionName": "集合名",
                        "query":"{created:{ $gte: ISODate('1990-01-01T16:00:00.000Z'), $lte: ISODate('2010-01-01T16:00:00.000Z') }}",
                    "column": [
                        { "name": "_id", "type": "string" }, 
                        { "name": "owner", "type": "string" }, 
                        { "name": "contributor", "type": "string" }, 
                        { "name": "type", "type": "string" }, 
                        { "name": "amount", "type": "int" }, 
                        { "name": "divided", "type": "double" }, 
                        { "name": "orderId", "type": "string" }, 
                        { "name": "orderPrice", "type": "int" }, 
                        { "name": "created", "type": "date" }, 
                        { "name": "updated", "type": "date" },
                        { "name": "hobbies", "type": "Array"}
                    ]
                }
            },
            "writer": {
                "name": "hdfswriter",
                "parameter": {
                       "defaultFS": "hdfs://flashHadoop",
                    "hadoopConfig": {
                        "dfs.nameservices": "flashHadoop",
                        "dfs.ha.namenodes.flashHadoop": "nn1,nn2",
                        "dfs.namenode.rpc-address.flashHadoop.nn1": "VECS01118:8020",
                        "dfs.namenode.rpc-address.flashHadoop.nn2": "VECS01119:8020",
                        "dfs.client.failover.proxy.provider.flashHadoop": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
                    },
                    "fileType": "text",
                    "path": "/user/hive/warehouse/aries.db/ods_goldsystem_mdaccountitems/accounting_day=$dt",
                    "fileName": "filenamexxx",
                    "column": [
                        { "name": "_id", "type": "string" }, 
                        { "name": "owner", "type": "string" }, 
                        { "name": "contributor", "type": "string" }, 
                        { "name": "type", "type": "string" }, 
                        { "name": "amount", "type": "int" }, 
                        { "name": "divided", "type": "double" }, 
                        { "name": "orderId", "type": "string" }, 
                        { "name": "orderPrice", "type": "int" }, 
                        { "name": "created", "type": "date" }, 
                        { "name": "updated", "type": "date" },
                        { "name": "hobbies", "type": "string"}
                    ],
                    "writeMode": "append",
                    "fieldDelimiter": "\t"
                }
            }
        }]
    }

}

hdfs -> mongo

{
    "job": {
        "setting": {
            "speed": {
                "channel": 2
            }
        },
        "content": [{
            "reader": {
                "name": "hdfsreader",
                "parameter": {
                    "column": [
                        { "index": 0, "type": "String" },
                        { "index": 1, "type": "String" },
                        { "index": 2, "type": "Long" },
                        { "index": 3, "type": "Date" }
                    ],
                    "defaultFS": "hdfs://flashHadoop",
                    "hadoopConfig": {
                        "dfs.nameservices": "flashHadoop",
                        "dfs.ha.namenodes.flashHadoop": "nn1,nn2",
                        "dfs.namenode.rpc-address.flashHadoop.nn1": "VECS01118:8020",
                        "dfs.namenode.rpc-address.flashHadoop.nn2": "VECS01119:8020",
                        "dfs.client.failover.proxy.provider.flashHadoop": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
                    },
                    "encoding": "UTF-8",
                    "fieldDelimiter": "\t",
                    "fileType": "text",
                    "path": "/tmp/test/mongo_student*"
                }
            },
            "writer": {
                "name": "mongodbwriter",
                "parameter": {
                    "address": [
                        "192.168.1.96:27017"
                    ],
                    "userName": "test",
                    "userPassword": "xxx",
                    "dbName": "test",
                    "collectionName": "student_from_hdfs",
                    "column": [
                        { "name": "_id", "type": "string" },
                        { "name": "name", "type": "string" },
                        { "name": "age", "type": "int" },
                        { "name": "birthday", "type": "date" }
                    ],
                    "splitter": ",",
                    "upsertInfo": {
                        "isUpsert": "true",
                        "upsertKey": "_id"
                    }
                }
            }
        }]
    }
}

dataX调优

原文:https://www.cnblogs.com/hit-zb/p/10940849.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!