Last active
September 21, 2018 09:00
-
-
Save hashbrowncipher/33e7f70c8929d8ba47728800bb30f852 to your computer and use it in GitHub Desktop.
Multiprocess WSGI server using gevent==1.3.6, demonstrating sendfile
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
""" | |
Copyright 2018 Josh Snyder | |
Licensed under the Apache License, Version 2.0 (the "License"); | |
you may not use this file except in compliance with the License. | |
You may obtain a copy of the License at | |
http://www.apache.org/licenses/LICENSE-2.0 | |
Unless required by applicable law or agreed to in writing, software | |
distributed under the License is distributed on an "AS IS" BASIS, | |
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
See the License for the specific language governing permissions and | |
limitations under the License. | |
""" | |
from gevent.monkey import patch_all | |
patch_all() | |
from errno import EAGAIN | |
from os import WIFSIGNALED | |
from socket import SOL_SOCKET | |
from socket import SO_REUSEADDR | |
from socket import SO_REUSEPORT | |
from socket import socket | |
import os | |
import sys | |
from gevent import pywsgi | |
from gevent.socket import wait_write | |
from gevent.queue import Channel | |
import gevent | |
def cooperative_sendfile(out_fh, in_fh, count): | |
while count > 0: | |
try: | |
sent = os.sendfile(out_fh, in_fh, None, count) | |
if sent == 0: | |
raise EOFError('Reached EOF with {} remaining bytes'.format(count)) | |
count -= sent | |
except OSError as e: | |
if e.args[0] == errno.EAGAIN: | |
wait_write(out_fh) | |
else: | |
raise | |
class FileChunk: | |
def __init__(self, size): | |
self._size = size | |
def __len__(self): | |
return self._size | |
class FileWrapper(object): | |
__len__ = None | |
def __init__(self, fh): | |
self._fh = fh | |
self._size = os.stat(fh.fileno()).st_size | |
def __iter__(self): | |
yield FileChunk(self._size) | |
def write(self, socket): | |
cooperative_sendfile(socket.fileno(), self._fh.fileno(), self._size) | |
class WSGIHandler(pywsgi.WSGIHandler): | |
def process_result(self): | |
if not isinstance(self.result, FileWrapper): | |
return super().process_result() | |
self._write_with_headers(b'') | |
self.result.write(self.socket) | |
class WSGIServer(pywsgi.WSGIServer): | |
handler_class = WSGIHandler | |
def set_environ(self, environ=None): | |
super().set_environ(environ) | |
if self.environ.get('wsgi.file_wrapper') is None: | |
self.environ['wsgi.file_wrapper'] = FileWrapper | |
def application(environ, start_response): | |
if environ['PATH_INFO'] == '/': | |
start_response('200 OK', [('Content-Type', 'text/plain')]) | |
return FileWrapper(open('multicore_server.py', 'rb')) | |
start_response('404 Not Found', [('Content-Type', 'text/html')]) | |
return [b'<h1>Not Found</h1>'] | |
def listen(address, backlog=32): | |
"""Create a TCP socket, bind it and listen on it. | |
This represents a tradeoff. We could have created the listen socket in the | |
master and forked it into each child process, but doing so would cause | |
all of the processes to wake up each time a new connection is formed. | |
Only one of them would be able to successfully accept(). | |
One solution is to use SO_REUSEPORT, as below. | |
Another would be to use EPOLLEXCLUSIVE. | |
""" | |
ret = socket() | |
ret.setsockopt(SOL_SOCKET, SO_REUSEADDR, True) | |
ret.setsockopt(SOL_SOCKET, SO_REUSEPORT, True) | |
ret.bind(address) | |
ret.listen(backlog) | |
return ret | |
def serve(identifier, port): | |
sock = listen(('0.0.0.0', port)) | |
server = WSGIServer(sock, application) | |
server.serve_forever() | |
deathpipe = None | |
def die_on_eof(fd): | |
try: | |
while os.read(fd, 4096): | |
pass | |
except: | |
pass | |
sys.exit(1) | |
def wait_on_process(pid, done_queue): | |
done_queue.put(os.waitpid(pid, 0)) | |
def spawn_process(done_queue, target, args=()): | |
global deathpipe | |
if deathpipe is None: | |
deathpipe = os.pipe() | |
pid = os.fork() | |
if pid: | |
gevent.spawn(wait_on_process, pid, done_queue) | |
return | |
sys.stdin.close() | |
os.close(0) | |
os.close(deathpipe[1]) | |
gevent.spawn(die_on_eof, deathpipe[0]) | |
return target(*args) | |
def main(): | |
port = int(sys.argv[1]) | |
done_queue = Channel() | |
def launch_process(): | |
spawn_process(done_queue, serve, args=(i, port)) | |
for i in range(20): | |
launch_process() | |
for pid, status in done_queue: | |
if WIFSIGNALED(status): | |
print('Process {} exited with signal {}'.format(pid, status)) | |
launch_process() | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment