「Kafka源码」日志处理

目前来说,kafka的日志中记录的内容比较多,具体的存储内容见这篇博客,写的比较好。可以看到,存储的内容还是比较多的,当存储文件比较大的时候,我们应该如何处理这些日志?下面我们通过kafka启动过程的源码,分析下kafka的日志处理过程。

一、入口方法

在kafkaServer.scala中的start方法中,有一个这样的调用:

/ start log manager /logManager = createLogManager(zkUtils.zkClient, brokerState)logManager.startup()

二、定时任务总方法

这块就是启动了日志相关的定时任务,具体都有哪些内容?我们跟进去看一下:

def startup() {
 / Schedule the cleanup task to delete old logs / if(scheduler != null) {
 info("Starting log cleanup with a period of %d ms.".format(retentionCheckMs))
 scheduler.schedule("kafka-log-retention",
 cleanupLogs,
 delay = InitialTaskDelayMs,
 period = retentionCheckMs,
 TimeUnit.MILLISECONDS)
 info("Starting log flusher with a default period of %d ms.".format(flushCheckMs))
 scheduler.schedule("kafka-log-flusher",
 flushDirtyLogs,
 delay = InitialTaskDelayMs,
 period = flushCheckMs,
 TimeUnit.MILLISECONDS)
 scheduler.schedule("kafka-recovery-point-checkpoint",
 checkpointRecoveryPointOffsets, delay = InitialTaskDelayMs, period = flushCheckpointMs,
 TimeUnit.MILLISECONDS)
 } if(cleanerConfig.enableCleaner)
 cleaner.startup()
 }

可以看到,这块主要使用了一个定时任务线程池,来处理任务的定时执行。具体包括两块,一部分是清理日志,另一部分是将日志写入文件。

2.1 清理日志

首先是cleanupLogs,这块涉及到配置,log.retention.check.interval.ms,也就是多长时间执行一次日志清理。我们看下具体的方法:

/**
 * Delete any eligible logs. Return the number of segments deleted.
 */
 def cleanupLogs() { debug("Beginning log cleanup...")
 var total = 0
 val startMs = time.milliseconds for(log <- allLogs; if !log.config.compact) { debug("Garbage collecting '" + log.name + "'")
 total += cleanupExpiredSegments(log) + cleanupSegmentsToMaintainSize(log)
 } debug("Log cleanup completed. " + total + " files deleted in " +
 (time.milliseconds - startMs) / 1000 + " seconds")
 }

这块还涉及到另一个配置:cleanup.policy,也就是清理的策略,目前有几种,一种是compact,也就是日志压缩,不会清理掉日志文件;还有一种就是delete,也就是删除。这块主要有两个方法,我们分别看下:

2.1.1 清理过期日志

/**
 * Runs through the log removing segments older than a certain age
 */
 private def cleanupExpiredSegments(log: Log): Int = { if (log.config.retentionMs < 0) return 0
 val startMs = time.milliseconds log.deleteOldSegments(startMs - _.lastModified > log.config.retentionMs)
 }

这块又涉及到一个配置:retention.ms,这个参数表示日志保存的时间。如果小于0,表示永不失效,也就没有了删除这一说。

当然,如果文件的修改时间跟当前时间差,大于设置的日志保存时间,就要执行删除动作了。具体的删除方法为:

/**
 * Delete any log segments matching the given predicate function,
 * starting with the oldest segment and moving forward until a segment doesn't match.
 * @param predicate A function that takes in a single log segment and returns true iff it is deletable
 * @return The number of segments deleted
 */
 def deleteOldSegments(predicate: LogSegment => Boolean): Int = {
 lock synchronized { //find any segments that match the user-supplied predicate UNLESS it is the final segment
 //and it is empty (since we would just end up re-creating it)
 val lastEntry = segments.lastEntry
 val deletable = if (lastEntry == null) Seq.empty
 else logSegments.takeWhile(s => predicate(s) && (s.baseOffset != lastEntry.getValue.baseOffset || s.size > 0))
 val numToDelete = deletable.size if (numToDelete > 0) { // we must always have at least one segment, so if we are going to delete all the segments, create a new one first
 if (segments.size == numToDelete)
 roll() // remove the segments for lookups
 deletable.foreach(deleteSegment(_))
 }
 numToDelete
 }
 }

