kafka操作命令
bin目录
/usr/hdp/3.1.4.0-315/kafka/bin
查看topic列表
./kafka-consumer-groups.sh --bootstrap-server host01.com:6667 --list
消费topic
./kafka-console-consumer.sh --bootstrap-server host01.com:6667 --from-beginning --topic console-consumer-6651
/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic test
有时查看消费者的位置很有用。我们有一个工具,可以显示所有使用者在使用者组中的位置以及他们在日志末尾之后的位置。要在名为my-group的使用者组上运行此工具,并使用名为my-topic的主题,则该过程将如下所示:
> bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
my-topic 0 2 4 2 consumer-1-029af89c-873c-4751-a720-cefd41a669d6 /127.0.0.1 consumer-1
my-topic 1 2 3 1 consumer-1-029af89c-873c-4751-a720-cefd41a669d6 /127.0.0.1 consumer-1
my-topic 2 2 3 1 consumer-2-42c1abd4-e3
有许多附加的“描述”选项可用于提供有关消费者组的更详细的信息:
> bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group --members
> bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group --members --verbose
要手动删除一个或多个使用者组,可以使用“ --delete”选项:
> bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete --group my-group --group my-other-group
要重置使用者组的偏移量,可以使用“ --reset-offsets”选项。此选项当时支持一个消费者组。它需要定义以下范围:--all-topics或--topic。除非您使用“ --from-file”方案,否则必须选择一个范围。另外,首先请确保使用者实例处于非活动状态。有关更多详细信息,请参见 KIP-122。
它具有3个执行选项:
--reset-offsets还具有以下场景可供选择(必须选择至少一个场景):
请注意,超出范围的偏移量将调整为可用的偏移量结束。例如,如果偏移量结束为10,偏移量请求为15,则实际上将选择偏移量为10。
例如,要将使用者组的偏移量重置为最新的偏移量:
> bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --reset-offsets --group consumergroup1 --topic topic1 --to-latest
TOPIC PARTITION NEW-OFFSET
topic1 0 0
如果您正在使用旧的高级使用者并将组元数据存储在ZooKeeper中(即offsets.storage=zookeeper
),请通过 --zookeeper
而不是bootstrap-server
:
> bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --list
将服务器添加到Kafka集群很容易,只需为其分配唯一的代理ID,然后在新服务器上启动Kafka。但是,不会为这些新服务器自动分配任何数据分区,因此,除非将分区移至它们,否则在创建新主题之前它们将不会做任何工作。因此,通常在将计算机添加到群集时,您将需要将一些现有数据迁移到这些计算机。
数据迁移过程是手动启动的,但是是完全自动化的。在幕后,Kafka会将新服务器添加为要迁移的分区的追随者,并允许其完全复制该分区中的现有数据。新服务器完全复制该分区的内容并加入同步副本后,现有副本之一将删除其分区的数据。
分区重新分配工具可用于在代理之间移动分区。理想的分区分配将确保所有代理之间的数据负载和分区大小均匀。分区重新分配工具没有能力自动研究Kafka群集中的数据分布,并四处移动分区以实现均匀的负载分布。因此,管理员必须弄清楚应该移动哪些主题或分区。
分区重新分配工具可以在3种互斥模式下运行:
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group
查看有哪些topic
kafka-topics.sh --describe --zookeeper localhost:2281
Topic:KafkaTestTopic PartitionCount:1 ReplicationFactor:1 Configs:
Topic: KafkaTestTopic Partition: 0 Leader: 0 Replicas: 0 Isr: 0
在kafka中每个分区都有一个编号,从0开始;
在kafka中如果有多个副本的话,就会存在leader与follower的关系;Leader表示领导,Leader:0表示当前这个副本为leader所在的broker是哪一个。
(2)只看主题名称
kafka-topics.sh --list --zookeeper localhost:2281
KafkaTestTopic
(3)查看指定主题的信息
./kafka-topics.sh --describe --zookeeper host01.com:6667 --topic --list
./kafka-consumer-groups.sh --new-consumer --bootstrap-server host01.com:6667 --group suricata_parser --describe
(4)查看指定的topic是否存在
./kafka-topics.sh --list --zookeeper host01.com:6667 --topic suricata_parser
# 或
kafka-topics.sh --list --zookeeper localhost:2281 | grep KafkaTestTopic
# 或
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topicName --from-beginning
(5) 修改主题的分区数
kafka-topics.sh --zookeeper localhost:2281 --alter --topic KafkaTestTopic --partitions 3
WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Adding partitions succeeded!
kafka-topics.sh --describe --zookeeper localhost:2281 --topic KafkaTestTopic
Topic:KafkaTestTopic PartitionCount:3 ReplicationFactor:1 Configs:
Topic: KafkaTestTopic Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: KafkaTestTopic Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: KafkaTestTopic Partition: 2 Leader: 0 Replicas: 0 Isr: 0
(6)修改配置项
kafka-topics.sh --zookeeper localhost:2281 --alter --topic KafkaTestTopic --config flush.messages=1
WARNING: Altering topic configuration from this script has been deprecated and may be removed in future releases.
Going forward, please use kafka-configs.sh for this functionality
Updated config for topic "KafkaTestTopic".
(7)删除配置项
kafka-topics.sh --zookeeper localhost:2281 --alter --topic KafkaTestTopic --delete-config flush.messages
WARNING: Altering topic configuration from this script has been deprecated and may be removed in future releases.
Going forward, please use kafka-configs.sh for this functionality
Updated config for topic "KafkaTestTopic".
kafka-topics.sh --describe --zookeeper localhost:2281 --topic KafkaTestTopic
Topic:KafkaTestTopic PartitionCount:3 ReplicationFactor:1 Configs:
Topic: KafkaTestTopic Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: KafkaTestTopic Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: KafkaTestTopic Partition: 2 Leader: 0 Replicas: 0 Isr: 0
(8)删除主题
kafka-topics.sh --zookeeper localhost:2281 --delete --topic KafkaTestTopic
注意:删除只是标记删除;当服务器重启就会删除已标记的topic,这个和kafka的版本有关。
kafka-topics.sh --bootstrap-server host01.com:6667 --topic suricata_parser --describe
./kafka-consumer-groups.sh --new-consumer --bootstrap-server host01.com:6667 --group suricata_parser --describe
./kafka-consumer-groups.sh --bootstrap-server host01.com:6667 --list
./kafka-consumer-groups.sh --new-consumer --bootstrap-server host01.com:6667 --group suricata_parser --describe
./kafka-consumer-groups.sh --new-consumer --bootstrap-server host01.com:6667 --list
suricata_parser
./kafka-consumer-groups.sh --new-consumer --zookeeper host01.com:6667 --group suricata_parser --describe
查看所有topic
./kafka-consumer-groups.sh --bootstrap-server host01.com:6667 --list
查看指定topic
./kafka-topics.sh --describe --zookeeper host01.com:6667 --topic console-consumer-11067
./kafka-topics.sh --describe --bootstrap-server host01.com:6667 --topic console-consumer-6651
启动消费者
./kafka-console-consumer.sh --bootstrap-server host01.com:6667 --topic console-consumer-6651 --from-beginning
发布新的topic
./kafka-console-producer.sh --broker-list localhost:9092 --topic console-consumer-11067
消费指定topic
./kafka-console-consumer.sh --bootstrap-server host01.com:6667 --from-beginning --topic console-consumer-11067
./kafka-topics.sh --describe --zookeeper host01.com:6667 --topic console-consumer-11067
./kafka-console-consumer.sh --bootstrap-server host01.com:6667 --from-beginning --topic console-consumer-11067
./kafka-topics.sh --list --bootstrap-server host01.com:6667 --topic suricata_parser
./kafka-console-consumer.sh --bootstrap-server host01.com:6667 --topic suricata_parser --from-beginning
查看topic列表(官网的,不可用)
./kafka-topics.sh --list --bootstrap-server host01.com:6667
查看消费者组topic列表
./kafka-consumer-groups.sh --bootstrap-server host01.com:6667 --list
查看所有消费者组
./kafka-consumer-groups.sh --bootstrap-server host01.com:6667 --list
./kafka-consumer-groups.sh --bootstrap-server host01.com:6667 --describe --group indexing-ra
./kafka-console-consumer.sh --bootstrap-server host01.com:6667 --topic suricata
./kafka-topics.sh --describe --bootstrap-server host01.com:6667 --topic my-replicated-topic
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group console-consumer-11067
./kafka-topics.sh --zookeeper host01.com:6667/kafka --list
kafka-topics.bat --zookeeper localhost:2181/kafka --describe
kafka-topics.bat --zookeeper localhost:2181/kafka --create --replication-factor 1 --partitions 8 --topic my-topic
kafka-topics.bat --zookeeper localhost:2181/kafka --alter --topic my-topic --partitions 16
kafka-topics.bat --zookeeper localhost:2181/kafka --delete --topic my-topic
kafka-topics.sh --new-consumer --bootstrap-server localhost:9092/kafka --list
kafka-topics.sh --new-consumer --bootstrap-server localhost:9092/kafka --describe --group 群组名
./kafka-console-consumer.sh --bootstrap-server=10.3.70.109:9092 --topic test --from-beginning --consumer-property group.id=test-group1
消费前
console-consumer-76439
console-consumer-92659
pcap
console-consumer-99389
yaf_parser
console-consumer-70947
indexing-ra
dlp_parser
console-consumer-1254
console-consumer-63288
console-consumer-88669
console-consumer-11067
console-consumer-39848
console-consumer-18803
console-consumer-60604
bro_parser
profiler
console-consumer-46600
snort_parser
console-consumer-49531
console-consumer-32598
console-consumer-76597
enrichments
console-consumer-57984
console-consumer-99946
indexing-batch
suricata_parser
zeek_parser
console-consumer-37238
console-consumer-29317
console-consumer-86748
console-consumer-64312
wazuh_parser
console-consumer-43352
console-consumer-7657
squid_parser
消费后
suricata_parser
zeek_parser
console-consumer-37238
console-consumer-29317
console-consumer-86748
console-consumer-79237
console-consumer-64312
wazuh_parser
console-consumer-43352
console-consumer-7657
squid_parser
console-consumer-92659
console-consumer-3010
console-consumer-70947
indexing-ra
console-consumer-63288
console-consumer-88669
console-consumer-76439
console-consumer-99389
pcap
yaf_parser
console-consumer-89010
dlp_parser
console-consumer-1254
console-consumer-11067
console-consumer-39848
console-consumer-18803
console-consumer-60604
profiler
bro_parser
console-consumer-46600
console-consumer-49531
console-consumer-32598
console-consumer-7519
console-consumer-76597
enrichments
console-consumer-57984
console-consumer-99946
indexing-batch
原文:https://www.cnblogs.com/liuhuan086/p/13112088.html