Add project files.

This commit is contained in:
eviled 2025-07-13 22:10:11 -04:00
commit eeda32a9b2
1735 changed files with 700598 additions and 0 deletions

View file

@ -0,0 +1,14 @@
#
# This file is part of gunicorn released under the MIT license.
# See the NOTICE for more information.
# supported gunicorn workers.
SUPPORTED_WORKERS = {
"sync": "gunicorn.workers.sync.SyncWorker",
"eventlet": "gunicorn.workers.geventlet.EventletWorker",
"gevent": "gunicorn.workers.ggevent.GeventWorker",
"gevent_wsgi": "gunicorn.workers.ggevent.GeventPyWSGIWorker",
"gevent_pywsgi": "gunicorn.workers.ggevent.GeventPyWSGIWorker",
"tornado": "gunicorn.workers.gtornado.TornadoWorker",
"gthread": "gunicorn.workers.gthread.ThreadWorker",
}

View file

@ -0,0 +1,287 @@
#
# This file is part of gunicorn released under the MIT license.
# See the NOTICE for more information.
import io
import os
import signal
import sys
import time
import traceback
from datetime import datetime
from random import randint
from ssl import SSLError
from gunicorn import util
from gunicorn.http.errors import (
ForbiddenProxyRequest, InvalidHeader,
InvalidHeaderName, InvalidHTTPVersion,
InvalidProxyLine, InvalidRequestLine,
InvalidRequestMethod, InvalidSchemeHeaders,
LimitRequestHeaders, LimitRequestLine,
UnsupportedTransferCoding,
ConfigurationProblem, ObsoleteFolding,
)
from gunicorn.http.wsgi import Response, default_environ
from gunicorn.reloader import reloader_engines
from gunicorn.workers.workertmp import WorkerTmp
class Worker:
SIGNALS = [getattr(signal, "SIG%s" % x) for x in (
"ABRT HUP QUIT INT TERM USR1 USR2 WINCH CHLD".split()
)]
PIPE = []
def __init__(self, age, ppid, sockets, app, timeout, cfg, log):
"""\
This is called pre-fork so it shouldn't do anything to the
current process. If there's a need to make process wide
changes you'll want to do that in ``self.init_process()``.
"""
self.age = age
self.pid = "[booting]"
self.ppid = ppid
self.sockets = sockets
self.app = app
self.timeout = timeout
self.cfg = cfg
self.booted = False
self.aborted = False
self.reloader = None
self.nr = 0
if cfg.max_requests > 0:
jitter = randint(0, cfg.max_requests_jitter)
self.max_requests = cfg.max_requests + jitter
else:
self.max_requests = sys.maxsize
self.alive = True
self.log = log
self.tmp = WorkerTmp(cfg)
def __str__(self):
return "<Worker %s>" % self.pid
def notify(self):
"""\
Your worker subclass must arrange to have this method called
once every ``self.timeout`` seconds. If you fail in accomplishing
this task, the master process will murder your workers.
"""
self.tmp.notify()
def run(self):
"""\
This is the mainloop of a worker process. You should override
this method in a subclass to provide the intended behaviour
for your particular evil schemes.
"""
raise NotImplementedError()
def init_process(self):
"""\
If you override this method in a subclass, the last statement
in the function should be to call this method with
super().init_process() so that the ``run()`` loop is initiated.
"""
# set environment' variables
if self.cfg.env:
for k, v in self.cfg.env.items():
os.environ[k] = v
util.set_owner_process(self.cfg.uid, self.cfg.gid,
initgroups=self.cfg.initgroups)
# Reseed the random number generator
util.seed()
# For waking ourselves up
self.PIPE = os.pipe()
for p in self.PIPE:
util.set_non_blocking(p)
util.close_on_exec(p)
# Prevent fd inheritance
for s in self.sockets:
util.close_on_exec(s)
util.close_on_exec(self.tmp.fileno())
self.wait_fds = self.sockets + [self.PIPE[0]]
self.log.close_on_exec()
self.init_signals()
# start the reloader
if self.cfg.reload:
def changed(fname):
self.log.info("Worker reloading: %s modified", fname)
self.alive = False
os.write(self.PIPE[1], b"1")
self.cfg.worker_int(self)
time.sleep(0.1)
sys.exit(0)
reloader_cls = reloader_engines[self.cfg.reload_engine]
self.reloader = reloader_cls(extra_files=self.cfg.reload_extra_files,
callback=changed)
self.load_wsgi()
if self.reloader:
self.reloader.start()
self.cfg.post_worker_init(self)
# Enter main run loop
self.booted = True
self.run()
def load_wsgi(self):
try:
self.wsgi = self.app.wsgi()
except SyntaxError as e:
if not self.cfg.reload:
raise
self.log.exception(e)
# fix from PR #1228
# storing the traceback into exc_tb will create a circular reference.
# per https://docs.python.org/2/library/sys.html#sys.exc_info warning,
# delete the traceback after use.
try:
_, exc_val, exc_tb = sys.exc_info()
self.reloader.add_extra_file(exc_val.filename)
tb_string = io.StringIO()
traceback.print_tb(exc_tb, file=tb_string)
self.wsgi = util.make_fail_app(tb_string.getvalue())
finally:
del exc_tb
def init_signals(self):
# reset signaling
for s in self.SIGNALS:
signal.signal(s, signal.SIG_DFL)
# init new signaling
signal.signal(signal.SIGQUIT, self.handle_quit)
signal.signal(signal.SIGTERM, self.handle_exit)
signal.signal(signal.SIGINT, self.handle_quit)
signal.signal(signal.SIGWINCH, self.handle_winch)
signal.signal(signal.SIGUSR1, self.handle_usr1)
signal.signal(signal.SIGABRT, self.handle_abort)
# Don't let SIGTERM and SIGUSR1 disturb active requests
# by interrupting system calls
signal.siginterrupt(signal.SIGTERM, False)
signal.siginterrupt(signal.SIGUSR1, False)
if hasattr(signal, 'set_wakeup_fd'):
signal.set_wakeup_fd(self.PIPE[1])
def handle_usr1(self, sig, frame):
self.log.reopen_files()
def handle_exit(self, sig, frame):
self.alive = False
def handle_quit(self, sig, frame):
self.alive = False
# worker_int callback
self.cfg.worker_int(self)
time.sleep(0.1)
sys.exit(0)
def handle_abort(self, sig, frame):
self.alive = False
self.cfg.worker_abort(self)
sys.exit(1)
def handle_error(self, req, client, addr, exc):
request_start = datetime.now()
addr = addr or ('', -1) # unix socket case
if isinstance(exc, (
InvalidRequestLine, InvalidRequestMethod,
InvalidHTTPVersion, InvalidHeader, InvalidHeaderName,
LimitRequestLine, LimitRequestHeaders,
InvalidProxyLine, ForbiddenProxyRequest,
InvalidSchemeHeaders, UnsupportedTransferCoding,
ConfigurationProblem, ObsoleteFolding,
SSLError,
)):
status_int = 400
reason = "Bad Request"
if isinstance(exc, InvalidRequestLine):
mesg = "Invalid Request Line '%s'" % str(exc)
elif isinstance(exc, InvalidRequestMethod):
mesg = "Invalid Method '%s'" % str(exc)
elif isinstance(exc, InvalidHTTPVersion):
mesg = "Invalid HTTP Version '%s'" % str(exc)
elif isinstance(exc, UnsupportedTransferCoding):
mesg = "%s" % str(exc)
status_int = 501
elif isinstance(exc, ConfigurationProblem):
mesg = "%s" % str(exc)
status_int = 500
elif isinstance(exc, ObsoleteFolding):
mesg = "%s" % str(exc)
elif isinstance(exc, (InvalidHeaderName, InvalidHeader,)):
mesg = "%s" % str(exc)
if not req and hasattr(exc, "req"):
req = exc.req # for access log
elif isinstance(exc, LimitRequestLine):
mesg = "%s" % str(exc)
elif isinstance(exc, LimitRequestHeaders):
reason = "Request Header Fields Too Large"
mesg = "Error parsing headers: '%s'" % str(exc)
status_int = 431
elif isinstance(exc, InvalidProxyLine):
mesg = "'%s'" % str(exc)
elif isinstance(exc, ForbiddenProxyRequest):
reason = "Forbidden"
mesg = "Request forbidden"
status_int = 403
elif isinstance(exc, InvalidSchemeHeaders):
mesg = "%s" % str(exc)
elif isinstance(exc, SSLError):
reason = "Forbidden"
mesg = "'%s'" % str(exc)
status_int = 403
msg = "Invalid request from ip={ip}: {error}"
self.log.warning(msg.format(ip=addr[0], error=str(exc)))
else:
if hasattr(req, "uri"):
self.log.exception("Error handling request %s", req.uri)
else:
self.log.exception("Error handling request (no URI read)")
status_int = 500
reason = "Internal Server Error"
mesg = ""
if req is not None:
request_time = datetime.now() - request_start
environ = default_environ(req, client, self.cfg)
environ['REMOTE_ADDR'] = addr[0]
environ['REMOTE_PORT'] = str(addr[1])
resp = Response(req, client, self.cfg)
resp.status = "%s %s" % (status_int, reason)
resp.response_length = len(mesg)
self.log.access(resp, req, environ, request_time)
try:
util.write_error(client, status_int, reason, mesg)
except Exception:
self.log.debug("Failed to send error message.")
def handle_winch(self, sig, fname):
# Ignore SIGWINCH in worker. Fixes a crash on OpenBSD.
self.log.debug("worker: SIGWINCH ignored.")

