索引
本节内容以日常开发中常见的异步场景为基础, 给出Tornado定义的协程和异步示例, 其中的代码稍加修改就可以用到实际项目中. 另外, 本节内容不会对其中原理做进一步说明, 原理分析将放到下一节.
常用异步应用示例
- 非阻塞 sleep
- 用线程池处理阻塞操作
- 异步HTTP请求
-
IOLoop
事件(定时, 回调) - 长连接输出(
RequestHandler.flush
) - 后台定时任务
- 循环
非阻塞 sleep
# 下面三种方法实现的功能都是, 异步sleep 2秒, 然后输出 "i sleep 2s"
# 推荐的写法, `.gen.sleep`是`tornado.gen`对`IOLoop`操作的封装
class NonBlockSleep(tornado.web.RequestHandler):
@tornado.gen.coroutine
def get(self):
yield tornado.gen.sleep(2)
self.finish("i sleep 2s")
# 本质上和第一个方法几乎没差别, 相当于上面的原始版
class NonBlockSleep(tornado.web.RequestHandler):
@tornado.gen.coroutine
def get(self):
yield tornado.gen.Task(tornado.ioloop.IOLoop.current().add_timeout, time.time() + 2)
self.finish("i sleep 2s")
# 采用异步回调
class NonBlockSleep(tornado.web.RequestHandler):
@tornado.web.asynchronous
def get(self):
tornado.ioloop.IOLoop.instance().add_timeout(time.time() + 2, callback=self.awake)
def awake(self):
self.finish("i sleep 2s")
用线程池处理阻塞操作
这里需要用到一个新的包futures
, 通过pip install futures
安装即可.
单任务, 无回调, 需要用到阻塞操作结果
两种方式实现非阻塞计算, 完成计算后输出结果(不需要操作结果时, 把yield
和@coroutine
去掉即可)
# 使用 submit, 较原始的方式, 未经过Tornado封装
class CoroutineWithThreadPool(tornado.web.RequestHandler):
@property
def executor(self):
# 下面两种实际上是一样的
# return concurrent.futures.ThreadPoolExecutor(2)
return tornado.concurrent.futures.ThreadPoolExecutor(2)
@tornado.gen.coroutine
def get(self, *args, **kwargs):
s = time.time()
result = yield self.executor.submit(self._calculate, *(1,))
used_time = time.time() - s
self.finish('calculate completed, used %.3f s, result is %s' % (used_time, result))
def _calculate(self, num=0):
for i in xrange(100000000):
num += 1
return num
# 使用 run_on_executor , 更推荐这种做法
class CoroutineWithThreadPool(tornado.web.RequestHandler):
executor = concurrent.futures.ThreadPoolExecutor(2)
@tornado.gen.coroutine
def get(self, *args, **kwargs):
s = time.time()
result = yield self._calculate(1)
used_time = time.time() - s
self.finish('calculate completed, used %.3f s, result is %s' % (used_time, result))
@tornado.concurrent.run_on_executor
def _calculate(self, num=0):
for i in xrange(100000000):
num += 1
return num
单任务, 带回调, 需要用到阻塞操作结果, 蹩脚原始实现
class CoroutineWithThreadPool(tornado.web.RequestHandler):
@tornado.web.asynchronous
def get(self, *args, **kwargs):
future = self.executor.submit(self._calculate, *(1,))
tornado.ioloop.IOLoop.current().add_future(future, self.result_callback)
# 阻塞操作的回调
def block_callback(self):
print 'after block func callback'
# 获取阻塞操作的结果
def result_callback(self, future):
tornado.ioloop.IOLoop.current().add_callback(self.block_callback)
self.finish('the calculate result is |%s|' % future.result())
def _calculate(self, num=0):
for i in xrange(100000000):
num += 1
return num
多任务, 带回调, 需要用到阻塞操作结果
class CoroutineWithThreadPool(tornado.web.RequestHandler):
@property
def executor(self):
return concurrent.futures.ThreadPoolExecutor(2)
@property
def io_loop(self):
'''
使用run_on_executor并为future添加callback的时候, 需要设置`self.io_loop`属性
实际上`run_on_executor`也提供了给`io_loop`和`executor`改名的功能, 使用方法:
@property
def my_io_loop(self):
return tornado.ioloop.IOLoop.current()
@property
def my_executor(self):
return self.application.executor
@tornado.concurrent.run_on_executor(io_loop='my_io_loop', executor='my_executor')
def block_func(*args, **kwargs):
pass
callback直接在调用需要执行的函数时, 当做普通参数传入即可,
`run_on_executor`这个装饰器使用后会`pop`掉, 无须担心报错
'''
return tornado.ioloop.IOLoop.current()
@tornado.gen.coroutine
def get(self, *args, **kwargs):
s = time.time()
calculate_result, sleep_result = yield [
self._calculate(2, callback=self.executor_callback),
self._sleep(3),
]
'''
使用字典实现
multi_task_result = yield {
'calculate': self._calculate(1),
'sleep': self._sleep(3),
}
calculate_result, sleep_result = multi_task_result['calculate'], multi_task_result['sleep']
'''
print sleep_result
used_time = time.time() - s
self.finish('calculate and sleep completed used %.3f s, %s, the calculate result is %s' %
(used_time, sleep_result, calculate_result))
def executor_callback(self, future_result):
print 'future is done, and the result is |%s|.' % future_result
@tornado.concurrent.run_on_executor
def _calculate(self, num=0):
for i in xrange(100000000):
num += 1
return num
@tornado.concurrent.run_on_executor
def _sleep(self, seconds=0):
time.sleep(seconds)
return 'sleep used %s seconds' % seconds
异步HTTP请求
# 异步回调
class AsyncFetch(tornado.web.RequestHandler):
@tornado.web.asynchronous
def get(self):
http_client = tornado.httpclient.AsyncHTTPClient()
http_client.fetch("http://www.baidu.com", callback=self.on_response)
def on_response(self, response):
r = response
# body, 状态码, 请求耗时, headers
print r.body, r.code, r.request_time
print {k: v for k, v in r.headers.items()}
self.finish('fetch completed')
# 协程
class AsyncFetch(tornado.web.RequestHandler):
@tornado.gen.coroutine
def get(self):
http_client = tornado.httpclient.AsyncHTTPClient()
response = yield http_client.fetch("http://example.com")
self.on_response(response)
self.finish('fetch completed')
def on_response(self, response):
print response
# 原始实现
class AsyncFetch(tornado.web.RequestHandler):
def get(self, *args, **kwargs):
self._auto_finish = False
tornado.httpclient.AsyncHTTPClient.configure(
None,
defaults=dict(
user_agent="MyUserAgent"
),
max_clients=20,
)
client = tornado.httpclient.AsyncHTTPClient()
fetch_future = client.fetch('http://www.baidu.com', request_timeout=2)
# 下面两种方法均可以实现future done回调, 不过tornado更推荐`add_future`的做法
tornado.ioloop.IOLoop.current().add_future(fetch_future, callback=self.on_response)
# fetch_future.add_done_callback(self.on_response)
def on_response(self, future):
http_response = future.result()
print http_response
result = dict(http_response.headers)
result.update({'content': http_response.body})
# raise ValueError # 异常情况下,
self.finish(result)
IOLoop
事件(定时, 回调)
class IOLoopCallback(tornado.web.RequestHandler):
def get(self, *args, **kwargs):
print time.time()
io_loop = tornado.ioloop.IOLoop.current()
# 定时任务, 将任务丢给IOLoop, 3秒后执行
io_loop.add_timeout(io_loop.time() + 3, callback=functools.partial(self.callback_timeout))
# 回调任务, 将任务丢给IOLoop, 由下一个Loop调用
io_loop.add_callback(self.callback_next_loop, None)
# sleep 会阻塞 IOLoop, 所以上面的 `IOLoop.add_timeout` 是相对的,
# 如果一直阻塞, 就不可能及时响应
# time.sleep(4) # 阻塞实验
def callback_timeout(self):
print 'callback_timeout at the time %s' % time.time()
def callback_next_loop(self, useless=None):
print 'callback_next_loop at the time %s' % time.time()
长连接输出(RequestHandler.flush
)
class Flush(tornado.web.RequestHandler):
@tornado.gen.coroutine
def get(self):
self.write('<h1>sleeping...</h1>')
self.flush()
yield tornado.gen.sleep(2)
self.finish('<h1>awake</h1>')
后台定时任务
方式1:
@tornado.gen.coroutine
def do_something(func_name):
print 'from %s n do_something at %s' % (func_name, int(time.time()))
@tornado.gen.coroutine
def minute_loop1():
"""实际上循环周期是(60 + n)秒, n为`do_something`执行时间, 非严格60s"""
while True:
yield do_something(minute_loop1.__name__)
yield tornado.gen.sleep(1) # 开始计时, 并等待计时完成
@tornado.gen.coroutine
def minute_loop2():
"""比较严格的60s周期循环"""
while True:
sleep = tornado.gen.sleep(2) # 开始计时
yield do_something(minute_loop2.__name__) # 执行间隔协程任务
yield sleep # "等待"计时结束
# 启动方法
tornado.ioloop.IOLoop.current().spawn_callback(minute_loop1)
tornado.ioloop.IOLoop.current().spawn_callback(minute_loop2)
方式2:
# tornado.ioloop.PeriodicCallback(callback, callback_time, io_loop=None)
# 需要注意`callback_time`的单位是`微秒`, 一般`PeriodicCallback`是不执行`协程`任务的,
# 另外如果执行的`callback`耗时比`callback_time`还要长, 那么
# 应该到点执行的下一次`callback`会被跳过,并放回到执行列表中, 在下一次到点的时候执行
COUNT = 0
def periodic_callback_print():
global COUNT
if COUNT < 3:
COUNT += 1
time.sleep(2)
print 'i have been call back %s times and now is %s' % (COUNT, int(time.time()))
ms_loop_time = 1000
# 启动方法, 需要先创建任务, 然后才能启动
# 创建任务
periodic_schedules_one = tornado.ioloop.PeriodicCallback(periodic_callback_print, ms_loop_time)
# 启动
periodic_schedules_one.start()
# 确认状态
assert periodic_schedules_one.is_running()
# 停止
periodic_schedules_one.stop()
循环/迭代
Python 3.5之前, 在协程中实现迭代会比较麻烦, 你需要将循环的条件与yield
结果分离. 例如下面这个使用Motor(异步MongoDB驱动)的例子. 不过在Python 3.5+里面, 新增的async for
可以实现异步迭代.
import motor
db = motor.MotorClient().test
# Python 3.5- 实现
@gen.coroutine
def loop_example(collection):
cursor = db.collection.find()
while (yield cursor.fetch_next):
doc = cursor.next_object()
...
# Python 3.5+ 实现
async def loop_example(collection):
cursor = db.collection.find()
async for doc in cursor:
...
本节内容就是这些, 下节内容将分析Tornado协程和异步实现的部分源码.
NEXT ===> Tornado应用笔记04-浅析源码