首页 > 其他 > 详细

kafka消费者全部数据

时间:2020-10-11 09:00:49      阅读:66      评论:0      收藏:0      [点我收藏+]

1.目的

  每次消费数据时从最开始消费,主要是做数据预览。

2.设置offset.reset

Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "demo");
properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer.class.getName());
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer.class.getName());

return new KafkaConsumer<>(properties);

如果只设置AUTO_OFFSET_RESET_CONFIG是无效的,必须加上AUTO_OFFSET_RESET_CONFIG

3.每次消费完重新seek offset到开始位置

  由于seek是针对offset,所以必须是消费过数据后才能执行seek。

  consumer.beginningOffsets(consumer.listTopics().get("demo").stream().map(o->new TopicPartition(o.topic(),o.partition())).collect(Collectors.toList()));

kafka消费者全部数据

原文:https://www.cnblogs.com/yangyang12138/p/13796103.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!