View file

@ -0,0 +1,147 @@
#
# This file is part of gunicorn released under the MIT license.
# See the NOTICE for more information.
from datetime import datetime
import errno
import socket
import ssl
import sys
from gunicorn import http
from gunicorn.http import wsgi
from gunicorn import util
from gunicorn.workers import base
ALREADY_HANDLED = object()
class AsyncWorker(base.Worker):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.worker_connections = self.cfg.worker_connections
def timeout_ctx(self):
raise NotImplementedError()
def is_already_handled(self, respiter):
# some workers will need to overload this function to raise a StopIteration
return respiter == ALREADY_HANDLED
def handle(self, listener, client, addr):
req = None
try:
parser = http.RequestParser(self.cfg, client, addr)
try:
listener_name = listener.getsockname()
if not self.cfg.keepalive:
req = next(parser)
self.handle_request(listener_name, req, client, addr)
else:
# keepalive loop
proxy_protocol_info = {}
while True:
req = None
with self.timeout_ctx():
req = next(parser)
if not req:
break
if req.proxy_protocol_info:
proxy_protocol_info = req.proxy_protocol_info
else:
req.proxy_protocol_info = proxy_protocol_info
self.handle_request(listener_name, req, client, addr)
except http.errors.NoMoreData as e:
self.log.debug("Ignored premature client disconnection. %s", e)
except StopIteration as e:
self.log.debug("Closing connection. %s", e)
except ssl.SSLError:
# pass to next try-except level
util.reraise(*sys.exc_info())
except OSError:
# pass to next try-except level
util.reraise(*sys.exc_info())
except Exception as e:
self.handle_error(req, client, addr, e)
except ssl.SSLError as e:
if e.args[0] == ssl.SSL_ERROR_EOF:
self.log.debug("ssl connection closed")
client.close()
else:
self.log.debug("Error processing SSL request.")
self.handle_error(req, client, addr, e)
except OSError as e:
if e.errno not in (errno.EPIPE, errno.ECONNRESET, errno.ENOTCONN):
self.log.exception("Socket error processing request.")
else:
if e.errno == errno.ECONNRESET:
self.log.debug("Ignoring connection reset")
elif e.errno == errno.ENOTCONN:
self.log.debug("Ignoring socket not connected")
else:
self.log.debug("Ignoring EPIPE")
except BaseException as e:
self.handle_error(req, client, addr, e)
finally:
util.close(client)
def handle_request(self, listener_name, req, sock, addr):
request_start = datetime.now()
environ = {}
resp = None
try:
self.cfg.pre_request(self, req)
resp, environ = wsgi.create(req, sock, addr,
listener_name, self.cfg)
environ["wsgi.multithread"] = True
self.nr += 1
if self.nr >= self.max_requests:
if self.alive:
self.log.info("Autorestarting worker after current request.")
self.alive = False
if not self.alive or not self.cfg.keepalive:
resp.force_close()
respiter = self.wsgi(environ, resp.start_response)
if self.is_already_handled(respiter):
return False
try:
if isinstance(respiter, environ['wsgi.file_wrapper']):
resp.write_file(respiter)
else:
for item in respiter:
resp.write(item)
resp.close()
finally:
request_time = datetime.now() - request_start
self.log.access(resp, req, environ, request_time)
if hasattr(respiter, "close"):
respiter.close()
if resp.should_close():
raise StopIteration()
except StopIteration:
raise
except OSError:
# If the original exception was a socket.error we delegate
# handling it to the caller (where handle() might ignore it)
util.reraise(*sys.exc_info())
except Exception:
if resp and resp.headers_sent:
# If the requests have already been sent, we should close the
# connection to indicate the error.
self.log.exception("Error handling request")
try:
sock.shutdown(socket.SHUT_RDWR)
sock.close()
except OSError:
pass
raise StopIteration()
raise
finally:
try:
self.cfg.post_request(self, req, environ, resp)
except Exception:
self.log.exception("Exception in post_request hook")
return True

