kafka是一个高吞吐量的分布式消息系统,整体设计为典型的发布与订阅模式。kafka内部采用zookeeper来解决配置管理、通知/协调、集群管理、master选举等问题,所以建议先对zookeeper有个基本的了解,可以看下我的zookeeper笔记(一)。
?
这次对于kafka只介绍一下命令行下的安装部署等操作,其它的,比如kafka在zookeeper中的存储结构、配置详解、java客户端代码等,将在后续的讲解中进行介绍。
?
1、首先我们下载kafka,这里选择最新版本0.9.0.1,下载并解压,同样采取与zookeeper笔记中类似的目录结构,使用单机模拟集群:
/Users/zk_chs/something/kafka/kafka-0.9.0.1-001 /Users/zk_chs/something/kafka/kafka-0.9.0.1-002 /Users/zk_chs/something/kafka/kafka-0.9.0.1-003
?
?
2、进入每个Kafka的根目录,分别对其config/server.properties文件进行配置:
----config/server.properties broker.id=1 listeners=PLAINTEXT://:9091 port=9091 log.dirs=/Users/zk_chs/something/kafka/tmp/kafka01 zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183
broker.id:当前kafka服务的id
listeners、post:监听的客户端连接端口
log.dirs:kafka数据、索引存储位置
zookeeper.connect:kafka依赖的zookeeper服务地址
?
再对另外两个kafka服务进行同样的配置:
----config/server.properties broker.id=2 listeners=PLAINTEXT://:9092 port=9092 log.dirs=/Users/zk_chs/something/kafka/tmp/kafka02 zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183
?
----config/server.properties broker.id=3 listeners=PLAINTEXT://:9093 port=9093 log.dirs=/Users/zk_chs/something/kafka/tmp/kafka03 zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183
?
因为我们模拟集群,为了方便管理,对于kafka的log.dir最好也统一一下路径:
/Users/zk_chs/something/kafka/tmp/kafka01 /Users/zk_chs/something/kafka/tmp/kafka02 /Users/zk_chs/something/kafka/tmp/kafka03
?
3、配置完成后便能进行启动了,不过在此之前,先将zookeeper服务进行启动。如果你的zookeeper服务已经启动,那么便能启动kafka了,首先还是进入kafka根目录,然后进行如下启动操作:
bin/kafka-server-start.sh config/server.properties &
随后,切换至不同的kafka目录,再次执行:
cd ../kafka-0.9.0.1-002 bin/kafka-server-start.sh config/server.properties & cd ../kafka-0.9.0.1-003 bin/kafka-server-start.sh config/server.properties
?
所有的kafka服务启动完成后,执行jps,可以看到:
1248 QuorumPeerMain
2055 Kafka
1224 QuorumPeerMain
2058 Jps
1212 QuorumPeerMain
2044 Kafka
2047 Kafka
以上代表我们启动了3个kafka服务,3个zookeeper服务。
?
4、接着创建一个topic,依然是在kafka根目录下执行:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 3 --topic my_test
--create:创建topic
--zookeeper:连接的zookeeper节点地址
--replication-factor:数据副本数量
--partitions:对创建的topic进行分片
--topic:要创建的topic名称
?
5、向topic发送消息、从topic消费消息,在kafka根目录下执行:
bin/kafka-console-producer.sh --broker-list localhost:9091 --topic my-replicated-topic
--broker-list:此处不是zookeeper的地址,而是kafka客户端的地址
--topic:向哪个topic发送消息
?
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic
--zookeeper:这里为zookeeper地址,不需要kafka客户端地址
--from-beginning:kafka消息存储在文件内,能够重复消费,这里代表偏移量(offsets)
--topic:从哪个topic消费消息
?
如果你同时开启producer、consumer两个bash窗口,当从producer向指定的topic发送消息时,便能看到consumer对其进行了消费,使用ctrl c即可退出发送消息或消费消息进程。
?
?
总结:
我们这次讲解了kafka的安装、基本配置、启动部署,以及一些kafka基本的操作,比如create topic,producer、consumer操作。
?
随后的笔记会对kafka的java客户端代码、kafka服务参数、zookeeper中节点数据进行讲解,欢迎继续学习。
?
?
原文:http://zk-chs.iteye.com/blog/2287907