通过group.id参数配置
(1)配置客户端参数并创建消费组实例
(2)订阅主题
(3)拉取消息并消费
(4)提交消费位移
(5)关闭消费组实例
group.id默认值为""
client.id默认值为"",之后KafkaConsumer会自动生成“consumer-”+数字
一个消费者可以订阅一到多个主题:
public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener); //ConsumerRebalanceListener用于设置再均衡监听器
public void subscribe(Collection<String> topics);
public void subscribe(Pattern pattern, ConsumerRebalanceListener listener); //正则表达式,可以订阅多个主题,用于和其他系统之间进行数据复制。
public void subscribe(Pattern pattern);
订阅指定分区:
public void assign(Collection<TopicPartition> partitions);
public class TopicPartition{
int partition;
String topic;
}
如果不知道分区个数,可以通过KafkaConsumer.partitionsFor(String topic);获取。
public List<PartitionInfo> partitionsFor(Collection<TopicPartition> partitions);
//获取分区信息的列表
public class PartitionInfo {
String topic;
int partition;
Node leader;
...
}
三种不同的订阅状态:
AUTO_TOPICS---->subcribe(Collection)
AUTO_PATTERN---->subcribe(Pattern)
USER_ASSIGNED---->assign(Collection)
拉取消息
public ConsumerRecords<K, V> poll (final Duration timeout);
timeout用来控制poll的阻塞时间,如果没有可用数据会发生阻塞。需要多长时间将控制权交给轮询的应用程序。
设置为0,不管是否拉取到消息,直接返回。
如果应用线程唯一的工作就是拉取并消费消息,则可设置为最大值(Long.MAX_VALUE)
消费消息
指定分区消费
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (TopicPartition tp : records.partitions()) {
for (ConsumerRecord<String, String> record : records.records(tp)) {
System.out.println(record.value);
}
}
public static final AtomicBoolean isRunning = new AtomicBoolean(true);
consumer.subscribe(ArraysList.asList(topic));
try {
while(running.get()) {
// consumer.poll($%##);
// commit offset
}
} catch (WakeupException e) {
// ignore the error
} catch (Exception e) {
// some process handle error
} finally {
consumer.close();
}
退出循环可以使用running.set(false)或者
原文:https://www.cnblogs.com/suyeSean/p/11241901.html