Python之线程介绍(2)

基于 Lock 的基本同步操作

有很多方法可以避免或解决竞争条件问题。这里虽然不会全部列出来,但也给出了一些常用的方法,先从锁(Lock)开始讲起吧。

为了解决上面的竞争条件问题,你需要找一种方法,一次只允许一个线程操作代码中的读/改/写部分。最常见的方法是在 Python 中使用 Lock。在其他一些语言中,这个概念被命名为互斥(mutex)。互斥来自 MUTual EXclusion,这也正是 Lock 实现的功能。

锁对象的行为就像一张通行证,同一时间只允许一个线程持有锁,其它线程想要持有锁必须等到锁的持有者释放它才行。

执行此操作的基本函数是 .acquire() 和 .release()。线程通过调用 my_lock.acquire() 方法来获得锁。如果锁已经被持有,则调用的线程会一直等到锁被释放。这里有一个很重要的点,如果一个线程获得了锁但一直不将其释放,那么程序就会被卡住。你稍后会了解到与其相关的更多信息。

幸好,Python 中的 Lock 也可以作为上下文管理器运行,因此也可以在 with 语句中使用它。并且当 with 语句由于任意原因退出时,Lock 都会自动释放。

我们来看一下带有锁的 FakeDatabase 类。函数调用方式保持不变:

Python之线程介绍(2)

除了添加一堆调试日志来更清楚的描述锁以外,较大的变化是添加了一个名为 .lock 的成员变量,它是一个 threading.Lock() 对象。._lock 在未锁定状态时会先初始化,并由 with 语句进行锁定和释放。

值得注意的是,运行这个函数的线程将会持有该锁,并直到数据库更新完全结束为止。这种情况下,线程在复制,更新,休眠以及将数值写回数据库时会一直持有锁。

如果运行这个版本的程序,并将日志级别设置为 WARNING 级别,可以看到以下输出内容:

Python之线程介绍(2)

这样,你的程序已经可以正常工作了!

你可以通过在 __main__ 中配置日志输出的语句之后,加入下面这段语句,将日志级别设置到 DEBUG 以开启完整的日志记录:

Python之线程介绍(2)

开启 DEBUG 级别日志并运行程序,输出内容如下所示:

Python之线程介绍(2)

这个输出内容中,你可以看到 Thread 0 获得锁并且在进入休眠时仍持有锁。然后 Thread 1 启动并尝试获得这个锁。因为 Thread 0 仍持有该锁,所以 Thread 1 只能等待着锁被释放。这就是锁提供的互斥特性。

本文其余部分中,许多示例都会有 WARNING 和 DEBUG 级别的日志信息。我们通常只显示 WARNING 级别的输出,因为 DEBUG 日志可能会非常冗杂。在打开日志记录功能的情况下运行程序,可以看到这些日志的作用。

死锁

在继续下一步之前,你应该先了解一下使用锁会经常遇到的问题。如你所见,如果已经获得了锁,那么再次调用 .acquire() 时,会先等待持有锁的线程调用 .release() 方法来释放锁。运行下面这段代码时,你觉得会发生什么:

Python之线程介绍(2)

当程序第二次调用 .acquire() 时,它会挂起自身线程并等待锁被释放。这个例子中,你可以通过移除第二次的方法调用来修复死锁问题。死锁的发生通常源于两个微妙事件:

1. 锁未被正确释放的错误实现

2. 由可能持有或曾经持有锁的函数调用公共函数的错误设计

第一情况时有发生,但是使用锁作为上下文管理器会大大减少其发生的频率。建议编写代码时尽可能使用上下文管理器,因为这有助于避免出现异常,而跳过调用 .release() 方法的情况。

在一些语言中,对第二种错误设计问题可能有点棘手。幸好,Python 线程还有第二个对象 RLock,就是专门为这种情况设计的。它允许一个线程在调用 .release() 之前多次调用 .acquire() 请求获得 RLock 锁。但仍然需要该线程对 .release() 和 .acquire() 的调用次数相同,且必须如此。

Lock 和 RLock 是线程化编程中的两个用于防止竞争条件的基本工具。当然,还有一些其它的工具。在了解它们之前,我们先转向一个略微不相关的问题上。

生产者/消费者线程

生产者/消费者问题是用于查看线程或进程同步问题的标准计算机科学问题。下面你会看到它的一个衍生体,可以用于了解 Python 线程模块提供的原生支持。

对于这个例子,你可以想象一下,一个程序需要从网络中读取消息并将其写入磁盘。程序不会在需要时才请求消息,它必须在消息到达时监听和接收消息。而且这些消息不会以均匀的速度进入,而是突然涌入。程序的这一部分称为生产者。

另一方面,一旦读取到消息,你需要将其写入数据库。数据库访问速度很慢,但是速度足以保持平稳地消费消息。可是,当消息突然涌入时,消费速度就会有些跟不上。这部分是消费者。

在生产者与消费者之间,需要创建一个管道,这与你已知的同步处理对象有所不同。

