如何提升python的处理速度?

导读:作为日常生产开发中非常实用的一门语言,python广泛应用于网络爬虫、web开发、自动化测试、数据分析和人工智能等领域。但python是单线程的,想要提升python的处理速度,涉及到一个很关键的技术——协程。本篇文章,将讲述python协程的理解与使用。

1、操作系统相关概念

  在理解与使用协程之前,先简单的了解几个与操作系统相关的概念,包括进程线程同步和异步阻塞与非阻塞。了解这些概念,对你学习协程、消息队列、缓存等知识都有一定的帮助。

(1)进程:

进程是操作系统分配资源的最小单位,系统由一个个程序(进程)组成的,一般而言,分为文本区域数据区域堆栈区域

  文本区域存储处理器执行的代码(机器码),通常来说,这是一个只读区域,防止运行的程序被意外的修改

  数据区域存储所有的变量和动态分配的内存,又细分为初始化的数据区(所有初始化的全局、静态、常量以及外部变量)和未初始化的数据区(初始化未0的全局变量和静态变量),初始化的变量最初保存在文本区,程序启动后被拷贝到初始化的数据区

  堆栈区域存储着活动过程调用的指令和本地变量,在地址空间里,栈区紧连着堆区,他们的增长方向相反,内存是线性的,所以我们的代码放在低地址的地方,由低向高增长,栈区大小不可预测,随开随用,因此放在高地址的地方,由高向低增长。当堆与栈指针重合的时候,意味着内存耗尽,造成内存溢出。

  进程的创建和销毁都非常的消耗系统资源,是一种比较昂贵的操作。进程为了自身能够得到运行,必须抢占式的争夺CPU。对于单核CPU而言,在同一时间内只能执行一个进程的代码,所以在单核CPU上实现多进程,是通过CPU的快速切换不同进程来实现的,看上去就像是多个进程同时执行。

  由于进程间是隔离的,各自拥有自己的内存资源,相比于线程的共享内存而言,要更安全,不同进程之间的数据只能通过IPC(Inter-Process Communication)进行通信共享

(2)线程

  线程是CPU调度的基本单位。如果进程是一个容器,线程就是运行在容器里面的程序,线程是属于进程的,同个进程的多个线程共享进程的内存地址空间

  线程间可以直接通过全局变量进行通信,所以相对来说,线程间通信是不太安全的,因此引入各种锁的场景,这里将不阐述

  当一个线程奔溃了,会导致整个进程也奔溃,即其它线程也挂了。这一点与进程不一样,一个进程挂了,其他进程照样执行

  在多核操作系统中,默认一个进程内只有一个线程,所以对多进程处理就像是一个进程一个核心

(3)同步和异步

  同步和异步关注的是消息通信机制,所谓同步,就是在发出一个函数调用时,在没有得到结果之前,该调用不会返回。一旦调用返回,就立即得到调用的返回值,即调用者主动等待调用结果

  所谓异步,就是在请求发出去后,这个调用就立即返回,但没有返回结果,通过回调的方式告知该调用的实际结果

  同步的请求,需要主动读写数据,并且等待结果;异步的请求,调用者不会立即得到结果。而是在调用发出后,被调用者通过状态、通知来告诉调用者,或通过回调函数处理这个调用

(4)阻塞与非阻塞

  阻塞和非阻塞关注的是程序在等待调用结果(消息,返回值)时的状态

  阻塞调用是指调用结果返回之前,当前线程会被挂起。调用线程只有在得到结果之后才会返回

  非阻塞调用指在得到不能立即得到结果之前,该调用不会阻塞当前线程。所以,区分的条件在于,进程/线程要访问的数据是否就绪,进程/线程是否需要等待

  非阻塞一般通过多路复用实现,多路复用由selectpollepoll几种实现方式

