clojure 中使用 actor
背景
Actor 模型 是让 Earlang 声名卓著的关键特性。它是 Erlang 平台实现分布式编程的关键内容,在 Clojure 语言设计时, Rich Hickey 考虑过在 Clojure 语言中是否实现 Actor,他最终认为:这仅仅是适合于分布式编程的一种特征,如果成为语言的本质,将限制 Clojure 成为一种服务器领域语言,因此他决定使用其他更简单直接的异步通信模型。但他也没有排除未来在 Clojure 中引入 Actor 的可能。
简单实现
随着 core.async 库的推出和成熟,我们实际上已经可以用 core.async 来实现一个最简单的 Actor 模型:
(require '[clojure.core.async :as a]) (defn actor [f] (let [mail-box (a/chan (a/dropping-buffer 32))] (a/go-loop [f f] (when-let [v (a/<! mail-box)] (recur (f v)))) mail-box)) (def ! a/put!) ;erlang 的操作符
可以看到,上面的 actor 函数可以将一个 core.async 通道封装成一个主动单元,其中拥有自己的事件循环,不断地在 mail-box 上等待新的消息。
下面我们定义一个简单的调试 actor:
(def debug-actor (actor (fn debug[x] (prn x) debug))) (! debug-actor "Hello, world!") ;;输出 Hello, world
这最简单的 actor 当然还没有支持分布式编程,但使用这个模型,我们将程序的组件变成了主动的单元,从而在通道的基础上提供了另一种抽象。
与 Transducers 的关系
Clojure 1.7 开始引入的 transducer 被广泛使用,它可以使用在任何连续的数据结构上,例如序列,以及 core.async 的通道,如果我们已经定义好了一个 transducer 或者用它定义的操作 (xf),是否可以用于 Actor 呢?
(defn actor-xf [xf out-actor] (let [f-out (fn ([b] b) ([b itm] (a/put! b itm) b))] (fn f-actor ([v] (f-actor out-actor v)) ([acc v] (let [acc ((xf f-out) acc v)] (partial f-actor acc))))))
这个函数可以用定义好的 xf 生成一个 actor,它将对消息处理后将转化后的消息送往 out-actor。例如:
(def inc-actor (actor (actor-xf (map inc) debug-actor))) (! inc-actor 6) ;;输出7 (def complex-actor (actor (actor-xf (mapcat #(repeatedly % vector)) debug-actor))) (! complex-actor 2) ;;[] ;;[]
其实 core.async 库中已经实现了类似的功能,就是 chan
函数本身!我们不过需要用 pipe
函数将内外两个通道连起来就可以了:
(defn actor-xf [xf out-ch] (let [ch (a/chan (a/dropping-buffer 32) xf)] (a/pipe ch out-ch) ch))