Python并发编程之协程/异步IO

原文://www.greatytc.com/p/4e048726b613

引言

随着node.js的盛行,相信大家今年多多少少都听到了异步编程这个概念。Python社区虽然对于异步编程的支持相比其他语言稍显迟缓,但是也在Python3.4中加入了asyncio,在Python3.5上又提供了async/await语法层面的支持,刚正式发布的Python3.6中asyncio也已经由临时版改为了稳定版。下面我们就基于Python3.4+来了解一下异步编程的概念以及asyncio的用法。

什么是协程

通常在Python中我们进行并发编程一般都是使用多线程或者多进程来实现的,对于计算型任务由于GIL的存在我们通常使用多进程来实现,而对与IO型任务我们可以通过线程调度来让线程在执行IO任务时让出GIL,从而实现表面上的并发。

其实对于IO型任务我们还有一种选择就是协程,协程是运行在单线程当中的“并发”,协程相比多线程一大优势就是省去了多线程之间的切换开销,获得了更大的运行效率。Python中的asyncio也是基于协程来进行实现的。在进入asyncio之前我们先来了解一下Python中怎么通过生成器进行协程来实现并发。

example1

我们先来看一个简单的例子来了解一下什么是协程(coroutine),对生成器不了解的朋友建议先看一下Stackoverflow上面的这篇高票回答

>>> def coroutine():...reply =yield'hello'...yieldreply

...

>>>c = coroutine()

>>>next(c)'hello'

>>>c.send('world')'world'

example2

下面这个程序我们要实现的功能就是模拟多个学生同时向一个老师提交作业,按照传统的话我们或许要采用多线程/多进程,但是这里我们可以采用生成器来实现协程用来模拟并发。

fromcollectionsimportdeque

def student(name, homeworks):

forhomeworkinhomeworks.items():

yield(name, homework[0], homework[1])# 学生"生成"作业给老师

class Teacher(object):

def __init__(self, students):

self.students = deque(students)

def handle(self):

"""老师处理学生作业"""

whilelen(self.students):

student = self.students.pop()

try:

homework = next(student)

print('handling', homework[0], homework[1], homework[2])

exceptStopIteration:

pass

else:

self.students.appendleft(student)

下面我们来调用一下这个程序。

Teacher([

student('Student1', {'math':'1+1=2','cs':'operating system'}),

student('Student2', {'math':'2+2=4','cs':'computer graphics'}),

student('Student3', {'math':'3+3=5','cs':'compiler construction'})

]).handle()

这是输出结果,我们仅仅只用了一个简单的生成器就实现了并发(concurrence),注意不是并行(parallel),因为我们的程序仅仅是运行在一个单线程当中。

handling Student3 cs compiler construction

handling Student2 cs computer graphics

handling Student1 cs operating system

handling Student3 math 3+3=5

handling Student2 math 2+2=4

handling Student1 math 1+1=2

使用asyncio模块实现协程

从Python3.4开始asyncio模块加入到了标准库,通过asyncio我们可以轻松实现协程来完成异步IO操作。

解释一下下面这段代码,我们自己定义了一个协程display_date(num, loop),然后它使用关键字yield from来等待协程asyncio.sleep(2)的返回结果。而在这等待的2s之间它会让出CPU的执行权,直到asyncio.sleep(2)返回结果。gather()或者wait()来返回future的执行结果。

# coroutine.pyimportasyncioimportdatetime

@asyncio.coroutine  # 声明一个协程def display_date(num, loop):

end_time = loop.time() +10.0

whileTrue:

print("Loop: {} Time: {}".format(num, datetime.datetime.now()))

if(loop.time() +1.0) >= end_time:

break

yieldfromasyncio.sleep(2)# 阻塞直到协程sleep(2)返回结果

loop = asyncio.get_event_loop()# 获取一个event_loop

tasks = [display_date(1, loop), display_date(2, loop)]

loop.run_until_complete(asyncio.gather(*tasks))# 阻塞直到所有的tasks完成

loop.close()

下面是运行结果,注意到并发的效果没有,程序从开始到结束只用大约10s,而在这里我们并没有使用任何的多线程/多进程代码。在实际项目中你可以将asyncio.sleep(secends)替换成相应的IO任务,比如数据库/磁盘文件读写等操作。

ziwenxie :: ~ » python coroutine.py

Loop:1Time:2016-12-1916:06:46.515329

Loop:2Time:2016-12-1916:06:46.515446

Loop:1Time:2016-12-1916:06:48.517613

Loop:2Time:2016-12-1916:06:48.517724