View file

@ -0,0 +1,186 @@
#
# This file is part of gunicorn released under the MIT license.
# See the NOTICE for more information.
from functools import partial
import sys
try:
import eventlet
except ImportError:
raise RuntimeError("eventlet worker requires eventlet 0.24.1 or higher")
else:
from packaging.version import parse as parse_version
if parse_version(eventlet.__version__) < parse_version('0.24.1'):
raise RuntimeError("eventlet worker requires eventlet 0.24.1 or higher")
from eventlet import hubs, greenthread
from eventlet.greenio import GreenSocket
import eventlet.wsgi
import greenlet
from gunicorn.workers.base_async import AsyncWorker
from gunicorn.sock import ssl_wrap_socket
# ALREADY_HANDLED is removed in 0.30.3+ now it's `WSGI_LOCAL.already_handled: bool`
# https://github.com/eventlet/eventlet/pull/544
EVENTLET_WSGI_LOCAL = getattr(eventlet.wsgi, "WSGI_LOCAL", None)
EVENTLET_ALREADY_HANDLED = getattr(eventlet.wsgi, "ALREADY_HANDLED", None)
def _eventlet_socket_sendfile(self, file, offset=0, count=None):
# Based on the implementation in gevent which in turn is slightly
# modified from the standard library implementation.
if self.gettimeout() == 0:
raise ValueError("non-blocking sockets are not supported")
if offset:
file.seek(offset)
blocksize = min(count, 8192) if count else 8192
total_sent = 0
# localize variable access to minimize overhead
file_read = file.read
sock_send = self.send
try:
while True:
if count:
blocksize = min(count - total_sent, blocksize)
if blocksize <= 0:
break
data = memoryview(file_read(blocksize))
if not data:
break # EOF
while True:
try:
sent = sock_send(data)
except BlockingIOError:
continue
else:
total_sent += sent
if sent < len(data):
data = data[sent:]
else:
break
return total_sent
finally:
if total_sent > 0 and hasattr(file, 'seek'):
file.seek(offset + total_sent)
def _eventlet_serve(sock, handle, concurrency):
"""
Serve requests forever.
This code is nearly identical to ``eventlet.convenience.serve`` except
that it attempts to join the pool at the end, which allows for gunicorn
graceful shutdowns.
"""
pool = eventlet.greenpool.GreenPool(concurrency)
server_gt = eventlet.greenthread.getcurrent()
while True:
try:
conn, addr = sock.accept()
gt = pool.spawn(handle, conn, addr)
gt.link(_eventlet_stop, server_gt, conn)
conn, addr, gt = None, None, None
except eventlet.StopServe:
sock.close()
pool.waitall()
return
def _eventlet_stop(client, server, conn):
"""
Stop a greenlet handling a request and close its connection.
This code is lifted from eventlet so as not to depend on undocumented
functions in the library.
"""
try:
try:
client.wait()
finally:
conn.close()
except greenlet.GreenletExit:
pass
except Exception:
greenthread.kill(server, *sys.exc_info())
def patch_sendfile():
# As of eventlet 0.25.1, GreenSocket.sendfile doesn't exist,
# meaning the native implementations of socket.sendfile will be used.
# If os.sendfile exists, it will attempt to use that, failing explicitly
# if the socket is in non-blocking mode, which the underlying
# socket object /is/. Even the regular _sendfile_use_send will
# fail in that way; plus, it would use the underlying socket.send which isn't
# properly cooperative. So we have to monkey-patch a working socket.sendfile()
# into GreenSocket; in this method, `self.send` will be the GreenSocket's
# send method which is properly cooperative.
if not hasattr(GreenSocket, 'sendfile'):
GreenSocket.sendfile = _eventlet_socket_sendfile
class EventletWorker(AsyncWorker):
def patch(self):
hubs.use_hub()
eventlet.monkey_patch()
patch_sendfile()
def is_already_handled(self, respiter):
# eventlet >= 0.30.3
if getattr(EVENTLET_WSGI_LOCAL, "already_handled", None):
raise StopIteration()
# eventlet < 0.30.3
if respiter == EVENTLET_ALREADY_HANDLED:
raise StopIteration()
return super().is_already_handled(respiter)
def init_process(self):
self.patch()
super().init_process()
def handle_quit(self, sig, frame):
eventlet.spawn(super().handle_quit, sig, frame)
def handle_usr1(self, sig, frame):
eventlet.spawn(super().handle_usr1, sig, frame)
def timeout_ctx(self):
return eventlet.Timeout(self.cfg.keepalive or None, False)
def handle(self, listener, client, addr):
if self.cfg.is_ssl:
client = ssl_wrap_socket(client, self.cfg)
super().handle(listener, client, addr)
def run(self):
acceptors = []
for sock in self.sockets:
gsock = GreenSocket(sock)
gsock.setblocking(1)
hfun = partial(self.handle, gsock)
acceptor = eventlet.spawn(_eventlet_serve, gsock, hfun,
self.worker_connections)
acceptors.append(acceptor)
eventlet.sleep(0.0)
while self.alive:
self.notify()
eventlet.sleep(1.0)
self.notify()
t = None
try:
with eventlet.Timeout(self.cfg.graceful_timeout) as t:
for a in acceptors:
a.kill(eventlet.StopServe())
for a in acceptors:
a.wait()
except eventlet.Timeout as te:
if te != t:
raise
for a in acceptors:
a.kill()

