Kestrel.scala中的QueueCollection

有关Kestrel的Scala实例已经介绍到了第三篇,接下来介绍Scala中的一个走读分支:QueueCollection。

在Kestrel.scala的startup方法中,告诉我们接下来有两个走读的分支,一个是QueueCollection和PersistentQueue。另一个是KestrelHandle。KestrelHandle是和NioSocketAcceptor相联系的,所以可以想想KetrelHandle是用来处理链接……与之相关的还有memcache目录下的Codec.scala。

我们先忽略Kestrel.scala在startup中的几个语法细节,从相对比较简单的QueueCollection开始吧:

class QueueCollection(queueFolder: String, private var queueConfigs: ConfigMap) {  


  ……  



  if (! path.isDirectory) {  



    path.mkdirs()  


  }  


  ……  


}  

这是一个Scala的比较特殊语法。class本身就是一个建构函数。程序中queueFolder和queueConfigs是创建是必须的两个参数。class在Scala里面就支持一种建构函数。这里的参数也可以看做是整个class的变量,所以程序中的private只表示在类以外的地方,没有办法获取queueConfigs这个参数。

接下来是关于HashMap的创建:

private val queues = new mutable.HashMap[String, PersistentQueue]  


 

我们知道HashMap是有两个类型,一个是键的类型,一个是值的类型。在queues里面,String就是键,PersistentQueue就是值。这跟Java/C++的模板累死,不需要特别的解释了。

然后是一个很多地方都用到的Count,在Count.scala里面有说明,就是一个统计技术的类,写法很干净,注意里面的函数定义,都没有用大括号。多看几遍就熟悉,Scala是怎么定义函数的了。显然,这看起来更像是命令,而不是函数体的声明。

经过两段比较好理解的代码之后,出现了一段恐怖的代码:

queueConfigs.subscribe { c =>  



  synchronized {  




    queueConfigs = c.getOrElse(new Config)  



  }  


}  

如果只是看这段代码,就有种喉咙被卡住,咽不下去又吐不出来的感觉。这段代码做了些什么,大家都能猜出来,郁闷的是,这到底遵循的是那个语法规范呢?咋代码就写得那么四不像呢?不过如果我们回过来看一下net.lag.configgy.configMap中subscribe的定义,似乎就能明白多一点:

def subscribe(f: (Option[ConfigMap]) => Unit): SubscriptionKey = {  



  subscribe(new Subscriber {  



    def validate(current: Option[ConfigMap], replacement: Option[ConfigMap]): Unit = { }  


    def commit(current: Option[ConfigMap], replacement: Option[ConfigMap]): Unit = {  


      f(replacement)  


    }  


  })  


}  


 

看来还是有点难度――呃,其实这是Kestrel的一个非常灵活的功能,定义作为参数的函数的类型:

func_name : (param_type1, param_type2) => Unit  

func_name是一个有两个参数的函数,参数类型分别是param_type1和param_type2。所以subscribe的参数是一个有一个参数的函数,这个参数所以Option[ConfigMap]。关于Option, Some的话题,我们稍后再谈。这已经不影响程序的阅读了。

回到在QueueCollection.scala的代码,当我们知道subscribe的参数是一个函数的时候,下面这段代码的作用就是,当queueConfigs的某些状态变化的时候,会调用一个叫commit的内部函数,而这个内部函数的功能,就是把新替换的配置,作为参数c,传递给这段代码,结果是queueConfigs = c.getOrElse(new Config)。涵义是,如果不存在就添一个缺省值。

queueConfigs.subscribe { c =>  



  synchronized {  




    queueConfigs = c.getOrElse(new Config)  



  }  


}  

绕了一圈,其实是定义了一个触发器,并且触发器的作用是,当设置是空置的时候,补上一个标准的缺省值。完全是杀鸡用了牛刀。但是回过来我们重新考虑Scala对架构上的意义,这种把函数作为参数的做法,可以很方便的实现callback操作,当然这需要配合上object这样的类,否则寻找对应的callback类,还需要费点周章。

随后,我们看到了著名的闭包(closure),而且一来还来了两段,第一段是filter,对于所有的成员“name”,如果 => 后面的内容返回是成功的话(如果name不包含~~),就添加到list里面去。

def loadQueues() {  



  path.list() filter { name => !(name contains "~~") } map { queue(_) }  



}  


 

第二段是把list变成一个map,完整的写法应该是

map { _ => queue(_) }  


 

很多时候不需要写成那么麻烦,可以直接把 _ => 给省略了。有时候因为习惯的原因,你会猜想queue又是一个什么特殊的语法?其实它一点都不特殊,往下大概10行左右,就是它的定义。queue,是把一个字符串的队列名,转变成一个真正的PersistentQueue的函数。所以load_queues,在大体上起了初始化队列的作用。

后面的段落中还有一个使用了closure的语法:

def currentItems = queues.values.foldLeft(0L) { _ + _.length }  


 

查询一下foldLeft的函数说明如下:

def  foldLeft[B](z : B)(op : (B, A) => B) : B  


 

相关推荐