1、logstash是什么
Logstash 是有管道输送能力的开源数据收集引擎。它可以动态地从分散的数据源收集数据,并且标准化数据输送到你选择的目的地。它是一款日志而不仅限于日志的搜集处理框架,将分散多样的数据搜集自定义处理并输出到指定位置。
2、logstash 原理
logstash 原理非常简单,就是将数据收集起来,经过filter操作,然后通过output转发出去。
3、实例
problems:最近接到一个需求,要求从一个集群的kafka中读取原始日志到另一个集群的kafka队列中,其实就是做kafka集群的数据迁移。
solve:传统的方式其实也很好做,就是写一个程序,将这个集群的每个kafka topic中的数据读到,写入到另一个集群的kafka队列中。但是这种做法太古老,事情不是很急,想想有啥开源的工具可用。(从hive、pig封装MapReduce,thriftserver封装spark JOB受到启示),果然还真有工具这就是logstash。
看了下官网的入门文档,非常简单。支持文件、ES、kafka、redis、hdfs很多输入和输出。
步骤:
1)、首先下载从官网下载工具,https://www.elastic.co/downloads/logstash,我下载的是zip包,版本 logstash-6.2.2.zip。
2)、用unzip命令解压缩,进入config目录。解压之后包含目录如下:
3)、创建配置文件kafka_input.conf,名称自己指定。
input{ kafka{ bootstrap_servers => ["10.67.1.150:9092,10.67.1.151:9092,10.67.1.152:9092"] client_id => "tam_normal_1" group_id => "tam_normal_1" auto_offset_reset => "latest" consumer_threads => 5 decorate_events => true topics => ["bsatam_normal"] session_timeout_ms => "60000" codec => plain { format => "%{message}" } } } output{ kafka{ topic_id => ["bsatam_normal"] bootstrap_servers => ["10.67.1.205:9092,10.67.1.206:9092,10.67.1.207:9092"] codec => plain { format => "%{message}" } } }
kafka的配置都很熟,重点说下codec 这里的配置,这里是配置输出的格式,我是接送格式,所以直接获取了message。
4)、在config目录直接输入命令./../bin/logstash -f kafka_input.conf --path.data=/home/bsauser/logstash-6.2.2/data/kafka_input ,启动转发程序。
5)、在output配置的kafka集群中就可以读到数据了。
启动成功截图:
需要注意的问题:
1、 需要修改你发送集群的host主机,否则kafka发数据的时候会报超时,当初这个问题也困扰了我很久。
比如我在10.67.1.205集群上运行,我需要在205集群节点的主机10.67.1.205上配置kafka_input中配置的主机信息,比如在205主机上加上:
10.67.1.150 bsa150
10.67.1.151 bsa151
10.67.1.152 bsa152
2、这个工具会将数据格式的每个字段都转化成type类型,input和output中间可以加上filter配置,用于对原始日志的过滤操作。
3、path.data 参数一定要指定,之前没指定也运行成功过,但是后续报错了。同时运行多个实例的时候,最好是指定下,里面存的是临时文件信息。
原文:https://www.cnblogs.com/lihao7/p/9409289.html