Kestrel中的Journal.scala类详解

本文是Scala代码实例之Kestrel的第六部分,讲述PersistentQueue中的Journal.scala类。

在PersistentQueue之下,有一个Journal.scala的类,支撑了消息队列的存储问题。这是Kestrel提供的另外一个特性:通过文件系统保存消息队列,避免服务重启的时候,Kestrel的队列丢失。

通过前面一段时间的阅读,我们对Scala的语法已经有一个基本的把握,所以在阅读Journal的时候,我们就更注重实现的方式,而不是语法细节了。当然Journal也没有太多的语法细节需要讲的了。当然出了还没有详细说过的case class和case object。

在Journal.scala的开始部分就定义了一个abstract class类JournalItem,并且定义了它的许多子类,这些子类是用来和PersistentQueue进行消息传递的。case class/case object是一种特殊的class/object,其功能是在对象里面增加了几个功能

1. 把所有的建构函数的var变成val,也就是变成了不可变的常量

2.自动实现了equal, hashCode和toString三个方法

3.当对象出现在case之后的时候,会自动apply出一个对象,对象的值和创建的时候一样,这个功能保证了可以和match…case语法可以写得很简练。

关于Case class的具体说明可以参考:CaseClasses和MatchingOnCaseClasses。关于第三条特性,还可以参考CompanionObjects。

简单的理解,我们就把case object/case class当作消息传递中需要使用的对象类型就可以了。

Journal使用了noi的FileChannel,来处理文件的读取和存储。核心的算法,可以只看readJournalEntry和replay两个方法。readJournalEntry的功能是从文件中读取数据,并且根据格式组成各种case class/case object,并且同时返回字节数。而在上层的方法,比如replay,则根据得到的不同数据类型,调用更上层的函数f(case class/case object)。

我们回到PersistentQueue中看replayJournal的时候,发现它将调用replay后得到的一系列的case class转义成为在PersistentQueue中需要执行的各种命令――所以这个方法的名字叫做replay!就是回放的意思。

当系统重启的时候,打开每个queue之前都需要一段回放的时间,把文件系统中记录的当时的整个存取过程重新回放一次,通过回放来重建内存中的队列。回过来再看Journal.scala的时候,我们就更清晰的知道,文件存储的不是当时的队列状态,而是每一次系统执行的轨迹。所以,Journal对整个Kestrel消息队列的开销才会很小。

但是另外一个问题随之而来,如果记录所有的操作过程,那么这个文件不是只会增大,不会缩小么?为了解决这个问题,Journal.scala实现了一个叫做roll的机制。从PersisitentQueue中的add方法中,我们可以看到这样的代码:

if (keepJournal() && !journal.inReadBehind) {  



     if (journal.size > maxJournalSize() * maxJournalOverflow() && queueSize < maxJournalSize()) {  




       // force re-creation of the journal.  




       log.info("Rolling journal file for '%s' (qsize=%d)", name, queueSize)  



       journal.roll(xidCounter, openTransactionIds map { openTransactions(_) }, queue)  


     }  



     if (queueSize >= maxMemorySize()) {  




       log.info("Dropping to read-behind for queue '%s' (%d bytes)", name, queueSize)  



       journal.startReadBehind  


     }  


   }  

如果发生了Journal文件的尺寸太大,但是实际的Queue尺寸也没有满的时候,就启动roll进程来重新建立一个Journal文件。处理的方法也很简单,就是把当前内存中的队列直接写入到Journal对应的文件中,变成一连串的add。这么做,是不是一个很好的做法?只有一种意外的情况,那就是现存在消息队列里面的数据很多,那么重建Journal的时间就需要很多。作者也考虑到了这个问题,所有要求queue实际的size必须小于Journal所能存储的量的时候,才会做roll的操作,也就是说,当队列里面有很多的事件没有处理的时候,就算硬盘占用得再多,也不会启动roll方法。而解决的方法是,当内存中的Queue太大,大到超过了最大的内存使用限制的时候,启动readBehind模式。

当readBehind模式启动之后,会对文件增加一个read句柄,每次从内存里面remove掉消息的时候,就会尝试从文件中读取消息放到内存里面。在这样的模式下,内存就一致保持着最满的队列。更多的消息就先直接存储到文件中,直到文件中的read指针和write指针重合,也就是说所有在文件系统中的消息都已经被处理完毕了,系统就会重新切换回正常的模式。

在这种模式下,只要硬盘的数量足够大,我们基本上可以把这个消息队列理解为无限长……但是在readbehind模式下,是不会进行roll的操作的。所以――大家需要注意的是,在配置中,maxJournalSize必须要小于maxMemorySize,否则这两个机制就会打架了。而maxJournalSize这个数值也不应该很大,这样就能保证每次roll的效率会很快(因为roll的效率是取决于事件占用内存的数量,也就是maxJournalSize的),超过这个值,系统就不会roll。

决定是否roll。还有一个数值就是maxJournalOverflow,这是一个很好的设计,相当于Journal文件的利用率,比如说Overflow设置为5,表示现在有效的消息队列数据,只有整个 Journal 文件大小的 1/5。

假设我们平均每个消息的数据占有1K,那么其他的指令信息基本可以被忽略(因为都只有几个字节而已),所以Overflow的比例相当于总消息数量 / 还没有处理过的消息数量。所以这个数值的上限取决于replay的效率。也就是读取文件的速度,比如说,我们觉得启动的时候,读取100M的文件,大概需要10s,是我们可以接受的,而每个消息字节平均是1K,roll一次100个消息需要100ms,是可以接受的。那么 maxJournalOverflow = 100M / 100 * 1K = 1000。不过实际情况可能是roll的次数会更多一些,因为当内存中的消息队列只有10个的时候,硬盘超过10M,就会触发roll操作了。

相关推荐