View file

@ -0,0 +1,193 @@
#
# This file is part of gunicorn released under the MIT license.
# See the NOTICE for more information.
import os
import sys
from datetime import datetime
from functools import partial
import time
try:
import gevent
except ImportError:
raise RuntimeError("gevent worker requires gevent 1.4 or higher")
else:
from packaging.version import parse as parse_version
if parse_version(gevent.__version__) < parse_version('1.4'):
raise RuntimeError("gevent worker requires gevent 1.4 or higher")
from gevent.pool import Pool
from gevent.server import StreamServer
from gevent import hub, monkey, socket, pywsgi
import gunicorn
from gunicorn.http.wsgi import base_environ
from gunicorn.sock import ssl_context
from gunicorn.workers.base_async import AsyncWorker
VERSION = "gevent/%s gunicorn/%s" % (gevent.__version__, gunicorn.__version__)
class GeventWorker(AsyncWorker):
server_class = None
wsgi_handler = None
def patch(self):
monkey.patch_all()
# patch sockets
sockets = []
for s in self.sockets:
sockets.append(socket.socket(s.FAMILY, socket.SOCK_STREAM,
fileno=s.sock.fileno()))
self.sockets = sockets
def notify(self):
super().notify()
if self.ppid != os.getppid():
self.log.info("Parent changed, shutting down: %s", self)
sys.exit(0)
def timeout_ctx(self):
return gevent.Timeout(self.cfg.keepalive, False)
def run(self):
servers = []
ssl_args = {}
if self.cfg.is_ssl:
ssl_args = {"ssl_context": ssl_context(self.cfg)}
for s in self.sockets:
s.setblocking(1)
pool = Pool(self.worker_connections)
if self.server_class is not None:
environ = base_environ(self.cfg)
environ.update({
"wsgi.multithread": True,
"SERVER_SOFTWARE": VERSION,
})
server = self.server_class(
s, application=self.wsgi, spawn=pool, log=self.log,
handler_class=self.wsgi_handler, environ=environ,
**ssl_args)
else:
hfun = partial(self.handle, s)
server = StreamServer(s, handle=hfun, spawn=pool, **ssl_args)
if self.cfg.workers > 1:
server.max_accept = 1
server.start()
servers.append(server)
while self.alive:
self.notify()
gevent.sleep(1.0)
try:
# Stop accepting requests
for server in servers:
if hasattr(server, 'close'): # gevent 1.0
server.close()
if hasattr(server, 'kill'): # gevent < 1.0
server.kill()
# Handle current requests until graceful_timeout
ts = time.time()
while time.time() - ts <= self.cfg.graceful_timeout:
accepting = 0
for server in servers:
if server.pool.free_count() != server.pool.size:
accepting += 1
# if no server is accepting a connection, we can exit
if not accepting:
return
self.notify()
gevent.sleep(1.0)
# Force kill all active the handlers
self.log.warning("Worker graceful timeout (pid:%s)", self.pid)
for server in servers:
server.stop(timeout=1)
except Exception:
pass
def handle(self, listener, client, addr):
# Connected socket timeout defaults to socket.getdefaulttimeout().
# This forces to blocking mode.
client.setblocking(1)
super().handle(listener, client, addr)
def handle_request(self, listener_name, req, sock, addr):
try:
super().handle_request(listener_name, req, sock, addr)
except gevent.GreenletExit:
pass
except SystemExit:
pass
def handle_quit(self, sig, frame):
# Move this out of the signal handler so we can use
# blocking calls. See #1126
gevent.spawn(super().handle_quit, sig, frame)
def handle_usr1(self, sig, frame):
# Make the gevent workers handle the usr1 signal
# by deferring to a new greenlet. See #1645
gevent.spawn(super().handle_usr1, sig, frame)
def init_process(self):
self.patch()
hub.reinit()
super().init_process()
class GeventResponse:
status = None
headers = None
sent = None
def __init__(self, status, headers, clength):
self.status = status
self.headers = headers
self.sent = clength
class PyWSGIHandler(pywsgi.WSGIHandler):
def log_request(self):
start = datetime.fromtimestamp(self.time_start)
finish = datetime.fromtimestamp(self.time_finish)
response_time = finish - start
resp_headers = getattr(self, 'response_headers', {})
# Status is expected to be a string but is encoded to bytes in gevent for PY3
# Except when it isn't because gevent uses hardcoded strings for network errors.
status = self.status.decode() if isinstance(self.status, bytes) else self.status
resp = GeventResponse(status, resp_headers, self.response_length)
if hasattr(self, 'headers'):
req_headers = self.headers.items()
else:
req_headers = []
self.server.log.access(resp, req_headers, self.environ, response_time)
def get_environ(self):
env = super().get_environ()
env['gunicorn.sock'] = self.socket
env['RAW_URI'] = self.path
return env
class PyWSGIServer(pywsgi.WSGIServer):
pass
class GeventPyWSGIWorker(GeventWorker):
"The Gevent StreamServer based workers."
server_class = PyWSGIServer
wsgi_handler = PyWSGIHandler