Loop:1Time:2016-12-1916:06:50.520005

Loop:2Time:2016-12-1916:06:50.520169

Loop:1Time:2016-12-1916:06:52.522452

Loop:2Time:2016-12-1916:06:52.522567

Loop:1Time:2016-12-1916:06:54.524889

Loop:2Time:2016-12-1916:06:54.525031

Loop:1Time:2016-12-1916:06:56.527713

Loop:2Time:2016-12-1916:06:56.528102

在Python3.5中为我们提供更直接的对协程的支持,引入了async/await关键字,上面的代码我们可以这样改写,使用async代替了@asyncio.coroutine,使用了await代替了yield from,这样我们的代码变得更加简洁可读。

importasyncioimportdatetime

asyncdef display_date(num, loop):# 声明一个协程

end_time = loop.time() +10.0

whileTrue:

print("Loop: {} Time: {}".format(num, datetime.datetime.now()))

if(loop.time() +1.0) >= end_time:

break

awaitasyncio.sleep(2)# 等同于yield from

loop = asyncio.get_event_loop()# 获取一个event_loop

tasks = [display_date(1, loop), display_date(2, loop)]

loop.run_until_complete(asyncio.gather(*tasks))# 阻塞直到所有的tasks完成

loop.close()

asyncio模块的其他方法

开启事件循环有两种方法,一种方法就是通过调用run_until_complete,另外一种就是调用run_forever。run_until_complete内置add_done_callback,使用run_forever的好处是可以通过自己自定义add_done_callback,具体差异请看下面两个例子。

run_until_complete()

importasyncio

asyncdef slow_operation(future):

awaitasyncio.sleep(1)

future.set_result('Future is done!')

loop = asyncio.get_event_loop()

future = asyncio.Future()

asyncio.ensure_future(slow_operation(future))

print(loop.is_running())# False

loop.run_until_complete(future)

print(future.result())

loop.close()

run_forever()

run_forever相比run_until_complete的优势是添加了一个add_done_callback,可以让我们在task(future)完成的时候调用相应的方法进行后续处理。

importasyncio

asyncdef slow_operation(future):

awaitasyncio.sleep(1)

future.set_result('Future is done!')

def got_result(future):

print(future.result())

loop.stop()

loop = asyncio.get_event_loop()

future = asyncio.Future()

asyncio.ensure_future(slow_operation(future))

future.add_done_callback(got_result)try:

loop.run_forever()finally:

loop.close()

这里还要注意一点,即使你调用了协程方法,但是如果事件循环没有开启,协程也不会执行,参考官方文档的描述,我刚被坑过。

Calling a coroutine does not start its code running – the coroutine object returned by the call doesn’t do anything until you schedule its execution. There aretwobasic ways to start it running: call await coroutine or yield from coroutine from another coroutine (assuming the other coroutine is already running!), or schedule its execution using theensure_future()function or theAbstractEventLoop.create_task()method. Coroutines (and tasks) can only run when the event loop is running.

Call

call_soon()

importasyncio

def hello_world(loop):

print('Hello World')

loop.stop()

loop = asyncio.get_event_loop()

# Schedule a call to hello_world()

loop.call_soon(hello_world, loop)

# Blocking call interrupted by loop.stop()

loop.run_forever()

loop.close()

下面是运行结果,我们可以通过call_soon提前注册我们的task,并且也可以根据返回的Handle进行cancel。

Hello World

call_later()

importasyncioimportdatetime

def display_date(end_time, loop):

print(datetime.datetime.now())

if(loop.time() +1.0) < end_time:

loop.call_later(1, display_date, end_time, loop)

else:

loop.stop()

loop = asyncio.get_event_loop()

# Schedule the first call to display_date()

end_time = loop.time() +5.0

loop.call_soon(display_date, end_time, loop)

# Blocking call interrupted by loop.stop()

loop.run_forever()

loop.close()

改动一下上面的例子我们来看一下call_later的用法,注意这里并没有像上面那样使用while循环进行操作,我们可以通过call_later来设置每隔1秒去调用display_date()方法。

2016-12-24 19:17:13.421649

2016-12-24 19:17:14.422933

2016-12-24 19:17:15.424315

2016-12-24 19:17:16.425571

2016-12-24 19:17:17.426874

Chain coroutines

importasyncio

asyncdef compute(x, y):

print("Compute %s + %s ..."% (x, y))

awaitasyncio.sleep(1.0)# 协程compute不会继续往下面执行,直到协程sleep返回结果

returnx + y

asyncdef print_sum(x, y):

