首页 > 其他 > 详细

kafka相关操作

时间:2020-06-13 12:16:28      阅读:50      评论:0      收藏:0      [点我收藏+]

kafka操作命令

bin目录

/usr/hdp/3.1.4.0-315/kafka/bin

查看topic列表

./kafka-consumer-groups.sh --bootstrap-server host01.com:6667 --list

消费topic

./kafka-console-consumer.sh --bootstrap-server host01.com:6667 --from-beginning  --topic console-consumer-6651
/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic test

检查消费者位置

有时查看消费者的位置很有用。我们有一个工具,可以显示所有使用者在使用者组中的位置以及他们在日志末尾之后的位置。要在名为my-group的使用者组上运行此工具,并使用名为my-topic的主题,则该过程将如下所示:

> bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group
TOPIC             PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG    CONSUMER-ID                    HOST              CLIENT-ID
my-topic            0     2        4        2     consumer-1-029af89c-873c-4751-a720-cefd41a669d6  /127.0.0.1           consumer-1 
my-topic            1     2        3        1     consumer-1-029af89c-873c-4751-a720-cefd41a669d6  /127.0.0.1           consumer-1
my-topic            2     2        3        1     consumer-2-42c1abd4-e3

有许多附加的“描述”选项可用于提供有关消费者组的更详细的信息:

  • --members:此选项提供使用者组中所有活动成员的列表。
> bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group --members
  • --members --verbose:除了上述“ --members”选项报告的信息之外,此选项还提供分配给每个成员的分区。
> bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group --members --verbose

要手动删除一个或多个使用者组,可以使用“ --delete”选项:

> bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete --group my-group --group my-other-group

要重置使用者组的偏移量,可以使用“ --reset-offsets”选项。此选项当时支持一个消费者组。它需要定义以下范围:--all-topics或--topic。除非您使用“ --from-file”方案,否则必须选择一个范围。另外,首先请确保使用者实例处于非活动状态。有关更多详细信息,请参见 KIP-122

它具有3个执行选项:

  • (默认)以显示要重置的偏移量。
  • --execute:执行--reset-offsets过程。
  • --export:将结果导出为CSV格式。

--reset-offsets还具有以下场景可供选择(必须选择至少一个场景):

  • --to-datetime <String:datetime>:将偏移量重置为与datetime的偏移量。格式:“ YYYY-MM-DDTHH:mm:SS.sss”
  • --to-最早:将偏移量重置为最早的偏移量。
  • --to-latest:将偏移量重置为最新偏移量。
  • --shift-by <长:偏移数>:重置偏移量,将当前偏移量偏移“ n”,其中“ n”可以为正或负。
  • --from-file:将偏移量重置为CSV文件中定义的值。
  • --to-current:将偏移量重置为当前偏移量。
  • --by-duration <String:duration>:将偏移量重置为当前时间戳记的持续时间偏移量。格式:“ PnDTnHnMnS”
  • --to-offset:将偏移量重置为特定偏移量。

请注意,超出范围的偏移量将调整为可用的偏移量结束。例如,如果偏移量结束为10,偏移量请求为15,则实际上将选择偏移量为10。

例如,要将使用者组的偏移量重置为最新的偏移量:

> bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --reset-offsets --group consumergroup1 --topic topic1 --to-latest
TOPIC             PARTITION NEW-OFFSET
topic1             0     0

如果您正在使用旧的高级使用者并将组元数据存储在ZooKeeper中(即offsets.storage=zookeeper),请通过 --zookeeper而不是bootstrap-server

> bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --list

扩展集群

将服务器添加到Kafka集群很容易,只需为其分配唯一的代理ID,然后在新服务器上启动Kafka。但是,不会为这些新服务器自动分配任何数据分区,因此,除非将分区移至它们,否则在创建新主题之前它们将不会做任何工作。因此,通常在将计算机添加到群集时,您将需要将一些现有数据迁移到这些计算机。

数据迁移过程是手动启动的,但是是完全自动化的。在幕后,Kafka会将新服务器添加为要迁移的分区的追随者,并允许其完全复制该分区中的现有数据。新服务器完全复制该分区的内容并加入同步副本后,现有副本之一将删除其分区的数据。

分区重新分配工具可用于在代理之间移动分区。理想的分区分配将确保所有代理之间的数据负载和分区大小均匀。分区重新分配工具没有能力自动研究Kafka群集中的数据分布,并四处移动分区以实现均匀的负载分布。因此,管理员必须弄清楚应该移动哪些主题或分区。

