首页 > 其他 > 详细

jmeter中执行kafka topic指令

时间:2021-01-18 16:39:56      阅读:304      评论:0      收藏:0      [点我收藏+]

前置条件

kafka版本:2.2.1

jmeter版本:5.3

插件:ApacheJMeter_ssh-1.2.0.jar

1.拷贝 ApacheJMeter_ssh-1.2.0.jar 到/lib/ext目录下
2.拷贝 jsch-0.1.55.jar 到/lib目录下
3.启动jmeter后,在Sampler下找到SSH Command和SSH SFTP进行配置后,可以让jmeter与linux进行交互

kafka 常用指令:

  • 查看offset

使用kafka bin路径下kafka-run-class.sh可实现topic 偏移量查看,需要填写的参数如下:

--broker-list <String: hostname: REQUIRED: The list of hostname and port,...,hostname:port> port of the server to connect to.
--max-wait-ms <Integer: ms> DEPRECATED AND IGNORED: The max amount of time each fetch request waits. (default: 1000)
--offsets <Integer: count> DEPRECATED AND IGNORED: number of offsets returned (default: 1)
--partitions <String: partition ids> comma separated list of partition ids. If not specified, it will find offsets for all partitions (default:)
--time  <Long: timestamp/-1(latest)/-2 timestamp of the offsets before that (earliest)> (default: -1)
--topic    <String: topic> REQUIRED: The topic to get offset from.

 

 

 

 

 

 

 

查看指定topic偏移量—— --time=-1

./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list service-kafka-admin-broker-0:9092 -topic test_by_cheliang --time -1
test_by_cheliang:0:0
test_by_cheliang:1:2
test_by_cheliang:2:0
test_by_cheliang:3:0
test_by_cheliang:4:0
test_by_cheliang:5:0
test_by_cheliang:6:0
test_by_cheliang:7:0
test_by_cheliang:8:0
test_by_cheliang:9:0

返回kafka topic所有分区的最大偏移量值

查看指定分区topic偏移量—— --time=-1

./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list service-kafka-admin-broker-0:9092 --partitions 1 -topic test_by_cheliang -time -1

test_by_cheliang:1:2

返回kafka topic指定分区的最大偏移量值

查看topic偏移量—— --time=-2

./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list service-kafka-admin-broker-0:9092 --topic test_by_cheliang --time -2
test_by_cheliang:0:0
test_by_cheliang:1:0
test_by_cheliang:2:0
test_by_cheliang:3:0
test_by_cheliang:4:0
test_by_cheliang:5:0
test_by_cheliang:6:0
test_by_cheliang:7:0
test_by_cheliang:8:0
test_by_cheliang:9:0

返回kafka topic指定分区的最小偏移量值

查看topic偏移量——不指定time参数

./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list service-kafka-admin-broker-0:9092 --topic test_by_cheliang
test_by_cheliang:0:0
test_by_cheliang:1:2
test_by_cheliang:2:0
test_by_cheliang:3:0
test_by_cheliang:4:0
test_by_cheliang:5:0
test_by_cheliang:6:0
test_by_cheliang:7:0
test_by_cheliang:8:0
test_by_cheliang:9:0

不指定time参数,默认返回所有topic分区最大偏移量

topic相关

--alter

Alter the number of partitions,replica assignment, and/orconfiguration for the topic.

--bootstrap-server <String: server to connect to> 

REQUIRED: The Kafka server to connect to. In case of providing this, a direct Zookeeper connection won‘t be required.

--command-config <String: command config property file> 

Property file containing configs to be passed to Admin Client. This is vused only with --bootstrap-server option for describing  nd altering broker configs.
--config <String: name=value> 

A topic configuration override for the topic being created or altered.The following is a list of valid configurations:cleanup.policy compression.type delete.retention.ms
file.delete.delay.ms flush.messages flush.ms follower.replication.throttled. replicas
index.interval.bytes
leader.replication.throttled.replicas max.message.bytes message.downconversion.enable message.format.version message.timestamp.difference.max.ms
message.timestamp.type min.cleanable.dirty.ratio min.compaction.lag.ms min.insync.replicas preallocate retention.bytes retention.ms segment.bytes
segment.index.bytes segment.jitter.ms segment.ms unclean.leader.election.enable See the Kafka documentation for full details on the topic configs.It is
supported only in combination with --
create if --bootstrap-server option
is used.

 --create Create a new topic.
 --delete  Delete a topic.
 --delete-config <String: name>  

