[译]介绍 `core.async` 核心的一些概念

源文档是 core.async 仓库的一个代码文件, 包含大量的教程性质的注释
https://github.com/clojure/core.async/blob/master/examples/walkthrough.clj
中间不确定的两句留了原文, 有读懂的同学请回复帮我纠正


这份攻略介绍 core.async 核心的一些概念

clojure.core.async namespace 包含了公开的 API.

(require '[clojure.core.async :as async :refer :all])

Channel

数据通过类似队列的 Channel 来传输, Channel 默认不进行 buffer(长度为 0)
需要生产者和消费者进行约定从而在 Channel 当中传送数据

chan 可以创建一个不进行 buffer 的 Channel:

(chan)

传一个数字以创建限定了 buffer 大小的 Channel:

(chan 10)

close! 用来关闭 Channel 终结接受消息传入, 已存在的数据依然可以取出
取尽的 Channel 在取值时返回 nil, nil 是不能直接通过 Channel 发送的!

(let [c (chan)]
  (close! c))

一般的 Thread

对在一般的 Thread 中, 使用 >!!(阻塞的 put) 和 <!!(阻塞的 take)
与 Channel 进行通信

(let [c (chan 10)]
  (>!! c "hello")
  (assert (= "hello" (<!! c)))
  (close! c))

由于是这些调用是阻塞的, 如果尝试把数据放进没有 buffer 的 Channel, 那么整个 Thread 都会被卡住.
所以需要 thread(好比 future) 在线程池当中执行代码主体, 并且通过 Channel 传回数据
例子中启动了一个后台任务把 "hello" 放进 Channel, 然后在主线程读取数据

(let [c (chan)]
  (thread (>!! c "hello"))
  (assert (= "hello" (<!! c)))
  (close! c))

go 代码块和反转控制(IoC) thread

go 是一个宏, 能把它的 body 在特殊的线程池里异步执行
不同的是本来会阻塞的 Channel 操作会暂停, 不会有线程被阻塞
这套机制封装了事件/回调系统当中需要外部代码的反转控制
go block 内部, 我们使用 >!(put) 和 <!(take)

这里把前面 Channel 的例子转化成 go block:

(let [c (chan)]
  (go (>! c "hello"))
  (assert (= "hello" (<!! (go (<! c)))))
  (close! c))

这里使用了 go block 来模拟生产者, 而不是直接用 Thread 和阻塞调用
消费者用 go block 进行获取, 返回 Channel 作为结果, 对这个 Channel 做阻塞的读取
(原文: The consumer uses a go block to take, then returns a result channel, from which we do a blocking take.)

选择性(alts)

Channel 对比队列一个啥手机应用是能够同时等待多个 Channel(像是 socket select)
通过 alts!!(一般 thread)或者 alts!(用于 go block)

可以通过 alts 创建后台线程讲两个任意的 Channel 结合到一起
alts!! 获取集合中某个操作的来执行
或者是可以 take 的 Channel, 或者是可以 put [channel value] 的 Channel
并返回包含具体的值(对于 put 返回 nil)以及获取成功的 Channel:
(原文: alts!! takes either a set of operations to perform either a channel to take from a [channel value] to put and returns the value (nil for put) and channel that succeeded:)

(let [c1 (chan)
      c2 (chan)]
  (thread (while true
            (let [[v ch] (alts!! [c1 c2])]
              (println "Read" v "from" ch))))
  (>!! c1 "hi")
  (>!! c2 "there"))

打印内容(在 stdout, 可能你的 REPL 当中看不到):
#<ManyToManyChannel ...> 读取 hi
#<ManyToManyChannel ...> 读取 there

使用 alts! 来做和 go block 一样的事情:

(let [c1 (chan)
      c2 (chan)]
  (go (while true
        (let [[v ch] (alts! [c1 c2])]
          (println "Read" v "from" ch))))
  (go (>! c1 "hi"))
  (go (>! c2 "there")))

因为 go block 是轻量级的进程而而不是限于 thread, 可以同时有大量的实例
这里创建 1000 个 go block 在 1000 个 Channel 里同时发送 hi
它们妥当时用 alts!! 来读取

(let [n 1000
      cs (repeatedly n chan)
      begin (System/currentTimeMillis)]
  (doseq [c cs] (go (>! c "hi")))
  (dotimes [i n]
    (let [[v c] (alts!! cs)]
      (assert (= "hi" v))))
  (println "Read" n "msgs in" (- (System/currentTimeMillis) begin) "ms"))

timeout 创建 Channel 并等待设定的毫秒时间, 然后关闭:

(let [t (timeout 100)
      begin (System/currentTimeMillis)]
  (<!! t)
  (println "Waited" (- (System/currentTimeMillis) begin)))

可以结合 timeoutalts 来做有时限的 Channel 等待
这里是花 100ms 等待数据到达 Channel, 没有的话放弃:

(let [c (chan)
      begin (System/currentTimeMillis)]
  (alts!! [c (timeout 100)])
  (println "Gave up after" (- (System/currentTimeMillis) begin)))

ALT

todo

其他 Buffer

Channel 可以定制不同的策略来处理 Buffer 填满的情况
这套 API 中提供了两个实用的例子.

使用 dropping-buffer 控制当 buffer 填满时丢弃最新鲜的值:

(chan (dropping-buffer 10))

使用 sliding-buffer 控制当 buffer 填满时丢弃最久远的值:

(chan (sliding-buffer 10))

相关推荐