View file

@ -0,0 +1,372 @@
#
# This file is part of gunicorn released under the MIT license.
# See the NOTICE for more information.
# design:
# A threaded worker accepts connections in the main loop, accepted
# connections are added to the thread pool as a connection job.
# Keepalive connections are put back in the loop waiting for an event.
# If no event happen after the keep alive timeout, the connection is
# closed.
# pylint: disable=no-else-break
from concurrent import futures
import errno
import os
import selectors
import socket
import ssl
import sys
import time
from collections import deque
from datetime import datetime
from functools import partial
from threading import RLock
from . import base
from .. import http
from .. import util
from .. import sock
from ..http import wsgi
class TConn:
def __init__(self, cfg, sock, client, server):
self.cfg = cfg
self.sock = sock
self.client = client
self.server = server
self.timeout = None
self.parser = None
self.initialized = False
# set the socket to non blocking
self.sock.setblocking(False)
def init(self):
self.initialized = True
self.sock.setblocking(True)
if self.parser is None:
# wrap the socket if needed
if self.cfg.is_ssl:
self.sock = sock.ssl_wrap_socket(self.sock, self.cfg)
# initialize the parser
self.parser = http.RequestParser(self.cfg, self.sock, self.client)
def set_timeout(self):
# set the timeout
self.timeout = time.time() + self.cfg.keepalive
def close(self):
util.close(self.sock)
class ThreadWorker(base.Worker):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.worker_connections = self.cfg.worker_connections
self.max_keepalived = self.cfg.worker_connections - self.cfg.threads
# initialise the pool
self.tpool = None
self.poller = None
self._lock = None
self.futures = deque()
self._keep = deque()
self.nr_conns = 0
@classmethod
def check_config(cls, cfg, log):
max_keepalived = cfg.worker_connections - cfg.threads
if max_keepalived <= 0 and cfg.keepalive:
log.warning("No keepalived connections can be handled. " +
"Check the number of worker connections and threads.")
def init_process(self):
self.tpool = self.get_thread_pool()
self.poller = selectors.DefaultSelector()
self._lock = RLock()
super().init_process()
def get_thread_pool(self):
"""Override this method to customize how the thread pool is created"""
return futures.ThreadPoolExecutor(max_workers=self.cfg.threads)
def handle_quit(self, sig, frame):
self.alive = False
# worker_int callback
self.cfg.worker_int(self)
self.tpool.shutdown(False)
time.sleep(0.1)
sys.exit(0)
def _wrap_future(self, fs, conn):
fs.conn = conn
self.futures.append(fs)
fs.add_done_callback(self.finish_request)
def enqueue_req(self, conn):
conn.init()
# submit the connection to a worker
fs = self.tpool.submit(self.handle, conn)
self._wrap_future(fs, conn)
def accept(self, server, listener):
try:
sock, client = listener.accept()
# initialize the connection object
conn = TConn(self.cfg, sock, client, server)
self.nr_conns += 1
# wait until socket is readable
with self._lock:
self.poller.register(conn.sock, selectors.EVENT_READ,
partial(self.on_client_socket_readable, conn))
except OSError as e:
if e.errno not in (errno.EAGAIN, errno.ECONNABORTED,
errno.EWOULDBLOCK):
raise
def on_client_socket_readable(self, conn, client):
with self._lock:
# unregister the client from the poller
self.poller.unregister(client)
if conn.initialized:
# remove the connection from keepalive
try:
self._keep.remove(conn)
except ValueError:
# race condition
return
# submit the connection to a worker
self.enqueue_req(conn)
def murder_keepalived(self):
now = time.time()
while True:
with self._lock:
try:
# remove the connection from the queue
conn = self._keep.popleft()
except IndexError:
break
delta = conn.timeout - now
if delta > 0:
# add the connection back to the queue
with self._lock:
self._keep.appendleft(conn)
break
else:
self.nr_conns -= 1
# remove the socket from the poller
with self._lock:
try:
self.poller.unregister(conn.sock)
except OSError as e:
if e.errno != errno.EBADF:
raise
except KeyError:
# already removed by the system, continue
pass
except ValueError:
# already removed by the system continue
pass
# close the socket
conn.close()
def is_parent_alive(self):
# If our parent changed then we shut down.
if self.ppid != os.getppid():
self.log.info("Parent changed, shutting down: %s", self)
return False
return True
def run(self):
# init listeners, add them to the event loop
for sock in self.sockets:
sock.setblocking(False)
# a race condition during graceful shutdown may make the listener
# name unavailable in the request handler so capture it once here
server = sock.getsockname()
acceptor = partial(self.accept, server)
self.poller.register(sock, selectors.EVENT_READ, acceptor)
while self.alive:
# notify the arbiter we are alive
self.notify()
# can we accept more connections?
if self.nr_conns < self.worker_connections:
# wait for an event
events = self.poller.select(1.0)
for key, _ in events:
callback = key.data
callback(key.fileobj)
# check (but do not wait) for finished requests
result = futures.wait(self.futures, timeout=0,
return_when=futures.FIRST_COMPLETED)
else:
# wait for a request to finish
result = futures.wait(self.futures, timeout=1.0,
return_when=futures.FIRST_COMPLETED)
# clean up finished requests
for fut in result.done:
self.futures.remove(fut)
if not self.is_parent_alive():
break
# handle keepalive timeouts
self.murder_keepalived()
self.tpool.shutdown(False)
self.poller.close()
for s in self.sockets:
s.close()
futures.wait(self.futures, timeout=self.cfg.graceful_timeout)
def finish_request(self, fs):
if fs.cancelled():
self.nr_conns -= 1
fs.conn.close()
return
try:
(keepalive, conn) = fs.result()
# if the connection should be kept alived add it
# to the eventloop and record it
if keepalive and self.alive:
# flag the socket as non blocked
conn.sock.setblocking(False)
# register the connection
conn.set_timeout()
with self._lock:
self._keep.append(conn)
# add the socket to the event loop
self.poller.register(conn.sock, selectors.EVENT_READ,
partial(self.on_client_socket_readable, conn))
else:
self.nr_conns -= 1
conn.close()
except Exception:
# an exception happened, make sure to close the
# socket.
self.nr_conns -= 1
fs.conn.close()
def handle(self, conn):
keepalive = False
req = None
try:
req = next(conn.parser)
if not req:
return (False, conn)
# handle the request
keepalive = self.handle_request(req, conn)
if keepalive:
return (keepalive, conn)
except http.errors.NoMoreData as e:
self.log.debug("Ignored premature client disconnection. %s", e)
except StopIteration as e:
self.log.debug("Closing connection. %s", e)
except ssl.SSLError as e:
if e.args[0] == ssl.SSL_ERROR_EOF:
self.log.debug("ssl connection closed")
conn.sock.close()
else:
self.log.debug("Error processing SSL request.")
self.handle_error(req, conn.sock, conn.client, e)
except OSError as e:
if e.errno not in (errno.EPIPE, errno.ECONNRESET, errno.ENOTCONN):
self.log.exception("Socket error processing request.")
else:
if e.errno == errno.ECONNRESET:
self.log.debug("Ignoring connection reset")
elif e.errno == errno.ENOTCONN:
self.log.debug("Ignoring socket not connected")
else:
self.log.debug("Ignoring connection epipe")
except Exception as e:
self.handle_error(req, conn.sock, conn.client, e)
return (False, conn)
def handle_request(self, req, conn):
environ = {}
resp = None
try:
self.cfg.pre_request(self, req)
request_start = datetime.now()
resp, environ = wsgi.create(req, conn.sock, conn.client,
conn.server, self.cfg)
environ["wsgi.multithread"] = True
self.nr += 1
if self.nr >= self.max_requests:
if self.alive:
self.log.info("Autorestarting worker after current request.")
self.alive = False
resp.force_close()
if not self.alive or not self.cfg.keepalive:
resp.force_close()
elif len(self._keep) >= self.max_keepalived:
resp.force_close()
respiter = self.wsgi(environ, resp.start_response)
try:
if isinstance(respiter, environ['wsgi.file_wrapper']):
resp.write_file(respiter)
else:
for item in respiter:
resp.write(item)
resp.close()
finally:
request_time = datetime.now() - request_start
self.log.access(resp, req, environ, request_time)
if hasattr(respiter, "close"):
respiter.close()
if resp.should_close():
self.log.debug("Closing connection.")
return False
except OSError:
# pass to next try-except level
util.reraise(*sys.exc_info())
except Exception:
if resp and resp.headers_sent:
# If the requests have already been sent, we should close the
# connection to indicate the error.
self.log.exception("Error handling request")
try:
conn.sock.shutdown(socket.SHUT_RDWR)
conn.sock.close()
except OSError:
pass
raise StopIteration()
raise
finally:
try:
self.cfg.post_request(self, req, environ, resp)
except Exception:
self.log.exception("Exception in post_request hook")
return True