A topic configuration override to be removed for an existing topic (see the list of configurations under the --config option). Not supported with the --bootstrap-server option.

 --describe  List details for the given topics.
 --disable-rack-aware  Disable rack aware replica assignment
 --exclude-internal  

exclude internal topics when running list or describe command. The internal topics will be listed by default

 --force Suppress console prompts 
 --help  Print usage information.
 --if-exists  

if set when altering or deleting or describing topics, the action will only execute if the topic exists. Not supported with the --bootstrap- server option.

 --if-not-exists  

if set when creating topics, the action will only execute if the topic does not already exist. Not supported with the --bootstrap- server option.

 --list  List all available topics.
 

--replica-assignment <String:  
broker_id_for_part1_replica1 : 
broker_id_for_part1_replica2 , 
broker_id_for_part2_replica1 :
broker_id_for_part2_replica2 , ...>

 A list of manual partition-to-broker assignments for the topic being created or altered. 
 

--replication-factor <Integer: 
replication factor> created.

 The replication factor for each partition in the topic being 
 --topic <String: topic>  

The topic to create, alter, describe or delete. It also accepts a regular expression, except for --create option. Put topic name in double quotes and use the ‘\‘ prefix to escape regular expression symbols; e.g. "test\.topic".

 --topics-with-overrides  

if set when describing topics, only show topics that have overridden configs

 --unavailable-partitions  

if set when describing topics, only show partitions whose leader is not available

 --under-replicated-partitions  

if set when describing topics, only show under replicated partitions

 --zookeeper <String: hosts>  

DEPRECATED, The connection string for the zookeeper connection in the form host:port. Multiple hosts can be given to allow fail-over.





















































































# 列出当前kafka所有的topic bin/kafka-topics.sh --zookeeper localhost:2181 --list # 创建topic bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic test_topic --replication-factor 1 --partitions 1 bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic test_topic --replication-factor 3 --partitions 10 --config cleanup.policy=compact bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic test_topic --partitions 1   --replication-factor 1 --config max.message.bytes=64000 --config flush.messages=1 # 查看某topic具体情况 bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic test_topic # 修改topic(分区数、特殊配置如compact属性、数据保留时间等) bin/kafka-topics.sh --zookeeper localhost:2181 --alter --partitions 3 --config cleanup.policy=compact --topic test_topic # 修改topic(也可以用这种) bin/kafka-configs.sh --alter --zookeeper localhost:2181 --entity-name test_topic --entity-type topics --add-config cleanup.policy=compact bin/kafka-configs.sh --alter --zookeeper localhost:2181 --entity-name test_topic --entity-type topics --delete-config cleanup.policy

consumer-group相关

# 查看某消费组(consumer_group)具体消费情况(活跃的消费者以及lag情况等等)
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test_group  --describe

# 列出当前所有的消费组
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

# 旧版
bin/kafka-consumer-groups.sh --zookeeper 127.0.0.1:2181 --group test_group --describe

consumer相关

# 消费数据(从latest消费)
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test_topic

# 消费数据(从头开始消费)
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test_topic --from-beginning

# 消费数据(最多消费多少条就自动退出消费)
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test_topic --max-messages 1

# 消费数据(同时把key打印出来)
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test_topic --property print.key=true

# 旧版
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test_topic

producer相关

# 生产数据
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test_topic

# 生产数据(写入带有key的message)
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test_topic --property "parse.key=true" --property "key.separator=:"

producer-golang

# golang实现的kafka客户端
https://github.com/Shopify/sarama/tree/master/tools

# Minimum invocation
kafka-console-producer -topic=test -value=value -brokers=kafka1:9092

# It will pick up a KAFKA_PEERS environment variable
export KAFKA_PEERS=kafka1:9092,kafka2:9092,kafka3:9092
kafka-console-producer -topic=test -value=value

# It will read the value from stdin by using pipes
echo "hello world" | kafka-console-producer -topic=test

# Specify a key:
echo "hello world" | kafka-console-producer -topic=test -key=key

# Partitioning: by default, kafka-console-producer will partition as follows:
# - manual partitioning if a -partition is provided
# - hash partitioning by key if a -key is provided
# - random partioning otherwise.
#
# You can override this using the -partitioner argument:
echo "hello world" | kafka-console-producer -topic=test -key=key -partitioner=random

# Display all command line options
kafka-console-producer -help

 

jmeter中执行kafka topic指令

原文:https://www.cnblogs.com/hxf19930508/p/14272590.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!