(5)协程

  了解完前面几个概念,再来看看协程的概念

  协程是属于线程的,又称微线程,纤程,英文名是coroutine。举个例子,在执行函数A时,我希望能随时终端去执行函数B,然后终端B的执行,切换回来执行函数A。这就是协程的作用,由调用者自有切换。这个切换过程并不等同于函数调用,因为它没有调用语句。执行方式与多线程类似,但是协程只有一个线程执行

  协程的优点是执行效率非常高,因为协程的切换是由程序自身控制,不需要切换线程,即没有切换线程的开销。同时,由于只有一个线程,不存在冲突的问题,不需要依赖锁(加锁和释放锁需要很多资源消耗)

  协程的主要使用场景在于处理io密集型程序,解决效率问题,不同于CPU密集型程序的处理。然而实际开发中这两种场景非常多,如果要充分发挥CPU的利用率,可以使用多进程+协程的方式,本文后续将讲到结合点

2、协程相关原理

  根据wikipedia的定义,协程是一个无优先级的子程序调度组件,允许子程序在特定的地方挂起恢复。所以理论上,只要内存足够,一个线程可以有任意多个协程,但同一时刻只能有一个协程在运行,多个协程分享该线程分配到的计算机资源。协程是为了充分发挥异步调用的优势,异步操作则是为了IO操作阻塞线程

(1)知识准备

  在了解原理前,先做一个知识的准备

  1)现代主流的操作系统几乎都是分时操作系统,即一台计算机采用时间片轮转的方式为多个用户提供服务,系统资源分配的基本单位是进程,CPU调度的基本单位是线程

  2)运行时内存空间氛围变量区、栈区、堆区。内存地址分配上,堆区从低到高,栈区从高到低

  3)计算机执行时一条条指令读取执行,执行到当前指令时,下一条指令的指令的地址在指令寄存器的IP中,ESP寄存值只想当前栈顶地址,EBP指向当前活动栈帧的基地址

  4)系统发生函数调用时操作为:先将入参从右往左一次压栈,然后把返回地址压栈,最后将当前EBP寄存器的值压栈,修改ESP寄存器的值,在栈区分配当前函数局部变量所需的空间

  5)协程的上下文包含属于当前协程的栈区和寄存器里面存放的值

(2)事件循环

  在python3.3中通过yield from使用协程,在3.5中,引入了关于协程的语法糖async/await的原理解析。其中,事件循环是一个核心所在,编写过js的同学,会对事件循环Eventloop更加了解,事件循环是一种等待程序分配消息或事件的编程架构。在python中,asyncio.coroutine修饰器用来标记作为协程的函数,这里的协程是和asyncio及其事件循环一起使用的,而在后续的发展中,async/await被使用的越来越广泛

(3)async/await

  async/await是使用python协程的关键,从结构上来看,asyncio实质上是一个异步框架,async/await是为异步框架提供API以方便使用者调用,所以使用者要想使用async/await编写协程代码,目前必须基于asyncio或其他异步库

(4)Future

  在实际开发编写异步代码时,为了避免太多回调方法导致的回调地狱,但又需要获取异步调用的返回结果,聪明的语言设计者设计了一个叫做Future的对象,封装了与loop的交互行为。其大致执行过程为:程序启动后,通过add_done_callback方法向epoll注册回调函数,当result属性得到返回值后,主动运行之前注册的回调函数,向上传递给coroutine。这个Future对象为asyncio.Future

  但是,要想取得返回值,程序必须恢复到工作状态,而由于Future对象本身的生存周期比较短,每一次注册回调、产生事件、触发回调过程后工作可能已经完成,所以用Future向生成器send result并不合适。这里又引入一个新的对象Task,保存在Future对象中,对生成器协程进行状态管理

  Python里另一个Future对象是concurrent.futures.Future,与asyncio.Future互不兼容,容易产生混淆。区别点在于,concurrent.futures是线程级的Future对象,当使用concurrent.futures.Executor进行多线程编程时,该对象用于在不同的thread之间传递结果

