前置条件
kafka版本:2.2.1
jmeter版本:5.3
插件:ApacheJMeter_ssh-1.2.0.jar
1.拷贝 ApacheJMeter_ssh-1.2.0.jar 到/lib/ext目录下
2.拷贝 jsch-0.1.55.jar 到/lib目录下
3.启动jmeter后,在Sampler下找到SSH Command和SSH SFTP进行配置后,可以让jmeter与linux进行交互
kafka 常用指令:
使用kafka bin路径下kafka-run-class.sh可实现topic 偏移量查看,需要填写的参数如下:
--broker-list | <String: hostname: REQUIRED: The list of hostname and port,...,hostname:port> port of the server to connect to. |
--max-wait-ms | <Integer: ms> DEPRECATED AND IGNORED: The max amount of time each fetch request waits. (default: 1000) |
--offsets | <Integer: count> DEPRECATED AND IGNORED: number of offsets returned (default: 1) |
--partitions | <String: partition ids> comma separated list of partition ids. If not specified, it will find offsets for all partitions (default:) |
--time | <Long: timestamp/-1(latest)/-2 timestamp of the offsets before that (earliest)> (default: -1) |
--topic | <String: topic> REQUIRED: The topic to get offset from. |
查看指定topic偏移量—— --time=-1
./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list service-kafka-admin-broker-0:9092 -topic test_by_cheliang --time -1
test_by_cheliang:0:0
test_by_cheliang:1:2
test_by_cheliang:2:0
test_by_cheliang:3:0
test_by_cheliang:4:0
test_by_cheliang:5:0
test_by_cheliang:6:0
test_by_cheliang:7:0
test_by_cheliang:8:0
test_by_cheliang:9:0
返回kafka topic所有分区的最大偏移量值
查看指定分区topic偏移量—— --time=-1
./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list service-kafka-admin-broker-0:9092 --partitions 1 -topic test_by_cheliang -time -1
test_by_cheliang:1:2
返回kafka topic指定分区的最大偏移量值
查看topic偏移量—— --time=-2
./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list service-kafka-admin-broker-0:9092 --topic test_by_cheliang --time -2
test_by_cheliang:0:0
test_by_cheliang:1:0
test_by_cheliang:2:0
test_by_cheliang:3:0
test_by_cheliang:4:0
test_by_cheliang:5:0
test_by_cheliang:6:0
test_by_cheliang:7:0
test_by_cheliang:8:0
test_by_cheliang:9:0
返回kafka topic指定分区的最小偏移量值
查看topic偏移量——不指定time参数
./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list service-kafka-admin-broker-0:9092 --topic test_by_cheliang
test_by_cheliang:0:0
test_by_cheliang:1:2
test_by_cheliang:2:0
test_by_cheliang:3:0
test_by_cheliang:4:0
test_by_cheliang:5:0
test_by_cheliang:6:0
test_by_cheliang:7:0
test_by_cheliang:8:0
test_by_cheliang:9:0
不指定time参数,默认返回所有topic分区最大偏移量
--alter |
Alter the number of partitions,replica assignment, and/orconfiguration for the topic. |
--bootstrap-server <String: server to connect to> |
REQUIRED: The Kafka server to connect to. In case of providing this, a direct Zookeeper connection won‘t be required. |
--command-config <String: command config property file> |
Property file containing configs to be passed to Admin Client. This is vused only with --bootstrap-server option for describing nd altering broker configs. |
--config <String: name=value> |
A topic configuration override for the topic being created or altered.The following is a list of valid configurations:cleanup.policy compression.type delete.retention.ms |
--create | Create a new topic. |
--delete | Delete a topic. |
--delete-config <String: name> |
A topic configuration override to be removed for an existing topic (see the list of configurations under the --config option). Not supported with the --bootstrap-server option. |
--describe | List details for the given topics. |
--disable-rack-aware | Disable rack aware replica assignment |
--exclude-internal |
exclude internal topics when running list or describe command. The internal topics will be listed by default |
--force | Suppress console prompts |
--help | Print usage information. |
--if-exists |
if set when altering or deleting or describing topics, the action will only execute if the topic exists. Not supported with the --bootstrap- server option. |
--if-not-exists |
if set when creating topics, the action will only execute if the topic does not already exist. Not supported with the --bootstrap- server option. |
--list | List all available topics. |
--replica-assignment <String: |
A list of manual partition-to-broker assignments for the topic being created or altered. |
--replication-factor <Integer: |
The replication factor for each partition in the topic being |
--topic <String: topic> |
The topic to create, alter, describe or delete. It also accepts a regular expression, except for --create option. Put topic name in double quotes and use the ‘\‘ prefix to escape regular expression symbols; e.g. "test\.topic". |
--topics-with-overrides |
if set when describing topics, only show topics that have overridden configs |
--unavailable-partitions |
if set when describing topics, only show partitions whose leader is not available |
--under-replicated-partitions |
if set when describing topics, only show under replicated partitions |
--zookeeper <String: hosts> |
DEPRECATED, The connection string for the zookeeper connection in the form host:port. Multiple hosts can be given to allow fail-over. |
# 列出当前kafka所有的topic
bin/kafka-topics.sh --zookeeper localhost:2181 --list
# 创建topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic test_topic --replication-factor 1 --partitions 1
bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic test_topic --replication-factor 3 --partitions 10 --config cleanup.policy=compact
bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic test_topic --partitions 1 --replication-factor 1 --config max.message.bytes=64000 --config flush.messages=1
# 查看某topic具体情况
bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic test_topic
# 修改topic(分区数、特殊配置如compact属性、数据保留时间等)
bin/kafka-topics.sh --zookeeper localhost:2181 --alter --partitions 3 --config cleanup.policy=compact --topic test_topic
# 修改topic(也可以用这种)
bin/kafka-configs.sh --alter --zookeeper localhost:2181 --entity-name test_topic --entity-type topics --add-config cleanup.policy=compact
bin/kafka-configs.sh --alter --zookeeper localhost:2181 --entity-name test_topic --entity-type topics --delete-config cleanup.policy
# 查看某消费组(consumer_group)具体消费情况(活跃的消费者以及lag情况等等)
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test_group --describe
# 列出当前所有的消费组
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
# 旧版
bin/kafka-consumer-groups.sh --zookeeper 127.0.0.1:2181 --group test_group --describe
# 消费数据(从latest消费)
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test_topic
# 消费数据(从头开始消费)
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test_topic --from-beginning
# 消费数据(最多消费多少条就自动退出消费)
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test_topic --max-messages 1
# 消费数据(同时把key打印出来)
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test_topic --property print.key=true
# 旧版
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test_topic
# 生产数据
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test_topic
# 生产数据(写入带有key的message)
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test_topic --property "parse.key=true" --property "key.separator=:"
# golang实现的kafka客户端
https://github.com/Shopify/sarama/tree/master/tools
# Minimum invocation
kafka-console-producer -topic=test -value=value -brokers=kafka1:9092
# It will pick up a KAFKA_PEERS environment variable
export KAFKA_PEERS=kafka1:9092,kafka2:9092,kafka3:9092
kafka-console-producer -topic=test -value=value
# It will read the value from stdin by using pipes
echo "hello world" | kafka-console-producer -topic=test
# Specify a key:
echo "hello world" | kafka-console-producer -topic=test -key=key
# Partitioning: by default, kafka-console-producer will partition as follows:
# - manual partitioning if a -partition is provided
# - hash partitioning by key if a -key is provided
# - random partioning otherwise.
#
# You can override this using the -partitioner argument:
echo "hello world" | kafka-console-producer -topic=test -key=key -partitioner=random
# Display all command line options
kafka-console-producer -help
原文:https://www.cnblogs.com/hxf19930508/p/14272590.html