首页 > 数据库技术 > 详细

【实战Apache-Flume采集DB数据到kafka】

时间:2016-07-07 02:16:48      阅读:1042      评论:0      收藏:0      [点我收藏+]

Flume是一个优秀的数据采集组件,有些重量级,其本质也是根据SQL语句的查询结果组装成opencsv格式的数据,默认的分隔符号是逗号(,),可以重写opencsv某些类进行修改

?

1、下载

[root@hadoop0 bigdata]# wget ?http://apache.fayea.com/flume/1.6.0/apache-flume-1.6.0-bin.tar.gz

?

2、解压缩

[root@hadoop0 bigdata]# tar -zxvf apache-flume-1.6.0-bin.tar.gz?

[root@hadoop0 bigdata]# ls

apache-flume-1.6.0-bin ? ? ? ? apache-hive-2.0.1-bin.tar.gz ?hadoop272 ? ?hbase-1.1.5-bin.tar.gz ?kafka ? ? ? ?sqoop-1.4.6.bin__hadoop-2.0.4-alpha.tar.gz ?taokeeper-monitor.tar.gz ?zookeeper

apache-flume-1.6.0-bin.tar.gz ?apache-tomcat-7.0.69.zip ? ? ?hbase-1.1.5 ?hive2.0 ? ? ? ? ? ? ? ? sqoop-1.4.6 ?stomr096 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?tomcat7 ? ? ? ? ? ? ? ? ? zookeeper.out

?

?

3、编译flume-ng-sql.jar

flume-ng-sql-source-develop_1.2.1 作者 :@author Luis Lázaro <lalazaro@keedio.com>

?

?<groupId>org.keedio.flume.flume-ng-sources</groupId>

?<artifactId>flume-ng-sql-source</artifactId>

?<version>1.2.1-SNAPSHOT</version>


bubuko.com,布布扣
?

?

4、配置数据源(两个作者的FlumeSink)

[root@hadoop0 apache-flume-1.6.0-bin]# vi conf/agent.conf?

agent.sources = sql-source

agent.channels=c1

agent.sinks=r

?

agent.sources.sql-source.type = org.keedio.flume.source.SQLSource

# URL to connect to database (currently only mysql is supported)

agent.sources.sql-source.connection.url = jdbc:mysql://192.168.1.100:3306/test

# Database connection properties

agent.sources.sql-source.user = root

agent.sources.sql-source.password = 123

agent.sources.sql-source.table = sdfs

agent.sources.sql-source.database = database

# Columns to import to kafka (default * import entire row)

agent.sources.sql-source.columns.to.select = *

# Increment column properties

agent.sources.sql-source.incremental.column.name = id

# Increment value is from you want to start taking data from tables (0 will import entire table)

agent.sources.sql-source.incremental.value = 0

# Query delay, each configured milisecond the query will be sent

agent.sources.sql-source.run.query.delay=10000

# Status file is used to save last readed row

agent.sources.sql-source.status.file.path = /tmp

agent.sources.sql-source.status.file.name = sql-source.status

#Custom query

agent.sources.sql-source.custom.query = SELECT * FROM users WHERE 1=1 AND @

agent.sources.sql-source.batch.size = 1000

agent.sources.sql-source.max.rows = 10000

?

?

agent.channels.c1.type = memory

agent.channels.c1.capacity = 100

agent.channels.c1.transactionCapacity = 100

agent.channels.c1.byteCapacityBufferPercentage = 20

agent.channels.c1.byteCapacity = 800

?

#flume-ng-kafka-sink-1.6.0.jar

#agent.sinks.r.type = org.apache.flume.sink.kafka.KafkaSink

#agent.sinks.r.brokerList=localhost:9092

#agent.sinks.r.batchSize=1

#agent.sinks.r.partitioner.class=org.apache.flume.plugins.SinglePartition

#agent.sinks.r.serializer.class=kafka.serializer.StringEncoder

#agent.sinks.r.requiredAcks=0

#agent.sinks.r.topic=test

?

?

#gitHub ?beyondj2ee ?flumeng-kafka-plugin.jar

agent.sinks.r.type = org.apache.flume.plugins.KafkaSink

agent.sinks.r.metadata.broker.list=localhost:9092

agent.sinks.r.partition.key=0

agent.sinks.r.partitioner.class=org.apache.flume.plugins.SinglePartition

agent.sinks.r.serializer.class=kafka.serializer.StringEncoder

agent.sinks.r.request.required.acks=0

agent.sinks.r.max.message.size=1000000

agent.sinks.r.producer.type=sync

agent.sinks.r.custom.encoding=UTF-8

agent.sinks.r.custom.topic.name=test

?