这些都是设计的基本内容。现在来看看使用锁的解决方案。它并不能完美的适用,但是用到了一些已知的工具,所以是一个很好的切入点。

带有 Lock 阻塞的生产者/消费者模型

由于这是一篇关于Python线程的文章,并且你刚刚阅读了关于 Lock 原生支持的内容,所以我们来尝试使用一个或两个 Lock 来解决这个问题。

生产者/消费者模型的一般设计,是由一个生产者线程从虚拟的网络中读取消息并将其放入管道中:

Python之线程介绍(2)

为了得到这些虚拟消息,生产者会生成一至一百之间的随机数,然后在管道对象上调用 .set_message() 并将其发送给消费者。

生产者还使用 SENTINEL 值作为信号,指示消费者在发送了十个值之后停止。这么操作有点繁琐,不过不用担心,在完成这个示例程序之后,你可以再尝试不用 SENTINEL 的实现方法。

管道的另一边是消费者:

Python之线程介绍(2)

消费者从管道中读取消息并将其写入模拟数据库,这里只是将消息打印到显示器。如果消费者读到 SENTINEL 信号值,则会终止线程并退出函数。

在实际处理消费消息之前,先看下 __main__ 中管道部分的代码,这部分代码创建了一些线程:

Python之线程介绍(2)

这部分代码应该比较眼熟吧,它与前面示例中的 __main__ 代码比较接近。

注意,你可以通过取消注释这一行,来打开 DEBUG 级别日志,来查看完整的日志记录:

Python之线程介绍(2)

通过 DEBUG 日志消息来查看每个线程获得和释放锁的具体位置,这一点还是很有必要的。

现在我们来看看将消息从生产者传递给消费者的管道:

Python之线程介绍(2)

这里的代码比较多,但其中大部分只是日志记录语句,方便查看运行时发生的情况。以下是删除了所有日志语句的相同代码:

Python之线程介绍(2)

这样似乎更容易管理。这个版本的代码中,管道(Pipeline)有三个成员:

1. .message 存储要传递的消息。

2. .producer_lock 是一个 threading.Lock 对象,它可以限制生产者线程对消息的访问。

3. .consumer_lock 也是一个 threading.Lock 对象,它可以限制消费者线程对消息的访问。

__init__() 方法初始化了这三个成员,然后用 .consumer_lock 变量调用 .acquire()。这是管道启动时的初始窗台,允许生产者添加新的消息,但是消费者需要等待消息出现。

.get_message() 和 .set_messages() 方法算是相对的。.get_message() 对 consumer_lock 变量调用 .acquire()。这样消费者会一直等待消息达到。

一旦消费者获得了 .consumer_lock,它就会复制 .message 中的值,并且对 .producer_lock 变量调用 .release()。释放锁并允许生产者将下一条消息插入管道中。

继续调用 .set_message() 之前,.get_message() 方法中有一些东西很容易被遗漏。它有可能会忽略 message 局部变量,只是让函数以 return self.message 结束。

这是由于消费者调用了 .producer_lock.release() 方法之后,生产者就可以将 self.message 替换并运行。但对 self.message 的赋值替换操作可能会发生在 .release() 返回之前!这意味着当函数返回 self.message 时,实际上可能是新生成的下一条消息,这样你就会丢失第一条消息。这也是竞争条件的一种示例。

再来看看 .set_message() 方法,你可以发现这个事务操作的另一面。生产者使用一条消息传入并调用这个方法,它将获得锁 .producer_lock,设置 .message 变量的值,然后对 consumer_lock 变量调用 .release() 方法,这样就允许消费者读取这个消息值了。

我们来运行这段代码,并将日志级别设置为 WARNING,运行结果是怎么样的呢:

Python之线程介绍(2)

首先,你可能会觉得有些奇怪,生产者在消费者运行之前就产生了两条消息。如果你回头看看生产者和 .set_message() 方法,你会发现,它仅在将消息放入管道时才会等待锁。这是在生产者获取消息并将其标记为持有之后才完成的。

当生产者尝试发送第二条消息时,会再次调用 .set_message() 方法并且会被阻塞。

操作系统可以随时在线程间切换,但通常允许每个线程在切换之前拥有一个合理的运行时间。这就是为什么生产者会在第二次调用 .set_message() 方法并被阻塞之前,线程会一直运行。

但是,一旦线程被阻塞,操作系统将会找到其他需要运行的线程,并将其切换出来。这种情况下,唯一可以处理其他事情的线程是这里的消费者。

消费者调用 .get_message() 方法读取消息并对 .producer_lock 变量调用 .release() 方法,从而使生产者在下次线程切换时可以继续运行。

注意,第一条消息是 43,这是消费者实际读到的消息,尽管此时生产者已经生成了新的消息 45。

虽然 Lock 适用于这种局限的生产者/消费者模型,但它通常不是生产者/消费者问题的一个好的解决方案,因为它一次只允许管道中只有一个值。当生产者有一连串消息时,就没地方放了。

