Gunicorn介绍
简介
- Gunicorn 是一个被广泛使用的高性能的Python WSGI UNIX HTTP服务器。
- 由于源码调用了fcntl、fork等接口,因此只能跑在类Unix系统上
Gunicorn 设计思想
- Gunicorn基于pre-fork的思想,有一个master进程,由它来管理一组worker进程,worker进程负责对来自客户端的请求进行响应。gunicorn依靠操作系统来提供负载均衡。
- pre-fork服务器会预先开启一定数量(数量由参数决定)的worker进程,在用户请求到来时服务器并不需要等待新的进程启动,因而能够以更快的速度应付多用户请求。
Python网络IO基础知识
源码解析
目录结构
│ arbiter.py # master进程,gunicorn中称为arbiter
│ config.py # 配置相关,可以通过命令行参数或配置文件来配置guniconrn
│ debug.py
│ errors.py
│ glogging.py
│ pidfile.py
│ reloader.py
│ sock.py
│ systemd.py
│ util.py
│ __init__.py
|
├─app
│ base.py # 入口基类
│ pasterapp.py
│ wsgiapp.py # 程序入口
│ __init__.py
│
├─http # 处理客户端发送过来的http请求
│ body.py
│ errors.py
│ message.py
│ parser.py
│ unreader.py
│ wsgi.py
│ __init__.py
│
├─instrument
│ statsd.py
│ __init__.py
│
├─workers # Guninicorn中worker有sync, gthread, ggevent等模式可选
base.py
base_async.py
geventlet.py
ggevent.py
gthread.py
gtornado.py
sync.py
workertmp.py
__init__.py
从入口开始
程序入口wsgiapp.py的run()将会运行基类base.py中BaseApplication的run():
# base.py
def run(self):
try:
Arbiter(self).run()
except RuntimeError as e:
print("\nError: %s\n" % e, file=sys.stderr)
sys.stderr.flush()
sys.exit(1)
即运行arbiter.py的run():
# arbiter.py
def run(self):
"Main master loop."
self.start() # 初始化arbiter,为信号绑定处理函数,创建socket对端口进行监听
util._setproctitle("master [%s]" % self.proc_name)
try:
self.manage_workers() #管理worker进程
while True:
self.maybe_promote_master()
sig = self.SIG_QUEUE.pop(0) if self.SIG_QUEUE else None
if sig is None:
self.sleep()
self.murder_workers()
self.manage_workers()
continue
if sig not in self.SIG_NAMES:
self.log.info("Ignoring unknown signal: %s", sig)
continue
signame = self.SIG_NAMES.get(sig)
handler = getattr(self, "handle_%s" % signame, None)
if not handler:
self.log.error("Unhandled signal: %s", signame)
continue
self.log.info("Handling signal: %s", signame)
handler()
self.wakeup()
except (StopIteration, KeyboardInterrupt):
self.halt()
except HaltServer as inst:
self.halt(reason=inst.reason, exit_status=inst.exit_status)
except SystemExit:
raise
except Exception:
self.log.info("Unhandled exception in main loop",
exc_info=True)
self.stop(False)
if self.pidfile is not None:
self.pidfile.unlink()
sys.exit(-1)
首先运行start()
# arbiter.py
def start(self):
"""\
Initialize the arbiter. Start listening and set pidfile if needed.
"""
self.log.info("Starting gunicorn %s", __version__)
if 'GUNICORN_PID' in os.environ:
self.master_pid = int(os.environ.get('GUNICORN_PID'))
self.proc_name = self.proc_name + ".2"
self.master_name = "Master.2"
self.pid = os.getpid()
if self.cfg.pidfile is not None:
pidname = self.cfg.pidfile
if self.master_pid != 0:
pidname += ".2"
self.pidfile = Pidfile(pidname)
self.pidfile.create(self.pid)
self.cfg.on_starting(self)
self.init_signals()
if not self.LISTENERS:
fds = None
listen_fds = systemd.listen_fds()
if listen_fds:
self.systemd = True
fds = range(systemd.SD_LISTEN_FDS_START,
systemd.SD_LISTEN_FDS_START + listen_fds)
elif self.master_pid:
fds = []
for fd in os.environ.pop('GUNICORN_FD').split(','):
fds.append(int(fd))
self.LISTENERS = sock.create_sockets(self.cfg, self.log, fds)
listeners_str = ",".join([str(l) for l in self.LISTENERS])
self.log.debug("Arbiter booted")
self.log.info("Listening at: %s (%s)", listeners_str, self.pid)
self.log.info("Using worker: %s", self.cfg.worker_class_str)
systemd.sd_notify("READY=1\nSTATUS=Gunicorn arbiter booted", self.log)
# check worker class requirements
if hasattr(self.worker_class, "check_config"):
self.worker_class.check_config(self.cfg, self.log)
self.cfg.when_ready(self)
其中比较重要的是init_signals()和create_sockets()
# arbiter.py
def init_signals(self):
"""\
Initialize master signal handling. Most of the signals
are queued. Child signals only wake up the master.
"""
# close old PIPE
for p in self.PIPE:
os.close(p)
# initialize the pipe
self.PIPE = pair = os.pipe()
for p in pair:
util.set_non_blocking(p)
util.close_on_exec(p)
self.log.close_on_exec()
# initialize all signals
for s in self.SIGNALS:
signal.signal(s, self.signal) # 为信号注册处理函数self.signal
signal.signal(signal.SIGCHLD, self.handle_chld)
def signal(self, sig, frame):
if len(self.SIG_QUEUE) < 5:
self.SIG_QUEUE.append(sig)
self.wakeup()
其中SIGCHLD在arbiter.py中是:
SIGNALS = [getattr(signal, "SIG%s" % x)
for x in "HUP QUIT INT TERM TTIN TTOU USR1 USR2 WINCH".split()]
- HUP,重启所有的配置和所有的worker进程
- QUIT,正常关闭,它会等待所有worker进程处理完各自的东西后关闭
- INT/TERM,立即关闭,强行中止所有的处理
- TTIN,增加一个worker进程
- TTOU,减少一个worker进程
- USR1,重新打开由master和worker所有的日志处理
- USR2,重新运行master和worker
- WINCH,正常关闭所有worker进程,保持主控master进程的运行
signal()中通过向pipe里写入数据来唤醒Arbiter:
def wakeup(self):
"""\
Wake up the arbiter by writing to the PIPE
"""
try:
os.write(self.PIPE[1], b'.')
except IOError as e:
if e.errno not in [errno.EAGAIN, errno.EINTR]:
raise
也就是说,产生上述8种信号之一时,arbiter进程都会被唤醒。
回到arbiter.py中的start(), create_sockets()。它会根据配置中的地址和端口号,创建socket,对其进行监听:
def create_sockets(conf, log, fds=None):
"""
Create a new socket for the configured addresses or file descriptors.
If a configured address is a tuple then a TCP socket is created.
If it is a string, a Unix socket is created. Otherwise, a TypeError is
raised.
"""
listeners = []
# get it only once
addr = conf.address
fdaddr = [bind for bind in addr if isinstance(bind, int)]
if fds:
fdaddr += list(fds)
laddr = [bind for bind in addr if not isinstance(bind, int)]
# check ssl config early to raise the error on startup
# only the certfile is needed since it can contains the keyfile
if conf.certfile and not os.path.exists(conf.certfile):
raise ValueError('certfile "%s" does not exist' % conf.certfile)
if conf.keyfile and not os.path.exists(conf.keyfile):
raise ValueError('keyfile "%s" does not exist' % conf.keyfile)
# sockets are already bound
if fdaddr:
for fd in fdaddr:
sock = socket.fromfd(fd, socket.AF_UNIX, socket.SOCK_STREAM)
sock_name = sock.getsockname()
sock_type = _sock_type(sock_name)
listener = sock_type(sock_name, conf, log, fd=fd)
listeners.append(listener)
return listeners
# no sockets is bound, first initialization of gunicorn in this env.
for addr in laddr:
sock_type = _sock_type(addr)
sock = None
for i in range(5):
try:
# 创建socket并绑定地址
sock = sock_type(addr, conf, log)
except socket.error as e:
if e.args[0] == errno.EADDRINUSE:
log.error("Connection in use: %s", str(addr))
if e.args[0] == errno.EADDRNOTAVAIL:
log.error("Invalid address: %s", str(addr))
if i < 5:
msg = "connection to {addr} failed: {error}"
log.debug(msg.format(addr=str(addr), error=str(e)))
log.error("Retrying in 1 second.")
time.sleep(1)
else:
break
if sock is None:
log.error("Can't connect to %s", str(addr))
sys.exit(1)
listeners.append(sock)
return listeners
其中_sock_type返回:
def _sock_type(addr):
if isinstance(addr, tuple):
if util.is_ipv6(addr[0]):
sock_type = TCP6Socket
else:
sock_type = TCPSocket
elif isinstance(addr, (str, bytes)):
sock_type = UnixSocket
else:
raise TypeError("Unable to create socket from: %r" % addr)
return sock_type
class TCP6Socket(TCPSocket):
FAMILY = socket.AF_INET6
def __str__(self):
(host, port, _, _) = self.sock.getsockname()
return "http://[%s]:%d" % (host, port)
class TCPSocket(BaseSocket):
FAMILY = socket.AF_INET
def __str__(self):
if self.conf.is_ssl:
scheme = "https"
else:
scheme = "http"
addr = self.sock.getsockname()
return "%s://%s:%d" % (scheme, addr[0], addr[1])
def set_options(self, sock, bound=False):
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
return super().set_options(sock, bound=bound)
class BaseSocket(object):
def __init__(self, address, conf, log, fd=None):
self.log = log
self.conf = conf
self.cfg_addr = address
if fd is None:
sock = socket.socket(self.FAMILY, socket.SOCK_STREAM)
bound = False
else:
sock = socket.fromfd(fd, self.FAMILY, socket.SOCK_STREAM)
os.close(fd)
bound = True
self.sock = self.set_options(sock, bound=bound)
def __str__(self):
return "<socket %d>" % self.sock.fileno()
def __getattr__(self, name):
return getattr(self.sock, name)
def set_options(self, sock, bound=False):
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
if (self.conf.reuse_port
and hasattr(socket, 'SO_REUSEPORT')): # pragma: no cover
try:
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
except socket.error as err:
if err.errno not in (errno.ENOPROTOOPT, errno.EINVAL):
raise
if not bound:
self.bind(sock)
sock.setblocking(0)
# make sure that the socket can be inherited
if hasattr(sock, "set_inheritable"):
sock.set_inheritable(True)
sock.listen(self.conf.backlog)
return sock
def bind(self, sock):
sock.bind(self.cfg_addr)
def close(self):
if self.sock is None:
return
try:
self.sock.close()
except socket.error as e:
self.log.info("Error while closing socket %s", str(e))
self.sock = None
再回到arbiter.py中的run(), start()之后会进行murder_workers。
# arbiter.py
def murder_workers(self):
"""\
Kill unused/idle workers
"""
if not self.timeout:
return
workers = list(self.WORKERS.items())
for (pid, worker) in workers:
try:
if time.time() - worker.tmp.last_update() <= self.timeout:
continue
except (OSError, ValueError):
continue
if not worker.aborted:
self.log.critical("WORKER TIMEOUT (pid:%s)", pid)
worker.aborted = True
self.kill_worker(pid, signal.SIGABRT)
else:
self.kill_worker(pid, signal.SIGKILL)
def last_update(self):
return os.fstat(self._tmp.fileno()).st_ctime
arbiter是通过检查临时文件的最新修改时间来判断每个worker是否timeout的。Worker修改这个文件的逻辑后面会进行介绍。若超过设定的timeout,arbiter会将这个worker kill 掉。
arbiter接着manage_workers():
# arbiter.py
def manage_workers(self):
"""\
Maintain the number of workers by spawning or killing
as required.
"""
if len(self.WORKERS) < self.num_workers:
self.spawn_workers()
workers = self.WORKERS.items()
workers = sorted(workers, key=lambda w: w[1].age)
while len(workers) > self.num_workers:
(pid, _) = workers.pop(0)
self.kill_worker(pid, signal.SIGTERM)
active_worker_count = len(workers)
if self._last_logged_active_worker_count != active_worker_count:
self._last_logged_active_worker_count = active_worker_count
self.log.debug("{0} workers".format(active_worker_count),
extra={"metric": "gunicorn.workers",
"value": active_worker_count,
"mtype": "gauge"})
manage_workers()功能是管理工作进程的数量是跟配置中的worker数量一致,小于的话会进行spawn_workers(),大于的话会对最旧的worker进行kill_worker(pid, signal.SIGTERM)。
def spawn_worker(self):
self.worker_age += 1
# 根据配置中的 worker_class 创建对应的 worker instance,它会把LISTENERS传给worker
worker = self.worker_class(self.worker_age, self.pid, self.LISTENERS,
self.app, self.timeout / 2.0,
self.cfg, self.log)
self.cfg.pre_fork(self, worker)
# 使用fork创建子进程后,子进程会复制父进程的数据信息,而后程序就分两个进程继续运行后面的程序。
# 在子进程内,方法会返回0;在父进程内,方法会返回子进程的编号PID
pid = os.fork()
if pid != 0:
worker.pid = pid
# 父进程将work存进self.WORKERS[pid]里,方便管理
self.WORKERS[pid] = worker
return pid
# Do not inherit the temporary files of other workers
for sibling in self.WORKERS.values():
sibling.tmp.close()
# Process Child
worker.pid = os.getpid()
try:
util._setproctitle("worker [%s]" % self.proc_name)
self.log.info("Booting worker with pid: %s", workerx.pid)
self.cfg.post_fork(self, worker)
worker.init_process()
sys.exit(0)
except SystemExit:
raise
except AppImportError as e:
self.log.debug("Exception while loading the application",
exc_info=True)
print("%s" % e, file=sys.stderr)
sys.stderr.flush()
sys.exit(self.APP_LOAD_ERROR)
except:
self.log.exception("Exception in worker process")
if not worker.booted:
sys.exit(self.WORKER_BOOT_ERROR)
sys.exit(-1)
finally:
self.log.info("Worker exiting (pid: %s)", worker.pid)
try:
worker.tmp.close()
self.cfg.worker_exit(self, worker)
except:
self.log.warning("Exception during worker exit:\n%s",
traceback.format_exc())
worker有sync、gthread、eventlet、gevent 和 tornado五种类型。默认的是sync。就先看sync的代码。
继续看worker.init_process()函数,SyncWorker没有override 基类的这个方法,所以会运行基类的方法:
# workers/base.py
def init_process(self):
"""\
If you override this method in a subclass, the last statement
in the function should be to call this method with
super().init_process() so that the ``run()`` loop is initiated.
"""
# set environment' variables
if self.cfg.env:
for k, v in self.cfg.env.items():
os.environ[k] = v
util.set_owner_process(self.cfg.uid, self.cfg.gid,
initgroups=self.cfg.initgroups)
# Reseed the random number generator
util.seed()
# For waking ourselves up
self.PIPE = os.pipe()
for p in self.PIPE:
util.set_non_blocking(p)
util.close_on_exec(p)
# Prevent fd inheritance
for s in self.sockets:
util.close_on_exec(s)
util.close_on_exec(self.tmp.fileno())
self.wait_fds = self.sockets + [self.PIPE[0]]
self.log.close_on_exec()
self.init_signals() # 注册信号量
self.load_wsgi() # 加载wsgi application
# start the reloader
if self.cfg.reload:
def changed(fname):
self.log.info("Worker reloading: %s modified", fname)
self.alive = False
os.write(self.PIPE[1], b"1")
self.cfg.worker_int(self)
time.sleep(0.1)
sys.exit(0)
reloader_cls = reloader_engines[self.cfg.reload_engine]
self.reloader = reloader_cls(extra_files=self.cfg.reload_extra_files,
callback=changed)
self.reloader.start()
self.cfg.post_worker_init(self)
# Enter main run loop
self.booted = True
self.run()
最后会运行run():
# workers/sync.py
def run(self):
# if no timeout is given the worker will never wait and will
# use the CPU for nothing. This minimal timeout prevent it.
timeout = self.timeout or 0.5
# self.socket appears to lose its blocking status after
# we fork in the arbiter. Reset it here.
for s in self.sockets:
s.setblocking(0)
if len(self.sockets) > 1:
self.run_for_multiple(timeout)
else:
self.run_for_one(timeout)
以绑定单端口的情况为例:
# workers/sync.py
def run_for_one(self, timeout):
listener = self.sockets[0]
while self.alive:
self.notify() # 通过notify向master做心跳,即修改临时文件
# Accept a connection. If we get an error telling us
# that no connection is waiting we fall down to the
# select which is where we'll wait for a bit for new
# workers to come give us some love.
try:
self.accept(listener)
# Keep processing clients until no one is waiting. This
# prevents the need to select() for every client that we
# process.
continue
except EnvironmentError as e:
if e.errno not in (errno.EAGAIN, errno.ECONNABORTED,
errno.EWOULDBLOCK):
raise
if not self.is_parent_alive():
return
try:
self.wait(timeout)
except StopWaiting:
return
# workertmp.py
def notify(self):
self.spinner = (self.spinner + 1) % 2
os.fchmod(self._tmp.fileno(), self.spinner)
从run_for_one()中我们得知,worker只要活着,就会不断地accept就绪的请求,即self.accept(listener):
# workers/sync.py
def accept(self, listener):
client, addr = listener.accept()
client.setblocking(1) # 将socket设置为阻塞模式,该worker进程会一直block直到请求处理完成
util.close_on_exec(client)
self.handle(listener, client, addr)
handle(listener, client, addr)中会对来自client请求进行处理:
# workers/sync.py
def handle(self, listener, client, addr):
req = None
try:
if self.cfg.is_ssl:
client = ssl.wrap_socket(client, server_side=True,
**self.cfg.ssl_options)
parser = http.RequestParser(self.cfg, client)
req = next(parser)
self.handle_request(listener, req, client, addr)
except http.errors.NoMoreData as e:
self.log.debug("Ignored premature client disconnection. %s", e)
except StopIteration as e:
self.log.debug("Closing connection. %s", e)
except ssl.SSLError as e:
if e.args[0] == ssl.SSL_ERROR_EOF:
self.log.debug("ssl connection closed")
client.close()
else:
self.log.debug("Error processing SSL request.")
self.handle_error(req, client, addr, e)
except EnvironmentError as e:
if e.errno not in (errno.EPIPE, errno.ECONNRESET):
self.log.exception("Socket error processing request.")
else:
if e.errno == errno.ECONNRESET:
self.log.debug("Ignoring connection reset")
else:
self.log.debug("Ignoring EPIPE")
except Exception as e:
self.handle_error(req, client, addr, e)
finally:
util.close(client)
# workers/sync.py
def handle_request(self, listener, req, client, addr):
environ = {}
resp = None
try:
self.cfg.pre_request(self, req)
request_start = datetime.now()
# 解析request,保存在environ中,初始化response
resp, environ = wsgi.create(req, client, addr,
listener.getsockname(), self.cfg)
# Force the connection closed until someone shows
# a buffering proxy that supports Keep-Alive to
# the backend.
resp.force_close()
self.nr += 1
if self.nr >= self.max_requests:
self.log.info("Autorestarting worker after current request.")
self.alive = False
# wsgi接口,传递environ 和 start_response到 wsgi application
respiter = self.wsgi(environ, resp.start_response)
try:
if isinstance(respiter, environ['wsgi.file_wrapper']):
resp.write_file(respiter)
else:
for item in respiter:
resp.write(item)
resp.close()
request_time = datetime.now() - request_start
self.log.access(resp, req, environ, request_time)
finally:
if hasattr(respiter, "close"):
respiter.close()
except EnvironmentError:
# pass to next try-except level
util.reraise(*sys.exc_info())
except Exception:
if resp and resp.headers_sent:
# If the requests have already been sent, we should close the
# connection to indicate the error.
self.log.exception("Error handling request")
try:
client.shutdown(socket.SHUT_RDWR)
client.close()
except EnvironmentError:
pass
raise StopIteration()
raise
finally:
try:
self.cfg.post_request(self, req, environ, resp)
except Exception:
self.log.exception("Exception in post_request hook")
如果没有在等待的请求,accept()会抛出WOULDBLOCK 或 EAGAIN,因为被except catch住,会进入到self.wait(timeout),使用select监听wait_fds,直到某个socket就绪:
def wait(self, timeout):
try:
self.notify()
ret = select.select(self.wait_fds, [], [], timeout)
if ret[0]:
if self.PIPE[0] in ret[0]:
os.read(self.PIPE[0], 1)
return ret[0]
except select.error as e:
if e.args[0] == errno.EINTR:
return self.sockets
if e.args[0] == errno.EBADF:
if self.nr < 0:
return self.sockets
else:
raise StopWaiting
raise
至此sync模式的wsgi server 主要流程就介绍完了。
sync模式中,每个worker进程只有一个线程,阻塞地处理client请求。也就是说一个worker同一时间只能处理一个client的请求。因此效率有限。
Gunicorn中一种更高效的aync worker是 gevent worker,它是基于协程的。
Gevent官方文档:http://www.gevent.org/contents.html
GeventWorker继承了AyncWorker,AsyncWorker继承BaseWorker。 GeventWorker override了init_process():
# workers/ggevent.py
def init_process(self):
self.patch() # monkey patch ,将所有阻塞实现换成gevent库的async实现
hub.reinit() # 为gevent hub在新创建的子进程中启动做准备。os.fork()后子进程必须调用
super().init_process()
def patch(self):
monkey.patch_all()
# monkey patch sendfile to make it none blocking
patch_sendfile()
# patch sockets
sockets = []
for s in self.sockets:
sockets.append(socket.socket(s.FAMILY, socket.SOCK_STREAM,
fileno=s.sock.fileno()))
self.sockets = sockets
super().init_process()调用基类的init_process()。init_process()最后会调用ggevent的run():
# workers/ggevent.py
def run(self):
servers = []
ssl_args = {}
if self.cfg.is_ssl:
ssl_args = dict(server_side=True, **self.cfg.ssl_options)
for s in self.sockets:
# 设置为非阻塞
s.setblocking(1)
# 用来限制单个线程内可以并发的gevent.greenlet的数目,
# 即一个worker可以维持的最大连接数,默认是1000,如果超过,请求可能被kill
pool = Pool(self.worker_connections)
if self.server_class is not None:
environ = base_environ(self.cfg)
environ.update({
"wsgi.multithread": True,
"SERVER_SOFTWARE": VERSION,
})
server = self.server_class(
s, application=self.wsgi, spawn=pool, log=self.log,
handler_class=self.wsgi_handler, environ=environ,
**ssl_args)
else:
# self.handle是请求处理函数
hfun = partial(self.handle, s)
# 创建gevent StreamServer,当socket监听到端口的请求时,会创建一个gevent.greenlet,
# 运行hfun函数对其进行处理
server = StreamServer(s, handle=hfun, spawn=pool, **ssl_args)
# 启动server
server.start()
servers.append(server)
# 每秒向Arbiter发送心跳,代表自己还活着
while self.alive:
self.notify()
gevent.sleep(1.0)
try:
# Stop accepting requests
for server in servers:
if hasattr(server, 'close'): # gevent 1.0
server.close()
if hasattr(server, 'kill'): # gevent < 1.0
server.kill()
# Handle current requests until graceful_timeout
ts = time.time()
while time.time() - ts <= self.cfg.graceful_timeout:
accepting = 0
for server in servers:
if server.pool.free_count() != server.pool.size:
accepting += 1
# if no server is accepting a connection, we can exit
if not accepting:
return
self.notify()
gevent.sleep(1.0)
# Force kill all active the handlers
self.log.warning("Worker graceful timeout (pid:%s)" % self.pid)
for server in servers:
server.stop(timeout=1)
except:
pass
def handle(self, listener, client, addr):
# Connected socket timeout defaults to socket.getdefaulttimeout().
# This forces to blocking mode.
# 设置为非阻塞
client.setblocking(1)
super().handle(listener, client, addr)
handle(listener, client, addr)会运行AsyncWorker的handle():
# workers/base_async.py
def handle(self, listener, client, addr):
req = None
try:
parser = http.RequestParser(self.cfg, client)
try:
listener_name = listener.getsockname()
# keepalive: 在keep-alive连接上等待请求的秒数
if not self.cfg.keepalive:
req = next(parser)
self.handle_request(listener_name, req, client, addr)
else:
# keepalive loop
proxy_protocol_info = {}
while True:
req = None
with self.timeout_ctx():
req = next(parser)
if not req:
break
if req.proxy_protocol_info:
proxy_protocol_info = req.proxy_protocol_info
else:
req.proxy_protocol_info = proxy_protocol_info
self.handle_request(listener_name, req, client, addr)
except http.errors.NoMoreData as e:
self.log.debug("Ignored premature client disconnection. %s", e)
except StopIteration as e:
self.log.debug("Closing connection. %s", e)
except ssl.SSLError:
# pass to next try-except level
util.reraise(*sys.exc_info())
except EnvironmentError:
# pass to next try-except level
util.reraise(*sys.exc_info())
except Exception as e:
self.handle_error(req, client, addr, e)
except ssl.SSLError as e:
if e.args[0] == ssl.SSL_ERROR_EOF:
self.log.debug("ssl connection closed")
client.close()
else:
self.log.debug("Error processing SSL request.")
self.handle_error(req, client, addr, e)
except EnvironmentError as e:
if e.errno not in (errno.EPIPE, errno.ECONNRESET):
self.log.exception("Socket error processing request.")
else:
if e.errno == errno.ECONNRESET:
self.log.debug("Ignoring connection reset")
else:
self.log.debug("Ignoring EPIPE")
except Exception as e:
self.handle_error(req, client, addr, e)
finally:
util.close(client)
# workers/base_async.py
def handle_request(self, listener_name, req, sock, addr):
request_start = datetime.now()
environ = {}
resp = None
try:
self.cfg.pre_request(self, req)
resp, environ = wsgi.create(req, sock, addr,
listener_name, self.cfg)
environ["wsgi.multithread"] = True
self.nr += 1
if self.alive and self.nr >= self.max_requests:
self.log.info("Autorestarting worker after current request.")
resp.force_close()
self.alive = False
if not self.cfg.keepalive:
resp.force_close()
respiter = self.wsgi(environ, resp.start_response)
if self.is_already_handled(respiter):
return False
try:
if isinstance(respiter, environ['wsgi.file_wrapper']):
resp.write_file(respiter)
else:
for item in respiter:
resp.write(item)
resp.close()
request_time = datetime.now() - request_start
self.log.access(resp, req, environ, request_time)
finally:
if hasattr(respiter, "close"):
respiter.close()
if resp.should_close():
raise StopIteration()
except StopIteration:
raise
except EnvironmentError:
# If the original exception was a socket.error we delegate
# handling it to the caller (where handle() might ignore it)
util.reraise(*sys.exc_info())
except Exception:
if resp and resp.headers_sent:
# If the requests have already been sent, we should close the
# connection to indicate the error.
self.log.exception("Error handling request")
try:
sock.shutdown(socket.SHUT_RDWR)
sock.close()
except EnvironmentError:
pass
raise StopIteration()
raise
finally:
try:
self.cfg.post_request(self, req, environ, resp)
except Exception:
self.log.exception("Exception in post_request hook")
return True