为什么用Tornado?
异步编程原理
服务器同时要对许多客户端提供服务,他的性能至关重要。而服务器端的处理流程,只要遇到了I/O操作,往往需要长时间的等待。
当然,我们可以用多线程/多进程达到类似的目的,但线程和进程都是系统控制的,消耗资源较多,而且何时运行,何时挂起不由程序本身做主,调度开销较大。我们希望将多任务流程的调度工作方法放到自己的代码里,精确的控制他的行踪,与线程相似,我们称这种任务流程为协程。
协程实在一个线程之内,无需操作系统参与,由程序自身调度的执行单位。
按照上述模式,在一个进程之内同时处理多个协程,充分利用CPU时间,就是我们需要的异步编程。
底层依赖epoll
在Linux下,底层对一个耗时操作(如网络访问)的处理流程为:
发起访问,将网络连接的文件描述符和期待事件注册到epoll里。当期待事件发生,epoll触发事件处理机制,通过回调函数通知tornado,tornado切换协程。
用生成器实现协程
要实现上述异步的操作,必须判断两个点:
1.等待开始----协程走到这里,去epoll注册,将CPU的控制权让给别的协程。
2.等待结束----协程走到这里,接到epoll回调,开始请求CPU的控制权,执行后续操作。
在Tornado 里,用一个yield语句来表示这两个点。程序开始等待时,向外yield一个Future对象,知道等待结束才回来执行yield的下一条语句。
Tornado异步HTTP服务器
Tornado最大用途当然是HTTP服务器
异步HTTP服务器
#http_0.py
from tornado.ioloop import IOLoop
from tornado import gen,web
class ExampleHandler(web.RequestHandler):
@gen.coroutine
def get(self):
delay = self.get_argument('delay', 5)
yield gen.sleep(int(delay))
self.write({"status":1, "msg":"success"})
self.finish()
# @gen.coroutine
# def post(self):
# pass
application = web.Application([
(r"/example", ExampleHandler),
#(r"/other", OtherHandler),
],autoreload = True)
application.listen(8765)
IOLoop.current().start()
运行以上代码,用浏览器访问http://localhost:8765/example?delay=1
即可在延迟1秒之后得到返回结果{
"status": 1,
"msg": "success"
}。
delay传入是几就延迟几秒,如果没有传入,默认值为5。
在这个例子里,我们用gen.sleep实现延迟。gen.sleep与time.sleep的用法相似,区别在于它是异步的,只阻塞当前协程,等待一段时间,但不影响同一进程内的其他协程的执行。
可以认为gen.sleep是time.sleep的一个异步版本。在tornado开发中,我们经常需要使用某个内含等待的函数的异步版本,以免同步的等待阻塞住整个进程。这些异步的操作都返回Future对象。
检验他能否并发服务
打开两个浏览器(不要使用同一个浏览器的两个标签页),各自在地址栏中输入http://localhost:8765/example?delay=10
尽可能快的在两个浏览器里先后敲回车加载页面,你会发现,两个页面都是在你开始加载的10秒之后返回,两次加载的总用时是10秒稍多,远不到20秒,这说明我们的事例程序虽然只有一个进程,一个线程,却能在第一个请求完成之前开始处理第二个请求。
对代码的详细解说
tornado与webapp一样,用一个继承于web.RequestHandler的类构造处理HTTP访问的handler,不同于Django,flask,bottle等用函数构造handler。在这个类里,我们重载get方法来处理GET请求,也可以重载post, put,delete等其他HTTP方法。
你大概注意到了,重载的方法前都加了@gen.coroutine这样一个装饰器,他的作用就是把一个普通的函数变成一个返回Future对象的函数,即异步函数。
异步函数一定要用yield调用,而且只有在另一个异步函数之内的调用才能起作用。
用self.write输出执行结果。如果输出的是一个字典,tornado会自动把它变成JSON串
正常结束的HTTP调用须以self.finish()结束。但如果是渲染模版或者跳转到其他网站,不应该添加self.finish().
接下来要构造Application对象,然后监听端口,启动消息循环,服务器就能运行起来了。
创建Appliaction对象时可设置这些参数
autoreload:若设为True,在程序运行起来之后,每次编辑代码并保存时,可以自动重新运行
debug:若设制为true,会把运行出错信息打印在屏幕上。
cookie_secret:用于cookie加密的密钥,形如:“ofjf939.m%dw$#3fdn923hrfsp309-[2”。
static_path:静态文件路径。如果与当前文件同一路径,可设为:os.path.dirname(file).
xsrf_cookies:xsrf检测
多进程服务器
前面讨论过,一个典型的单进程,单线程。需访问数据库的异步Tornado服务器每秒可以响应500个请求,在较低配置服务器上实际的负载能力大概也是这个数字。如果需要更高的负载能力,且服务器有多个CPU,应开启多个服务进程。
只要将
application.listen(8765)
改为
from tornado.httpserver import HTTPServer
server = HTTPServer(application)
server.bind(8765)
server.start(4)
就能同时启动4个进程了。
理论上讲,在一个线程里也能运行多个协程,可以作出「多进程 × 多线程 × 多协程」的模式。而实际上,协程可以完全代替线程,「多进程 × 多协程」已经能够充分利用服务器的硬件资源。
流式响应的HTTP服务器
如果响应数据较大,为了节约内存,或者是各部分数据的返回要有一个时间差,我们需要将数据分成多次发送。
#http_stream.py
from tornado.ioloop import IOLoop
from tornado import gen,web
class ExampleHandler(web.RequestHandler):
@gen.coroutine
def get(self):
for _ in range(5):
yield gen.sleep(1)
self.write('zzzzzzzzzzzz<br>')
self.flush()
self.finish()
application = web.Application([
(r"/example", ExampleHandler),
],autoreload = True)
application.listen(8765)
IOLoop.current().start()
演示及解说
与前面的http_0.py相比,明显多了一行self.flush().他的作用是将此前由self.write()写入缓冲区的内容发送出去。在self.finish()结束这次响应之前,可以多次调用self.write和self.flush,逐步发送数据.
运行这段代码,用浏览器访问http://localhost:8765/example
可以看到页面每隔一秒打印一行输出。如果注释掉self.flush()在运行,就会等到5秒时候才将5行输出同时打印出来.
访问数据库
#http_db.py
from tornado.ioloop import IOLoop
from tornado import gen,web
from tornado_mysql import pools
connParam = {'host':'localhost', 'port':3306, 'user':'root', 'passwd':'123456', 'db':'testdb'}
class GetUserHandler(web.RequestHandler):
POOL = pools.Pool(
connParam,
max_idle_connections=1,
max_recycle_sec=3,
)
@gen.coroutine
def get(self):
userid = self.get_argument('id')
cursor = yield self.POOL.execute('select name from user where id = %s', userid)
if cursor.rowcount > 0:
self.write( { "status": 1, "name": cursor.fetchone()[0] } )
else:
self.write( { "status": 0, "name": "" } )
self.finish()
application = web.Application([
(r"/getuser", GetUserHandler),
],autoreload = True)
application.listen(8765)
IOLoop.current().start()
以上代码从mysql库中读取数据。
演示
首先,安装tornado-MySQL
然后,在你的mysql中建立一个名为user的表,至少有两个字段:一个整数型的id,和一个字符串型的name。
编辑http_db.py,将mysql的连接参数填入connParam中。运行http_db.py,用浏览器访问:http://localhost:8765/getuser?id=1
如果你的user表中有id=1的数据,他的name字段是jim,你将看到返回值:{
"status": 1,
"name": "jim"
}
如果没有找到数据,你将看到返回值:{
"status": 0,
"name": ""
}
需要ORM?
很多人喜欢通过ORM访问数据库,而Tornado没有提供异步的ORM工具。
这是访问mysql的例子,如果你要使用postgresql或其他数据库,也需要先安装这些数据库的tornado接口,就像tornado_mysql一样。
访问网路
http_req.py
from tornado.ioloop import IOLoop
from tornado import gen,web
from tornado.httpclient import AsyncHTTPClient
url = 'http://hq.sinajs.cn/list=sz000001'
class GetPageHandler(web.RequestHandler):
@gen.coroutine
def get(self):
client = AsyncHTTPClient()
response = yield client.fetch(url, method='GET')
self.write(response.body.decode('gbk'))
self.finish()
application = web.Application([
(r"/getpage", GetPageHandler),
],autoreload = True)
application.listen(8765)
IOLoop.current().start()
演示
运行http_req.py,用浏览器访问:http://localhost:8765/getpage,即可看到从http://hq.sinajs.cn/list=sz000001抓取的内容。
解说
Tornado有一个AsyncHTTPClient用于访问其他网页,用法比较简单。他在fetch方法中等待远端网页返回内容,因此也要以yield调用。
tornado用户认证
#http_auth.py
from tornado.ioloop import IOLoop
from tornado import gen,web
class LoginHandler(web.RequestHandler):
@gen.coroutine
def get(self):
self.set_secure_cookie('username', 'Jim')
self.write('login ok.')
self.finish()
class LogoutHandler(web.RequestHandler):
@gen.coroutine
def get(self):
self.clear_cookie('username')
self.write('logout ok.')
self.finish()
class WhoHandler(web.RequestHandler):
def get_current_user(self):
if self.get_secure_cookie('username'):
return str(self.get_secure_cookie('username'), encoding = "utf-8")
else:
return 'unknown'
@gen.coroutine
def get(self):
self.write('you are ' + self.current_user)
self.finish()
application = web.Application([
(r"/login", LoginHandler),
(r"/logout", LogoutHandler),
(r"/whoami", WhoHandler),
],autoreload = True,
cookie_secret="feljjfesrh48thfe2qrf3np2zl90bmw")
application.listen(8765)
IOLoop.current().start()
演示
1.运行http_auth.py,用浏览器访问http://localhost:8765/whoami,可以看到输出:you are unknown,说明还没有登录
2.访问http://localhost:8765/login,看到:login ok.登录成功。在访问http://localhost:8765/whoami,这是会看到: you are Jim,说明你已经以Jim身份登录了。
3.访问http://localhost:8765/logout,看到:logout ok,退出登录,在访问http://localhost:8765/whoami,又变回了:you are unknown,说明你已经退出了登录。
解说
你知道服务端通常用 cookie 保存用户的身份信息。login 时创建 cookie;logout 时清除 cookie;需要检查用户身份时,读取 cookie。
在 tornado 里,创建 cookie 通常用 set_secure_cookie,这样创建的 cookie 是加密的。与之对应的读取加密 cookie 的方法是 get_secure_cookie。
为了给 cookie 加密,要在创建 Application 时添加 cookie_secret 属性,这是加密的密钥,它的值用一串乱写的字符就行了。
清除 cookie 用 clear_cookie。
在 RequestHandler 里有一个 get_current_user 方法,它会在 get / post 之前调用,其返回值会赋予 current_user 属性。我们在 WhoHandler 里重载了 get_current_user,后面在 get 里就能直接使用 self.current_user 了。
tornado定时任务
定时任务分两种,一种是每隔一定的时间周期性地执行,另一种是在某个钟点单次执行。
周期性定时任务
#cron_0.py
from tornado import ioloop, gen
@gen.coroutine
def Count():
print("1 second has gone.")
if __name__ == '__main__':
ioloop.PeriodicCallback(Count, 1000).start()
ioloop.IOLoop.current().start()
演示及解说
在启动消息循环之前,用PeriodicCallback设定每1000毫秒执行一次异步函数Count。直接运行,每过一秒会打印一行:1 second has gone.
单次定时任务
#cron_1.py
from tornado import ioloop, gen
from time import time
@gen.coroutine
def Ring():
print('it\'s time to get up')
if __name__ == '__main__':
loop = ioloop.IOLoop.current()
loop.call_at(time() + 5, Ring)
loop.start()
演示及解说
在启动消息循环之前,用call_at设定5秒后执行一次异步函数Ring。如果想在明早9点执行,需要输入明早9点的unix时间戳。
直接运行上述代码,5秒之后会打印一行:it's time to get up.仅此一次。
如果要在一个相对的时间(例如五秒钟后)而不是一个绝对时间(例如八点整)运行定时任务,用 call_later 会比 call_at 更简单一点。可以将
loop.call_at(time() + 5, Ring)
改为
loop.call_later(5,Ring)
执行效果是一样的。
单元测试
#test.py
from tornado.testing import gen_test, AsyncTestCase
from tornado.httpclient import AsyncHTTPClient
import unittest
class MyAsyncTest(AsyncTestCase):
@gen_test
def test_xx(self):
client = AsyncHTTPClient(self.io_loop)
path = 'http://localhost:8765/example?delay=2'
responses = yield [client.fetch(path, method = 'GET') for _ in range(10)]
for response in responses:
print(response.body)
if __name__ == '__main__':
unittest.main()
演示
1.先运行http_0.py,保证通过浏览器访问http://localhost:8765/example?delay=2能得到结果
2.运行test.py,看到如下输出
可见,测试模块在2.027s之内完成了10次请求,每个请求都要延迟2秒返回,因此,这10个请求必定是并行执行的。
解说
与普通的 unittest 用法相似,先定义好基于 AsyncTestCase 的测试类,在执行 unittest.main()时,测试类中所有 以 test 开头的方法都会执行。
注意,测试类中的方法要以 gen_test 装饰。
每次测试整体的用时不能超过 5 秒,超则报错。
tornado异步TCP连接
tornado有TCPClient和TCPServer两个类,可用于实现tcp的客户端和服务端。事实上,这两个类都是对iostream的简单包装。
真正重要的是iostream
iostream是client与server之间的tcp通道,被动等待创建iostream的一方是server,主动找对方创建的一方是client。
在iostream创建之后,client与server的操作再无分别,在任何时候都可以通过iostream.write向对方传送内容,或者通过iostream.read_xx接受对方传来的内容,或者以iostream.close关闭连接。
TCPServer
#tcp_server.py
from tornado import ioloop,gen,iostream
from tornado.tcpserver import TCPServer
class MyTcpServer(TCPServer):
@gen.coroutine
def handle_stream(self,stream,address):
try:
while True:
msg = yield stream.read_bytes(20, partial = True)
print(msg, 'from', address)
yield stream.write(msg[::-1])
if msg == 'over':
stream.close()
except iostream.StreamClosedError:
pass
if __name__ == '__main__':
server = MyTcpServer()
server.listen(8760)
server.start()
ioloop.IOLoop.current().start()
解说
创建一个继承于TCPServer的类的实例,监听端口,启动服务器,启动消息循环,服务器开始运行。
这时,如果有client连接过来,tornado会创建一个iostream,然后调用handle_stream方法,调用时传入的两个参数是iostream和client的地址。
我们示例的功能很简单,每收到一段 20 字符以内的内容,将之反序回传,如果收到 'over',就断开连接。注意,断开连接不用 yield 调用。
无论是谁断开连接,连接双方都会各自触发一个 StreamClosedError。
TCPClient
#tcp_client.py
from tornado import ioloop,gen,iostream
from tornado.tcpclient import TCPClient
@gen.coroutine
def Trans():
straem = yield TCPClient().connect('localhost', 8760)
try:
for msg in ('zzxxc', 'abcde', 'i feel lucky', 'over'):
yield straem.write(msg.encode('utf-8'))
back = yield straem.read_bytes(20, partial = True)
print(back)
except iostream.StreamClosedError:
pass
if __name__ == '__main__':
ioloop.IOLoop.current().run_sync(Trans)
解说
使用TCPClient比TCPServer更简单,无需继承,只要用connect方法连接到server,就会返回iostream对象了。
在本例中,我们像server发送一些字符串,他都会反序发回来,最后发个“over”,让sever断开连接。
值得注意,这段代码与之前的几个例子有个根本的区别,之前都是服务器,被动等待行为发生,而这段代码是一运行就主动发起行为(连接),因此它的运行方式不同于以往,需要我们主动通过 ioloop 的 run_sync 来调用。以往那些实例中的异步处理方法实际是由 Tornado 调用的。在 run_sync 里,tornado 会先启动消息循环,执行目标函数,之后再结束消息循环。
演示
在第一个终端端口运行tcp_server.py,在第二个终端端口运行tcp_client.py,即可看到他们之间的交互和断开的过程。
tornado的ioloop消息循环
tornado的异步功能都是通过ioloop实现的。
前面的每一段示例代码的最后一行都是启动ioloop:
ioloop.IOLoop.current().start()
每个进程都有一个磨人的ioloop,虽然还可以有更多个,通常使用默认的就够了。在上面的这行代码里,我们通过current()获得当前的ioloop,让他start(),ioloop就会一直跑下去,让他run_sync,就会跑起来,执行目标函数,执行完就停止。
一般编程中很少用到ioloop的其他功能,只要简简单单的处理好RequestHandler的get/post方法,调用tornado-Mysql的异步函数访问数据库,返回结果就可以了。
可是,如果有些内含等待(CPU休息)的操作,找不到现成的异步库,只要深入到这个操作的底层,靠ioloop,可以把同步的操作变成异步。
把同步的操作变成异步
内含等待的操作大概有几种情况:
1.为了拖时间而等待
对应前面用过的 gen.sleep,如果没有 gen.sleep 呢?可以很简单地通过也在前面用过的 call_at 或 call_later 来实现这个功能。
2.等待网络返回
这一部分是本章的重点。我们将不再使用 iostream,而是在同步代码的基础上,用 ioloop 自己实现一个与 iostream 同样异步、高效的 tcp client。
同步的代码
#sync_client.py
import socket
from time import time
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(('localhost', 8760))
t0 = time()
for _ in range(1000):
sock.send(b'test message.')
sock.recv(99)
print('time cost', time() - t0)
sock.close()
演示及解说
这是一段十分普通的tcp client代码,当然它是同步的,连上服务器之后,发一条,收一条,重复一千次,看他耗时多少。
服务器端就用我们前面用过的tcp_server.py,在运行sync_client.py,转瞬之间就结束了,输出:time cost 0.15320992469787598
这么快,还需要异步吗?
慢着,快是因为我们服务端的处理极其简单,又放在本地,实际的情况复杂得多。为了模拟真实场景,我们在服务端加一个只有 5ms 的时延。
在
yield stream.write(msg[::-1])
之前加上
yield gen.sleep( 0.005 )
加上时延之后重新启动 tcp_server.py,再运行 sync_client.py,这回就慢多了。输出:time cost 6.7937400341033936
由于时延,client 每次发出消息之后要等 5 ms 以上才能收到回复,时间浪费在 recv 里,一千次当然要五秒多。我们的目标就是通过异步编程优化等待的开销。
再看看异步的代码
#async_client.py
import socket
from time import time
from tornado import ioloop
loop = ioloop.IOLoop.current()
socks = [socket.socket(socket.AF_INET, socket.SOCK_STREAM) for _ in range(50)]
[sock.connect(('localhost', 8760)) for sock in socks]
SockD = {sock.fileno(): sock for sock in socks}
t0 = time()
n = 0
def OnEvent(fd, event):
if event == loop.WRITE:
loop.update_handler(fd, loop.READ)
elif event == loop.READ:
sock = SockD[fd]
sock.recv(99)
global n
n += 1
if n >= 1000:
print('time cost ', time() - t0)
sock.close()
loop.remove_handler(fd)
loop.stop()
return
loop.update_handler(fd, loop.WRITE)
sock.send(b'test message.')
for fd, sock in SockD.items():
loop.add_handler(fd, OnEvent, loop.WRITE)
sock.send(b'test message.')
loop.start()
演示及解说
启动加了时延的tcp_server.py,在运行aync_client.py,瞬间结束,输出:time cost 0.27816009521484375
比同步的快四,五十倍。
为什么这么快?因为现在的recv虽然写法与同步版本一样,调用的时机已经不同。
同步的版本,send之后紧接着调用recv,却不知道数据多久才能返回,从调用recv到获得数据之间只能等待。而现在的异步版本,send完成时只是注册了一个读事件,直到真有数据到来时才调用recv,于是recv不用等待,时间就节省下来了。
节省下来的时间给了别的协程。可以看到,我们创建了50个连接来完成这一千次收发,每个连接一个协程,send之后数据未来之际,别的协程可以发送自己的数据。
程序运行步骤
1.创建50个socket对象并全部连接到服务器。
2.为这些socket对象创立文件描述符的索引。后面,在回调函数里,我们要通过文件描述符获取socket对象
3.对每个socket对象注册一个WRITE事件的回调函数,之后发送第一条消息。消息发送完成时,WRITE事件发生,触发i oloop执行刚刚注册的回调函数,传入文件描述符和触发回调的事件,这次的事件是WRITE。接下来我们要等待服务器端返回的消息,因此将注册等待的事件改为READ。
4.对于每个等待READ事件的socket对象,一旦有数据来,又以事件READ触发回调,我们检查一下n的值,未满1000就发下一条数据。发之前要再把注册等待的事件改为WRITE。
5.重复3,4两步,直到发满一千条,取消事件注册,结束。
我们用了 ioloop 的三个方法来实现消息注册与回调:
add_handler:注册一个回调函数到一个文件描述符的指定事件上;
update_handler:改变 add_handler 注册的事件,文件描述符和回调函数不变;
remove_handler:取消事件注册。