?

?

5、准备好数据库


bubuko.com,布布扣
?

6、启动zookeeper

[root@hadoop0 ~]# cd /opt/bigdata/

[root@hadoop0 bigdata]# ls

apache-flume-1.6.0-bin ? ? ? ? apache-hive-2.0.1-bin.tar.gz ?hadoop272 ? ?hbase-1.1.5-bin.tar.gz ?kafka ? ? ? ?sqoop-1.4.6.bin__hadoop-2.0.4-alpha.tar.gz ?taokeeper-monitor.tar.gz ?zookeeper

apache-flume-1.6.0-bin.tar.gz ?apache-tomcat-7.0.69.zip ? ? ?hbase-1.1.5 ?hive2.0 ? ? ? ? ? ? ? ? sqoop-1.4.6 ?stomr096 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?tomcat7 ? ? ? ? ? ? ? ? ? zookeeper.out

[root@hadoop0 bigdata]# cd zookeeper/bin/

[root@hadoop0 bin]# ./zkServer.sh start?

JMX enabled by default

Using config: /opt/bigdata/zookeeper/bin/../conf/zoo.cfg

Starting zookeeper ... STARTED

?

?

7、启动kafka

[root@hadoop0 bin]# cd ../../kafka/bin/

[root@hadoop0 bin]# ./kafka-server-start.sh ?../config/server.properties ?&

[1] 32613

[root@hadoop0 bin]# [1999-05-25 12:34:44,651] INFO KafkaConfig values:?

? ? ? ? request.timeout.ms = 30000

? ? ? ? log.roll.hours = 168

? ? ? ? inter.broker.protocol.version = 0.9.0.X

? ? ? ? log.preallocate = false

? ? ? ? security.inter.broker.protocol = PLAINTEXT

? ? ? ? controller.socket.timeout.ms = 30000

? ? ? ? broker.id.generation.enable = true

? ? ? ? ssl.keymanager.algorithm = SunX509

? ? ? ? ssl.key.password = null

? ? ? ? log.cleaner.enable = true

? ? ? ? ssl.provider = null

[root@hadoop0 bin]# ./kafka-topics.sh ?--zookeeper localhost --list

test

?

?

8、启动Flume

[root@hadoop0 apache-flume-1.6.0-bin]# rm -rf /tmp/sql-source.status?

[root@hadoop0 apache-flume-1.6.0-bin]# ./bin/flume-ng agent -n agent ?-c conf -f conf/agent.conf -Dflume.root.logger=INFO,console

Info: Including Hadoop libraries found via (/opt/bigdata/hadoop272/bin/hadoop) for HDFS access

Info: Excluding /opt/bigdata/hadoop272/share/hadoop/common/lib/slf4j-api-1.7.10.jar from classpath

Info: Excluding /opt/bigdata/hadoop272/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar from classpath

Info: Including HBASE libraries found via (/opt/bigdata/hbase-1.1.5/bin/hbase) for HBASE access

Info: Excluding /opt/bigdata/hbase-1.1.5/lib/slf4j-api-1.7.7.jar from classpath

Info: Excluding /opt/bigdata/hbase-1.1.5/lib/slf4j-log4j12-1.7.5.jar from classpath

Info: Excluding /opt/bigdata/hadoop272/share/hadoop/common/lib/slf4j-api-1.7.10.jar from classpath

Info: Excluding /opt/bigdata/hadoop272/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar from classpath

Info: Including Hive libraries found via (/opt/bigdata/hive2.0) for Hive access


bubuko.com,布布扣
?

?

9、准备消费者消费数据

[root@hadoop0 bin]# ./kafka-console-consumer.sh ?--zookeeper localhost --topic test --from-beginning

test-message

gaojs

杞欢璁捐

1

2

gaojs

nihao

tesdhdhsdhgf

vdxgdgsdg

dfhfdhd

gaojs

gaojingsong

2015-09-02342

535435353

"1","zhangsan","12","17-May-2016 20:06:38"

"3","444","23","17-May-2016 20:06:38"

"4","wan-flume","23","17-May-2016 20:06:38"

"5","gaojs-flume","23","17-May-2016 20:06:38"

"1","zhangsan","12","17-May-2016 20:06:38"

"3","444","23","17-May-2016 20:06:38"

"4","wan-flume","23","17-May-2016 20:06:38"

"5","gaojs-flume","23","17-May-2016 20:06:38"

?

10、结果验证


bubuko.com,布布扣
?

?

?

启动Flume消费过程中日志

?


bubuko.com,布布扣
?

?

【实战Apache-Flume采集DB数据到kafka】

原文:http://gaojingsong.iteye.com/blog/2308728

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!