View file

@ -0,0 +1,166 @@
#
# This file is part of gunicorn released under the MIT license.
# See the NOTICE for more information.
import os
import sys
try:
import tornado
except ImportError:
raise RuntimeError("You need tornado installed to use this worker.")
import tornado.web
import tornado.httpserver
from tornado.ioloop import IOLoop, PeriodicCallback
from tornado.wsgi import WSGIContainer
from gunicorn.workers.base import Worker
from gunicorn import __version__ as gversion
from gunicorn.sock import ssl_context
# Tornado 5.0 updated its IOLoop, and the `io_loop` arguments to many
# Tornado functions have been removed in Tornado 5.0. Also, they no
# longer store PeriodCallbacks in ioloop._callbacks. Instead we store
# them on our side, and use stop() on them when stopping the worker.
# See https://www.tornadoweb.org/en/stable/releases/v5.0.0.html#backwards-compatibility-notes
# for more details.
TORNADO5 = tornado.version_info >= (5, 0, 0)
class TornadoWorker(Worker):
@classmethod
def setup(cls):
web = sys.modules.pop("tornado.web")
old_clear = web.RequestHandler.clear
def clear(self):
old_clear(self)
if "Gunicorn" not in self._headers["Server"]:
self._headers["Server"] += " (Gunicorn/%s)" % gversion
web.RequestHandler.clear = clear
sys.modules["tornado.web"] = web
def handle_exit(self, sig, frame):
if self.alive:
super().handle_exit(sig, frame)
def handle_request(self):
self.nr += 1
if self.alive and self.nr >= self.max_requests:
self.log.info("Autorestarting worker after current request.")
self.alive = False
def watchdog(self):
if self.alive:
self.notify()
if self.ppid != os.getppid():
self.log.info("Parent changed, shutting down: %s", self)
self.alive = False
def heartbeat(self):
if not self.alive:
if self.server_alive:
if hasattr(self, 'server'):
try:
self.server.stop()
except Exception:
pass
self.server_alive = False
else:
if TORNADO5:
for callback in self.callbacks:
callback.stop()
self.ioloop.stop()
else:
if not self.ioloop._callbacks:
self.ioloop.stop()
def init_process(self):
# IOLoop cannot survive a fork or be shared across processes
# in any way. When multiple processes are being used, each process
# should create its own IOLoop. We should clear current IOLoop
# if exists before os.fork.
IOLoop.clear_current()
super().init_process()
def run(self):
self.ioloop = IOLoop.instance()
self.alive = True
self.server_alive = False
if TORNADO5:
self.callbacks = []
self.callbacks.append(PeriodicCallback(self.watchdog, 1000))
self.callbacks.append(PeriodicCallback(self.heartbeat, 1000))
for callback in self.callbacks:
callback.start()
else:
PeriodicCallback(self.watchdog, 1000, io_loop=self.ioloop).start()
PeriodicCallback(self.heartbeat, 1000, io_loop=self.ioloop).start()
# Assume the app is a WSGI callable if its not an
# instance of tornado.web.Application or is an
# instance of tornado.wsgi.WSGIApplication
app = self.wsgi
if tornado.version_info[0] < 6:
if not isinstance(app, tornado.web.Application) or \
isinstance(app, tornado.wsgi.WSGIApplication):
app = WSGIContainer(app)
elif not isinstance(app, WSGIContainer) and \
not isinstance(app, tornado.web.Application):
app = WSGIContainer(app)
# Monkey-patching HTTPConnection.finish to count the
# number of requests being handled by Tornado. This
# will help gunicorn shutdown the worker if max_requests
# is exceeded.
httpserver = sys.modules["tornado.httpserver"]
if hasattr(httpserver, 'HTTPConnection'):
old_connection_finish = httpserver.HTTPConnection.finish
def finish(other):
self.handle_request()
old_connection_finish(other)
httpserver.HTTPConnection.finish = finish
sys.modules["tornado.httpserver"] = httpserver
server_class = tornado.httpserver.HTTPServer
else:
class _HTTPServer(tornado.httpserver.HTTPServer):
def on_close(instance, server_conn):
self.handle_request()
super().on_close(server_conn)
server_class = _HTTPServer
if self.cfg.is_ssl:
if TORNADO5:
server = server_class(app, ssl_options=ssl_context(self.cfg))
else:
server = server_class(app, io_loop=self.ioloop,
ssl_options=ssl_context(self.cfg))
else:
if TORNADO5:
server = server_class(app)
else:
server = server_class(app, io_loop=self.ioloop)
self.server = server
self.server_alive = True
for s in self.sockets:
s.setblocking(0)
if hasattr(server, "add_socket"): # tornado > 2.0
server.add_socket(s)
elif hasattr(server, "_sockets"): # tornado 2.0
server._sockets[s.fileno()] = s
server.no_keep_alive = self.cfg.keepalive <= 0
server.start(num_processes=1)
self.ioloop.start()