(5)Task

  上文中提到,Task是维护生成器协程状态处理执行逻辑的任务对象,Task中有一个_step方法,负责生成器协程与EventLoop交互过程的状态迁移,整个过程可以理解为:Task向协程send一个值,恢复其工作状态。当协程运行到断点后,得到新的Future对象,再处理futureloop的回调注册过程

(6)Loop

  在日常开发中,会有一个误区,认为每一个线程都可以有一个独立的loop。实际运行时,主线程才能通过asyncio.get_event_loop()创建一个新的loop,而在其他线程时,使用get_event_loop()却会抛错。正确的做法为通过asyncio.set_event_loop(),将当前线程与主线程loop显式绑定

如何提升python的处理速度?

 3、协程实战

  上面介绍完了协程相关的概念和原理,接下来看看如何使用,这里举一个实际场景的例子

场景:

  外部接受一些文件,每个文件里有一些数据,其中,这组数据需要通过http的方式,发向第三方平台,并获得结果

分析:

  由于同一文件的每一组数据没有前后的处理逻辑,在之前通过requests库发送的网络请求,串行执行,下一组数据的发送需要等待上一组数据的返回,显得整个文件的处理时间长,这种请求方式,完全可以由协程来实现

  为了更方便的配合协程发请求,我们使用aiohttp库来代替requests库,关于aiohttp,下面做简单介绍

aiohttp:

  aiohttp是asyncio和python的异步HTTP客户端/服务器,由于是异步的,经常用在服务器端接收请求,和客户端爬虫应用,发起异步请求,这里我们主要用来发请求

  aiohttp支持客户端和HTTP服务器,可以实现单线程并发IO操作,无需使用Callback Hell即可支持Server WebSockets和Client WebSockets,且具有中间件

4、代码实现

  直接上代码吧,talk is cheap,show me the code~

import aiohttp
import asyncio
from inspect import isfunction
import time
import logger

@logging_utils.exception(logger)
def request(pool, data_list):
    loop = asyncio.get_event_loop()
    loop.run_until_complete(exec(pool, data_list))


async def exec(pool, data_list):
    tasks = []
    sem = asyncio.Semaphore(pool)
    for item in data_list:
        tasks.append(
            control_sem(sem,
                        item.get("method", "GET"),
                        item.get("url"),
                        item.get("data"),
                        item.get("headers"),
                        item.get("callback")))
    await asyncio.wait(tasks)


async def control_sem(sem, method, url, data, headers, callback):
    async with sem:
        count = 0
        flag = False
        while not flag and count < 4:
            flag = await fetch(method, url, data, headers, callback)
            count = count + 1
            print("flag:{},count:{}".format(flag, count))
        if count == 4 and not flag:
            raise Exception(‘EAS service not responding after 4 times of retry.‘)


async def fetch(method, url, data, headers, callback):
    async with aiohttp.request(method, url=url, data=data, headers=headers) as resp:
        try:
            json = await resp.read()
            print(json)
            if resp.status != 200:
                return False
            if isfunction(callback):
                callback(json)
            return True
        except Exception as e:
            print(e)

这里,我们封装了对外发送批量请求的request方法,接收一次性发送的数据多少,和数据综合,在外部使用时,只需要构建好网络请求对象的数据,设定好请求池大小即可,同时,设置了重试功能,进行了4次重试,防治在网络抖动的时候,单个数据的网络请求发送失败

最终效果:

  在使用协程重构网络请求模块之后,当数据量在1000的时候,由之前的816s,提升到424s,快了一倍,且请求池大小加大的时候,效果更明显,由于第三方平台同时建立连接的数据限制,我们设定了40的阈值。可以看到,优化的程度很显著

文章来源:阿里技术(博主为了学习这篇文章,将阿里技术中的原文重新码了一遍,放到了自己的博客中,与原文相比,语句的表达方式有些出入,但内容是一致的)

相关推荐