首页 > Windows开发 > 详细

kafka学习总结013 --- kafka消费者API

时间:2020-06-28 16:10:47      阅读:69      评论:0      收藏:0      [点我收藏+]

创建消费者

public static Consumer<String, String> createConsume2(String groupName) {
    Properties properties = new Properties();
    properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, BOOTSTARP_SERVER_URL);
    properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupName);
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    return new KafkaConsumer<>(properties);
}

数据消费流程

public class MyConsumer1 {

    private static final Logger LOGGER = LoggerFactory.getLogger(MyConsumer1.class);

    public static void main(String[] args) {
        Consumer<String, String> consumer = KafkaTestUtil.createConsume2("group2");
        consumer.subscribe(Collections.singletonList("topic1"));

        while (true) {
            ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
            for (ConsumerRecord<String, String> record : consumerRecords) {
                LOGGER.error("consumer1: offset={}, partition={}, value={}", record.offset(), record.partition(), record.value());
            }
        }
    }
}

运行结果

技术分享图片

kafka学习总结013 --- kafka消费者API

原文:https://www.cnblogs.com/sniffs/p/13202923.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!