首页 > 其他 > 详细

kafka(二)

时间:2020-07-30 09:55:21      阅读:83      评论:0      收藏:0      [点我收藏+]

1.消费者

  

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;

public class KafkaConsumerDemo {
    public static void main(String[] args) {

        String topic = "ot";
        String groupId = "og";

        Properties properties = new Properties();
        properties.put("bootstrap.servers","localhost:9092");
        properties.put("group.id",groupId);
        properties.put("enable.auto.commit","true");
        properties.put("auto.commit.interval.ms","1000");
        properties.put("auto.offset.reset","earliest");
        properties.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");


        KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);
        consumer.subscribe(Arrays.asList(topic));

        try{
            while(true){
                ConsumerRecords<String,String> records = consumer.poll(1000);
                for(ConsumerRecord<String,String> record : records){
                    System.out.printf("offset=%d, key=%s,value= %s%n",record.offset(),record.key(),record.value());
                }
            }
        }finally {
            consumer.close();
        }
    }
}

2.处理过程

  首先通过链接zk找到borker的位置,然后通过相互发消息找到负载最小的一个点,然后发送join请求,收到broker回复后然后执行分区选择操作,然后将分区选择结果发送给broker,然后从服务端拉取offset,从offset开始消费数据。每次消费完一部分数据后要向服务端报告节点的offset,有新的消费者接入时消费者客户端通过监听事件立刻保存offset然后执行rebalance操作重新分配分区。

  1).NetWorkClient通过发送请求到服务端来寻找负载最小的broker,

  2).执行joinGroup操作,发送joinGroupRequet到broker,然后执行rebalance操作,来选择当前客户端的消费分区。

  3).定时的心跳任务,向broker确认消费者的状态

  4).定时的offset确认任务

  5).消费数据

以上操作均在ConsumerCoordinator中完成。

kafka(二)

原文:https://www.cnblogs.com/yangyang12138/p/13401637.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!