python线程、锁、死锁以及生产者消费者模式
概念及状态
线程
- 线程,有时候被成为轻量级进程,是程序执行流的最小单元。
- 线程是进程中的一个实体,是被系统独立调度和分派的基本单位
- 线程自己不拥有系统资源,但它与同一个进程中的其他线程共享进程的全部资源
- 每个进程至少有一个线程
状态
- 就绪
- 线程具备运行的条件,在等待被调度
- 阻塞
- 线程在等待一个事件,比如释放锁
- 运行
- 线程被调度,正在运行
- 结束
- 线程运行结束
创建线程及执行顺序
通过threading.thread创建
- 同Process,通过其构造函数可以创建线程
- 调用start方法开启线程
import time
import threading
def sayHello():
print('hello world')
time.sleep(1)
def sing():
for i in range(3):
print('begin sing... %d' % i)
time.sleep(1)
def dance():
for i in range(3):
print('begin dance... %d' % i)
time.sleep(1)
if __name__ == '__main__':
# 单线程执行
print('---单线程start----')
for i in range(1, 3):
sayHello()
print('---单线程end----')
# 多线程执行
print('---多线程start----')
for i in range(3):
t = threading.Thread(target=sayHello())
t.start()
print('---多线程end----')
# 主线程等待所有子线程结束后才结束, 默认主线程不会等待子线程结束后才结束
t1 = threading.Thread(target=sing)
t2 = threading.Thread(target=dance)
t1.start()
t2.start()
# 继承thrad类,实现run方法
print('继承thread类,实现run方法')
while True:
length = len(threading.enumerate())
# print('当前运行的线程数为:%d' % length)
if length <= 1:
print('---main end---')
break
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
输出结果为:
---单线程start----
hello world
hello world
---单线程end----
---多线程start----
hello world
hello world
hello world
---多线程end----
begin sing... 0
begin dance... 0
继承thread类,实现run方法
begin sing... 1
begin dance... 1
begin sing... 2
begin dance... 2
---main end---
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
继承Thread类
- 同Process,继承thread类,重写run方法,也可以完成一个线程的创建
- python中的threading.Thread类中有一个run方法,用于定义线程的功能函数,可以在自己的类中复写此方法
- 创建好进程实例后,通过调用实例的start()方法可以启动线程,交由python虚拟机进行调度,当该线程执行时,会自动执行其中的run方法
class MyThread(threading.Thread):
'''
多线程程序的执行顺序是不确定的。
当执行到sleep时,线程被阻塞,到sleep结束时,线程处于就绪状态,等待调度。
线程调度将自行选择一个线程执行,所以线程的启动顺序 run函数中每次循环的执行顺序都不能确定
每个线程都有自己的名字
run方法结束时,线程完成
无法控制线程调度形式,但可以通过别的方式来影响线程调度方式
线程的几种状态:
新建 就绪 运行 阻塞(等待) 死亡
'''
def run(self):
for i in range(5):
time.sleep(1)
print('i am ' + self.name + ' @ ' + str(i))
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
输出结果为:
继承thread类,实现run方法
i am Thread-1 @ 0
i am Thread-1 @ 1
i am Thread-1 @ 2
i am Thread-1 @ 3
i am Thread-1 @ 4
---main end---
- 1
- 2
- 3
- 4
- 5
- 6
- 7
线程执行顺序
- 从上述示例的执行结果来看,线程的执行是不确定的。当执行到sleep语句,线程将被阻塞,sleep结束后,线程进入就绪状态。线程调度将自动选择一个线程执行。
- 只能保证每个线程都完整的运行run函数,但是启动顺序 run函数中每次循环的执行顺序都不能确定
- 每个线程都有自己的名字
- run方法结束时该线程完成
- 无法控制线程调度程序,但可以通过别的方式来影响线程调度的方式(比如下面说到的同步应用)
多线程共享数据
- 在一个进程内的所有线程共享全局变量,能够在不适用其他方式的前提下完成多线程之间的数据共享
- 线程对全局变量都可以随意修改,不安全(后面会介绍到)
import time
import threading
g_num = 100
g_nums = [1, 2, 3, 4]
def test1():
global g_num
for i in range(5):
g_num += 1
print('---in test1,g_num is %d' % g_num)
def test2():
global g_num
print('---in test2,g_num is %d' % g_num)
def work1(nums):
nums.append(99)
print('---in work1---', nums)
def work2(nums):
time.sleep(1)
print('---in work2---', nums)
if __name__ == '__main__':
print('g_num原始值为:%d' % g_num)
t1 = threading.Thread(target=test1)
t1.start()
t2 = threading.Thread(target=test2)
t2.start()
t3 = threading.Thread(target=work1, args=(g_nums,))
t3.start()
t4 = threading.Thread(target=work2, args=(g_nums,))
t4.start()
t1.join()
t2.join()
t3.join()
t4.join()
print('---main end---')
print('g_num现在值为:%d' % g_num)
print('g_nums is ', g_nums)
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
输出结果为:
g_num原始值为:100
---in test1,g_num is 105
---in test2,g_num is 105
---in work1--- [1, 2, 3, 4, 99]
---in work2--- [1, 2, 3, 4, 99]
---main end---
g_num现在值为:105
g_nums is [1, 2, 3, 4, 99]
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
多线程非共享数据
- 在多线程开发中,全局变量时多个线程都共享的数据,而局部变量时各个线程自有的,非共享
import time
import threading
class MyThread(threading.Thread):
def __init__(self, num, timeSleep):
threading.Thread.__init__(self)
self.num = num
self.timeSleep = timeSleep
def run(self):
self.num += 1
time.sleep(self.timeSleep)
print('线程 %s , num = %d' % (self.name, self.num))
def test1(sleepTime):
num = 1
time.sleep(sleepTime)
num += 1
print('---%s--num = %d' % (threading.current_thread(), num))
'''
在多线程开发中,全局变量时多个线程都共享的数据,而局部变量时各个线程自有的,非共享
'''
if __name__ == '__main__':
t1 = MyThread(20, 1)
t1.start()
t2 = MyThread(20, 1)
t2.start()
t3 = threading.Thread(target=test1, args=(4,))
t4 = threading.Thread(target=test1, args=(2,))
t3.start()
t4.start()
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
输出结果为:
线程 Thread-1 , num = 21
线程 Thread-2 , num = 21
---<Thread(Thread-4, started 123145409413120)>--num = 2
---<Thread(Thread-3, started 123145404157952)>--num = 2
- 1
- 2
- 3
- 4
线程同步 锁 死锁
锁
- 在上一节中,说到了多线程共享进程的全局变量会引发线程安全的问题,如下示例:
import time
import threading
g_num = 1
def test1():
global g_num
for i in range(1000000):
g_num += 1
# print('---in test1,g_num is %d' % g_num)
def test2():
global g_num
for i in range(1000000):
g_num += 1
# print('---in test2,g_num is %d' % g_num)
'''
多个线程对同一个资源进行访问操作,并且没有进行资源保护,线程不安全,与预期结果不符
'''
if __name__ == '__main__':
print('g_num原始值为:%d' % g_num)
t1 = threading.Thread(target=test1)
t1.start()
t2 = threading.Thread(target=test2)
t2.start()
t1.join()
t2.join()
print('---main end---')
print('g_num现在值为:%d' % g_num)
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
输出结果为:
g_num原始值为:1
---main end---
g_num现在值为:1315909
- 1
- 2
- 3
- 狠明显,与预期不符,没有对多个线程对同一资源的访问进行控制。那么此问题该如何保证呢?
- 思路:
- 系统调用T1,然后获取到num=1,此时上一把锁,不允许其他线程对num进行操作
- 当t1对num操作完成后,释放锁
- 下一个t2拿到num时,同样加一把锁,操作完成后释放锁
- 同理,其他线程对num进行操作时都加一把锁,操作完成后释放锁,这样就保证了数据的正确性
- 下面,通过threading中的Lock来完成锁的设置
import time
import threading
g_num = 1
def test1():
global g_num
for i in range(1000000):
mutexFlag = mutex.acquire()
if mutexFlag:
g_num += 1
# print('---in test1,g_num is %d' % g_num)
mutex.release()
def test2():
global g_num
for i in range(1000000):
mutexFlag = mutex.acquire()
if mutexFlag:
g_num += 1
# print('---in test2,g_num is %d' % g_num)
mutex.release()
'''
多个线程对同一个资源进行访问操作,并且没有进行资源保护,线程不安全,与预期结果不符
'''
if __name__ == '__main__':
print('g_num原始值为:%d' % g_num)
# 为解决上述问题,引入锁机制
mutex = threading.Lock()
t1 = threading.Thread(target=test1)
t1.start()
t2 = threading.Thread(target=test2)
t2.start()
t1.join()
t2.join()
#使用锁 确保了某段关键代码只能由一个线程从头到尾完整的执行
# 阻止了多线程并发执行,包含锁的某段代码实际上只能以单线程模式来执行,效率降低
# 由于存在多个锁,不同的线程持有不同的锁,并试图获取对方持有的锁时,可能会造成死锁
print('---main end---')
print('g_num现在值为:%d' % g_num)
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
输出结果为:
g_num原始值为:1
---main end---
g_num现在值为:2000001
- 1
- 2
- 3
- 如上,输出结果就符合我们的预期了
- mutex.acquire()方法,默认参数为true,true表示堵塞,即如果这个锁在上锁之前以及被锁了,那线程会阻塞住,直到拿到锁才会继续执行。false的话,表示非阻塞,不管有没有锁都不会阻塞在这里
上锁解锁过程
- 当一个线程调用锁的acquire()方法时,锁就进入locked状态
- 每次只要一个线程可以获得锁。如果其他线程试图获取锁,该线程就会变为block阻塞状态,直到拥有锁的线程释放掉后,锁就进入unlocked状态
- 线程调度程序从处于同步阻塞中的线程中选择一个来获取锁,并使得该线程进入运行状态
多线程同步应用
- 可以使用互斥锁完成多个任务,有序的进行工作,这就是线程同步
import time
import threading
''''
多个线程有序执行
可以使用互斥锁完成多个任务,有序的进行工作,这就是线程同步
'''
class Task1(threading.Thread):
def run(self):
while True:
if lock1.acquire():
print('----task1----')
time.sleep(1)
lock2.release()
class Task2(threading.Thread):
def run(self):
while True:
if lock2.acquire():
print('----task2----')
time.sleep(1)
lock3.release()
class Task3(threading.Thread):
def run(self):
while True:
if lock3.acquire():
print('----task3----')
time.sleep(1)
lock1.release()
lock1 = threading.Lock()
lock2 = threading.Lock()
lock2.acquire()
lock3 = threading.Lock()
lock3.acquire()
if __name__ == '__main__':
t1 = Task1()
t2 = Task2()
t3 = Task3()
t1.start()
t2.start()
t3.start()
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
输出结果为:
----task1----
----task2----
----task3----
----task1----
----task2----
----task3----
......
- 1
- 2
- 3
- 4
- 5
- 6
- 7
死锁
- 当多个线程共享多个资源,每个线程持有一部分资源,但又在相互等待对方的资源释放,这个时候就会产生死锁
class FirstThread(threading.Thread):
def run(self):
if mutexA.acquire():
print('hello this is first thred step1')
time.sleep(1)
if mutexB.acquire():
print('hello this is first thred step2')
mutexB.release()
mutexA.release()
class SecondThread(threading.Thread):
def run(self):
if mutexB.acquire():
print('hello this is second thred step1')
time.sleep(1)
if mutexA.acquire():
print('hello this is second thred step1')
mutexA.release()
mutexB.release()
mutexA = threading.Lock()
mutexB = threading.Lock()
if __name__ == '__main__':
t1 = FirstThread()
t1.start()
t2 = SecondThread()
t2.start()
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 如上,就会造成死锁,程序卡住陷入死循环
生产者与消费者
- 在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发中,如果生产者处理速度很快,而消费者消费速度很慢,那么生产者必须
- 等待消费者处理完,才能继续生产数据。
- 反之,如果消费者的处理能力大于生产者的处理能力,消费者必须等生产者生成完后在进行处理。
- 生产消费模式是通过一个容器来解决生产者和消费者的强耦合问题。俩者不再直接通讯,而通过阻塞队列来进行通讯,相当于一个缓冲区,平衡了生产者和消费者的处理能力
import time
import threading
from queue import Queue
class Product(threading.Thread):
def run(self):
global q
count = 0
while True:
if q.qsize() < 1000:
for i in range(100):
count += 1
msg = '生产商品 ' + str(count)
q.put(msg)
print(msg)
time.sleep(1)
class Customer(threading.Thread):
def run(self):
global q
while True:
if q.qsize() > 100:
for i in range(3):
msg = self.name + '消费了' + q.get()
print(msg)
time.sleep(1)
if __name__ == '__main__':
q = Queue()
for i in range(500):
q.put('初始化数据 %d' % i)
for i in range(2):
p = Product()
p.start()
for i in range(3):
c = Customer()
c.start()
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
输出结果为:
...
生产商品 100
Thread-3消费了初始化数据 0
Thread-3消费了初始化数据 1
Thread-3消费了初始化数据 2
Thread-4消费了初始化数据 3
Thread-4消费了初始化数据 4
Thread-4消费了初始化数据 5
Thread-5消费了初始化数据 6
Thread-5消费了初始化数据 7
Thread-5消费了初始化数据 8
生产商品 101
生产商品 101
生产商品 102
生产商品 103
生产商品 104
生产商品 105
生产商品 106
生产商品 107
生产商品 108
生产商品 109
生产商品 110
生产商品 111
生产商品 112
Thread-5消费了初始化数据 9
Thread-5消费了初始化数据 10
Thread-5消费了初始化数据 11
...
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
threadLocal
- 一个threadLocal虽然是全局变量,但每个线程都只能读写自己线程的独立副本,互不干扰,解决了参数在一个线程中各个函数相互传递的过程
- 在多线程环境下,每个线程都有自己的数据,一个线程使用自己的局部变量比使用全局变量好,因为只有自己可见,并不影响其他线程,也不用加锁。
- 如下:
- local_school就是一个threadlocal对象,每个thread对它都可以读写student属性,但互不影响
- ThreadLocal最常用的地方就是为每个线程绑定一个数据库链接,http请求 用户身份信息等,这样一个线程的所有调用的处理函数都可以非常方便的访问这些资源
import threading
import time
local_school = threading.local()
def process_student():
# 获取当前线程关联的student
std = local_school.student
print('hello %s in %s' % (std, threading.current_thread()))
def process_thread(name):
# 绑定threadlocal的student
local_school.student = name
process_student()
if __name__ == '__main__':
t1 = threading.Thread(target=process_thread, args=('旺财',))
t2 = threading.Thread(target=process_thread, args=('小强',))
t1.start()
t2.start()
t1.join()
t2.join()
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
输出结果为:
hello 旺财 in <Thread(Thread-1, started 123145557520384)>
hello 小强 in <Thread(Thread-2, started 123145562775552)>
- 1
- 2
总结
锁
- 好处:确保了某段代码只能由一个线程完整的执行,解决了线程安全问题
- 坏处:
- 阻止了多线程并发执行,包含锁的代码只能以单线程模式执行,降低了效率
- 可能存在不同线程持有不同的锁,处理不当可能造成死锁
进程与线程
- 进程是系统进行资源分配和调度的一个独立单元
- 线程是进程的一个实体,是cpu调度和分配的基本单元,基本上不拥有系统资源,只拥有一点在运行过程中必不可少的资源,但是它可以与同一进程中的其他线程共享进程拥有的所有资源
- 一个程序至少一个进程,一个进程至少一个线程
- 线程的划分尺度小于进程,使得多线程程序的并发性高
- 进程在运行过程中拥有独立的内存单元,而多个线程共享内存,提高了程序的执行效率
- 线程不能独立运行,必须依存在进程中