这块的逻辑是:根据传入的predicate来判断哪些日志符合被删除的要求,放入到deletable中,最后遍历deletable,进行删除操作。

private def deleteSegment(segment: LogSegment) { info("Scheduling log segment %d for log %s for deletion.".format(segment.baseOffset, name)) lock synchronized { segments.remove(segment.baseOffset) asyncDeleteSegment(segment)
 }
 } private def asyncDeleteSegment(segment: LogSegment) { segment.changeFileSuffixes("", Log.DeletedFileSuffix) def deleteSeg() { info("Deleting segment %d from log %s.".format(segment.baseOffset, name)) segment.delete()
 } scheduler.schedule("delete-file", deleteSeg, delay = config.fileDeleteDelayMs)
 }

这块是一个异步删除文件的过程,包含一个配置:file.delete.delay.ms。表示每隔多久删除一次日志文件。删除的过程是先把日志的后缀改为.delete,然后定时删除。

2.1.2 清理过大日志

/**
 * Runs through the log removing segments until the size of the log
 * is at least logRetentionSize bytes in size
 */
 private def cleanupSegmentsToMaintainSize(log: Log): Int = { if(log.config.retentionSize < 0 || log.size < log.config.retentionSize) return 0
 var diff = log.size - log.config.retentionSize
 def shouldDelete(segment: LogSegment) = { if(diff - segment.size >= 0) {
 diff -= segment.size
 true
 } else { false
 }
 } log.deleteOldSegments(shouldDelete)
 }

这块代码比较清晰,如果日志大小大于retention.bytes,那么就会被标记为待删除,然后调用的方法是一样的,也是deleteOldSegments。就不赘述了。

2.2 日志刷到硬盘

这块有两个定时任务。

scheduler.schedule("kafka-log-flusher",
 flushDirtyLogs,
 delay = InitialTaskDelayMs,
 period = flushCheckMs,
 TimeUnit.MILLISECONDS)
scheduler.schedule("kafka-recovery-point-checkpoint",
 checkpointRecoveryPointOffsets, delay = InitialTaskDelayMs, period = flushCheckpointMs,
 TimeUnit.MILLISECONDS)

涉及到两个配置:

  • log.flush.scheduler.interval.ms:检查是否需要固化到硬盘的时间间隔

  • log.flush.offset.checkpoint.interval.ms:控制上次固化硬盘的时间点,以便于数据恢复一般不需要去修改

我们分别看下两个任务做了啥。

2.2.1 flushDirtyLogs

/**
 * Flush any log which has exceeded its flush interval and has unwritten messages.
 */
 private def flushDirtyLogs() = { debug("Checking for dirty logs to flush...") for ((topicAndPartition, log) <- logs) { try {
 val timeSinceLastFlush = time.milliseconds - log.lastFlushTime debug("Checking if flush is needed on " + topicAndPartition.topic + " flush interval " + log.config.flushMs + " last flushed " + log.lastFlushTime + " time since last flush: " + timeSinceLastFlush) if(timeSinceLastFlush >= log.config.flushMs) log.flush
 } catch {
 case e: Throwable =>
 error("Error flushing topic " + topicAndPartition.topic, e)
 }
 }
 }

这个方法的目的是把日志刷新到硬盘中,保证数据不丢。

这块设计到一个配置:flush.ms。当日志的刷新时间与当前时间差,大于配置的值时,就会执行flush操作。