result =awaitcompute(x, y)# 协程print_sum不会继续往下执行,直到协程compute返回结果

print("%s + %s = %s"% (x, y, result))

loop = asyncio.get_event_loop()

loop.run_until_complete(print_sum(1,2))

loop.close()

下面是输出结果

ziwenxie :: ~ » python chain.py

Compute 1 + 2 ...

1 + 2 = 3

如何将同步的代码改成异步

结合上面提到的内容下面来小结一下如何将同步的代码改成异步

同步模型

def handle(id):

subject = get_subject_from_db(id)# 1

buyinfo = get_buyinfo(id)# 2

change = process(subject, buyinfo)

notify_change(change)

flush_cache(id)

上面是一个典型的同步编程模型,每个步骤必须建立在上一个步骤完成的前提,但是注意到步骤1和步骤2之间并没有任何的关系,所以可以将这两个IO型改成异步的,让两者可以并发进行。

异步模型

# 先要将get_subject_from_db, get_buyinfo, process, notify_change修改成协程函数/方法importasyncio

def handle(id):

subject = asyncio.ensure_future(get_subject_from_db(id))# 1

buyinfo = asyncio.ensure_future(get_buyinfo(id))# 2

results = asyncio.gather(subject, buyinfo)

change =awaitprocess(results)

awaitnotify_change(change)

loop.call_soon(flush_cache(id))

使用ensure_future, loop.crate_task, Task可以将协程包装成一个Future对象,这里我们选择ensure_future。

Queue

在asyncio使用Queue来模拟生产者-消费者模式:

importasyncioimportrandom

asyncdef produce(queue, n):

forxinrange(n):

# produce an item

print('producing {}/{}'.format(x, n))

# simulate i/o operation using sleep

awaitasyncio.sleep(random.random())

item = str(x)

# put the item in the queue

awaitqueue.put(item)

asyncdef consume(queue):

whileTrue:

# wait for an item from the producer

item =awaitqueue.get()

# process the item

print('consuming {}...'.format(item))

# simulate i/o operation using sleep

awaitasyncio.sleep(random.random())

# Notify the queue that the item has been processed

queue.task_done()

asyncdef run(n):

queue = asyncio.Queue()

# schedule the consumer

consumer = asyncio.ensure_future(consume(queue))

# run the producer and wait for completion

awaitproduce(queue, n)

# wait until the consumer has processed all items

awaitqueue.join()

# the consumer is still awaiting for an item, cancel it

consumer.cancel()

loop = asyncio.get_event_loop()

loop.run_until_complete(run(10))

loop.close()

实战

by the way:在asyncio中使用requests没有任何意义,requests是基于同步实现的,目前也没有要支持asyncio的动向,如果要充分发回异步的威力,应该使用aiohttp。而且也要合理使用concurrent.futures模块提供的线程池/进程池。

Asyncio+Aiohttp

importaiohttpimportasyncioimporttime

NUMBERS = range(12)

URL ='http://httpbin.org/get?a={}'

asyncdef fetch_async(a):

asyncwithaiohttp.request('GET', URL.format(a))asr:

data =awaitr.json()

returndata['args']['a']

start = time.time()

event_loop = asyncio.get_event_loop()

tasks = [fetch_async(num)fornuminNUMBERS]

results = event_loop.run_until_complete(asyncio.gather(*tasks))

fornum, resultinzip(NUMBERS, results):

print('fetch({}) = {}'.format(num, result))

print('Use asyncio+aiohttp cost: {}'.format(time.time() - start))

下面是运行结果:

ziwenxie :: ~ » python example1.py

fetch(0) =0

fetch(1) =1

fetch(2) =2

fetch(3) =3

fetch(4) =4

fetch(5) =5

fetch(6) =6

fetch(7) =7

fetch(8) =8

fetch(9) =9

fetch(10) =10

fetch(11) =11

Use asyncio+aiohttp cost:0.8980867862701416

Requests+Pool

如果使用传统的Requests和ThreadPool/ProcessPool方式的话,由于多线程/多进程之间切换的开销速度会慢了许多。

importrequestsimporttimefromconcurrent.futuresimportThreadPoolExecutor

NUMBERS = range(12)

URL ='http://httpbin.org/get?a={}'

def fetch(a):

r = requests.get(URL.format(a))

returnr.json()['args']['a']

start = time.time()withThreadPoolExecutor(max_workers=3)asexecutor:

fornum, resultinzip(NUMBERS, executor.map(fetch, NUMBERS)):

print('fetch({}) = {}'.format(num, result))

