首页 > 其他 > 详细

Kafka基础

时间:2021-06-12 01:17:30      阅读:21      评论:0      收藏:0      [点我收藏+]

1.Kafka 配置参数含义

  

①配置文件配置方式
spring:
  kafka:
    bootstrapServers: kafkaserver:port
    schemaRegistry: kafkaSchemaRegisterWebSite
    autoOffsetReset: earliest
    security.protocol: SSL
    topic: kafkatopic
    groupId: groupId
    listener:
      type: batch
    properties:
      # Broker connection parameters
      security.protocol: SSL
      #Schema Registry Connection parameter
      schema.registry.url: kafkaSchemaRegisterWebSite
      specific.avro.reader: true
    consumer:
      security.protocol: SSL
      auto-offset-reset: earliest
      #key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
      #value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
      properties:
        spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
        spring.deserializer.value.delegate.class: io.confluent.kafka.serializers.KafkaAvroDeserializer

②java类配置方式
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
        config.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put("security.protocol", "SSL");
        config.put("compression.type", "snappy");
        config.put("acks", "all");
        config.put("retries", 0);
        config.put(ConsumerConfig.GROUP_ID_CONFIG,groupId);
        config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");

      ①配置文件配置方式
spring:
  kafka:
    bootstrapServers: kafkaserver:port
    schemaRegistry: kafkaSchemaRegisterWebSite
    autoOffsetReset: earliest
    security.protocol: SSL
    topic: kafkatopic
    groupId: groupId
    listener:
      type: batch
    properties:
      # Broker connection parameters
      security.protocol: SSL
      #Schema Registry Connection parameter
      schema.registry.url: kafkaSchemaRegisterWebSite
      specific.avro.reader: true
    consumer:
      security.protocol: SSL
      auto-offset-reset: earliest
      #key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
      #value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
      properties:
        spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
        spring.deserializer.value.delegate.class: io.confluent.kafka.serializers.KafkaAvroDeserializer

②java类配置方式
consumer
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
        config.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put("security.protocol", "SSL");
        config.put("compression.type", "snappy");
        config.put("acks", "all");
        config.put("retries", 0);
        config.put(ConsumerConfig.GROUP_ID_CONFIG,groupId);
        config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");

producer
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServersMap.get(env));
    props.put("schema.registry.url",schemaRegistryMap.get(env));
    props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
    props.put("security.protocol", "SSL");
    props.put("max.request.size", "10000000");
    props.put("compression.type", "snappy");
    props.put("acks", "all");
    props.put("retries", 0);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());

  

1.1bootstrapServer: 指定broker的地址清单

1.2(反)序列化器

  key-deserializer/value-deserializer: kafka producer将java对象通过key-serializer/value-serializer转为字节格式发送到broker中,kafka consumer将字节通过key-deserializer/value-deserializer转为java对象。另外由于kafka消息是由key和value组成,所有有key-(de)serializer/value-(de)serializer

  Key-deserializer(key反序列化器) :key为基本数据类型org.apache.kafka.common.serialization.StringDeserializer,ByteArray、ByteBuffer、Bytes、Double、Integer、Long这几种类型也都实现了。

  value-deserializer(value反序列化器):value为基本数据类型org.apache.kafka.common.serialization.StringDeserializer,ByteArray、ByteBuffer、Bytes、Double、Integer、Long这几种类型也都实现了.value为特定schema类型:io.confluent.kafka.serializers.KafkaAvroDeserializer

    (反)序列化框架:Avro, Thrift, Protobuf...

    Avro序列化框架:producre发送消息时序列化消息中包含整个schema,consumer读取消息时需要找到schema,目前通用的设计模式是使用schema注册表达到这个目的,schema注册表是一个开源项目:https://github.com/confluentinc/schema-registry#quickstart-api-usage-examples

  schemaRegister:   当kafka producer发送特定schema类型数据,consumer接收特定schema数据时,schemaRegister变的很重要。为了让Kafka支持Avro序列化,schemaRegister以topic为单位用来管理Avro schema。

        序列化器用来处理schema的注册,发序列化器用来处理schema的拉取。如果序列化注册的schema和发序列化器拉取的schema不兼容,则会抛出异常,需要额外处理

  schema.registry.url指向schema注册表的位置

  技术分享图片

 

 

 

 1.3 autoOffsetReset(consumer)消费偏移量

   earliest 
  当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费 
  latest 
  当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据 
  none 
  topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常

