contextvars
顾名思义就是 context variables,直译「上下文变量」。
什么是上下文
高中英语课让我影响最深刻的就是「上下文」这个词,每次做完型阅读的时候老师总是强调要根据上下文选出最合适的词。上下文能够给这个空词位提供某些额外的信息,有了这些信息,这个空位就能选出一个最合适的词。
Flask 的设计中就包含了 Context。这个设计有什么用呢?简单地说:可以在一些场景下隐式地传递变量
在 Django 和 Sanic 中是这样传递 Request 对象的:
# Django
from django.http import HttpResponse
def index(request):
text = request.GET.get('text')
return HttpResponse(f'Text is {text}')
# Sanic
from sanic import response
app = Sanic()
@app.route('/')
async def index(request):
text = request.args.get('text')
return response.text(f'Text is {text}')
而在 Flask 里面是这样的:
from flask import Flask, request
app = Flask(__name__)
@app.route('/')
def index():
text = request.args.get('text')
return f'Text is {text}'
Django、Sanic 和 Flask 的区别:在 Flask 中,request 是 import 进来使用的(不需要就不用 import),和视图解耦了。这种设计下,不需要像 Django/Sanic 那样把参数传来传去。
通过查看源码,Flask 是通过实现了一个类似 ThreadLocal 的一个 Local 类来实现 context 的传递的。
Python 本身的 ThreadLocal 是线程安全的,但是 Flask 的 Local 类支持 Greenlet 的协程。
同样类似的功能,在 Tornado 中为了配合自己的异步特性,Tornado 也实现了一个类似的 StackContext 来管理上下文。
先看一下线程库中 ThreadLocal 的例子:
import random
import threading
local_data = threading.local()
def show():
name = threading.current_thread().getName()
try:
val = local_data.value
except AttributeError:
print(f'Thread {name}: No value yet')
else:
print(f'Thread {name}: {val}')
def worker():
show()
local_data.value = random.randint(1, 100)
show()
for i in range(2):
t = threading.Thread(target=worker)
t.start()
Thread Thread-1: No value yet
Thread Thread-1: 55
Thread Thread-2: No value yet
Thread Thread-2: 48
可见不同线程中的状态是相互独立的。
Flask里,请求 Context 在内部作为一个栈来维护(应用 Context 在另外一个栈)。每个访问 Flask 的请求,会绑定到当前的 Context,等请求结束后再销毁。维护的过程由框架实现,开发者不需要关心,你只需要用 flask.request 就可以了,这样就提高了接口的可读性和扩展性。
Tornado中,如果通过 add_callback 等在 ioloop 的添加了异步回调函数,当回调函数抛出异常时,无法通过在调用 add_callback 处捕捉到该异常:
def callback():
raise Exception('in callback')
def func():
ioloop.add_callback(callback)
ioloop = tornado.ioloop.IOLoop.instance()
try:
func()
except:
print('catch the Exception')
ioloop.start()
虽然是 func 导致了 callback 的调用,进而导致异常的抛出。但实际上 callback 在 func 中被 add_callback 后并没有立即执行,只是被放到 IOLoop 中,等到适当的时机执行。当 callback 执行时,它的环境早已不是“调用时”(func)的环境,因此对 func 的 try...except 无法捕捉到异常,自然“catch the Exception”也就不会被打印。
Tornado 通过 StackContext 实现对“调用时”上下文环境的保存,其内部也是通过栈来维护不同上下文的信息。
contextvars
随着 pyton 的不断发展,异步特性逐渐加强,threading.local
针对线程的隔离效果很好,但是协程的出现我们需要一个能在协程中也能有此特性,contextvars
就提供了一组接口,可以用于管理、存储、访问局部的 context 状态:
import asyncio
import contextvars
# 申明Context变量
request_id = contextvars.ContextVar('Id of request')
async def get():
# Get Value
print(f'Request ID (Inner): {request_id.get()}')
async def new_coro(req_id):
# Set Value
request_id.set(req_id)
await get()
print(f'Request ID (Outer): {request_id.get()}')
async def main():
tasks = []
for req_id in range(1, 5):
tasks.append(asyncio.create_task(new_coro(req_id)))
await asyncio.gather(*tasks)
asyncio.run(main())
Request ID (Inner): 1
Request ID (Outer): 1
Request ID (Inner): 2
Request ID (Outer): 2
Request ID (Inner): 3
Request ID (Outer): 3
Request ID (Inner): 4
Request ID (Outer): 4
可见不听协程间的数据状态互不影响。
注意:这个模块不仅仅给 aio 加入 Context 的支持,也用来替代 threading.local()
。
contextvars
实现了 PEP 567, 如果在 Python3.6 想使用可以用 MagicStack/contextvars
这个向后移植库,它和标准库都是同一个作者写的,可以放心使用。用之前需要安装它:
pip install contextvars
内存泄漏和上下文清理
根据 Python 文档,contextvars
对象会持有变量值的强引用,所以如果没有适当清理,会导致内存漏泄。我们使用以下代码演示这种问题:
import asyncio
import contextvars
from unittest import TestCase
import weakref
obj_context = contextvars.ContextVar('obj')
obj_ref_dict = {}
class A(object):
def __init__(self, x):
self.x = x
def __repr__(self):
return '<A|x: %d>' % self.x
async def inner(x):
obj = A(x)
obj_context.set(obj)
obj_ref_dict[x] = weakref.ref(obj)
async def outer(i):
await inner(i)
print('obj: %s in outer request-%d from obj_ref_dict' % (obj_ref_dict[i](), i))
async def dispatcher():
await asyncio.gather(*[
outer(i) for i in range(0, 10)
])
for i in range(0, 10):
print('obj-%d: %s in obj_ref_dict' % (i, obj_ref_dict[i]()))
class ContextTest(TestCase):
def test(self):
asyncio.run(dispatcher())
obj: <A|x: 0> in outer request-0 from obj_ref_dict
obj: <A|x: 1> in outer request-1 from obj_ref_dict
obj: <A|x: 2> in outer request-2 from obj_ref_dict
obj: <A|x: 3> in outer request-3 from obj_ref_dict
obj: <A|x: 4> in outer request-4 from obj_ref_dict
obj: <A|x: 5> in outer request-5 from obj_ref_dict
obj: <A|x: 6> in outer request-6 from obj_ref_dict
obj: <A|x: 7> in outer request-7 from obj_ref_dict
obj: <A|x: 8> in outer request-8 from obj_ref_dict
obj: <A|x: 9> in outer request-9 from obj_ref_dict
obj-0: <A|x: 0> in obj_ref_dict
obj-1: <A|x: 1> in obj_ref_dict
obj-2: <A|x: 2> in obj_ref_dict
obj-3: <A|x: 3> in obj_ref_dict
obj-4: <A|x: 4> in obj_ref_dict
obj-5: <A|x: 5> in obj_ref_dict
obj-6: <A|x: 6> in obj_ref_dict
obj-7: <A|x: 7> in obj_ref_dict
obj-8: <A|x: 8> in obj_ref_dict
obj-9: <A|x: 9> in obj_ref_dict
inner
方法在调用栈的最内部设置了上下文变量 obj_context
,设置上下文的同时也将保存在上下文的对象 A
的实例保存到一个弱引用中,以便后续通过弱引用来检查对象实例是否被回收。
可以看到,无论是在 outer
中,还是在 dispatcher
中,所有 inner
方法保存的上下文变量都没有被回收,所以我们必须在使用完上下文变量后显式地清理上下文,否则会导致内存泄漏。
这里,我们在 inner
方法的最后,将 obj_context
设置为 None
,就可以保证不会因为上下文而导致内存不会被回收:
async def inner(x):
obj = A(x)
obj_context.set(obj)
obj_ref_dict[x] = weakref.ref(obj)
obj_context.set(None)
再次执行代码:
obj: None in outer request-0 from obj_ref_dict
obj: None in outer request-1 from obj_ref_dict
obj: None in outer request-2 from obj_ref_dict
obj: None in outer request-3 from obj_ref_dict
obj: None in outer request-4 from obj_ref_dict
obj: None in outer request-5 from obj_ref_dict
obj: None in outer request-6 from obj_ref_dict
obj: None in outer request-7 from obj_ref_dict
obj: None in outer request-8 from obj_ref_dict
obj: None in outer request-9 from obj_ref_dict
obj-0: None in obj_ref_dict
obj-1: None in obj_ref_dict
obj-2: None in obj_ref_dict
obj-3: None in obj_ref_dict
obj-4: None in obj_ref_dict
obj-5: None in obj_ref_dict
obj-6: None in obj_ref_dict
obj-7: None in obj_ref_dict
obj-8: None in obj_ref_dict
obj-9: None in obj_ref_dict
可以看到,当 outer
和 dispatcher
尝试通过弱引用来访问曾经保存在上下文的对象实例的时候,这些对象都已经被回收了。