print('Use requests+ThreadPoolExecutor cost: {}'.format(time.time() - start))

线程池的执行结果:

ziwenxie :: ~ » python example2.py

fetch(0) =0

fetch(1) =1

fetch(2) =2

fetch(3) =3

fetch(4) =4

fetch(5) =5

fetch(6) =6

fetch(7) =7

fetch(8) =8

fetch(9) =9

fetch(10) =10

fetch(11) =11

Use requests+ThreadPoolExecutor cost:3.356502056121826

进程池的执行结果:

fetch(0) = 0

fetch(1) = 1

fetch(2) = 2

fetch(3) = 3

fetch(4) = 4

fetch(5) = 5

fetch(6) = 6

fetch(7) = 7

fetch(8) = 8

fetch(9) = 9

fetch(10) = 10

fetch(11) = 11

Use requests+ProcessPoolExecutor cost: 3.2979931831359863

Asyncio+Requests+Pool

虽然上面提到requests不支持异步,但是在某些情形需要控制event loop中运行在单独的线程/进程中的function会阻塞直到这些function返回结果,这个时候可以结合run_in_executor()和wait()来进行控制。

p.s:下面这个例子在处理纯IO任务的时候并没有太多的意义,只是为了理解如何在不支持异步的模块中引入异步的概念。

importasyncioimportrequestsimporttimefromconcurrent.futuresimportThreadPoolExecutor

NUMBERS = range(12)

URL ='http://httpbin.org/get?a={}'

def fetch(a):

r = requests.get(URL.format(a))

returnr.json()['args']['a']

asyncdef run_scraper_tasks(executor):

loop = asyncio.get_event_loop()

blocking_tasks = []

fornuminNUMBERS:

task = loop.run_in_executor(executor, fetch, num)

task.__num = num

blocking_tasks.append(task)

completed, pending =awaitasyncio.wait(blocking_tasks)

results = {t.__num: t.result()fortincompleted}

fornum, resultinsorted(results.items(), key=lambdax: x[0]):

print('fetch({}) = {}'.format(num, result))

start = time.time()

executor = ThreadPoolExecutor(3)

event_loop = asyncio.get_event_loop()

event_loop.run_until_complete(

run_scraper_tasks(executor)

)

print('Use asyncio+requests+ThreadPoolExecutor cost: {}'.format(time.time() - start))

结果可想而知与requests+ThreadPoolExecutor执行速度上并没有太多的差别,因为我们的IO任务还是放在对应的子线程中去处理的,只是这里通过wait引入了异步的概念,但是在某些场景可以取得更大自由度程度的控制。

fetch(0) =0

fetch(1) =1

fetch(2) =2

fetch(3) =3

fetch(4) =4

fetch(5) =5

fetch(6) =6

fetch(7) =7

fetch(8) =8

fetch(9) =9

fetch(10) =10

fetch(11) =11

Use asyncio+requests+ThreadPoolExecutor cost:3.614989995956421

Semaphore

爬虫一次性的产生过多的请求账号/IP很快就会被封掉,可以考虑使用Semaphore控制同时的并发量,与我们熟悉的threading模块中的Semaphore(信号量)用法类似。

importaiohttpimportasyncio

NUMBERS = range(12)

URL ='http://httpbin.org/get?a={}'

sema = asyncio.Semaphore(3)

asyncdef fetch_async(a):

asyncwithaiohttp.request('GET', URL.format(a))asr:

data =awaitr.json()

returndata['args']['a']

asyncdef print_result(a):

with(awaitsema):

r =awaitfetch_async(a)

print('fetch({}) = {}'.format(a, r))

loop = asyncio.get_event_loop()

f = asyncio.wait([print_result(num)fornuminNUMBERS])

loop.run_until_complete(f)

可以到后台看到并发受到了信号量的限制,同一时刻一般只处理三个请求。

References

DOCUMENTATION OF ASYNCIO1

DOCUMENTATION OF ASYNCIO2

COROUTINES AND ASYNC/AWAIT

GOLD-XITU1

GOLD-XITU2

STACKOVERFLOW

PyMOTW-3

500LINES

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 197,368评论 5 462
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 82,941评论 2 374
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 144,369评论 0 326
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,848评论 1 267
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,719评论 5 358
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,505评论 1 275
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,904评论 3 388
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,528评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,819评论 1 293
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,848评论 2 314
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,652评论 1 328
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,468评论 3 316
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,912评论 3 300
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,095评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,389评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,906评论 2 343
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 41,120评论 2 339

推荐阅读更多精彩内容