原文地址:http://blog.csdn.net/honglei915/article/details/37760631
消息由一个固定长度的头部和可变长度的字节数组组成。头部包括了一个版本和CRC32校验码。
/** * 具有N个字节的消息的格式例如以下 * * 假设版本是0 * * 1. 1个字节的 "magic" 标记 * * 2. 4个字节的CRC32校验码 * * 3. N - 5个字节的详细信息 * * 假设版本是1 * * 1. 1个字节的 "magic" 标记 * * 2.1个字节的參数同意标注一些附加的信息比方是否压缩了,解码类型等 * * 3.4个字节的CRC32校验码 * * 4. N - 6 个字节的详细信息 * */
一个叫做“my_topic”且有两个分区的的topic,它的日志有两个目录组成,my_topic_0和my_topic_1,每一个目录里放着详细的数据文件,每一个数据文件都是一系列的日志实体,每一个日志实体有一个4个字节的整数N标注消息的长度,后边跟着N个字节的消息。
每一个消息都能够由一个64位的整数offset标注,offset标注了这条消息在发送到这个分区的消息流中的起始位置。每一个日志文件的名称都是这个文件第一条日志的offset.所以第一个日志文件的名字就是00000000000.kafka.所以每相邻的两个文件名称字的差就是一个数字S,S差点儿相同就是配置文件里指定的日志文件的最大容量。
消息的格式都由一个统一的接口维护,所以消息能够在producer,broker和consumer之间无缝的传递。存储在硬盘上的消息格式例如以下所看到的:
消息长度: 4 bytes (value: 1+4+n) 版本: 1 byte CRC校验码: 4 bytes 详细的消息: n bytes
消息被不断的追加到最后一个日志的末尾,当日志的大小达到一个指定的值时就会产生一个新的文件。对于写操作有两个參数,一个规定了消息的数量达到这个值时必须将数据刷新到硬盘上,另外一个规定了刷新到硬盘的时间间隔,这对数据的持久性是个保证,在系统崩溃的时候仅仅会丢失一定数量的消息或者一个时间段的消息。
读操作须要两个參数:一个64位的offset和一个S字节的最大读取量。S通常比单个消息的大小要大,但在一些个别消息比較大的情况下,S会小于单个消息的大小。这样的情况下读操作会不断重试,每次重试都会将读取量加倍,直到读取到一个完整的消息。能够配置单个消息的最大值,这样server就会拒绝大小超过这个值的消息。也能够给client指定一个尝试读取的最大上限,避免为了读到一个完整的消息而无限次的重试。
在实际运行读取操纵时,首先须要定位数据所在的日志文件,然后依据offset计算出在这个日志中的offset(前面的的offset是整个分区的offset),然后在这个offset的位置进行读取。定位操作是由二分查找法完毕的,Kafka在内存中为每一个文件维护了offset的范围。
以下是发送给consumer的结果的格式:
MessageSetSend (fetch result) total length : 4 bytes error code : 2 bytes message 1 : x bytes ... message n : x bytes
MultiMessageSetSend (multiFetch result) total length : 4 bytes error code : 2 bytes messageSetSend 1 ... messageSetSend n
日志管理器同意定制删除策略。眼下的策略是删除改动时间在N天之前的日志(按时间删除),也能够使用另外一个策略:保留最后的N GB数据的策略(按大小删除)。为了避免在删除时堵塞读操作,採用了copy-on-write形式的实现,删除操作进行时,读取操作的二分查找功能实际是在一个静态的快照副本上进行的,这类似于Java的CopyOnWriteArrayList。
日志文件有一个可配置的參数M,缓存超过这个数量的消息将被强行刷新到硬盘。一个日志矫正线程将循环检查最新的日志文件里的消息确认每一个消息都是合法的。合法的标准为:全部文件的大小的和最大的offset小于日志文件的大小,而且消息的CRC32校验码与存储在消息实体中的校验码一致。假设在某个offset发现不合法的消息,从这个offset到下一个合法的offset之间的内容将被移除。
有两种情况必须考虑:1,当发生崩溃时有些数据块未能写入。2,写入了一些空白数据块。另外一种情况的原因是,对于每一个文件,操作系统都有一个inode(inode是指在很多“类Unix文件系统”中的一种数据结构。每一个inode保存了文件系统中的一个文件系统对象,包含文件、文件夹、大小、设备文件、socket、管道, 等等),但无法保证更新inode和写入数据的顺序,当inode保存的大小信息被更新了,但写入数据时发生了崩溃,就产生了空白数据块。CRC校验码能够检查这些块并移除,当然由于崩溃而未写入的数据块也就丢失了。
漫游Kafka实现篇之消息和日志,布布扣,bubuko.com
原文:http://www.cnblogs.com/mengfanrong/p/3900918.html