Kaka日志的结构概览可见之前的博客。
日志段是kafka保存消息的最小载体,阅读日志段代码可更好的去定位分析问题,鉴于网上对日志段的说明文档较少,本文对Kafka日志段进行详细说明,重点介绍Kafka日志段LogSegment的声明、append、read、recover方法。
日志段代码在Kafka的core工程目录下,详细位置:core/scala/kafka/log/LogSegment.scala,日志相关的代码都在core/scala/kafka/log目录下。
LogSegment.scala文件包含LogSegment class、LogSegment object、LogFlushStats object.
说明:scala语法中,允许scala中包含同名的class 和object,这种用法称之为伴生(Companion),class 对象称之为伴生类,和java语法中的类是一样的。而 object是一个单例对象
里面包含静态方法和变量,用java比喻时,object相当于java的utils工具类
具体翻译为:日志段。每个日志段包含两个部分,分别是日志和索引。日志是一个文件记录,包含真实的消息。索引是一个偏移量,是物理位置到逻辑的映射。每一个段有一个基础的偏移。<=段中所有的消息>之前的段。段包含两个偏移文件,分别是索引和文件。
class LogSegment private[log] (val log: FileRecords, val lazyOffsetIndex: LazyIndex[OffsetIndex], val lazyTimeIndex: LazyIndex[TimeIndex], val txnIndex: TransactionIndex, val baseOffset: Long, val indexIntervalBytes: Int, val rollJitterMs: Long, val time: Time) extends Logging {...}
字段解析和说明
从日志段的声明中我们发现,需要日志文件、三个索引文件(位移索引、时间戳索引、已终止的事务索引)等,我们同时也发现其中两个索引是延迟索引。
baseoffset:每个日志段都保留了自身的起始位移。
indexIntervalBytes:即为Broker段参数,log.index.interval.bytes,控制了日志段对象新增索引项的频率。默认情况下日志段写入4KB的消息才会新增一条索引项目
rollJitterMs:是一个抖动值,如果没有这个干扰值,未来某个时刻可能同时创建多个日志段对象,将会大大的增加IO压力。
以下append源码实现即为写入消息的具体操作
/** * Append the given messages starting with the given offset. Add * an entry to the index if needed. * * It is assumed this method is being called from within a lock. * * @param largestOffset The last offset in the message set * @param largestTimestamp The largest timestamp in the message set. * @param shallowOffsetOfMaxTimestamp The offset of the message that has the largest timestamp in the messages to append. * @param records The log entries to append. * @return the physical position in the file of the appended records * @throws LogSegmentOffsetOverflowException if the largest offset causes index offset overflow */ @nonthreadsafe def append(largestOffset: Long, largestTimestamp: Long, shallowOffsetOfMaxTimestamp: Long, records: MemoryRecords): Unit = { if (records.sizeInBytes > 0) { trace(s"Inserting ${records.sizeInBytes} bytes at end offset $largestOffset at position ${log.sizeInBytes} " + s"with largest timestamp $largestTimestamp at shallow offset $shallowOffsetOfMaxTimestamp") val physicalPosition = log.sizeInBytes() if (physicalPosition == 0) rollingBasedTimestamp = Some(largestTimestamp) ensureOffsetInRange(largestOffset) // append the messages val appendedBytes = log.append(records) trace(s"Appended $appendedBytes to ${log.file} at end offset $largestOffset") // Update the in memory max timestamp and corresponding offset. if (largestTimestamp > maxTimestampSoFar) { maxTimestampSoFar = largestTimestamp offsetOfMaxTimestampSoFar = shallowOffsetOfMaxTimestamp } // append an entry to the index (if needed) if (bytesSinceLastIndexEntry > indexIntervalBytes) { offsetIndex.append(largestOffset, physicalPosition) timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestampSoFar) bytesSinceLastIndexEntry = 0 } bytesSinceLastIndexEntry += records.sizeInBytes } }
原文:https://www.cnblogs.com/cnxieyang/p/12731156.html