上一节我们整合了flume+kafka,是将kafka的生产者作为flume的sink,本节将反其道行之,将kafka的消费者作为flume的source。
#定义各个模块
a1.sources = kafka
a1.sinks = log
a1.channels = c1
#配置kafka source
#source的类型为kafkaSource
a1.sources.kafka.type = org.apache.flume.source.kafka.KafkaSource
#消费者连接的zk集群地址
a1.sources.r1.kafka.bootstrap.servers = 192.168.100.200:9092
a1.sources.kafka.zookeeperConnect = 192.168.100.200:2181
#消费者消费的topic,只能是一个。
a1.sources.kafka.topic = test
#kafka的组id
#a1.sources.kafka.groupId = flume
#kafka的消费者连接超时时间单位毫秒
a1.sources.kafka.kafka.consumer.timeout.ms = 3000
# 配置logger sink
a1.sinks.log.type = logger
# 配置 memory channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 绑定三种组件的关系
a1.sources.kafka.channels = c1
a1.sinks.log.channel = c1
cd /root/kafka_2.12-0.11.0.2/bin
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
cd /root/apache-flume-1.8.0-bin/bin
./flume-ng agent -c ../conf -f ../conf/flume-conf.properties -n a1 -Dflume.root.logger=INFO,console
原文:https://www.cnblogs.com/alichengxuyuan/p/12576873.html