-
-
Save lwzm/820664528b102634054f9e8b31183fb0 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import multiprocessing.connection | |
import os | |
import time | |
import signal | |
def wait(*_): | |
while True: | |
try: | |
child, status = os.wait() | |
if status: | |
exit, signal = status >> 8, status & 0xFF | |
print(child, exit, signal) | |
except ChildProcessError: | |
break | |
def work(r: multiprocessing.connection.Connection, q: multiprocessing.SimpleQueue): | |
import random | |
import string | |
print(time.time(), os.getpid(), r, q) | |
signal.signal(signal.SIGCHLD, wait) | |
while True: | |
msg = r.recv() | |
id, *_ = msg | |
pid = os.fork() | |
if pid == 0: | |
with open(f".tmp/{id}", "w") as f: | |
print(msg, "ok", file=f) | |
for i in range(10240): | |
f.write("".join(random.choices(string.ascii_letters, k=80))) | |
f.write("\n") | |
#time.sleep(6) | |
q.put(msg) | |
return | |
if __name__ == '__main__': | |
import mpw | |
mpw.run() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
multi processing workers | |
""" | |
import asyncio | |
import collections | |
import multiprocessing | |
import os | |
import traceback | |
from tornado.web import RequestHandler, Application, HTTPError | |
from m import work | |
CONCURRENT = int(os.environ.get("CONCURRENT", 100)) | |
RESERVED = 10 | |
mp = multiprocessing.get_context("spawn") | |
q = mp.SimpleQueue() | |
class Executors(dict): | |
def __missing__(self, key): | |
r, w = mp.Pipe(False) | |
p = mp.Process(target=work, args=(r, q)) | |
p.start() | |
self[key] = p, w | |
return self[key] | |
class Handler(RequestHandler): | |
_pool = collections.deque(range(CONCURRENT + RESERVED)) | |
_futures = collections.defaultdict(asyncio.Future) | |
_executors = Executors() | |
@classmethod | |
def complete(cls, id, result): | |
cls._futures.pop(id).set_result(result) | |
cls._pool.append(id) | |
async def get(self, path): | |
if len(self._pool) <= RESERVED: | |
raise HTTPError(429) | |
id = self._pool.popleft() | |
msg = id, 'xxx' | |
for _ in range(3): | |
try: | |
_, w = self._executors[path] | |
w.send(msg) | |
break | |
except BrokenPipeError: | |
traceback.print_exc(2) | |
self._executors.pop(path) | |
result = await self._futures[id] | |
print(result, file=self) | |
with open(f".tmp/{id}", "rb") as f: | |
self.write(f.read()) | |
self.set_header("Content-Type", "text/plain; charset=UTF-8") | |
def delete(self, path): | |
try: | |
p, _ = self._executors.pop(path) | |
except KeyError: | |
raise HTTPError(404) | |
p.terminate() | |
app = Application([ | |
(r"/(.*)", Handler), | |
]) | |
def _on_q(fd, events): | |
id, result = q.get() | |
Handler.complete(id, result) | |
def run(): | |
from tornado.options import define, parse_command_line, options | |
from tornado.ioloop import IOLoop | |
define("port", default=1111) | |
parse_command_line() | |
app.listen(options.port, xheaders=True) | |
ioloop = IOLoop.current() | |
ioloop.add_handler(q._reader, _on_q, ioloop.READ) | |
ioloop.start() | |
if __name__ == '__main__': | |
"do not run this directly" |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import asyncio | |
import collections | |
import os | |
import time | |
import json | |
import random | |
import signal | |
import string | |
import multiprocessing | |
from tornado.web import RequestHandler, Application, HTTPError | |
CONCURRENT = int(os.environ.get("CONCURRENT", 90)) | |
RESERVED = int(os.environ.get("RESERVED", 10)) | |
def test(reader, q): | |
print(time.time(), os.getpid(), q) | |
def wait(*_): | |
while True: | |
try: | |
child, status = os.wait() | |
#print(child, status) | |
except ChildProcessError: | |
break | |
signal.signal(signal.SIGCHLD, wait) | |
while True: | |
msg = reader.recv() | |
id, *_ = msg | |
pid = os.fork() | |
if pid == 0: | |
with open(f".tmp/{id}", "w") as f: | |
print(msg, "ok", file=f) | |
for i in range(1024): | |
f.write("".join(random.choices(string.ascii_letters, k=1024))) | |
f.write("\n") | |
#time.sleep(6) | |
q.put(msg) | |
return | |
class Workers(dict): | |
def __missing__(self, key): | |
r, w = mp.Pipe(False) | |
p = mp.Process(name=key, target=test, args=(r, q)) | |
p.start() | |
self[key] = w | |
return w | |
class Handler(RequestHandler): | |
_pool = collections.deque(range(CONCURRENT + RESERVED)) | |
_futures = collections.defaultdict(asyncio.Future) | |
_workers = Workers() | |
def set_default_headers(self): | |
self.set_header("Content-Type", "text/plain; charset=UTF-8") | |
@classmethod | |
def complete(cls, id, result): | |
cls._futures.pop(id).set_result(result) | |
cls._pool.append(id) | |
class Api(Handler): | |
async def get(self, path): | |
if len(self._pool) <= RESERVED: | |
raise HTTPError(429) | |
id = self._pool.popleft() | |
msg = id, time.time() | |
self._workers[path].send(msg) | |
result = await self._futures[id] | |
print(result, file=self) | |
with open(f".tmp/{id}", "rb") as f: | |
self.write(f.read()) | |
if __name__ == '__main__': | |
app = Application([ | |
(r"(.*)", Api), | |
]) | |
mp = multiprocessing.get_context('spawn') | |
q = mp.SimpleQueue() | |
#f = open('multi.log', 'a') | |
def on_read(fd, events): | |
#print(fd, events, q.get(), flush=True) | |
id, result = q.get() | |
Handler.complete(id, result) | |
from tornado.options import define, parse_command_line, options | |
define("port", default=1111) | |
define("addr", default="") | |
parse_command_line() | |
app.listen(options.port, options.addr, xheaders=True) | |
from tornado.ioloop import IOLoop | |
ioloop = IOLoop.current() | |
ioloop.add_handler(q._reader, on_read, ioloop.READ) | |
ioloop.start() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment