Python进程及线程编程

什么是进程:

    简单来讲,进程就是操作系统中运行的程序或任务,进程和程序的区别在于进程是动态的,而程序是静态的。进程是操作系统资源管理的最小单位。

什么是线程:

    线程是进程的一个实体,是cpu调度和分派的最小单位,它是比进程更小的能独立运行的基本单位,线程本身不拥有资源,但它可以与同属于一个进程的线程共享进程的资源所拥有的全部资源。

Python多线程编程与GIL:

    为了更有效的利用多核处理,就出现了多线程编程,但是问题是线程间数据的一致性和状态的同步如果得到保证,因此python解析器引入了GIL全局锁GIL全局锁的出现虽然保证了线程之间状态和一致性的原则,但是同一时间点上却只能有一个线程在运行。比如:我们有4核CPU,同时发起4个线程,每个线程都在cpu上,但因为GIL全局锁的存在,在同一时间片上只有一个线程,所以多线程并发在python中就是一个美丽的梦。

线程与进程的区别:

    1. 线程共享创建它的进程的地址空间;进程有自己的地址空间。
    2. 线程可以直接访问其进程的数据段;进程有自己的父进程数据段的副本。
    3. 新线程很容易创建;新进程需要父进程fork。
    4. 线程可以对同一进程的线程进行相当大的控制;进程只能对子进程执行控制。
    5. 对主线程的更改(取消、优先级变更等)可能会影响进程的其他线程的行为;对父进程的更改不会影响子进程。

 

python多进程模型

    multiprocessing 是一个跨平台版本的多进程模块,multiprocessing模块提供了一个Process类来代表一个进程对象.

1. 进程的基本用法

 

#!_*_coding:utf-8_*_
# Author: hkey
from multiprocessing import Process    # 导入Process方法
import os

def run_proc():
    print('child process run %s (%s)' %(os.getpid(), os.getppid()))    # os.getpid获取当前进程的pid,os.getppid 获取当前进程的父进程pid


if __name__ == '__main__':
    print('parent process id: ', os.getpid())
    p = Process(target=run_proc)
    p.start()
    p.join()
    print('parent process %s done.' %os.getpid())

 

创建子进程时,只需要传入一个执行函数和函数的参数,创建一个Process实例,用start()方法启动。
join()方法可以等待子进程结束后再继续往下运行,通常用于进程间的同步,当使用join()方法时就阻塞了主进程,直到子进程执行完毕,再次执行主进程

 

2. 进程池Pool

    如果要启动大量子进程,可以用进程池的方式批量创建子进程:

 

#!_*_coding:utf-8_*_
# Author: hkey
from multiprocessing import Pool, Process
import os, time

def run_proc():
    print('Run task %s (%s)' %(os.getpid(), os.getppid()))
    start_time = time.time()
    time.sleep(1)
    print('Task %s runs %.2f seconds.' %(os.getpid(), time.time()-start_time))

if __name__ == '__main__':
    print('parent process %s' % os.getpid())
    p = Pool(4)
    for i in range(5):
        p.apply_async(run_proc)
    p.close()
    p.join()
    print('parent process %s done.' % os.getpid())


输出结果:

parent process 12980

Run task 8064 (12980)
Run task 9224 (12980)
Run task 11604 (12980)
Run task 13604 (12980)

Task 8064 runs 1.00 seconds.
Run task 8064 (12980)
Task 9224 runs 1.00 seconds.
Task 11604 runs 1.00 seconds.
Task 13604 runs 1.00 seconds.
Task 8064 runs 1.00 seconds.
parent process 12980 done.

 

上面的例子,进程池最大限制4个子进程,但是循环了5次。从结果可以看到首先建立了4个子进程,当其中一个退出后,再次创建第5个子进程。
特别注意的是,在使用进程池的时候,首先要调用close()方法,调用close()方法后就不能继续添加新的子进程了,然后再调用join()方法。

 

3. 进程的锁lock

    当多个进程需要访问共享资源的时候,Lock可以用来避免访问冲突。
    通过实例测试,在不加锁的情况下,多进程无论在读写同一个文件还是cpu计算都没有发生错误的现象。可能是我测试的量不够。

 

对文件写入:

 

#!_*_coding:utf-8_*_
#__author__:"hkey"
import multiprocessing, sys
def write1(f):
    fs = open(f, 'a+')
    n = 10000
    while n > 0:
        fs.write('aaaaaaaaaaaaaa\n')
        n -= 1
    fs.close()


def write2(f):
    fs = open(f, 'a+')
    n = 10000
    while n > 0:
        fs.write('bbbbbbbbbbbbbbbb\n')
        n -= 1
    fs.close()

if __name__ == '__main__':
    f = 'test.txt'
    p1 = multiprocessing.Process(target=write1, args=(f,))
    p2 = multiprocessing.Process(target=write2, args=(f,))
    p1.start()
    p2.start()

多进程在没有加锁的情况下,没有出现写入错误的现象。

多进程加锁的写法:

#!_*_coding:utf-8_*_
#__author__:"hkey"
import multiprocessing, sys
def write1(f, lock):
    lock.acquire()
    fs = open(f, 'a+')
    n = 10000
    while n > 0:
        fs.write('aaaaaaaaaaaaaa\n')
        n -= 1
    fs.close()
    lock.release()
def write2(f, lock):
    lock.acquire()
    fs = open(f, 'a+')
    n = 10000
    while n > 0:
        fs.write('bbbbbbbbbbbbbbbb\n')
        n -= 1
    fs.close()
    lock.release()
if __name__ == '__main__':
    lock = multiprocessing.Lock()
    f = 'test.txt'
    p1 = multiprocessing.Process(target=write1, args=(f,lock))
    p2 = multiprocessing.Process(target=write2, args=(f,lock))
    p1.start()
    p2.start()

 

 

个人总结:在多进程编程中,如果只是变量的计算或者cpu计算,可以不加锁,因为每个进程的地址空间都是独立的存在。
而在写入同一个文件的时候,就有必要加锁。

 

4. 子进程的控制

    子进程是独立与主进程的存在,创建子进程时,系统fork出子进程后,就与主进程资源完全独立了,我们不单单创建完子进程就行了,还要控制子进程
    的输入和输出。
   
    subprocess模块可以让我们非常方便的启动一个子进程,然后控制其输入和输出。

子进程输出:
使用subprocess模块,在子进程中运行 ping -n 1 baidu.com

 

#!_*_coding:utf-8_*_
# Author: hkey
import subprocess

print('$ ping -n 1 baidu.com')
r = subprocess.call(['ping', '-n', '1', 'baidu.com'])
print('Exit code:', r)

输出结果:

$ ping -n 1 baidu.com

正在 Ping baidu.com [111.13.101.208] 具有 32 字节的数据:
来自 111.13.101.208 的回复: 字节=32 时间=22ms TTL=51

111.13.101.208 的 Ping 统计信息:
    数据包: 已发送 = 1,已接收 = 1,丢失 = 0 (0% 丢失),
往返行程的估计时间(以毫秒为单位):
    最短 = 22ms,最长 = 22ms,平均 = 22ms
Exit code: 0

 

子进程输入:

print('$nslookup')
p = subprocess.Popen(['nslookup'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output, err = p.communicate(b'set q=mx\napache.org\nexit\n')    # 通过调用communicate方法实现输入,这里是二进制格式。
result = output if output else err
print(result.decode('gbk'))
print('Exit code:', p.returncode)

输出结果:
# 主机dns有问题,但是输出结果完全正确的。
$nslookup
默认服务器:  UnKnown
Address:  127.0.0.1

> > 服务器:  UnKnown
Address:  127.0.0.1

>
Exit code: 0

 

 

5. 进程间通信

虽然子进程从主进程fork后是独立的存在,但进程之间肯定是需要通信的。python的multiprocessing模块包装了底层的机制,提供了Queue、Pipes等方式
来交换数据。
进程间通信,最典型的例子就是消费者生产者模型, 生产者生产数据,消费者消费。

 

#!_*_coding:utf-8_*_
# Author: hkey
from multiprocessing import Process, Queue
import os, time
def write(q):
    for i in range(10):
        print('process to write: %s' % os.getpid())
        print('生产包子[%s]' %i)
        q.put(i)    # 将i上传至队列中。
        time.sleep(1)
def read(q):
    while True:
        print('process to read: %s' % os.getpid())
        values = q.get(True)    # 通过get方法将队列中的数据下载,从队列中拿走一个数据就少一个数据。
        print('吃掉包子[%s]' %values)
if __name__ == '__main__':
    q = Queue()        # 调用Queue方法生成一个队列
    pw = Process(target=write, args=(q,))    # 通过进程的方式调用
    pr = Process(target=read, args=(q,))
    pw.start()
    pr.start()
    pw.join()
    pr.terminate()    # 因为read()是死循环,需要通过terminate()方法关闭。
   
输出结果:

process to read: 16628
process to write: 16440
生产包子[0]
吃掉包子[0]
process to read: 16628
process to write: 16440
生产包子[1]
吃掉包子[1]
process to read: 16628
process to write: 16440
生产包子[2]
吃掉包子[2]
......

 

 

通过结果发现是两个进程通过队列在通信,生产一个包子,吃掉一个包子。

什么是生产者消费者模型

    在python中,生产者消费者模型是一个很典型而且很经典的例子。
    队列的概念就是在生产者和消费者中间加一个类似仓库的中间层,生产者不在直接对应消费者,而是生产者将生产好的东西放置到仓库中,而当消费者需要
    的时候,自己去仓库中取出东西就好。这样做有以下几个优点:

    1. 解耦
    2. 支持并发
    3. 支持忙闲不均

 

python多线程模型

    多任务可以由多进程完成,也可以由一个进程内的多线程完成。一个进程中至少有一个线程。线程是操作系统直接支持的执行单元。
    threading模块提供python对线程的使用

 

1. 线程的基本用法

    启动一个线程就是把一个函数传入并创建Thread实例,然后调用start()开始执行,语法和进程差不多

 

#!_*_coding:utf-8_*_
# Author: hkey
import threading, time

def run_thread():
    print('thread %s running...' % threading.current_thread().name)
    n = 0
    while n < 5:
        n += 1
        print('thread %s >>> %s' % (threading.current_thread().name, n))
        time.sleep(1)
    print('thread %s ended.' % threading.current_thread().name)

if __name__ == '__main__':
    print('threading %s is running...' % threading.current_thread().name)
    t = threading.Thread(target=run_thread, name='LoopThread')
    t.start()
    t.join()
    print('thread %s ended.' % threading.current_thread().name)
   

输出结果:

threading MainThread is running...
thread LoopThread running...
thread LoopThread >>> 1
thread LoopThread >>> 2
thread LoopThread >>> 3
thread LoopThread >>> 4
thread LoopThread >>> 5
thread LoopThread ended.
thread MainThread ended.

 

 

任何一个进程默认就有一个线程,我们把该线程称为主线程,主线程又可以启动新的线程,python的threading模块有个current_thread()函数,
它永远返回当前线程的实例。主线程的名字叫 MainThread,子线程的名字在创建时指定,我们用LoopThread命名子线程。

 

2. 线程的型号量

    相对于进程来说,线程占用的资源就很小,因此没有使用到线程池的概念,但是要实现类似线程池的功能可以使用线程的型号量来做限制
线程的信号量是同时允许一定数量的线程更改数据,主要作用在于限制线程的并发。

 

#!_*_coding:utf-8_*_
#__author__:"hkey"
import threading, time, os

sem = threading.BoundedSemaphore(3)    # 调用BoundedSemaphore方法限制3个线程的并发
def run():
    sem.acquire()    # 开始
    print('threading running', threading.current_thread().name)
    time.sleep(1)   
    sem.release()    # 结束
if __name__ == '__main__':
    for i in range(10):
        t = threading.Thread(target=run)
        t.start()

 

 

3. 线程的锁Lock

    由于线程是共享进程的地址空间,所以在多线程的环境下,锁就显得尤为重要。多线程编程,在不加锁的情况下,同时对一个全局变量做修改,基本上全是错误。

 

#!_*_coding:utf-8_*_
#__author__:"hkey"
import threading
balance = 0
def run_thread(n):
    global balance
    for i in range(10000000):
        balance = balance + n
        balance = balance - n

if __name__ == '__main__':
    t1 = threading.Thread(target=run_thread, args=(5,))
    t2 = threading.Thread(target=run_thread, args=(8,))
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    print(balance)

输出结果:
-86

 

 

多线程编程加锁实例如下:

 

#!_*_coding:utf-8_*_
#__author__:"hkey"
import threading
balance = 0
def run_thread(lock, n):
    lock.acquire()
    global balance
    for i in range(10000000):
        balance = balance + n
        balance = balance - n
    lock.release()
if __name__ == '__main__':
    lock = threading.Lock()
    t1 = threading.Thread(target=run_thread, args=(lock, 5))
    t2 = threading.Thread(target=run_thread, args=(lock, 8))
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    print(balance)

输出结果:
0

 

多线程锁总结:在多线程编程中,无论是变量的计算还是写入同一个文件都要加锁,使用多线程编程一定要注意锁的使用。

相关推荐