View file

@ -0,0 +1,209 @@
#
# This file is part of gunicorn released under the MIT license.
# See the NOTICE for more information.
#
from datetime import datetime
import errno
import os
import select
import socket
import ssl
import sys
from gunicorn import http
from gunicorn.http import wsgi
from gunicorn import sock
from gunicorn import util
from gunicorn.workers import base
class StopWaiting(Exception):
""" exception raised to stop waiting for a connection """
class SyncWorker(base.Worker):
def accept(self, listener):
client, addr = listener.accept()
client.setblocking(1)
util.close_on_exec(client)
self.handle(listener, client, addr)
def wait(self, timeout):
try:
self.notify()
ret = select.select(self.wait_fds, [], [], timeout)
if ret[0]:
if self.PIPE[0] in ret[0]:
os.read(self.PIPE[0], 1)
return ret[0]
except OSError as e:
if e.args[0] == errno.EINTR:
return self.sockets
if e.args[0] == errno.EBADF:
if self.nr < 0:
return self.sockets
else:
raise StopWaiting
raise
def is_parent_alive(self):
# If our parent changed then we shut down.
if self.ppid != os.getppid():
self.log.info("Parent changed, shutting down: %s", self)
return False
return True
def run_for_one(self, timeout):
listener = self.sockets[0]
while self.alive:
self.notify()
# Accept a connection. If we get an error telling us
# that no connection is waiting we fall down to the
# select which is where we'll wait for a bit for new
# workers to come give us some love.
try:
self.accept(listener)
# Keep processing clients until no one is waiting. This
# prevents the need to select() for every client that we
# process.
continue
except OSError as e:
if e.errno not in (errno.EAGAIN, errno.ECONNABORTED,
errno.EWOULDBLOCK):
raise
if not self.is_parent_alive():
return
try:
self.wait(timeout)
except StopWaiting:
return
def run_for_multiple(self, timeout):
while self.alive:
self.notify()
try:
ready = self.wait(timeout)
except StopWaiting:
return
if ready is not None:
for listener in ready:
if listener == self.PIPE[0]:
continue
try:
self.accept(listener)
except OSError as e:
if e.errno not in (errno.EAGAIN, errno.ECONNABORTED,
errno.EWOULDBLOCK):
raise
if not self.is_parent_alive():
return
def run(self):
# if no timeout is given the worker will never wait and will
# use the CPU for nothing. This minimal timeout prevent it.
timeout = self.timeout or 0.5
# self.socket appears to lose its blocking status after
# we fork in the arbiter. Reset it here.
for s in self.sockets:
s.setblocking(0)
if len(self.sockets) > 1:
self.run_for_multiple(timeout)
else:
self.run_for_one(timeout)
def handle(self, listener, client, addr):
req = None
try:
if self.cfg.is_ssl:
client = sock.ssl_wrap_socket(client, self.cfg)
parser = http.RequestParser(self.cfg, client, addr)
req = next(parser)
self.handle_request(listener, req, client, addr)
except http.errors.NoMoreData as e:
self.log.debug("Ignored premature client disconnection. %s", e)
except StopIteration as e:
self.log.debug("Closing connection. %s", e)
except ssl.SSLError as e:
if e.args[0] == ssl.SSL_ERROR_EOF:
self.log.debug("ssl connection closed")
client.close()
else:
self.log.debug("Error processing SSL request.")
self.handle_error(req, client, addr, e)
except OSError as e:
if e.errno not in (errno.EPIPE, errno.ECONNRESET, errno.ENOTCONN):
self.log.exception("Socket error processing request.")
else:
if e.errno == errno.ECONNRESET:
self.log.debug("Ignoring connection reset")
elif e.errno == errno.ENOTCONN:
self.log.debug("Ignoring socket not connected")
else:
self.log.debug("Ignoring EPIPE")
except BaseException as e:
self.handle_error(req, client, addr, e)
finally:
util.close(client)
def handle_request(self, listener, req, client, addr):
environ = {}
resp = None
try:
self.cfg.pre_request(self, req)
request_start = datetime.now()
resp, environ = wsgi.create(req, client, addr,
listener.getsockname(), self.cfg)
# Force the connection closed until someone shows
# a buffering proxy that supports Keep-Alive to
# the backend.
resp.force_close()
self.nr += 1
if self.nr >= self.max_requests:
self.log.info("Autorestarting worker after current request.")
self.alive = False
respiter = self.wsgi(environ, resp.start_response)
try:
if isinstance(respiter, environ['wsgi.file_wrapper']):
resp.write_file(respiter)
else:
for item in respiter:
resp.write(item)
resp.close()
finally:
request_time = datetime.now() - request_start
self.log.access(resp, req, environ, request_time)
if hasattr(respiter, "close"):
respiter.close()
except OSError:
# pass to next try-except level
util.reraise(*sys.exc_info())
except Exception:
if resp and resp.headers_sent:
# If the requests have already been sent, we should close the
# connection to indicate the error.
self.log.exception("Error handling request")
try:
client.shutdown(socket.SHUT_RDWR)
client.close()
except OSError:
pass
raise StopIteration()
raise
finally:
try:
self.cfg.post_request(self, req, environ, resp)
except Exception:
self.log.exception("Exception in post_request hook")