分区重新分配工具可以在3种互斥模式下运行:

  • --generate:在此模式下,给定主题列表和代理列表,该工具会生成候选重新分配,以将指定主题的所有分区移至新的代理。给定主题和目标代理的列表,此选项仅提供一种方便的方法来生成分区重新分配计划。
  • --execute:在这种模式下,该工具将根据用户提供的重新分配计划启动分区的重新分配。(使用--reassignment-json-file选项)。这可以是管理员手工制作的自定义重新分配计划,也可以使用--generate选项提供
  • --verify:在此模式下,该工具会验证上一次--execute期间列出的所有分区的重新分配状态。状态可以是成功完成,失败或进行中
自动将数据迁移到新计算机
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group

查看有哪些topic

kafka-topics.sh --describe --zookeeper localhost:2281

Topic:KafkaTestTopic    PartitionCount:1    ReplicationFactor:1 Configs:
    Topic: KafkaTestTopic   Partition: 0    Leader: 0   Replicas: 0 Isr: 0

在kafka中每个分区都有一个编号,从0开始;

在kafka中如果有多个副本的话,就会存在leader与follower的关系;Leader表示领导,Leader:0表示当前这个副本为leader所在的broker是哪一个。

(2)只看主题名称

kafka-topics.sh --list --zookeeper localhost:2281
KafkaTestTopic

(3)查看指定主题的信息

./kafka-topics.sh --describe --zookeeper host01.com:6667 --topic --list
 ./kafka-consumer-groups.sh --new-consumer --bootstrap-server  host01.com:6667 --group suricata_parser --describe

(4)查看指定的topic是否存在

./kafka-topics.sh --list --zookeeper host01.com:6667 --topic suricata_parser

# 或
kafka-topics.sh --list --zookeeper localhost:2281 | grep KafkaTestTopic

# 或
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topicName --from-beginning

(5) 修改主题的分区数

kafka-topics.sh --zookeeper localhost:2281 --alter --topic KafkaTestTopic --partitions 3


WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Adding partitions succeeded!

kafka-topics.sh --describe --zookeeper localhost:2281 --topic KafkaTestTopic

Topic:KafkaTestTopic    PartitionCount:3    ReplicationFactor:1 Configs:
    Topic: KafkaTestTopic   Partition: 0    Leader: 0   Replicas: 0 Isr: 0
    Topic: KafkaTestTopic   Partition: 1    Leader: 0   Replicas: 0 Isr: 0
    Topic: KafkaTestTopic   Partition: 2    Leader: 0   Replicas: 0 Isr: 0

(6)修改配置项

kafka-topics.sh --zookeeper localhost:2281 --alter --topic KafkaTestTopic --config flush.messages=1

WARNING: Altering topic configuration from this script has been deprecated and may be removed in future releases.
         Going forward, please use kafka-configs.sh for this functionality
Updated config for topic "KafkaTestTopic".

(7)删除配置项

kafka-topics.sh --zookeeper localhost:2281 --alter --topic KafkaTestTopic --delete-config flush.messages

WARNING: Altering topic configuration from this script has been deprecated and may be removed in future releases.
         Going forward, please use kafka-configs.sh for this functionality
Updated config for topic "KafkaTestTopic".

kafka-topics.sh --describe --zookeeper localhost:2281 --topic KafkaTestTopic

Topic:KafkaTestTopic    PartitionCount:3    ReplicationFactor:1 Configs:
    Topic: KafkaTestTopic   Partition: 0    Leader: 0   Replicas: 0 Isr: 0
    Topic: KafkaTestTopic   Partition: 1    Leader: 0   Replicas: 0 Isr: 0
    Topic: KafkaTestTopic   Partition: 2    Leader: 0   Replicas: 0 Isr: 0

(8)删除主题

kafka-topics.sh --zookeeper localhost:2281  --delete  --topic KafkaTestTopic

注意:删除只是标记删除;当服务器重启就会删除已标记的topic,这个和kafka的版本有关。
kafka-topics.sh --bootstrap-server host01.com:6667 --topic suricata_parser --describe
./kafka-consumer-groups.sh --new-consumer --bootstrap-server host01.com:6667 --group suricata_parser --describe
./kafka-consumer-groups.sh --bootstrap-server host01.com:6667 --list
./kafka-consumer-groups.sh --new-consumer --bootstrap-server host01.com:6667 --group suricata_parser --describe

./kafka-consumer-groups.sh --new-consumer --bootstrap-server host01.com:6667 --list
suricata_parser

./kafka-consumer-groups.sh --new-consumer --zookeeper host01.com:6667 --group suricata_parser --describe

