首先安装一个kafka集群,但是zookeeper使用单节点,可以让kafka快速跑起来,后续再研究kafka和zokkeeper的集群
下面是三个节点都要做
[root@node1 src]# wget http://mirror.rise.ph/apache/kafka/2.4.0/kafka_2.11-2.4.0.tgz
[root@node1 src]# tar -xf kafka_2.11-2.4.0.tgz
[root@node1 src]# mv kafka_2.11-2.4.0 /usr/local/kafka
[root@node1 src]# cd /usr/local/kafka
就使用node1作为zooker节点
[root@node1 kafka]# nohup ./bin/zookeeper-server-start.sh ./config/zookeeper.properties &
三个节点的kafka配置文件
[root@node1 kafka]# grep -Ev "^$|[;#]" config/server.properties
broker.id=0 listeners=PLAINTEXT://:9092 advertised.listeners=PLAINTEXT://192.168.132.131:9092 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/tmp/kafka-logs num.partitions=1 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 zookeeper.connect=192.168.132.131:2181 zookeeper.connection.timeout.ms=6000 group.initial.rebalance.delay.ms=0
[root@node2 kafka]# grep -Ev "^$|[;#]" config/server.properties
broker.id=1 listeners=PLAINTEXT://:9092 advertised.listeners=PLAINTEXT://192.168.132.132:9092 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/tmp/kafka-logs num.partitions=1 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 zookeeper.connect=192.168.132.131:2181 zookeeper.connection.timeout.ms=6000 group.initial.rebalance.delay.ms=0
[root@node3 kafka]# grep -Ev "^$|[;#]" config/server.properties
broker.id=2 listeners=PLAINTEXT://:9092 advertised.listeners=PLAINTEXT://192.168.132.133:9092 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/tmp/kafka-logs num.partitions=1 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 zookeeper.connect=192.168.132.131:2181 zookeeper.connection.timeout.ms=6000 group.initial.rebalance.delay.ms=0
都启动kafka
[root@node1 kafka]# nohup ./bin/kafka-server-start.sh ./config/server.properties &
[root@node2 kafka]# nohup ./bin/kafka-server-start.sh ./config/server.properties &
[root@node3 kafka]# nohup ./bin/kafka-server-start.sh ./config/server.properties &
查看端口
查看其他端口
使用一下
[root@node1 kafka]# ./bin/zookeeper-shell.sh 192.168.132.131:2181
连接到zookeeper
ls /brokers [ids, seqid, topics] ls /brokers/ids [0, 1, 2] create /test "hello" Created /test get /test hello
创建一个topic
[root@node1 kafka]# ./bin/kafka-topics.sh --create --zookeeper 192.168.132.131:2181 --replication factor 3 --partitions 3 --topic kafkatest
Created topic kafkatest.
[root@node1 kafka]# ./bin/kafka-topics.sh --list --zookeeper 192.168.132.131:2181
kafkatest
[root@node1 kafka]# ./bin/kafka-topics.sh --describe --zookeeper 192.168.132.131:2181 --topic kafkatest
Topic: kafkatest PartitionCount: 3 ReplicationFactor: 3 Configs: Topic: kafkatest Partition: 0 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2 Topic: kafkatest Partition: 1 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0 Topic: kafkatest Partition: 2 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1
测试发送消息
[root@node1 kafka]# ./bin/kafka-topics.sh --create --zookeeper 192.168.132.131:2181 --replication-factor 3 --partitions 3 --topic messagetest
Created topic messagetest.
[root@node1 kafka]# ./bin/kafka-console-producer.sh --broker-list 192.168.132.131:9092,192.168.132.132:9092,192.168.132.133:9092 --topic messagetest
这里可以输入消息
>helloworld
>hello
node2和node3查看
[root@node2 kafka]# ./bin/kafka-console-consumer.sh --bootstrap-server 192.168.132.131:9092 --topic messagetest --from-beginning
helloworld
hello
[root@node3 kafka]# ./bin/kafka-console-consumer.sh --bootstrap-server 192.168.132.131:9092 --topic messagetest --from-beginning
helloworld
hello
配置filebeat文件
filebeat.inputs: ##################################################### ## Nginx log ##################################################### - type: log enabled: true paths: - /usr/local/nginx/logs/access.log json.key_under_root: true json.overwrite_keys: true tags: ["access"] - type: log enabled: true paths: - /usr/local/nginx/logs/error.log tags: ["error"] output.kafka: hosts: ["192.168.132.131:9092","192.168.132.132:9092","192.168.132.133:9092"] topic: "elklog"
[root@node4 ~]# systemctl restart filebeat
修改logstah
input { kafka { bootstrap_server => "192.168.132.131:9092" topics=>["elklog"] group_id=>"logstash" codec => "json" } } filter{ mutate { convert => ["upstream_time","float"] convert => ["request_time","float"] } } output{ stdout {} if "access" in [tags]{ elasticsearch { hosts => "192.168.132.131:9200" manage_template => false index => "nginx_access-%{+yyyy.MM.dd}" } } if "error" in [tags]{ elasticsearch { hosts => "192.168.132.131:9200" manage_template => false index => "nginx_error-%{+yyyy.MM.dd}" } } } ~
启动logstah
[root@node4 ~]# /usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/logsatsh.conf
[INFO ] 2020-01-21 10:15:57.734 [[main]-pipeline-manager] javapipeline - Pipeline started {"pipeline.id"=>"main"} [INFO ] 2020-01-21 10:15:57.931 [Agent thread] agent - Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]} [INFO ] 2020-01-21 10:15:58.630 [Api Webserver] agent - Successfully started Logstash API endpoint {:port=>9600}
访问生成数据
查看topic
[root@node1 kafka]# ./bin/kafka-topics.sh --list --zookeeper 192.168.132.131:2181
__consumer_offsets
elklog #已经有定义好的topic
kafkatest
messagetest
查看索引
查看kinban
{ "_index": "nginx_access-2020.01.21", "_type": "_doc", "_id": "F1O0yG8BOF7DoSFd9EoQ", "_version": 1, "_score": null, "_source": { "ecs": { "version": "1.1.0" }, "@timestamp": "2020-01-21T15:24:58.441Z", "agent": { "type": "filebeat", "version": "7.4.2", "hostname": "node4", "id": "bb3818f9-66e2-4eb2-8f0c-3f35b543e025", "ephemeral_id": "af9e9e07-03a6-4869-a5ff-9476ccf00dae" }, "json": { "Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.117 Safari/537.36", "upstreamtime": "-", "upstreamhost": "-", "responsetime": 0, "xff": "-", "domain": "192.168.132.134", "@timestamp": "2020-01-21T10:24:57-05:00", "http_host": "192.168.132.134", "size": 555, "status": "404", "referer": "-", "url": "/tcp", "host": "192.168.132.134", "clientip": "192.168.132.1" }, "tags": [ "access" ], "input": { "type": "log" }, "@version": "1", "log": { "offset": 24522559, "file": { "path": "/usr/local/nginx/logs/access.log" } }, "host": { "name": "node4" } }, "fields": { "json.@timestamp": [ "2020-01-21T15:24:57.000Z" ], "@timestamp": [ "2020-01-21T15:24:58.441Z" ] }, "sort": [ 1579620298441 ] }
实验完成
原文:https://www.cnblogs.com/zyxnhr/p/12227362.html