Kafka是一个消息系统,原本开发自LinkedIn,用作LinkedIn的活动流(activity stream)和运营数据处理管道(pipeline)的基础。
如今它已为多家不同类型的公司 作为多种类型的数据管道(data pipeline)和消息系统使用。
活动流数据是全部站点在对其站点使用情况做报表时要用到的数据中最常规的部分。活动数据包含页面訪问量(page view)、被查看内容方面的信息以及搜索情况等内容。这样的数据通常的处理方式是先把各种活动以日志的形式写入某种文件。然后周期性地对这些文件进行统计分析。运营数据指的是server的性能数据(CPU、IO使用率、请求时间、服务日志等等数据)。运营数据的统计方法种类繁多。
近年来,活动和运营数据处理已经成为了站点软件产品特性中一个至关重要的组成部分。这就须要一套略微更加复杂的基础设施对其提供支持。
这样的由不可变(immutable)的活动数据组成的高吞吐量数据流代表了对计算能力的一种真正的挑战,因其数据量非常easy就可能会比站点中位于第二位的数据源的数据量大10到100倍。
传统的日志文件统计分析对报表和批处理这样的离线处理的情况来说,是一种非常不错且非常有伸缩性的方法;可是这样的方法对于实时处理来说其时延太大,并且还具有较高的运营复杂度。
还有一方面。现有的消息队列系统(messaging and queuing system)却非常适合于在实时或近实时(near-real-time)的情况下使用,但它们对非常长的未被处理的消息队列的处理非常不给力,往往并不将数据持久化作为首要的事情考虑。这样就会造成一种情况,就是当把大量数据传送给Hadoop这样的离线系统后。 这些离线系统每一个小时或每天仅能处理掉部分源数据。
Kafka的目的就是要成为一个队列平台,只使用它就行既支持离线又支持在线使用这两种情况。
Kafka支持很通用的消息语义(messaging semantics)。虽然我们这篇文章主要是想把它用于活动处理。但并没有不论什么限制性条件使得它只适用于此目的。
以下的示意图所看到的是在LinkedIn中部署后各系统形成的拓扑结构。
要注意的是,一个单个的Kafka集群系统用于处理来自各种不同来源的全部活动数据。它同一时候为在线和离线的数据使用者提供了一个单个的数据管道,在线活动和异步处理之间形成了一个缓冲区层。我们还使用kafka。把全部数据复制(replicate)到另外一个不同的数据中心去做离线处理。
我们并不想让一个单个的Kafka集群系统跨越多个数据中心,而是想让Kafka支持多数据中心的数据流拓扑结构。
这是通过在集群之间进行镜像或“同步”实现的。这个功能很easy,镜像集群仅仅是作为源集群的数据使用者的角色执行。这意味着,一个单个的集群就行将来自多个数据中心的数据集中到一个位置。
以下所看到的是可用于支持批量装载(batch loads)的多数据中心拓扑结构的一个样例:
请注意。在图中上面部分的两个集群之间不存在通信连接,两者可能大小不同,具有不同数量的节点。以下部分中的这个单个的集群能够镜像随意数量的源集群。
要了解镜像功能使用方面的很多其它细节,请訪问这里.
Kafka之所以和其他绝大多数信息系统不同,是由于以下这几个为数不多的比較重要的设计决策:
以上这些设计决策将在下文中进行逐条详述。
首先来看一些主要的术语和概念。
消息指的是通信的基本单位。由消息生产者(producer)公布关于某话题(topic)的消息,这句话的意思是。消息以一种物理方式被发送给了作为代理(broker)的server(可能是另外一台机器)。若干的消息使用者(consumer)订阅(subscribe)某个话题,然后生产者所公布的每条消息都会被发送给全部的使用者。
Kafka是一个显式的分布式系统 —— 生产者、使用者和代理都能够执行在作为一个逻辑单位的、进行相互协作的集群中不同的机器上。对于代理和生产者。这么做很自然,但使用者却须要一些特殊的支持。
每一个使用者进程都属于一个使用者小组(consumer group) 。
准确地讲,每条消息都仅仅会发送给每一个使用者小组中的一个进程。因此,使用者小组使得很多进程或多台机器在逻辑上作为一个单个的使用者出现。使用者小组这个概念很强大。能够用来支持JMS中队列(queue)或者话题(topic)这两种语义。
为了支持队列 语义,我们能够将全部的使用者组成一个单个的使用者小组。在这样的情况下,每条消息都会发送给一个单个的使用者。为了支持话题语义。能够将每一个使用者分到它自己的使用者小组中,随后全部的使用者将接收到每一条消息。在我们的使用其中,一种更常见的情况是,我们依照逻辑划分出多个使用者小组。每一个小组都是有作为一个逻辑总体的多台使用者计算机组成的集群。在大数据的情况下,Kafka有个额外的长处,对于一个话题而言。不管有多少使用者订阅了它,一条条消息都仅仅会存储一次。
在对消息进行存储和缓存时,Kafka严重地依赖于文件系统。 大家普遍觉得“磁盘非常慢”,因而人们都对持久化结(persistent structure)构能够提供说得过去的性能抱有怀疑态度。
实际上。同人们的期望值相比。磁盘能够说是既非常慢又非常快,这取决于磁盘的使用方式。
设计的非常好的磁盘结构往往能够和网络一样快。
磁盘性能方面最关键的一个事实是,在过去的十几年中,硬盘的吞吐量正在变得和磁盘寻道时间严重不一致了。结果,在一个由6个7200rpm的SATA硬盘组成的RAID-5磁盘阵列上,线性写入(linear write)的速度大约是300MB/秒,但随即写入却仅仅有50k/秒,当中的区别接近10000倍。
线性读取和写入是全部使用模式中最具可估计性的一种方式。因而操作系统採用预读(read-ahead)和后写(write-behind)技术对磁盘读写进行探測并优化后效果也不错。预读就是提前将一个比較大的磁盘块中内容读入内存,后写是将一些较小的逻辑写入操作合并起来组成比較大的物理写入操作。
关于这个问题更深入的讨论请參考这篇文章ACM Queue article;实际上他们发现,在某些情况下。顺序磁盘訪问可以比随即内存訪问还要快。
为了抵消这样的性能上的波动,现代操作系变得越来越积极地将主内存用作磁盘缓存。全部现代的操作系统都会乐于将全部空暇内存转做磁盘缓存。即时在须要回收这些内存的情况下会付出一些性能方面的代价。
全部的磁盘读写操作都须要经过这个统一的缓存。想要舍弃这个特性都不太easy。除非使用直接I/O。因此,对于一个进程而言。即使它在进程内的缓存中保存了一份数据,这份数据也可能在OS的页面缓存(pagecache)中有反复的一份。结构就成了一份数据保存了两次。
更进一步讲,我们是在JVM的基础之上开发的系统,仅仅要是了解过一些Java中内存用法的人都知道这两点:
这还大大简化了代码,由于对缓存和文件系统之间的一致性进行维护的全部逻辑如今都是在OS中实现的。这事OS做起来要比我们在进程中做那种一次性的缓存更加高效。准确性也更高。假设你使用磁盘的方式更倾向于线性读取操作,那么随着每次磁盘读取操作,预读就能很高效使用随后准能用得着的数据填充缓存。
这就让人联想到一个很easy的设计方案:不是要在内存中保存尽可能多的数据并在须要时将这些数据刷新(flush)到文件系统。而是我们要做全然相反的事情。全部数据都要马上写入文件系统中持久化的日志中但不进行刷新数据的不论什么调用。实际中这么做意味着。数据被传输到OS内核的页面缓存中了。OS随后会将这些数据刷新到磁盘的。
此外我们加入了一条基于配置的刷新策略。同意用户对把数据刷新到物理磁盘的频率进行控制(每当接收到N条消息或者每过M秒)。从而能够为系统硬件崩溃时“处于危急之中”的数据在量上加个上限。
这样的以页面缓存为中心的设计风格在一篇解说Varnish的设计思想的文章中有具体的描写叙述(文风略带有助于身心健康的傲气)。
一般O(log N)被觉得基本上等于常量时长,但对于磁盘操作来讲,情况就不同了。磁盘寻道时间一次要花10ms的时间,并且每一个磁盘同一时候仅仅能进行一个寻道操作。因而其并行程度很有限。因此,即使少量的磁盘寻道操作也会造成很大的时间开销。由于存储系统混合了快速缓存操作和真正的物理磁盘操作,所以树型结构(tree structure)可观察到的性能往往是超线性的(superlinear)。
更进一步讲,BTrees须要一种很复杂的页面级或行级锁定机制才干避免在每次操作时锁定一整颗树。实现这样的机制就要为行级锁定付出很高昂的代价,否则就必须对全部的读取操作进行串行化(serialize)。由于对磁盘寻道操作的高度依赖。就不太可能高效地从驱动器密度(drive density)的提高中获得改善。因而就不得不使用容量较小(< 100GB)转速较高的SAS驱动去,以维持一种比較合理的数据与寻道容量之比。
直觉上讲,持久化队列能够依照通常的日志解决方式的样子构建。仅仅是简单的文件读取和简单地向文件里加入内容。尽管这样的结果必定无法支持BTree实现中的丰富语义。但有个优势之处在于其全部的操作的复杂度都是O(1),读取操作并不须要阻止写入操作,并且反之亦然。这样做显然有性能优势,由于性能全然同数据大小之间脱离了关系 —— 一个server如今就能利用大量的便宜、低转速、容量超过1TB的SATA驱动器。
尽管这些驱动器寻道操作的性能非常低。但这些驱动器在大量数据读写的情况下性能还凑和,而仅仅需1/3的价格就能获得3倍的容量。
可以存取到差点儿无限大的磁盘空间而无须付出性能代价意味着。我们可以提供一些消息系统中并不常见的功能。
例如。在Kafka中,消息在使用完后并没有马上删除,而是会将这些消息保存相当长的一段时间(例如说一周)。
我们的如果是。系统里消息的量很之大。实际消息量是站点页面浏览总数的数倍之多(由于每一个页面浏览就是我们要处理的当中一个活动)。并且我们如果公布的每条消息都会被至少读取一次(往往是多次),因而我们要为消息使用而不是消息的产生进行系统优化。
导致低效率的原因常见的有两个:过多的网络请求和大量的字节拷贝操作。
为了提高效率,API是环绕这“消息集”(message set)抽象机制进行设计的,消息集将消息进行自然分组。这么做能让网络请求把消息合成一个小组,分摊网络往返(roundtrip)所带来的开销。而不是每次只发送一个单个消息。
MessageSet实现(implementation)本身是对字节数组或文件进行一次包装后形成的一薄层API。
因而,里面并不存在消息处理所需的单独的序列化(serialization)或逆序列化(deserialization)的步骤。消息中的字段(field)是按需进行逆序列化的(或者说。在不须要时就不进行逆序列化)。
由代理维护的消息日志本身只是是那些已写入磁盘的消息集的文件夹。
按此进行抽象处理后,就能够让代理和消息使用者共用一个单个字节的格式(从某种程度上说,消息生产者也能够用它。消息生产者的消息要求其校验和(checksum)并在验证后才会加入到日志中)
使用共通的格式后就能对最重要的操作进行优化了:持久化后日志块(chuck)的网络传输。为了将数据从页面缓存直接传送给socket。现代的Unix操作系统提供了一个高度优化的代码路径(code path)。在Linux中这是通过sendfile这个系统调用实现的。通过Java中的API,FileChannel.transferTo,由它来简洁的调用上述的系统调用。
为了理解sendfile所带来的效果,重要的是要理解将数据从文件传输到socket的数据路径:
这样效率显然非常低,由于里面涉及4次拷贝,2次系统调用。使用sendfile就能够避免这些反复的拷贝操作,让OS直接将数据从页面缓存发送到网络中,当中仅仅需最后一步中的将数据复制到NIC的缓冲区。
我们预期的一种常见的用例是一个话题拥有多个消息使用者。採用前文所述的零拷贝优化方案。数据仅仅需复制到页面缓存中一次,然后每次发送给使用者时都对它进行反复使用就可以,而无须先保存到内存中,然后在阅读该消息时每次都须要将其复制到内核空间中。如此一来。消息使用的速度就能接近网络连接的极限。
要得到Java中对send‘file和零拷贝的支持方面的很多其它背景知识,请參考IBM developerworks上的这篇文章。
多数情况下系统的瓶颈是网络而不是CPU。 这一点对于须要将消息在个数据中心间进行传输的数据管道来说,尤其如此。
当然。无需来自Kafka的支持,用户总是能够自行将消息压缩后进行传输。但这么做的压缩率会非常低,由于不同的消息里都有非常多反复性的内容(比方JSON里的字段名、web日志中的用户代理或者经常使用的字符串)。
高效压缩须要将多条消息一起进行压缩而不是分别压缩每条消息。
理想情况下。以端到端的方式这么做是行得通的 —— 也即,数据在消息生产者发送之前先压缩一下,然后在server上一直保存压缩状态,仅仅有到终于的消息使用者那里才须要将其解压缩。
通过执行递归消息集,Kafka对这样的压缩方式提供了支持。 一批消息能够打包到一起进行压缩。然后以这样的形式发送给server。这批消息都会被发送给同一个消息使用者。并会在到达使用者那里之前一直保持为被压缩的形式。
Kafka支持GZIP和Snappy压缩协议。
关于压缩的很多其它更具体的信息,请參见这里。
追踪(客户)消费了什么是一个消息系统必须提供的一个关键功能之中的一个。
它并不直观,可是记录这个状态是该系统的关键性能之中的一个。状态追踪要求(不断)更新一个有持久性的实体的和一些潜在会发生的随机訪问。
因此它更可能受到存储系统的查询时间的制约而不是带宽(正如上面所描写叙述的)。
大部分消息系统保留着关于代理者使用(消费)的消息的元数据。也就是说,当消息被交到客户手上时,代理者自己记录了整个过程。这是一个相当直观的选择,并且确实对于一个单机server来说。它(数据)能去(放在)哪里是不清晰的。又由于很多消息系统存储使用的数据结构规模小,所以这也是个有用的选择--由于代理者知道什么被消费了使得它能够立马删除它(数据),保持数据大小只是大。
或许不显然的是,让代理和使用者这两者对消息的使用情况做到一致表述绝不是一件轻而易举的事情。假设代理每次都是在将消息发送到网络中后就将该消息记录为已使用的话,一旦使用者没能真正处理到该消息(比方说,由于它宕机或这请求超时了抑或别的什么原因)。就会出现消息丢失的情况。为了解决此问题,很多消息系新加了一个确认功能,当消息发出后仅把它标示为已发送而不是已使用。然后代理须要等到来自使用者的特定的确认信息后才将消息记录为已使用。这样的策略的确攻克了丢失消息的问题。但由此产生了新问题。首先。假设使用者已经处理了该消息但却未能发送出确认信息,那么就会让这一条消息被处理两次。
第二个问题是关于性能的,这样的策略中的代理必须为每条单个的消息维护多个状态(首先为了防止反复发送就要将消息锁定。然后,然后还要将消息标示为已使用后才干删除该消息)。另外另一些棘手的问题须要处理。比方,对于那些以发出却未得到确认的消息该怎样处理?
系统能够提供的几种可能的消息传递保障例如以下所看到的:
消息在发出后马上标示为已使用,因此消息不会被发出去两次,但这在很多故障中都会导致消息丢失。
这个问题已得到广泛的研究,属于“事务提交”问题的一个变种。
提供只一次语义的算法已经有了。两阶段或者三阶段提交法以及Paxos算法的一些变种就是当中的一些样例,但它们都有与生俱来的的缺陷。
这些算法往往须要多个网络往返(round trip),可能也无法非常好的保证其活性(liveness)(它们可能会导致无限期停机)。FLP结果给出了这些算法的一些主要的局限。
Kafka对元数据做了两件非常不平常的事情。一件是。代理将数据流划分为一组互相独立的分区。这些分区的语义由生产者定义,由生产者来指定每条消息属于哪个分区。
一个分区内的消息以到达代理的时间为准进行排序,将来按此顺序将消息发送给使用者。
这么一来,就用不着为每一天消息保存一条元数据(比方说,将消息标示为已使用)了,我们仅仅需为使用者、话题和分区的每种组合记录一个“最高水位标记”(high
water mark)就可以。因此,标示使用者状态所需的元数据总量实际上特别小。在Kafka中,我们将该最高水位标记称为“偏移量”(offset)。这么叫的原因将在实现细节部分解说。
在Kafka中。由使用者负责维护反映哪些消息已被使用的状态信息(偏移量)。典型情况下。Kafka使用者的library会把状态数据保存到Zookeeper之中。
然而。让使用者将状态信息保存到保存它们的消息处理结果的那个数据存储(datastore)中或许会更佳。
比如。使用者或许就是要把一些统计值存储到集中式事物OLTP数据库中。在这样的情况下,使用者能够在进行那个数据库数据更改的同一个事务中将消息使用状态信息存储起来。
这样就消除了分布式的部分,从而攻克了分布式中的一致性问题!这在非事务性系统中也有类似的技巧可用。搜索系统可用将使用者状态信息同它的索引段(index segment)存储到一起。虽然这么做可能无法保证数据的持久性(durability)。但却可用让索引同使用者状态信息保存同步:假设因为宕机造成有一些没有刷新到磁盘的索引段信息丢了。我们总是可用从上次建立检查点(checkpoint)的偏移量处继续对索引进行处理。与此类似,Hadoop的载入作业(load job)从Kafka中并行载入,也有同样的技巧可用。每一个Mapper在map任务结束前,将它使用的最后一个消息的偏移量存入HDFS。
这个决策还带来一个额外的优点。使用者可用有益回退(rewind)到曾经的偏移量处。再次使用一遍曾经使用过的数据。尽管这么做违背了队列的一般协约(contract)。但对非常多使用者来讲却是个非常主要的功能。
举个样例,假设使用者的代码里有个Bug,并且是在它处理完一些消息之后才被发现的,那么当把Bug改正后,使用者还有机会又一次处理一遍那些消息。
近来有些系统。比方scribe和flume,更着重于日志统计功能,遵循了一种很不同的基于Push的设计思路,当中每一个节点都能够作为代理。数据一直都是向下游Push的。
上述两种方法都各有优缺点。
然而。由于基于Push的系统中代理控制着数据的传输速率,因此它难以应付大量不同种类的使用者。
我们的设计目标是。让使用者能以它最大的速率使用数据。不幸的是,在Push系统中当数据的使用速率低于产生的速率时。使用者往往会处于超载状态(这实际上就是一种拒绝服务攻击)。
基于Pull的系统在使用者的处理速度稍稍落后的情况下会表现更佳,并且还能够让使用者在有能力的时候往往前赶赶。让使用者採用某种退避协议(backoff
protocol)向代理表明自己处于超载状态,能够解决部分问题,可是,将传输速率调整到正好能够全然利用(但从不能过度利用)使用者的处理能力可比初看上去难多了。曾经我们尝试过多次,想按这样的方式构建系统,得到的经验教训使得我们选择了更加常规的Pull模型。
没有中央的“主”节点。代理彼此之间是对等的。不须要不论什么手动配置就可以可随时加入和删除。
相同。生产者和消费者能够在不论什么时候开启。
每一个代理都能够在Zookeeper(分布式协调系统)中注冊的一些元数据(比如,可用的主题)。生产者和消费者能够使用Zookeeper发现主题和相互协调。关于生产者和消费者的细节将在以下描写叙述。
不好的地方在于全部均衡工作都是在TCP连接的层次完毕的,因而均衡效果可能并不佳(假设有些生产者产生的消息远多于其他生产者,按每一个代理对TCP连接进行平均分配可能会导致每一个代理接收到的消息总数并不平均)。
採用client基于zookeeper的负载均衡能够解决部分问题。假设这么做就能让生产者动态地发现新的代理,并按请求数量进行负载均衡。
类似的,它还能让生产者依照某些键值(key)对数据进行分区(partition)而不是随机乱分,因而能够保存同使用者的关联关系(比如,依照用户id对数据使用进行分区)。这样的分法叫做“语义分区”(semantic partitioning)。下文再讨论其细节。
以下解说基于zookeeper的负载均衡的工作原理。
在发生下列事件时要对zookeeper的监视器(watcher)进行注冊:
生产者在其内部为每个代理维护了一个弹性的连接(同代理建立的连接)池。通过使用zookeeper监视器的回调函数(callback),该连接池在建立/保持同全部在线代理的连接时都要进行更新。当生产者要求进入某特定话题时,由分区者(partitioner)选择一个代理分区(參加语义分区小结)。从连接池中找出可用的生产者连接,并通过它将数据发送到刚才所选的代理分区。
这样就能够用一个语义分区函数将消息流依照消息中的某个键值进行分区。并将不同分区发送给各自对应的代理。
通过实现kafak.producer.Partitioner接口。能够对分区函数进行定制。在缺省情况下使用的是随即分区函数。上例中,那个键值应该是member_id。分区函数能够是hash(member_id)%num_partitions。
具有伸缩性的持久化方案使得Kafka可支持批量数据装载,可以周期性将快照数据加载进行批量处理的离线系统。
我们利用这个功能将数据加载我们的数据仓库(data warehouse)和Hadoop集群。
批量处理始于数据加载阶段。然后进入非循环图(acyclic graph)处理过程以及输出阶段(支持情况在这里)。
支持这样的处理模型的一个重要特性是。要有又一次装载从某个时间点開始的数据的能力(以防处理中有不论什么发生错误)。
对于Hadoop,我们通过在单个的map任务之上切割装载任务对数据的装载进行了并行化处理,切割时,全部节点/话题/分区的每种组合都要分出一个来。Hadoop提供了任务管理,失败的任务能够重头再来,不存在数据被反复的危急。
以下给出了一些在上一节所描写叙述的低层相关的实现系统的某些部分的细节的简要说明。
生产者 API 是给两个底层生产者的再封装 -kafka.producer.SyncProducerandkafka.producer.async.AsyncProducer.
class Producer { /* Sends the data, partitioned by key to the topic using either the */ /* synchronous or the asynchronous producer */ public void send(kafka.javaapi.producer.ProducerData producerData); /* Sends a list of data, partitioned by key to the topic using either */ /* the synchronous or the asynchronous producer */ public void send(java.util.List< kafka.javaapi.producer.ProducerData> producerData); /* Closes the producer and cleans up */ public void close(); }
该API的目的是将生产者的全部功能通过一个单个的API公开给其使用者(client)。
新建的生产者能够:
当事件进入队列时会先放入队列进行缓冲。直到时间到了queue.time或者批量大小到达batch.size为止,后台线程(kafka.producer.async.ProducerSendThread)会将这批数据从队列中取出,交给kafka.producer.EventHandler进行序列化并发送给适当的kafka代理分区。通过event.handler这个配置參数。能够在系统中插入一个自己定义的事件处理器。
在该生产者队列管道中的各个不同阶段,为了插入自己定义的日志/跟踪代码或者自己定义的监视逻辑,如能注入回调函数会很实用。通过实现kafka.producer.asyn.CallbackHandler接口并将配置參数callback.handler设置为实现类就能够实现注入。
interfaceEncoder<T> { publicMessage toMessage(T data); }
在有些应用场合,可能不太适合于依赖zookeeper。
在这样的情况下。生产者可以从broker.list这个配置參数中获得一个代理的静态列表,每一个生产请求会被随即的分配给各代理分区。假设对应的代理宕机。那么生产请求就会失败。
interfacePartitioner<T> { intpartition(T key, intnumPartitions); }
假设key为null,那就进行随机选择。使用partitioner.class这个配置參数也能够插入自己定义的分区策略。
我们有两个层次的使用者API。
底层比較简单的API维护了一个同单个代理建立的连接,全然同发送给server的网络请求相吻合。该API全然是无状态的,每一个请求都带有一个偏移量作为參数,从而同意用户以自己选择的随意方式维护该元数据。
高层API对使用者隐藏了代理的详细细节,让使用者可执行于集群中的机器之上而无需关心底层的拓扑结构。它还维护着数据使用的状态。高层API还提供了订阅同一个过滤表达式(比如,白名单或黑名单的正則表達式)相匹配的多个话题的能力。
class SimpleConsumer { /* Send fetch request to a broker and get back a set of messages. */ public ByteBufferMessageSet fetch(FetchRequest request); /* Send a list of fetch requests to a broker and get back a response set. */ public MultiFetchResponse multifetch(List<FetchRequest> fetches); /** * Get a list of valid offsets (up to maxSize) before the given time. * The result is a list of offsets, in descending order. * @param time: time in millisecs, * if set to OffsetRequest$.MODULE$.LATIEST_TIME(), get from the latest offset available. * if set to OffsetRequest$.MODULE$.EARLIEST_TIME(), get from the earliest offset available. */ public long[] getOffsetsBefore(String topic, int partition, long time, int maxNumOffsets); }
底层API不但用于实现高层API,并且还直接用于我们的离线使用者(比方Hadoop这个使用者),这些使用者还对状态的维护有比較特定的需求。
高层API
/* create a connection to the cluster */ ConsumerConnector connector = Consumer.create(consumerConfig); interface ConsumerConnector { /** * This method is used to get a list of KafkaStreams, which are iterators over * MessageAndMetadata objects from which you can obtain messages and their * associated metadata (currently only topic). * Input: a map of <topic, #streams> * Output: a map of <topic, list of message streams> */ public Map<String,List<KafkaStream>> createMessageStreams(Map<String,Int> topicCountMap); /** * You can also obtain a list of KafkaStreams, that iterate over messages * from topics that match a TopicFilter. (A TopicFilter encapsulates a * whitelist or a blacklist which is a standard Java regex.) */ public List<KafkaStream> createMessageStreamsByFilter( TopicFilter topicFilter, int numStreams); /* Commit the offsets of all messages consumed so far. */ public commitOffsets() /* Shut down the connector */ public shutdown() }
该API的中心是一个由KafkaStream这个类实现的迭代器(iterator)。每一个KafkaStream都代表着一个从一个或多个分区到一个或多个server的消息流。每一个流都是使用单个线程进行处理的。所以,该API的使用者在该API的创建调用中能够提供所需的随意个数的流。这样,一个流可能会代表多个server分区的合并(同处理线程的数目同样)。但每一个分区仅仅会把数据发送给一个流中。
createMessageStreams方法为使用者注冊到对应的话题之上,这将导致须要对使用者/代理的分配情况进行又一次平衡。为了将又一次平衡操作降低到最小。
该API鼓舞在一次调用中就创建多个话题流。createMessageStreamsByFilter方法为发现同其过滤条件想匹配的话题(额外地)注冊了多个监视器(watchers)。
应该注意。createMessageStreamsByFilter方法所返回的每一个流都可能会对多个话题进行迭代(比方,在满足过滤条件的话题有多个的情况下)。
这样就能够让基于文件的消息更加高效地利用transferTo实现,而不是使用线程内缓冲区读写方式。
线程模型用的是一个单个的接收器(acceptor)线程和每一个能够处理固定数量网络连接的N个处理器线程。这样的设计方案在别处已经经过了很彻底的检验,发现事实上现起来简单、执行起来很快。当中使用的协议一直都很easy,将来还能够用其他语言实现其client。
都不用说。在Kafka的某特定应用中很有可能在它的使用中须要採用某种特殊的序列化类型。MessageSet接口就是一个使用特殊的方法对NIOChannel进行大宗数据读写(bulk
reading and writing to an NIOChannel)的消息迭代器。
/** * A message. The format of an N byte message is the following: * * If magic byte is 0 * * 1. 1 byte "magic" identifier to allow format changes * * 2. 4 byte CRC32 of the payload * * 3. N - 5 byte payload * * If magic byte is 1 * * 1. 1 byte "magic" identifier to allow format changes * * 2. 1 byte "attributes" identifier to allow annotations on the message independent of the version (e.g. compression enabled, type of codec used) * * 3. 4 byte CRC32 of the payload * * 4. N - 6 byte payload * */
每条消息用一个64位的整数偏移量进行唯一性标示。该偏移量表示了该消息在那个分区中的那个话题下发送的全部消息组成的消息流中所处的字节位置。每条消息在磁盘上的格式例如以下文所看到的。
每一个日志文件的以它所包括的第一条消息的偏移量来命名。因此,第一个创建出来的文件的名字将为00000000000.kafka,随后每一个后加的文件的名字将是前一个文件的文件名称大约再加S个字节所得的整数,当中,S是配置文件里指定的最大日志文件的大小。
消息的确切的二进制格式都有版本号。它保持为一个标准的接口,让消息集能够依据须要在生产者、代理、和使用者直接进行自由传输而无须又一次拷贝或转换。其格式例如以下所看到的:
On-disk format of a message message length : 4 bytes (value: 1+4+n) "magic" value : 1 byte crc : 4 bytes payload : n bytes
将消息的偏移量作为消息的可不常见。
我们原先的想法是使用由生产者产生的GUID作为消息id,然后在每一个代理上作一个从GUID到偏移量的映射。可是,既然使用者必须为每一个server维护一个ID,那么GUID所具有的全局唯一性就失去了价值。更有甚者,维护将从一个随机数到偏移量的映射关系带来的复杂性,使得我们必须使用一种重量级的索引结构,并且这样的结构还必须与磁盘保持同步,这样我们还就必须使用一种全然持久化的、需随机訪问的数据结构。如此一来,为了简化查询结构,我们就决定使用一个简单的依分区的原子计数器(atomic counter),这个计数器能够同分区id以及节点id结合起来唯一的指定一条消息;这样的方法使得查询结构简化不少,虽然每次在处理使用者请求时仍有可能会涉及多次磁盘寻道操作。然而,一旦我们决定使用计数器。跳向直接使用偏移量作为id就很自然了,毕竟两者都是分区内具有唯一性的、单调添加的整数。既然偏移量是在使用者API中并不会体现出来,所以这个决策终于还是属于一个实现细节,进而我们就选择了这样的更加高效的方式。
读取操作返回的是这S个字节中包括的消息的迭代器。S应该要比最长的单条消息的字节数大,但在出现特别长的消息情况下,能够反复进行多次读取,每次的缓冲区大小都加倍,直到能成功读取出这样长的一条消息。也能够指定一个最大的消息和缓冲区大小并让server拒绝接收比这个大小大一些的消息,这样也能给client一个能够读取一条完整消息所需缓冲区的大小的上限。非常有可能会出现读取缓冲区以一个不完整的消息结尾的情况,这个情况用大小界定(size
delimiting)非常easy就能探知。
从某偏移量開始进行日志读取的实际过程须要先找出存储所需数据的日志段文件,从全局偏移量计算出文件内偏移量,然后再从该文件偏移量处開始读取。搜索过程通过对每一个文件保存在内存中的范围值进行一种变化后的二分查找完毕。
日志提供了获取最新写入的消息的功能,从而同意从“当下”開始消息订阅。
这个功能在使用者在SLA规定的天数内没能正常使用数据的情况下也非常实用。当使用者企图从一个并不存在的偏移量開始使用数据时就会出现这样的情况,此时使用者会得到一个OutOfRangeException异常,它能够依据详细的使用情况对自己进行重新启动或者只失败而退出。
下面是发送给数据使用者(consumer)的结果的格式。
MessageSetSend (fetch result) total length : 4 bytes error code : 2 bytes message 1 : x bytes ... message n : x bytes
MultiMessageSetSend (multiFetch result) total length : 4 bytes error code : 2 bytes messageSetSend 1 ... messageSetSend n
一次仅仅能删除一个日志段的数据。 日志管理器同意通过可载入的删除策略设定删除的文件。 当前策略删除改动事件超过N 天以上的文件,也能够选择保留最后 N GB 的数据。 为了避免删除时的读取锁定冲突。我们能够使用副本写入模式,以便在进行删除的同一时候对日志段的一个不变的静态快照进行二进制搜索。
日志功能里有一个配置參数M,可对在强制进行磁盘刷新之前可写入的消息的最大条目数进行控制。在系统启动时会执行一个日志恢复过程,对最新的日志段内全部消息进行迭代。以对每条消息项的有效性进行验证。
一条消息项是合法的,仅当其大小加偏移量小于文件的大小而且该消息中有效载荷的CRC32值同该消息中存储的CRC值相等。在探測出有数据损坏的情况下,就要将文件依照最后一个有效的偏移量进行截断。
要注意。这里有两种必需处理的数据损坏情况:因为系统崩溃造成的未被正常写入的数据块(block)因而须要截断的情况以及因为文件里被增加了毫无意义的数据块而造成的数据损坏情况。造成数据损坏的原因是,一般来说OS并不能保证文件索引节点(inode)和实际数据块这两者的写入顺序,因此。除了可能会丢失未刷新的已写入数据之外,在索引节点已经用新的文件大小更新了但在将数据块写入磁盘块之前发生了系统崩溃的情况下。文件就可能会获得一些毫无意义的数据。CRC值就是用于这样的极端情况,避免由此造成整个日志文件的损坏(虽然未得到保存的消息当然是真的找不回来了)。
接下来讨论zookeeper用于在使用者和代理直接进行协调的结构和算法。
当一个路径中的元素是用[xyz]这样的形式表示的时,其意思是, xyz的值并不固定并且实际上xyz的每种可能的值都有一个zookpeer z节点(znode)。比如。/topics/[topic]表示了一个名为/topics的文件夹。当中包括的子文件夹同话题相应。一个话题一个文件夹并且文件夹名即为话题的名称。也能够给出数字范围。比如[0...5],表示的是子文件夹0、1、2、3、4。箭头->用于给出z节点的内容。比如/hello -> world表示的是一个名称为/hello的z节点,包括的值为"world"。
/brokers/ids/[0...N] --> host:port (ephemeral node)
上面是全部出现的代理节点的列表,列表中每一项都提供了一个具有唯一性的逻辑代理id。用于让使用者可以识别代理的身份(这个必须在配置中给出)。在启动时,代理节点就要用/brokers/ids下列出的逻辑代理id创建一个z节点,并在自己注冊到系统中。
使用逻辑代理id的目的是,可以让我们在不影响数据使用者的情况下就能把一个代理搬到还有一台不同的物理机器上。试图用已在使用中的代理id(比方说,两个server配置成了同一个代理id)进行注冊会导致错误发生。
由于代理是以非长久性z节点的方式注冊的,所以这个注冊过程是动态的,当代理关闭或宕机后注冊信息就会消失(至此要数据使用者。该代理不再有效)。
/brokers/topics/[topic]/[0...N] --> nPartions (ephemeral node)
每一个代理会都要注冊在某话题之下。注冊后它会维护并保存该话题的分区总数。
为了对数据的使用进行负载均衡并记录使用者使用的每一个代理上的每一个分区上的偏移量,全部话题的使用者都要在Zookeeper中进行注冊。
多个使用者能够组成一个小组共同使用一个单个的话题。同一小组内的每一个使用者共享同一个给定的group_id。比方说,假设某个使用者负责用三台机器进行某某处理过程。你就能够为这组使用者分配一个叫做“某某”的id。
这个小组id是在使用者的配置文件里指定的,而且这就是你告诉使用者它究竟属于哪个组的方法。
小组内的使用者要尽量公正地划分出分区,每一个分区仅为小组内的一个使用者所使用。
除了小组内的全部使用者都要共享一个group_id之外。每一个使用者为了要同其他使用者差别开来,还要有一个非永久性的、具有唯一性的consumer_id(採用hostname:uuid的形式)。 consumer_id要在下面的文件夹中进行注冊。
/consumers/[group_id]/ids/[consumer_id] --> {"topic1": #streams, ..., "topicN": #streams} (ephemeral node)
小组内的每一个使用者都要在它所属的小组中进行注冊并採用consumer_id创建一个z节点。
z节点的值包括了一个<topic, #streams>的map。 consumer_id仅仅是用来识别小组内活跃的每一个使用者。使用者建立的z节点是个暂时性的节点,因此假设这个使用者进程终止了,注冊信息也将随之消失。
数据使用者跟踪他们在每一个分区中耗用的最大偏移量。
这个值被存储在一个Zookeeper(分布式协调系统)文件夹中。
/consumers/[group_id]/offsets/[topic]/[broker_id-partition_id] --> offset_counter_value ((persistent node)
每一个代理分区都被分配给了指定使用者小组中的单个数据使用者。数据使用者必须在耗用给定分区前确立对其的全部权。要确立其全部权,数据使用者须要将其 id 写入到特定代理分区中的一个暂时节点(ephemeral node)中。
/consumers/[group_id]/owners/[topic]/[broker_id-partition_id] --> consumer_node_id (ephemeral node)
代理节点之间基本上都是相互独立的,因此它们仅仅须要公布它们拥有的信息。当有新的代理增加进来时,它会将自己注冊到代理节点注冊文件夹中,写下它的主机名和port。代理还要将已有话题的列表和它们的逻辑分区注冊到代理话题注冊表中。在代理上生成新话题时,须要动态的对话题进行注冊。
当使用者启动时。它要做下面这些事情:
使用者又一次复杂均衡的算法可用让小组内的全部使用者对哪个使用者使用哪些分区达成一致意见。使用者又一次负载均衡的动作每次加入或移除代理以及同一小组内的使用者时被触发。对于一个给定的话题和一个给定的使用者小组,代理分区是在小组内的全部使用者中进行平均划分的。一个分区总是由一个单个的使用者使用。
这样的设计方案简化了实施过程。如果我们执行多个使用者以并发的方式同一时候使用同一个分区,那么在该分区上就会形成争用(contention)的情况。这样一来就须要某种形式的锁定机制。
如果使用者的个数比分区多,就会出现有写使用者根本得不到数据的情况。在又一次进行负载均衡的过程中,我们依照尽量降低每一个使用者须要连接的代理的个数的方式,尝尝试着将分区分配给使用者。
每一个使用者在又一次进行负载均衡时须要做下列的事情:
1. 针对Ci所订阅的每一个话题T 2. 将PT设为生产话题T的全部分区 3. 将CG设为小组内同Ci 一样使用话题T的全部使用者 4. 对PT进行排序(让同一个代理上的各分区挨在一起) 5. 对CG进行排序 6. 将i设为Ci在CG中的索引值并让N = size(PT)/size(CG) 7. 将从i*N到(i+1)*N - 1的分区分配给使用者Ci 8. 将Ci当前所拥有的分区从分区拥有者注冊表中删除 9. 将新分配的分区增加到分区拥有者注冊表中 (我们可能须要多次尝试才干让原先的分区拥有者释放其拥有权)
在触发了一个使用者要又一次进行负载均衡时,同一小组内的其他使用者也会差点儿在同一时候被触发又一次进行负载均衡。
文件夹索引:
6)kafka.common.ConsumerRebalanceFailedException异常解决的方法
8)apache kafka中server.properties配置文件參数说明
9)apache kafka的consumer初始化时获取不到消息
11)apache kafka源码project环境搭建(IDEA)
12)apache kafka监控系列-KafkaOffsetMonitor
15)apache kafka监控系列-kafka-web-console
17)kafka LeaderNotAvailableException
19)apache kafka性能測试命令使用和构建kafka-perf
22) kafka broker内部架构
23)apache kafka源代码分析走读-kafka总体结构分析
24)apache kafka源代码分析走读-Producer分析
26)apache kafka源代码分析走读-server端网络架构分析
27)apache kafka源代码分析走读-ZookeeperConsumerConnector分析
30) kafka文件系统设计那些事
原文:http://www.cnblogs.com/gccbuaa/p/6911274.html