查看所有topic

./kafka-consumer-groups.sh --bootstrap-server host01.com:6667 --list

查看指定topic

./kafka-topics.sh --describe --zookeeper host01.com:6667 --topic console-consumer-11067
./kafka-topics.sh --describe --bootstrap-server host01.com:6667 --topic console-consumer-6651

启动消费者

./kafka-console-consumer.sh --bootstrap-server host01.com:6667 --topic console-consumer-6651 --from-beginning

发布新的topic

./kafka-console-producer.sh --broker-list localhost:9092 --topic console-consumer-11067

消费指定topic

./kafka-console-consumer.sh --bootstrap-server host01.com:6667 --from-beginning --topic console-consumer-11067
./kafka-topics.sh --describe --zookeeper host01.com:6667 --topic console-consumer-11067
./kafka-console-consumer.sh --bootstrap-server host01.com:6667 --from-beginning --topic console-consumer-11067
./kafka-topics.sh --list --bootstrap-server host01.com:6667 --topic suricata_parser
./kafka-console-consumer.sh --bootstrap-server host01.com:6667 --topic suricata_parser --from-beginning

查看topic列表(官网的,不可用)

./kafka-topics.sh --list --bootstrap-server host01.com:6667

查看消费者组topic列表

./kafka-consumer-groups.sh --bootstrap-server host01.com:6667 --list

查看所有消费者组

./kafka-consumer-groups.sh --bootstrap-server host01.com:6667 --list

 ./kafka-consumer-groups.sh --bootstrap-server host01.com:6667 --describe --group indexing-ra
消费指定topic
./kafka-console-consumer.sh --bootstrap-server host01.com:6667 --topic suricata 
查看指定topic
./kafka-topics.sh --describe --bootstrap-server host01.com:6667 --topic my-replicated-topic
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group console-consumer-11067
列出所有主题
./kafka-topics.sh --zookeeper host01.com:6667/kafka --list
列出所有主题的详细信息
kafka-topics.bat --zookeeper localhost:2181/kafka --describe
创建主题 主题名 my-topic,1副本,8分区
kafka-topics.bat --zookeeper localhost:2181/kafka --create --replication-factor 1 --partitions 8 --topic my-topic
增加分区,注意:分区无法被删除
kafka-topics.bat --zookeeper localhost:2181/kafka --alter --topic my-topic --partitions 16
删除主题
kafka-topics.bat --zookeeper localhost:2181/kafka --delete --topic my-topic
列出消费者群组(仅Linux)
kafka-topics.sh --new-consumer --bootstrap-server localhost:9092/kafka --list
列出消费者群组详细信息(仅Linux)
kafka-topics.sh --new-consumer --bootstrap-server localhost:9092/kafka --describe --group 群组名
./kafka-console-consumer.sh --bootstrap-server=10.3.70.109:9092 --topic test --from-beginning --consumer-property group.id=test-group1

消费前

console-consumer-76439
console-consumer-92659
pcap
console-consumer-99389
yaf_parser
console-consumer-70947
indexing-ra
dlp_parser
console-consumer-1254
console-consumer-63288
console-consumer-88669
console-consumer-11067
console-consumer-39848
console-consumer-18803
console-consumer-60604
bro_parser
profiler
console-consumer-46600
snort_parser
console-consumer-49531
console-consumer-32598
console-consumer-76597
enrichments
console-consumer-57984
console-consumer-99946
indexing-batch
suricata_parser
zeek_parser
console-consumer-37238
console-consumer-29317
console-consumer-86748
console-consumer-64312
wazuh_parser
console-consumer-43352
console-consumer-7657
squid_parser

消费后

suricata_parser
zeek_parser
console-consumer-37238
console-consumer-29317
console-consumer-86748
console-consumer-79237
console-consumer-64312
wazuh_parser
console-consumer-43352
console-consumer-7657
squid_parser
console-consumer-92659
console-consumer-3010
console-consumer-70947
indexing-ra
console-consumer-63288
console-consumer-88669
console-consumer-76439
console-consumer-99389
pcap
yaf_parser
console-consumer-89010
dlp_parser
console-consumer-1254
console-consumer-11067
console-consumer-39848
console-consumer-18803
console-consumer-60604
profiler
bro_parser
console-consumer-46600
console-consumer-49531
console-consumer-32598
console-consumer-7519
console-consumer-76597
enrichments
console-consumer-57984
console-consumer-99946
indexing-batch

kafka相关操作

原文:https://www.cnblogs.com/liuhuan086/p/13112088.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!