我们可以使用队列来更好的解决这个问题。

带有队列的生产者/消费者模型

如果你希望能一次处理管道中的多个值,则需要管道的数据结构允许根据生产者消息的数量进行伸缩调整。

Python 的标准库中有一个队列模型,包含一个 Queue 类。现在修改管道,使用队列替换这种被锁保护的变量。还有另外一种方式是使用其它 Python 的 threading 原语(Event,事件)停止工作线程(事件)。

先从事件开始,threading.Event 对象允许线程发出事件信号,而其它很多线程则等待该事件发生。代码中的关键部分在于。等待事件的线程不一定需要停止它们当前正在执行的操作,可以每隔一段时间就检查一次事件的状态。

触发的事件可以有很多。这个例子中,主线程只是休眠一段时间然后执行事件 .set():

Python之线程介绍(2)

这里与之前仅有的变化是,在第 6 行创建事件对象,在第 8 行和第 9 行传递事件作为参数,最后一部分是第 11 行到第 13 行,休眠 1 秒,记录消息然后对事件调用 .set()。

生产者代码同样不需要太多改动:

Python之线程介绍(2)

在第 3 行设置事件之前,循环会一直进行,并且不再向管道中放入 SENTINEL (哨兵)值。

而消费者则必须做出相应修改:

Python之线程介绍(2)

如果需要读取 SENTINEL 的消息,此时代码中需要一个略微复杂的循环条件。这个循环不仅需要设置在事件触发之前,而且还需要保持循环状态,直到管道被清空为止。

在消费者完成之前,确保队列为空可防止另一个常见的问题。如果消费者在管道中还有消息时退出了,那么可能会发生两种结果。第一是丢失了最后的消息,但更严重的是生产者可能会尝试将消息添加到整个消息队列并且无法退出。

如果在生产者检查 .is_set() 条件之前,调用 pipeline.set_message() 之后,触发了事件消息,就有可能会发生这种情况。

如果发生了这种情况,生产者可能会在队列还满的状态下唤醒并退出。然后,生产者会调用 .set_message() 方法,等待队列中腾出新的消息空间。但是消费者已经退出了,所以不会腾出新的消息空间,生产者也不会退出。

其它的消费者代码应该也是与此相同。

然而,管道的代码却需要修改很多:

Python之线程介绍(2)

这可以看到 Pipeline 是 queue.Queue 的子类。队列初始化时,可选参数可以指定队列的最大容量。

如果你为 maxsize 指定了一个正整数,它就会限制队列中元素的个数,导致在队列中的元素少于 maxsize 之前。.put() 方法会被阻塞。如果未指定 maxsize,则队列有可能会增长到计算机内存的上限。

.get_message() 和 .set_message() 方法代码体量变的更小了。它们基本上只是在 Queue 的 get() 和 .put()方法上包装了一下。可能你比较关心的是新添加的那部分,用于阻止线程竞争条件触发的加锁代码的位置。

编写标准库的核心开发人员知道 Queue 经常被用于多线程开发,已将所有加锁代码合并到 Queue 内部。队列 Queue 是线程安全的。

运行这个程序,输出内容如下:

Python之线程介绍(2)

如果你看完了上面示例中的输出内容,可以看到一些有趣的东西。在最上面,可以看到生产者创建了五条消息,并将其中的四条放入队列。它在放置第五条消息时,操作系统切换了线程。

然后消费者运行并拉取第一条消息,打印了该消息以及此时队列的深度:

Python之线程介绍(2)

这就是你看到的为什么第五条消息尚未进入队列的原因。当一条消息被移除后,队列的大小减到三。我们知道队列最多可以容纳十条消息,因此生产者线程不会被队列阻塞。它只是被操作系统切换掉了。

注意:你在自己电脑上的输出可能有所不同。输出内容会因每次运行而变化,这就是使用线程的魅力所在。

程序从开始到完成,我们看到主线程生成了事件,导致生产者立即退出。此时消费者仍然有很多工作要做,所以它会继续运行,直到管道被清空为止。

试着在生产者或消费者中使用不同的队列大小,并调用 time.sleep() 方法,可以分别模拟长耗时的网络访问或磁盘访问时间。对程序来说,即使是这些元素的微小改动,也会对运行结果产生很大差异。

对于生产者/消费者问题,这是一个更好的解决方案,但它还可以进一步优化。这个问题实际上不需要管道。如果你不需要记录日志,可以用队列 queue.Queue 来替换。

下面是直接使用队列 queue.Queue 的最终代码:

Python之线程介绍(2)

Python之线程介绍(2)

这样代码就更加简单易读了,并且展示了如何使用 Python 内置的原语来简化复杂问题。

锁 Lock 和队列 Queue 都是解决并发问题的轻量级类,不过标准库还提供了其它操作类。在结束本教程之前,我们来快速了解一下它们。

英文原文:https://realpython.com/intro-to-python-threading/
译者:敦伟

相关推荐