[root@slavenode1 bin]# ./kafka-topics.sh --create --zookeeper 192.168.1.160:2181 --replication-factor 3 --partitions 10 --topic test
Created topic test.
[root@slavenode1 bin]# ./kafka-topics.sh --zookeeper 192.168.1.160:2181 --list
test
[root@slavenode1 bin]# ./kafka-topics.sh --zookeeper 192.168.1.160:2181 --describe --topic test
Topic:test PartitionCount:10 ReplicationFactor:3 Configs:
Topic: test Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1
Topic: test Partition: 1 Leader: 2 Replicas: 2,3,1 Isr: 2,1
Topic: test Partition: 2 Leader: 3 Replicas: 3,1,2 Isr: 3,1
Topic: test Partition: 3 Leader: 1 Replicas: 1,3,2 Isr: 1
Topic: test Partition: 4 Leader: 2 Replicas: 2,1,3 Isr: 2,1
Topic: test Partition: 5 Leader: 3 Replicas: 3,2,1 Isr: 3,1
Topic: test Partition: 6 Leader: 1 Replicas: 1,2,3 Isr: 1
Topic: test Partition: 7 Leader: 2 Replicas: 2,3,1 Isr: 2,1
Topic: test Partition: 8 Leader: 3 Replicas: 3,1,2 Isr: 3,1
Topic: test Partition: 9 Leader: 1 Replicas: 1,3,2 Isr: 1
[root@slavenode2 bin]# ./kafka-console-producer.sh --broker-list slavenode2:9092 --topic tzh1
>ni hao
[root@slavenode1 bin]# ./kafka-console-consumer.sh --bootstrap-server slavenode1:9092 --topic tzh1 --from-beginning
ni hao
[root@slavenode1 bin]#./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --group group-test
[root@masternode1 bin]# ./kafka-run-class.sh kafka.tools.GetOffsetShell --topic test --time -1 --broker-list 192.168.1.160:9092 --partitions 0
test:0:0
[root@masternode1 bin]# kafka-topics.sh --zookeeper masternode1:2181 --alter --topic test --partitions 11
WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Adding partitions succeeded!
# 查看当前一共有多少个分区,不出意外是11(原来的分区+新增的)。
[root@masternode1 bin]# ./kafka-topics.sh --zookeeper 192.168.1.160:2181 --describe --topic test
Topic:test PartitionCount:11 ReplicationFactor:3 Configs:
Topic: test Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: test Partition: 1 Leader: 2 Replicas: 2,3,1 Isr: 1,2,3
Topic: test Partition: 2 Leader: 3 Replicas: 3,1,2 Isr: 1,2,3
Topic: test Partition: 3 Leader: 1 Replicas: 1,3,2 Isr: 1,2,3
Topic: test Partition: 4 Leader: 2 Replicas: 2,1,3 Isr: 1,2,3
Topic: test Partition: 5 Leader: 3 Replicas: 3,2,1 Isr: 1,2,3
Topic: test Partition: 6 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: test Partition: 7 Leader: 2 Replicas: 2,3,1 Isr: 1,2,3
Topic: test Partition: 8 Leader: 3 Replicas: 3,1,2 Isr: 1,2,3
Topic: test Partition: 9 Leader: 1 Replicas: 1,3,2 Isr: 1,2,3
Topic: test Partition: 10 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1
如果kafaka启动时加载的配置文件中server.properties没有配置delete.topic.enable=true,那么此时的删除并不是真正的删除,而是把topic标记为:marked for deletion
你可以通过命令:./bin/kafka-topics --zookeeper 【zookeeper server】 --list 来查看所有topic
此时你若想真正删除它,可以如下操作:
(1)登录zookeeper客户端:命令:./bin/zookeeper-client
(2)找到topic所在的目录:ls /brokers/topics
(3)找到要删除的topic,执行命令:rmr /brokers/topics/【topic name】即可,此时topic被彻底删除。
[root@masternode1 bin]# ./kafka-topics.sh --delete --zookeeper masternode1:2181 --topic tzh
Topic tzh is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
# 查看所有组
[root@slavenode1 bin]# ./kafka-consumer-groups.sh --bootstrap-server masternode1:9092 --list
group-test
console-consumer-33594
console-consumer-51445
console-consumer-21313
console-consumer-72215
# 查看消费情况
[root@slavenode1 bin]# ./kafka-consumer-groups.sh --describe --bootstrap-server masternode1:9092 --group group-test
Consumer group ‘group-test‘ has no active members.
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
test 6 0 0 0 - - -
topic-test 0 7 7 0 - - -
test 10 2 2 0 - - -
test 0 0 0 0 - - -
test 7 0 0 0 - - -
test 5 0 0 0 - - -
test 8 0 0 0 - - -
test 1 0 0 0 - - -
topic-test 1 7 7 0 - - -
tzh1 0 3 17 14 - - -
test 4 0 0 0 - - -
test 9 0 0 0 - - -
test 3 0 0 0 - - -
test 2 0 0 0 - - -
要点:
1、一个消费者只能属于一个消费者组
2、消费者组订阅的topic只能被其中的一个消费者消费
3、不同消费者组中的消费者可以消费同一个topic
4、消费者组中的消费者实例个数不能超过分区的数量
假设分区数量设置为1,消费者实例设置为2,这个时候会发现只有1个消费者在消费消息,另一个消费者闲置。
创建分区数量为2的topic
[root@masternode1 bin]# ./kafka-topics.sh --create --zookeeper 192.168.1.160:2181 --replication-factor 1 --partitions 2 --topic topic-test
Created topic topic-test.
打开2个命令行窗口,执行同样的语句进行消息消费
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic-test --group group-test
打开1个命令行窗口,进行消息发送
kafka-console-producer.sh --broker-list localhost:9092 --topic topic-test
原文:https://www.cnblogs.com/hsyw/p/14897928.html