1.4 max.request.size(producer)

     能发送单个消息的最大值

1.5 ack (producer)

  ack指定必须有多少个分区副本收到消息生产者才认为写入是成功的

  ack=0:producer不会等待任何服务器的相应,会丢数据但吞吐量高

  ack=1:主节点收到消息则producer会收到来自服务器成功的响应

  ack=all:所有参与复制的节点全部收到消息时,producer才会收到一个来自服务器成功响应

 

1.6  compression.type (producer)开启压缩

  开启压缩在发送传输中可以提高kafka消息吞吐量
  (‘gzip’, ‘snappy’, ‘lz4’, ‘zstd’)

1.7 retries (producer)

  生产者在收到服务器错误消息时重发消息的次数

1.8 fetch.min.bytes(consumer)

  consumer从broker获取最小数据量,如果broker中可用数据量小于这个值他会等到足够的可用数据时才返回给consumer

1.9 fetch.max.wait.ms(Consumer)

  consumer从broker获取消息时间间隔

2.0 enable.auto.commit(consumer)

  指定consumer是否自动提交偏移量,默认ture

 

 

2.Kafka消费不同数据类型

   创建不同containerFactory实现消费不同的kafka

 创建消费String类型consumerListener

技术分享图片
    @KafkaListener(topics = "#{‘${spring.kafka.topic}‘}", groupId = "{‘${spring.kafka.group-id}‘}", containerFactory = "kafkaListenerContainerFactory", errorHandler = "validationErrorHandler")
    public void stringBatchListener(ConsumerRecord<?, ?> data) {
        log.info("-----string message consume start:offset:{},partition:{},value.size:{}-----", data.offset(), data.partition(), data.value().toString().length());
        log.info(MEASUREMENT_MARKER, "{\"keyword\":\"KafkaString\",\"partition\":\"" + data.partition() + "\",\"offset\":\""+data.offset()+"\",\"size\":\""+data.value().toString().length()+"\"}");
        if (null != data.value())
            kafkaService.valueProcess(data.value().toString());
    }


    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        log.info("------ipvrawdata groupid:{}",groupId);
        Map<String, Object> config = new HashMap<>();

        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
        config.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put("security.protocol", "SSL");
        config.put("max.request.size", "10000000");
        config.put("compression.type", "snappy");
        config.put("acks", "all");
        config.put("retries", 0);
        config.put(ConsumerConfig.GROUP_ID_CONFIG,groupId);
        config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
        return new DefaultKafkaConsumerFactory<>(config);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
View Code

 创建消费特定schema类型consumerListener

技术分享图片
 @Bean
    public ConsumerFactory<String, DataTable> consumerFactory() {
        log.info("------ipvrawdata groupid:{}",groupId);
        Map<String, Object> config = new HashMap<>();

        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
        config.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        config.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG,"")
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
        config.put("security.protocol", "SSL");
        config.put("max.request.size", "10000000");
        config.put("compression.type", "snappy");
        config.put("acks", "all");
        config.put("retries", 0);
        config.put(ConsumerConfig.GROUP_ID_CONFIG,groupId);
        config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
        return new DefaultKafkaConsumerFactory<>(config);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, DataTable> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, DataTable> factory = new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
View Code

 

Kafka基础

原文:https://www.cnblogs.com/enhance/p/14876137.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!