View file

@ -0,0 +1,53 @@
#
# This file is part of gunicorn released under the MIT license.
# See the NOTICE for more information.
import os
import time
import platform
import tempfile
from gunicorn import util
PLATFORM = platform.system()
IS_CYGWIN = PLATFORM.startswith('CYGWIN')
class WorkerTmp:
def __init__(self, cfg):
old_umask = os.umask(cfg.umask)
fdir = cfg.worker_tmp_dir
if fdir and not os.path.isdir(fdir):
raise RuntimeError("%s doesn't exist. Can't create workertmp." % fdir)
fd, name = tempfile.mkstemp(prefix="wgunicorn-", dir=fdir)
os.umask(old_umask)
# change the owner and group of the file if the worker will run as
# a different user or group, so that the worker can modify the file
if cfg.uid != os.geteuid() or cfg.gid != os.getegid():
util.chown(name, cfg.uid, cfg.gid)
# unlink the file so we don't leak temporary files
try:
if not IS_CYGWIN:
util.unlink(name)
# In Python 3.8, open() emits RuntimeWarning if buffering=1 for binary mode.
# Because we never write to this file, pass 0 to switch buffering off.
self._tmp = os.fdopen(fd, 'w+b', 0)
except Exception:
os.close(fd)
raise
def notify(self):
new_time = time.monotonic()
os.utime(self._tmp.fileno(), (new_time, new_time))
def last_update(self):
return os.fstat(self._tmp.fileno()).st_mtime
def fileno(self):
return self._tmp.fileno()
def close(self):
return self._tmp.close()