官方文档:http://kafka.apache.org/documentation/
多租户
保证:
同一个partition内的顺序性;
consumer能够按序看到日志文件中的记录;
对于副本因子为N的topic,即使N-1个服务器宕机,已经提交到日志的记录能够不被丢失。
用作消息系统:
简化了传统消息系统的两种概念:queuing publish-subscribe
将topic中的每一个partition分配给组里的一个consumer,能够保证同一个partition中的消息被顺序消费。
用作存储系统:
只有数据被完全备份并且保证已经持久化了,数据的写入才被认为成功。
流处理:
better throughput, built-in partitioning, replication, and fault-tolerance,其他消息队列:
其他流处理平台:Storm,Samza
broker
topic
producer
consumer
connect
pagecache-centric design
常量的时间复杂度
LB:
producer直接将数据发送给对应的partition;
所有的kafka服务器可以在给定的时间内回应哪一个服务器是存储的并且topic的partition leader在哪里;
客户端控制将数据写入到哪一个partition;kafka留了一个指定key的接口,kafka将对该key作hash(hash函数也是可以自定义的)从而确定partition。
异步发送:
批量发送可以配置为不超过固定数量的message或者等到不超过一些固定的延迟限制(比如说64k,10ms);
这种缓冲是可以配置的,并且提供了一种来通过少量的延迟来提高吞吐量的机制。
kafka采用的是传统的数据由producer推送给broker,然后由consumer从broker拉去的机制;Scribe及flume采用的pushed机制,这样consumer就比较难处理信息,因为它无法控制broker向他推送数据的频率,相反kafka在这方面就显得更加可控些;
拉式系统有一个问题是如果broker没有数据,consumer会一直空转忙等待至有数据到达;
常规的消息系统是由broker记录哪些消息被消费,确定哪些消息被消费后,将之删除这能使数据量变小,但会带来一些其他问题:如果消息被发送后没有被正确消费,一直收不到消费成功的确认等,给每条数据记录状态的性能问题等等;
kafka将topic分为多个有序的partition,对于每一个partition而言,consumer的position仅仅是一个整数;另外,消息可以被重复消费。
从producer的角度而言:0.11之后,kafka提供了一种幂等的机制,能够保证重发不会在log中产生重复的项,因为broker给每个producer分配了一个ID并且删除使用已被发送消息的序列码的消息;在这个版本后,kafka还支持向多个topic partitions发送消息的事务语义。
从consumer的角度而言:为了保证"exactly once",我们可以在一个事务中处理数据并将offset写入到topic中;事务默认的隔离级别是未提交读。
kafka默认支持at-least-once发送,并且允许用户通过关闭重试机制实现at-most-once发送以及在处理一批数据之前提交offset。
所有的读写都是由leader partition实现。
kafka对节点存活的定义:
1. 节点维持着与ZK的session(依据ZK的心跳机制);
2. 从节点必须复制主节点上发生的写并且不能落后太多;
leader跟踪从节点的列表,并且发现挂了、出错或者落后的节点后将至从列表中移除。对于出错或者落后的节点的配置在replica.lag.time.max.ms中;
kafka不处理拜占庭一类的问题:比如恶意或者随意的回复。
只有已经提交的消息才能被consumer读到。
producer可以选择是否等待消息被提交,这取决于在时延及持久性之间作权衡,这由相关的producer的ack配置控制。
kafka的选举不是多数决,而是为能够追上leader的副本动态维护一个ISR(In-sync replicas),只有这里的成员能够参与leader选举。写入数据至partition时,只有in-sync的副本都接受到了,这些消息才会确认。ISR集合放在ZK中。
两种办法:
等待ISR中的副本出现;(0.11后默认,可设置unclean.leader.election.enable改变)
选择第一个出现的副本;
在多少副本写入成功才认为消息已提交:0,1,-1(all),注意:all只是保证所有处于正在in-sync的副本成功。
关于可用性及持久性,有两个更高级的配置:
Disable unclean leader election;
指定最小ISR大小:不过只有当ack为all时才能起效。
partition的分配:
leader的选择:
1. min.compaction.lag.ms可以保证消息写入后不被清理的最短时间;
2. 不改变顺序
3. 不改变offset
4. delete.retention.ms
日志清理有一个log cleaner后台线程池执行。
log.cleanup.policy=compact
log.cleaner.min.compaction.lag.ms 消息在被清理前的最小保留时间
Quotas的配置:
超过配额了怎么办:
由于文件夹名大小限制255个字符,所以topic的名字长度有限制;
kafka可以增加partition的数目,但是不会改变已有数据的partition归属,同时也不支持减小partition的数目。
1. 同步日志到磁盘,避免重启时作日志恢复;这一点在非hard kill情况下是默认执行的;
2. 关机前迁移所拥有的leader partition,需设置controlled.shutdown.enable=true。
执行
bin/kafka-preferred-replica-election.sh --zookeeper zk_host:port/chroot
或者配置 auto.leader.rebalance.enable=true
broker.rack=my-rack-id可以设置broker所属的机架,一个partition将分布于min{racks,replication-factor}个机架中。
source以及destination集群的partition数量及offset都可以不一样。
kafka-mirror-maker.sh命令,--whitelist指定topic,值需要在引号中,可以是一个正则表达式。
auto.create.topics.enable=true配置可以让集群实现自动创建或者备份数据。
kafka-reassign-partitions.sh
横跨多个数据中心的情形,kakfa更推荐使用镜像集群的方式。
不推荐部署一个跨越多个数据中心的集群,因为会增加分片之间同步的延时,网络不可用时,要么kafka要么ZK不可用。
producer的关键配置包括:
consumer的关键配置是fetch size
kafka服务器使用Yammer Metrics ,Kafka客户端使用内置的Kafka Metrics。这两者都拓展自JMX
典型的ZK服务包含5或7个节点
使ZK隔离运行
为ZK分配足够的Java堆空间
1. subscribe与assign的区别
原文:https://www.cnblogs.com/cheungchein/p/10066903.html