/**
 * Flush all log segments
 */
 def flush(): Unit = flush(this.logEndOffset)
 /**
 * Flush log segments for all offsets up to offset-1
 * @param offset The offset to flush up to (non-inclusive); the new recovery point
 */
 def flush(offset: Long) : Unit = { if (offset <= this.recoveryPoint) return
 debug("Flushing log '" + name + " up to offset " + offset + ", last flushed: " + lastFlushTime + " current time: " + time.milliseconds + " unflushed = " + unflushedMessages) for(segment <- logSegments(this.recoveryPoint, offset))
 segment.flush()
 lock synchronized { if(offset > this.recoveryPoint) {
 this.recoveryPoint = offset
 lastflushedTime.set(time.milliseconds)
 }
 }
 }
 /**
 * Flush this log segment to disk
 */
 @threadsafe
 def flush() {
 LogFlushStats.logFlushTimer.time { log.flush()
 index.flush()
 }
 }

找到当前segment的最后一个offset,即logEndOffset,然后调用flush方法,刷新到日志文件中。首先判断,当前offset是否小于recoveryPoint,也就是第一个需要刷新到硬盘的offset,如果小于的话,直接返回,否则继续flush操作。

将日志中从recoveryPoint到offset的所有日志,刷新到日志文件中,调用segment.flush()方法上。刷新log文件和index文件。

2.2.2 checkpointRecoveryPointOffsets

/**
 * Write out the current recovery point for all logs to a text file in the log directory
 * to avoid recovering the whole log on startup.
 */
 def checkpointRecoveryPointOffsets() { this.logDirs.foreach(checkpointLogsInDir)
 } /**
 * Make a checkpoint for all logs in provided directory.
 */
 private def checkpointLogsInDir(dir: File): Unit = {
 val recoveryPoints = this.logsByDir.get(dir.toString) if (recoveryPoints.isDefined) { this.recoveryPointCheckpoints(dir).write(recoveryPoints.get.mapValues(_.recoveryPoint))
 }
 }

这块主要是用于写一些恢复点的数据到文件中去,文件名是recovery-point-offset-checkpoint,里面的内容是:

  • 第一行是当前的版本version

  • 第二行是所有偏移量的数字和,每个topic和partition的组合的数量

  • 之后会遍历所有的topic和partition组合,每行展示的内容是:topic partition offset

但是这块的写文件不是直接向目标文件写入,而是先写一个临时文件,然后再将临时文件移动到目标文件中。

三、总结

以上就是kafka中日志处理的一些源码,我们总结一下,其中涉及到的配置项有:

  • log.retention.check.interval.ms

  • cleanup.policy

  • retention.ms

  • file.delete.delay.ms

  • retention.bytes

  • log.flush.scheduler.interval.ms

  • log.flush.offset.checkpoint.interval.ms

  • flush.ms

可能还有其他的一些配置,这块没有涉及到。当然,这些参数如何配置,才能使性能达到最优,也需要不断地进行测试和探索,目前只能依靠默认的参数来进行配置,这显然是不够的。

如果你也想在IT行业拿高薪,可以参加我们的训练营课程,选择最适合自己的课程学习,技术大牛亲授,7个月后,进入名企拿高薪。我们的课程内容有:Java工程化、高性能及分布式、高性能、深入浅出。高架构。性能调优、Spring,MyBatis,Netty源码分析和大数据等多个知识点。如果你想拿高薪的,想学习的,想就业前景好的,想跟别人竞争能取得优势的,想进阿里面试但担心面试不过的,你都可以来,群号为:575745314

注:加群要求

1、具有1-5工作经验的,面对目前流行的技术不知从何下手,需要突破技术瓶颈的可以加。

2、在公司待久了,过得很安逸,但跳槽时面试碰壁。需要在短时间内进修、跳槽拿高薪的可以加。

3、如果没有工作经验,但基础非常扎实,对java工作机制,常用设计思想,常用java开发框架掌握熟练的,可以加。

4、觉得自己很牛B,一般需求都能搞定。但是所学的知识点没有系统化,很难在技术领域继续突破的可以加。

5.阿里Java高级大牛直播讲解知识点,分享知识,多年工作经验的梳理和总结,带着大家全面、科学地建立自己的技术体系和技术认知!

6.小号或者小白之类加群一律不给过,谢谢。

相关推荐