Python并发之异步I/O(async,await)

Python并发之异步I/O(async,await)

背景

Python有很长一段的异步编程历史,特别是twistedgevent和一些无堆栈的Python项目。
异步编程因为一些好的原因在这些年来越来越受关注。尽管相对于传统的线性风格更难一点,但是却是值得的:因为异步编程有更高的效率。
举个例子:在Http请求方面,Python异步协程可以提交请求然后去做队列中其他等待的任务,让它慢慢请求,而不是传统的一直等它请求到完成为止,这样的话会浪费更多的时间与资源。总之异步编程能让你的代码在处于等待资源状态时处理其他任务。
在Python3.4中,asyncio产生了。而在Python3.5中,有加入了对async defawait新的语法支持,让我们看一看它们怎么工作的。

协程

在Python中,一个异步的函数我们通常叫它协程。之前我们在讲解yield的时候也已经讲过yield语法在协程中的基本使用了,这次同样是协程,但却是不同的语法。

#在Python 3.4中, 创建一个协程我们用asyncio.coroutine装饰器:
async def double(x):
    return x * 2

# 这是个协程对象
>>> double(6)
>>><coroutine object double at 0x115b59d58>

# 既然是协程,我们像之前yield协程那样,预激活一下(注意这里用next(double(6)预激活会报错)
>>> double(6).send(None)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
StopIteration: 12
# 好像差不多。

或者是这样的:

async def ping_server(ip):
    # ping code here...

# 用await语法代替"yield from"
async def ping_local():  
    return await ping_server('192.168.1.1')

建议使用Python3.5最新主流语法:async def ,await

asyncio的几个重要结构

我这里都用英文展示,原生,易理解。怎么翻译都绕口。

event loop:

An event loop essentially manages and distributes the execution of different tasks. It registers them and handles distributing the flow of control between them.

Coroutines:

Coroutines are special functions that work similarly Python generators that on await they release the flow of control back to the event loop. A coroutine needs to be scheduled to run using the event loop, to do this we create a Task, which is a type of Future.

Futures:

Futures are objects that represent the result of a task that may or may not have been executed. This result may be an exception.

理解基本架构很重要,理解协程,future,task的概念重中之重,至于什么内部实现,真的挺复杂的,无需关心。

在这边的时候我卡了很长时间:futureTask的区别是什么????

future在多线程说过,future表示终将发生的事情,而确定某件事会发生的唯一方式是执行的时间已经排定。(你不排定它,它就是个协程),那怎么排定?

BaseEventLoop.create_task(...) 或者 asyncio.ensure_future方法接收一个协程,排定它的运行时间,然后返回一个asyncio.Task 实例——也是 asyncio.Future 类的实例,因为 Task 是Future 的子类,用于包装协程。这与调用 Executor.submit(...) 方法创建Future实例是一个道理

这句话我读了好多遍,意思是不是说future跟task是同一样东西。对于event loop来说,一个包装了协程的future,就是循环中的一个task?我是这么理解的。

我们无法确定future啥时完成结束,但是总归结束(无论报错还是返回值)的,因为我们已经给它排定了时间。

例子1

少废话,直接看例子。

import asyncio
import time

start = time.time()


def tic():
    return 'at %1.1f seconds' % (time.time() - start)


async def gr1():
    # Busy waits for a second, but we don't want to stick around...
    print('gr1 started work: {}'.format(tic()))
    # 暂停两秒,但不阻塞时间循环,下同
    await asyncio.sleep(2)
    print('gr1 ended work: {}'.format(tic()))


async def gr2():
    # Busy waits for a second, but we don't want to stick around...
    print('gr2 started work: {}'.format(tic()))
    await asyncio.sleep(2)
    print('gr2 Ended work: {}'.format(tic()))


async def gr3():
    print("Let's do some stuff while the coroutines are blocked, {}".format(tic()))
    await asyncio.sleep(1)
    print("Done!")

# 事件循环
ioloop = asyncio.get_event_loop()

# tasks中也可以使用asyncio.ensure_future(gr1())..
tasks = [
    ioloop.create_task(gr1()),
    ioloop.create_task(gr2()),
    ioloop.create_task(gr3())
]
ioloop.run_until_complete(asyncio.wait(tasks))
ioloop.close()


output:
    gr1 started work: at 0.0 seconds
    gr2 started work: at 0.0 seconds
    Let's do some stuff while the coroutines are blocked, at 0.0 seconds
    Done!
    gr2 Ended work: at 2.0 seconds
    gr1 ended work: at 2.0 seconds
  • asyncio.wait(...) 协程的参数是一个由future或协程构成的可迭代对象;wait 会分别
    把各个协程包装进一个 Task 对象。最终的结果是,wait 处理的所有对象都通过某种方式变成 Future 类的实例。wait 是协程函数,因此返回的是一个协程或生成器对象。

  • ioloop.run_until_complete 方法的参数是一个future或协程。如果是协程,run_until_complete方法与 wait 函数一样,把协程包装进一个 Task 对象中。

  • 在 asyncio 包中,future和协程关系紧密,因为可以使用 yield from 从asyncio.Future 对象中产出结果。这意味着,如果 foo 是协程函数(调用后返回协程对象),抑或是返回Future 或 Task 实例的普通函数,那么可以这样写:res = yield from foo()。这是 asyncio 包的 API 中很多地方可以互换协程与期物的原因之一。 例如上面的例子中tasks可以改写成协程列表:tasks = [gr1(), gr(2), gr(3)]

详细的各个类说明,类方法,传参,以及方法返回的是什么类型都可以在官方文档上仔细研读,多读几遍,方有体会。

好好体会event loop中运作图.....如下所示:


async.jpg

例子2

例子1只是用来讲述语法,实际用途中还是得看这个例子。上节多线程与多进程的例子,获取有效网址。

import asyncio
import time
import aiohttp
import async_timeout

msg = "http://www.nationalgeographic.com.cn/photography/photo_of_the_day/{}.html"
headers = {
    'User-Agent': 'Mozilla/5.0 (Windows; U; Windows NT 6.1; en-US; rv:1.9.1.6) Gecko/20091201 Firefox/3.5.6'
}

urls = [msg.format(i) for i in range(4500, 5057)]

async def fetch(session, url):
    with async_timeout.timeout(10):
        async with session.get(url) as response:
            return response.status

async def main(url):
    async with aiohttp.ClientSession() as session:
            status = await fetch(session, url)
            return status

if __name__ == '__main__':
    start = time.time()
    loop = asyncio.get_event_loop()
    tasks = [main(url) for url in urls]
    # 返回一个列表,内容为各个tasks的返回值
    status_list = loop.run_until_complete(asyncio.gather(*tasks))
    print(len([status for status in status_list if status==200]))
    end = time.time()
    print("cost time:", end - start)

任重而道远

  • 在封闭的代码块中使用一些新的关键字就能实现异步功能
  • 我对于这一块还是处于小白状态,掌握不是很全面
  • 多Google,多Google,多Google.....

参考资料

http://stackabuse.com/python-async-await-tutorial/
https://hackernoon.com/asyncio-for-the-working-python-developer-5c468e6e2e8e
https://docs.python.org/3/library/asyncio-task.html
http://quietlyamused.org/blog/2015/10/02/async-python/

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容