特点:
1、同时为发布和订阅提供搞吞吐量。Kafka的设计目标是以时间复杂度为O(1)的方式提供消息持久化能力的,即使对TB级别以上数据也能保证常数时间的访问性能,即使在非常廉价的商用机器上也能做到单机支持每秒100K条消息的传输(一般消息处理是百万级,使用Partition实现机器间的并行处理)
2、消息持久化。将消息持久化到磁盘,因此可用于批量消费,例如ETL以及实时应用程序。通过将数据持久化到磁盘以及复制可以防止数据丢失
3、分布式。支持服务器间的消息分区及分布式消费,同时保证每个Partition内的消息顺序传输。同时保证每个Partition内的消息顺序传输。其内部的Producer、Broker和Consumer都是分布式架构,这更易于向外扩展。
4、 消费消息采用Pull模式。消息被处理的状态是在Consumer端维护的,而不是由服务端维护,Broker无状态,Consumer自己保存offset。
5、支持Online和Offline场景,同时支持离线数据处理和实时数据处理。
基本概念:
Broker:Kafka集群中的一台或多台服务器
Topic:主题,发布到Kafka的每条消息都有一个类别,这个类别就被称为Topic(物理上,不同Topic的消息分开存储;逻辑上,虽然一个Topic的消息被保存在一个或多个Broker上,但用户只需要指定消息的Topic即可生产或消费数据,而不必关心数据存于何处)
Partition:物理上的Topic分区,一个Topic可以分为多个Partition,每个partition都是一个有序的队列。Partition中的每条消息都会被分配一个有序的ID(offset),它唯一地标识分区中的每个记录。每个Partition只能被一个消费组中的一个消费者消费。
Producer:消息和数据的生产者,可以理解为向Kafka发送消息的客户端
Consumer:消息和数据的消费者,可以理解为从Kafka取消息的客户端,通过与Kafka集群建立长连接的方式,不断的从集群中拉去消息
Consumer Group(消费组):每个消费者都属于一个特定的消费组(可为每个消费者指定组名,若不指定组名,则属于默认的组)。这是Kafka用来实现一个Topic的广播(发送给所有的消费者)和单播(发送给任意一个消费者)的手段。一个Topic可以由多个消费组。但对每个消费组,只会把消息发送给该组中的一个消费者。如果要实现广播,只要每个消费者都有一个独立的消费组就可以了;如果要实现单播,只要所有的消费者都在同一个消费者组中就行。
一个典型的Kafka集群中包含若干生产者、若干Broker(Kafka支持水平扩展,一般Broker数量越多集群吞吐量越大)、若干消费者组以及一个Zookeeper集群。Kafka通过Zookeeper管理集群配置、选举leader,以及当消费者组发生变化时进行Rebalance(再均衡)。生产者使用推模式将消息发布到Broker,消费者使用拉模式从Broker订阅并消费消息。
创建一个Topic时,可以指定分区数目,分区数越多,其吞吐量越大,但是需要的资源也越多,也会带来更高的不可用性。
生产者在向kafka集群发送消息的分区策略:
1、可以指定分区,则消息投递到指定的分区
2、如果没有指定分区,但是消息的key不为空,则基于key的哈希值来选择一个分区
3、如果既没有指定分区,且消息的key也为空,则用轮询的方式选择一个分区
也就是一条消息只会发送到一个分区中。
对于一个group而言,消费者的数量不应该多余分区的数量,因为在一个group中,每个分区最多只能绑定到一个消费者上,只能被一个消费组中的一个消费者消费。而一个消费者可以消费多个分区。因此,若一个消费组中的消费者数量大于分区数量的话,多余的消费者将不会收到消息(没有分区可以消费)。
实例:
<!--kafka--> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.11.0.1</version> </dependency>
消息生产者:
package com.yang.spbo.kafka; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.HashMap; import java.util.Map; /** * Kafka消息生产者 * 〈功能详细描述〉 * * @author 17090889 * @see [相关类/方法](可选) * @since [产品/模块版本] (可选) */ public class ProducerSample { public static void main(String[] args) { Map<String, Object> props = new HashMap<>(); // Kafka集群,多台服务器地址之间用逗号隔开 props.put("bootstrap.servers", "localhost:9092"); // 消息的序列化类型 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 消息的反序列化类型 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // zookeeper集群地址,提供了基于Zookeeper的集群服务器自动感知功能,可以动态从Zookeeper中读取Kafka集群配置信息 props.put("zk.connect", "127.0.0.1:2181"); String topic = "test-topic"; Producer<String, String> producer = new KafkaProducer<String, String>(props); // 发送消息 producer.send(new ProducerRecord<String, String>(topic, "idea-key2", "javaMesage1")); producer.close(); } }
ProducerRecord构造:
public ProducerRecord(String topic, Integer partition, K key, V value) { this(topic, partition, (Long)null, key, value, (Iterable)null); } public ProducerRecord(String topic, K key, V value) { this(topic, (Integer)null, (Long)null, key, value, (Iterable)null); } public ProducerRecord(String topic, V value) { this(topic, (Integer)null, (Long)null, (Object)null, value, (Iterable)null); }
topic和value是必填的,如果指定了partition,那么消息会被发送至指定的partition;如果没有指定partition但指定了key,那么消息会按照hash(key)发送至指定的partition;如果既没有指定partition,也没有指定key,那么消息会按照round-robin模式发送至每一个partition。
消息消费者:
package com.yang.spbo.kafka; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.Arrays; import java.util.Properties; /** * Kafka消息消费者 * 〈功能详细描述〉 * * @author 17090889 * @see [相关类/方法](可选) * @since [产品/模块版本] (可选) */ public class ConsumerSample { public static void main(String[] args) { String topic = "test-topic"; Properties props = new Properties(); // Kafka集群,多台服务器地址之间用逗号隔开 props.put("bootstrap.servers", "localhost:9092"); // 消费组ID props.put("group.id", "test_group1"); // Consumer的offset是否自动提交 props.put("enable.auto.commit", "true"); // 自动提交offset到zk的时间间隔,时间单位是毫秒 props.put("auto.commit.interval.ms", "1000"); // 消息的反序列化类型 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); Consumer<String, String> consumer = new KafkaConsumer<String, String>(props); // 订阅的话题 consumer.subscribe(Arrays.asList(topic)); // Consumer调用poll方法来轮询Kafka集群的消息,一直等到Kafka集群中没有消息或者达到超时时间100ms为止 while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord record : records) { System.out.println(record.partition() + record.offset()); System.out.println(record.key()); System.out.println(record.value()); } } } }
Kafka应用:
1、用户行为数据采集
2、基于Kafka的日志收集:
集群方式部署的应用,日志文件有多个存放地址,需要快速定位日志问题就比较繁琐,那么就需要一个统一的日志平台来管理项目中产生的日志文件。
各个应用系统在输出日志时利用拥有高吞吐量的Kafka作为数据缓冲平台,将日志统一输出到Kafka,再通过Kafka以统一接口服务的方式开放给消费者。
现在很多公司做的统一日志平台就是收集重要的系统日志几种到Kafka中,然后再导入Elasticsearch、HDFS、Storm等具体日志数据的消费者中,用于进行实时搜索分析、离线统计、数据备份、大数据分析等。引入log4j和Kafka的集成包kafka-log4j-appender,在日志配置文件中配置kafka信息即可
3、基于Kafka的流量削峰
比如秒杀带来的流量高峰的场景,为了保证系统高可用,加入消息队列作为信息流的缓冲,从而缓解短时间内产生的高流量带来的压垮整个应用的问题,这就叫流量削峰。
比如:秒杀场景,将商品的基本信息和库存使用缓存预热保存在Redis中,然后从Redis中读取,系统后台收到秒杀下单请求先从Redis中预减库存,如果库存
不足,返回秒杀失败;如果库存充足,则将请求的业务数据放入消息队列Kafka中排队,之后请求立即返回页面。消息队列的消费者在收到消息后取得业务数据,
执行后续的生成订单、扣减数据库和写消息操作。
Kafka分区:
在使用Kafka作为消息队列时,不管是发布还是订阅都需要指定主题topic,在这里的主题是一个逻辑上的概念,实际上Kafka的基本存储单元是分区Partition,在一个Topic中会有一个或多个Partition,不同的Partition可位于不同的服务器节点上,物理上一个Partition对应一个文件夹。(分区是Topic私有的,所有的Topic之间不共享分区)
站在生产者和Broker的角度,对不同Partition的写操作时完全并行的,但对消费者而言,其并发数则取决于Partition的数量。
所以在实际的项目中需要配置合适的Partition数量,而这个数值需要根据所设计的系统吞吐量来推算。假设p是生产者写入单个Partition的最大吞吐量,c表示消费者从单个Partitin消费的最大吞吐量,系统需要的目标吞吐量为t,那么Partition的数量应取t/p和t/c之间的大者。而且Partition的值要大于或等于消费组中消费者的数量。
Kafka集群复制(1.0保证消息不丢失的策略)
Kafka使用了zookeeper实现了去中心化的集群功能,简单地讲,其运行机制是利用zookeeper维护集群成员的信息,每个Broker实例都会被设置一个唯一的标识符,Broker在启动时会通过创建临时节点的方式把自己的唯一标识符注册到zookeeper中,Kafka中的其他组件会监视Zookeeper里的/broker/ids路径,所以当集群中有Broker加入或退出时其他组件就会收到通知。
虽然Kafka有集群功能,但是在0.8版本之前一直存在一个严重的问题,就是一旦某个Broker宕机,该Broker上的所有Partition数据就不能被消费了,生产者也不能把数据存放在这些Partition中了,显然不满足高可用设计。
为了让Kafka集群中某些节点不能继续提供服务的情况下,集群对外整体依然可用,即生产者可继续发送消息,消费者可继续消费消息,所以需要提供一种集群间数据的复制机制。在Kafka中是通过使用Zookeeper提供的leader选举方式来实现数据复制方案的,其基本原理是:首先在Kafka集群中的所有节点中选举出一个leader,其他副本作为follower,所有的写操作都先发给leader,然后再由leader把消息发给follower。
复制方案使Kafka集群可以在部分节点不可用的情况下还能保证Kafka的整体可用性。Kafka中的复制操作也是针对分区的。一个分区有多个副本,副本被保存在Broker上,每个Broker都可以保存上千个属于不同主题和分区的副本。副本有两种类型:leader副本(每个分区都会有)和follower副本(除了leader副本之外的其他副本)。为了保证一致性,所有的生产者和消费者的请求都会经过leader。而follower不处理客户端的请求,它的职责是从leader处复制消息数据,使自己和leader的状态保持一致,如果leader节点宕机,那么某个follower就会被选为leader继续对外提供服务。
Kafka保证消息不丢失的方案
一、消息发送
1、消息发送确认:消息数据是存储在分区中的,而分区又可能有多个副本,所以一条消息被发送到Broker之后何时算投递成功呢?Kafka提供了三种模式:
1):不等Broker确认,消息被发送出去就认为是成功的。这种方式延迟最小,但是不能保证消息已经被成功投递到Broker
2):由leader确认,当leader确认接收到消息就认为投递是成功的,然后由其他副本通过异步方式拉取
3):由所有的leader和follower都确认接收到消息才认为是成功的。采用这种方式投递的可靠性最高,但相对会损伤性能
// 生产者消息发送确认模式,0表示第一种,1表示第二种,all表示第三种 props.put("acks", "1");
2、消息重发:Kafka为了高可用性,生产者提供了自动重试机制。当从Broker接收到的是临时可恢复的异常时,生产者会向Broker重发消息,但不能无限
次重发,如果重发次数达到阀值,生产者将不再重试并返回错误。
// 消息发送重试次数 props.put("retries", "10"); // 重试间隔时间,默认100ms,设置时需要知道节点恢复所用的时间,要设置的比节点恢复所用时间长 props.put("retry.backoff.ms", "1000");
二、消息消费
从设计上来说,由于Kafka服务端并不保存消息的状态,所以在消费消息时就需要消费者自己去做很多事情,消费者每次调用poll方法时,该方法总是返回
由生产者写入Kafka中但还没有被消费者消费的消息。Kafka在设计上有一个不同于其他JMS队列的地方是生产者的消息并不需要消费者确认,而消息在分区中
又是顺序排列的,那么必然就可以通过一个偏移量offset来确定每一条消息的位置,偏移量在消息消费的过程中起着很重要的作用。
更新分区当前位置的操作叫做提交偏移量,Kafka中有个叫做_consumer_offset的特殊主题用来保存消息在每个分区的偏移量,消费者每次消费时都会往
这个主题中发送消息,消息包含每个分区的偏移量。如果消费者崩溃或者有新的消费者加入消费组从而触发再均衡操作,再均衡之后该分区的消费者若不是之前
那个,那么新的消费者如何得知该分区的消息已经被之前的消费者消费到哪个位置了呢?这种情况下,就提现了偏移量的用处。为了能继续之前的工作,新的消
费者需要读取每个分区最后一次提交的偏移量,然后再从偏移量开始继续往下消费消息。
偏移量提交方式:
1、自动提交
Kafka默认会定期自动提交偏移量,提交的默认时间间隔是5000ms,但可能存在提交不及时导致再均衡之后重复消费的情况
// Consumer的offset是否自动提交 props.put("enable.auto.commit", "true"); // 自动提交offset到zk的时间间隔,时间单位是毫秒 props.put("auto.commit.interval.ms", "1000");
2、手动提交
先关闭消费者的自动提交配置,然后使用commitSync方法提交偏移量。
// 关闭自动提交 props.put("enable.auto.commit", "false"); // Consumer调用poll方法来轮询Kafka集群的消息,一直等到Kafka集群中没有消息或者达到超时时间100ms为止 while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord record : records) { System.out.println(record.partition() + record.offset()); System.out.println(record.key()); System.out.println(record.value()); } // 手动提交最新的偏移量 consumer.commitSync(); }
commitSync方法会提交由poll返回的最新偏移量,所以在处理完记录后要确保调用了commitSync方法,否则还是会发生重复处理的问题。
3、异步提交
使用commitSync方法提交偏移量有一个不足之处,就是该方法在Broker对提交请求做出回应前是阻塞的,要等待回应。因此,采用这种方式每提交一次偏移量就
等待一次限制了消费端的吞吐量,因此Kafka提供了异步提交的方式【consumer.commitAsync();】,消费者只管发送提交请求,而不需要等待Broker的立即回应。
但commitSync方法在成功提交之前如碰到无法恢复的错误之前会一直重试,而commitAsync并不会,因为为了避免异步提交的偏移量被覆盖。
Kafka高吞吐量的原因?
1)、顺序读写
Kafka的消息是不断追加到文件中的,这个特性使Kafka可以充分利用磁盘的顺序读写性能。顺序读写不需要磁盘磁头的寻道时间,只需很少的扇区
旋转时间,所以速度远快于随机读写
2)、零拷贝
在Linux Kernel2.2之后出现了一种叫做“零拷贝(zero-copy)”系统调用机制,就是跳过“用户缓冲区”的拷贝,建立一个磁盘空间和内存的直接映射,
数据不再复制到“用户缓冲区”
3)、分区
kafka中的topic中的内容可以分在多个分区(partition)存储,每个partition又分为多个段segment,所以每次操作都是针对一小部分做操作,很轻便,
并且增加并行操作的能力
4)、批量发送
Kafka允许进行批量发送消息,Productor发送消息的时候,可以将消息缓存在本地,等到了固定条件发送到kafka
(1):等消息条数到固定条数
(2):一段时间发送一次
5)、数据压缩
Kafka还支持对消息集合进行压缩,Producer可以通过GZIP或Snappy格式对消息集合进行压缩,压缩的好处就是减少传输的数据量,减轻
对网络传输的压力。
批量发送和数据压缩一起使用,单条做数据压缩的话,效果不太明显。
原文:https://www.cnblogs.com/yangyongjie/p/11140897.html