直观理解生产者消费者问题

文章内容:

  • 生产者-消费者问题的模拟场景(代码实现)
  • 深入理解(信号量操作顺序)

先把概念丢一边,来一起下个馆子吧。

模拟场景

1. 客人进店点餐,把自己想吃的东西写在了订单上,并把订单放在了服务台;

2. 服务员从服务台上拿走客人写好的订单,准备上菜。

客人:生产者(把想吃的菜写在订单上,这里可以理解为下(订)单)。服务员:消费者(此消费者不是指代花钱买东西的人,这里可以理解为做(订)单处理订单服务台:公用缓冲池(这里可以理解为订单队列

一切都跟我们往常所见一样正常运作,但是计算机却并不那么"人性化",当客人和服务员都是机器人时,会发生两个奇怪的现象:

1. 当机器人服务员发现服务台上订单的数量为0时,它会把客人手上并未写好的单子抢走,执行处理订单的操作。

 2. 当机器人客人发现服务台上摆满了订单并且已经明确不允许再放订单时,它任会固执的把订单放上去,执行下单的操作。

为了世界和平,我们需要使用一个标志来指示机器人何时处于等待状态,这里的标志即是信号量,信号量为1时执行,为0时互斥。

何为互斥?

在编程实现上,我们把生产者和消费者定义为两个线程在工作,当这两个线程在执行过程中都遇到关键代码块或者关键变量时,如果此时信号量为1,则该代码块允许访问,如果此时信号量为0,则表示当前代码块已经被对方线程在访问了,需等待对方线程访问完毕才能继续访问。

在实际过程中,像这样的生产者和消费者的数量非常多,这里这是简单的开辟两条线程进行演示。

编码场景: 客人在订单中的食物栏写上我要蛋炒饭,并把订单放在服务台上,服务员取出该订单后将我要蛋炒饭改为给你蛋炒饭

下面来看看具体的代码实现。

(main.swift)
import Foundation

/// 服务台允许摆放的最大订单数量
let maxSize = 10
/// 订单队列
var orders = [Order]()
/// 目标订单
var targetOrder = Order()
/// 信号量
var semaphore = DispatchSemaphore(value: 1)
/// 生产者线程
let producer = DispatchQueue(label: "生产者线程")
/// 消费者线程
let consumer = DispatchQueue(label: "消费者线程")
/// 等待时长
let timeInterval = 0.003125

while true {
    // 生产者线程
    producer.async {
        if orders.count < maxSize {
            semaphore.wait()
            
            var createdOrder = Order()
            createdOrder.id = Int(arc4random())
            createdOrder.foods = "我要蛋炒饭"
            orders.append(createdOrder)
            targetOrder = createdOrder
            print("当前线程名称及地址: \(producer.label)<\(String.init(format: "%p", Thread.current))>"
                + " 正在提交的订单: [订单编号: \(targetOrder.id) 食物: \(String(describing: targetOrder.foods))]"
                + " 服务台订单数量: \(orders.count)")
            // print(orders)

            Thread.sleep(forTimeInterval: timeInterval)
            semaphore.signal()
        }
    }

    // 消费者线程
    consumer.async {
        if orders.count > 0 {
            semaphore.wait()
            
            let disposedOrder = orders.removeFirst()
            targetOrder = disposedOrder
            targetOrder.foods = "给你蛋炒饭"

            print("当前线程名称及地址: \(consumer.label)<\(String.init(format: "%p", Thread.current))>"
                + " 正在处理的订单: [订单编号: \(targetOrder.id) 食物: \(String(describing: targetOrder.foods))]"
                + " 服务台订单数量: \(orders.count)")
            // print(orders)
            
            Thread.sleep(forTimeInterval: timeInterval)
            semaphore.signal()
        }
    }

}

订单:

(Order.swift)
import Foundation

struct Order {
    var id: Int = 0
    var foods: String = ""
}

当不利用信号量进行互斥操作(注释相关代码)时会报这样的运行时会报这样的error:Thread 2: Fatal error: UnsafeMutablePointer.deinitialize with negative count: 同时在不同线程中对数组进行修改。

并且我们仔细观察也可以看到生产者和消费者操作的目标订单和订单队列也是混乱的。

由此可见进行同步操作尤为重要。

以上是对生产者与消费者问题的直观理解和具体的代码实现。

深入理解

从上面的实现中我们仅设置了一个订单队列数组orders来对公用缓冲池进行具象化,其实实际的公用缓冲池具有n个缓冲区,这时可利用互斥信号量mutex实现各个进程(或线程或程序,以下同)对缓冲池的互斥使用;利用信号量emptyfull分别表示缓冲池中的空缓冲区满缓冲区数量。只要缓冲池未满,生产者便可以发送一个信号量到缓冲区;只要缓冲区未空,消费者便可以从缓冲区取走一个信号量。

对数组orders的长度判断间接对应了emptyfull的职责。

这是较完整意义上的生产者-消费者问题。 我们再进行稍微深入的理解。

它相较于上面订单例子的实现似乎多了几个概念,为了方便理解,另外也加入几个概念,来一起看看:

  1. 死锁: 在无外力状态下,两者(或以上)都将无法从僵持等待状态中解脱出来。
  2. 临界资源: 一次只允许一个进程使用的非抢占式资源。
  3. 临界区: 每个进程中访问临界资源的那段代码称为临界区。
  4. 互斥信号量: mutex,其初值为1,取值范围是(-1,0,1)。 对应代码实现中var semaphore = DispatchSemaphore(value: 1)的value值。
  • 当mutex=1时,表示两个进程都没有进入需要互斥的临界区;
  • 当mutex=0时,表示有一个进程进入临界区运行,另一个进程必须等待;
  • 当mutex=-1时,表示有一个进程正在临界区运行,另一个进程因等待而阻塞在信号量队列中,需要被当前已在临界区运行的进程退出时唤醒。注意:我们对mutex的操作主要有两个,wait(P操作)和signal(V操作),分别表示对semaphore count的+1和-1,并且这两步操作必须成对出现。缺少wait会导致系统混乱,不能保证对临界资源的互斥访问;缺少signal会使临界资源永远不会被释放,从而使因等待该资源而阻塞的进程不能被唤醒。
  1. 资源信号量: 空缓冲区与满缓冲区。注意:应先执行对资源信号量的wait操作,然后再执行对互斥信号量的wait操作,否则可能引起进程死锁。为什么?换句话说,先执行对资源信号量的wait操作,然后再执行对互斥信号量的wait操作,这样就不会引起进程死锁

我们先来分析分析第二句话。

生产者-消费者问题的伪代码描述如下:

int in = 0, out = 0; in: 空缓冲区队列索引; out: 满缓冲区队列索引
 item buffer[n]; // 大小为n的缓冲区
 semaphore mutex = 1, full = 0, empty = n;
 
 // 生产者进程
 void producer() {
 do {
     produce an item nextp; // 生产一个产品
     ...
     wait(empty); // 申请资源信号量(一个单位空缓冲区)
     wait(mutex); // 申请互斥信号量(临界资源)
     buffer[in] = nextp; // 将产品放到缓冲区
     in = (in + 1) % n; //类似循环队列
     signal(mutex); // 释放临界资源
     signal(full); // 释放一个单位满缓冲区
     } while (TRUE);
 }
 
 // 消费者进程
 void consumer() {
     do {
     wait(full); // 申请资源信号量(一个单位满缓冲区)
     wait(mutex); // 申请互斥信号量(临界资源)
     nextc = buffer[out]; // 将产品从缓冲区取出
     out = (out + 1) % n; //类似循环队列
     signal(mutex); // 释放临界资源
     signal(empty); // 释放一个单位空缓冲区
     consume the item in nextc; // 消费一个产品
     ...
     } while (TRUE);
 }

根据墨菲定律,某个情况如果可能会出现,那么它肯定会出现,无论情况好坏。

现在假设这种可能的极端情况有两种:

<1>. 当在生产者进程中执行到wait(empty)操作申请一个空缓冲区时,发现空缓冲区已满。那么这时生产者进程会挂入到阻塞队列,等待消费者进程释放一个空缓冲区后,然后被唤醒。注意:生产者进程被唤醒后才会去申请临界资源。

我们再回来看看为什么如果先执行对互斥信号量的wait操作(即申请资源信号量),再执行对资源信号量的wait操作(即申请互斥信号量),可能引起进程死锁。

<2>. 当生产者进程先申请互斥信号量再申请资源信号量,则可能发生一个进程申请互斥信号量成功,得到访问的临界资源,而再去申请空缓冲区时发现空缓冲区已满,那么这时生产者会挂入到阻塞队列,等待消费者进程释放一个空缓冲区,然后被唤醒。注意:此时消费者进程有无能力释放一个空缓冲区?答案是 没有。

因为当消费者进程执行到wait(mutex)即想要去申请互斥信号量访问临界资源时,发现此时该临界资源被访问,因而wait(mutex)操作必定失败,消费者进程阻塞。很显然,这种情况下因为生产者进程带着临界资源进入到阻塞状态,而消费者进程因无法拿到临界资源也处于阻塞状态,造成死锁。

结语

  • 关于生产者-消费者问题的解决办法有很多,以上的讨论只是利用了记录型信号量,当然了我们也可以使用AND信号量(加强版的记录型信号量),甚至使用管程机制等等,有兴趣的小伙伴可以自行查阅。
  • 小生才疏浅陋,文中难免有错漏之处,请多多指教,感谢您的阅读。

相关推荐