? 关于Kafka 简介可以查看 https://www.cnblogs.com/lentoo/p/7785004.html ,里面介绍比较详细。
从官网下载,本次演示使用的是最新的 3.7.0的版本
zookeeper 配置
(1)下载解压后,在conf文件夹下,将默认配置文件 zoo_sample.cfg,复制后重命名一份为 200.cfg 。
(2)配置 zoo.cfg ,注意端口不要太大,否则后面启动会报错
tickTime=2000
initLimit=10
syncLimit=5
# snapshot持久化
dataDir=../data
dataLogDir=../logs
clientPort=2181
#集群配置 server.A = B:C:D
#A 是一个数字,表示第几号服务器
#B 服务器IP地址
#C 端口号,当前服务器与集群的 Leader 服务器交换信息的接口
#D Leader 挂掉时用来进行选举 leader所用的端口
server.1=127.0.0.1:2387:1387
server.2=127.0.0.1:2388:1388
server.3=127.0.0.1:2389:1389
复制 zookeeper 文件夹,重命名为 -2 -3
zookeeper-3.7.0 修改zoo.cfg 配置文件
clientPort=2181
zookeeper-3.7.0-2 修改zoo.cfg 配置文件
clientPort=2182
zookeeper-3.7.0-3 修改zoo.cfg 配置文件
clientPort=2183
创建 ServerID
在配置的dataDir目录下面新建一个 myid 文件,文件内容就是对应的id号,
比如:
zookeeper-3.7.0 myid 文件的内容 为 1
zookeeper-3.7.0-2 myid 文件的内容 为 2
zookeeper-3.7.0-3 myid 文件的内容 为 3
启动 zookeeper
在对应的bin目录下启动 cmd命令,输入 zkServer (需要将zookeeper配置到系统环境变量)
(1) 从官网下载,注意下载二进制文件,本次演示使用版本 kafka_2.12-2.8.0
(2) 配置文件
? 解压之后,在config文件夹下,打开 server.properties配置文件进行配置
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0
port=9092
#配置副本数量
default.replication.factor=2
# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3
# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8
# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400
# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400
# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600
############################# Log Basics #############################
# A comma separated list of directories under which to store log files
log.dirs=../kafka-logs
# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1
# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1
############################# Internal Topic Settings #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183
# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0
(3) 集群配置
复制两份解压后的文件,命名如下
kafka_2.12-2.8.0-2 kafka_2.12-2.8.0-3
修改部分配置信息
在对应的server.properties中修改
#**唯一标识
broker.id=0
broker.id=1
broker.id=2
#**监听端口
port=9092
port=9093
port=9094
(4) 启动 Kafka
进入到bin/windows目录下 启动kafka并指定配置文件
kafka-server-start.bat ../../config/server.properties
(1)创建一个 topic
kafka-topics.bat --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 1 --topic test
(2)查看是否创建成功
kafka-topics.bat --list --zookeeper localhost:2181
(3)发送消息
kafka-console-producer.bat --broker-list localhost:9092 --topic test
helloworld
(4)接收消息
? 分别在kafka_2.12-2.8.0\bin\windows 目录下执行
.\kafka-console-consumer.bat --bootstrap-server
localhost:9092 --topic test
.\kafka-console-consumer.bat --bootstrap-server localhost:9093 --topic test
.\kafka-console-consumer.bat --bootstrap-server localhost:9094 --topic test
原文:https://www.cnblogs.com/louiszh/p/14814529.html