用kafka构建数据管道
把kafka看着是一个数据的端点,怎么把kafka数据移到mysql,elasticSearchs 这里面介绍kafka connect API怎么样帮忙我们把数据移到我们想要的位置。
构建数据管道时需要考虑的问题
1.及时性,kafka作为一个基于流的数据平台,提供了可靠的伸缩性,多集群高可用的方案。这一点可以保证及时性。如果因为网络延迟,那么数据也不会丢失。
2.可靠性,我们要避免单点故障,和根据自己的业务情况来设置生产者和消费者的一些必要的参数。如生产者的提交方法,不完全选举,等等
3.高吞吐量和动态吞吐量。kafka使用多线程来拆分任务,最大限度利用了cpu,还支持压缩数据传输数据。
数据格式
本身支持apache Avro 序列化数据,可以是json格式 自定义序列化方式。
或者转成Parquet 写入HDFS,或转成CSV写入S3
转换,
kafka在这方面只支持类型转换,这方面还是其他etl 解决方案支持比较好
安全性
kafka支持加密传输和认证授权,所以不用担心安全问题、
故障处理能力
kafka会把数据持久化在磁盘上,一般保存7天,所以不要担心数据丢失问题。
如何在connect API和客户端API之间做出选择
如果是消息生产的话那么选择客户端,来源数据一般来源用户,connect只能连接一个数据端点,但不可能是用户数据收集的入口。
如果用于传输数据,如mysql 到elasticsearch 这样选用connect API 是个不错的选择
运行connect
connect 不需要安装,本身和kafka一起安装了,所以我们只需要配置我们需要的参数即可,启动和kafka类似
bin/connect-distributed.sh config/connect-distibuted.properties
connect进程有几个重要的配置参数
bootstrap.servers 可以集群模式或者单点
group.id 具有相同group.id的worker属于同一个集群。
key.converter 和value.converter connect可以处理存储在kafka里的不用格式的数据。
有些key.converter.schema.enable 设置成true或者false来指定JSON消息是否包含schema
对应的key.converter.schema.registry.url 指定schema registry的位置 value同样。
启动之后如果需要使用mysql 那么下载jdbc和mysql 驱动包 即可使用。elasticsearch也是同样。
都是操作api。
深入理解connect
连接器插件实现了connector API 负责3件事情
1.决定需要运行多少个任务
2,按照任务来拆分数据复制
3.从worker进程获取任务配置并将其传递下去。
任务
负责将数据移入或移除kafka。
worker进程
负责处理http请求,还负责保存连接器的配置,启动连接器和连接器任务,并把配置消息传递给任务。如果一个worker崩溃了,集群其他worker进程会感知到,并将原本属于这个worker的任务分配给其他进程。
connect之外的选择
Hadoop的flume elasticsearch fluentd
kafka学习(六)
原文:https://www.cnblogs.com/Seeasunnyday/p/9241276.html