Skip to content

Instantly share code, notes, and snippets.

@lwzm
Last active December 18, 2019 09:18
Show Gist options
  • Save lwzm/820664528b102634054f9e8b31183fb0 to your computer and use it in GitHub Desktop.
Save lwzm/820664528b102634054f9e8b31183fb0 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
import multiprocessing.connection
import os
import time
import signal
def wait(*_):
while True:
try:
child, status = os.wait()
if status:
exit, signal = status >> 8, status & 0xFF
print(child, exit, signal)
except ChildProcessError:
break
def work(r: multiprocessing.connection.Connection, q: multiprocessing.SimpleQueue):
import random
import string
print(time.time(), os.getpid(), r, q)
signal.signal(signal.SIGCHLD, wait)
while True:
msg = r.recv()
id, *_ = msg
pid = os.fork()
if pid == 0:
with open(f".tmp/{id}", "w") as f:
print(msg, "ok", file=f)
for i in range(10240):
f.write("".join(random.choices(string.ascii_letters, k=80)))
f.write("\n")
#time.sleep(6)
q.put(msg)
return
if __name__ == '__main__':
import mpw
mpw.run()
#!/usr/bin/env python3
"""
multi processing workers
"""
import asyncio
import collections
import multiprocessing
import os
import traceback
from tornado.web import RequestHandler, Application, HTTPError
from m import work
CONCURRENT = int(os.environ.get("CONCURRENT", 100))
RESERVED = 10
mp = multiprocessing.get_context("spawn")
q = mp.SimpleQueue()
class Executors(dict):
def __missing__(self, key):
r, w = mp.Pipe(False)
p = mp.Process(target=work, args=(r, q))
p.start()
self[key] = p, w
return self[key]
class Handler(RequestHandler):
_pool = collections.deque(range(CONCURRENT + RESERVED))
_futures = collections.defaultdict(asyncio.Future)
_executors = Executors()
@classmethod
def complete(cls, id, result):
cls._futures.pop(id).set_result(result)
cls._pool.append(id)
async def get(self, path):
if len(self._pool) <= RESERVED:
raise HTTPError(429)
id = self._pool.popleft()
msg = id, 'xxx'
for _ in range(3):
try:
_, w = self._executors[path]
w.send(msg)
break
except BrokenPipeError:
traceback.print_exc(2)
self._executors.pop(path)
result = await self._futures[id]
print(result, file=self)
with open(f".tmp/{id}", "rb") as f:
self.write(f.read())
self.set_header("Content-Type", "text/plain; charset=UTF-8")
def delete(self, path):
try:
p, _ = self._executors.pop(path)
except KeyError:
raise HTTPError(404)
p.terminate()
app = Application([
(r"/(.*)", Handler),
])
def _on_q(fd, events):
id, result = q.get()
Handler.complete(id, result)
def run():
from tornado.options import define, parse_command_line, options
from tornado.ioloop import IOLoop
define("port", default=1111)
parse_command_line()
app.listen(options.port, xheaders=True)
ioloop = IOLoop.current()
ioloop.add_handler(q._reader, _on_q, ioloop.READ)
ioloop.start()
if __name__ == '__main__':
"do not run this directly"
#!/usr/bin/env python3
import asyncio
import collections
import os
import time
import json
import random
import signal
import string
import multiprocessing
from tornado.web import RequestHandler, Application, HTTPError
CONCURRENT = int(os.environ.get("CONCURRENT", 90))
RESERVED = int(os.environ.get("RESERVED", 10))
def test(reader, q):
print(time.time(), os.getpid(), q)
def wait(*_):
while True:
try:
child, status = os.wait()
#print(child, status)
except ChildProcessError:
break
signal.signal(signal.SIGCHLD, wait)
while True:
msg = reader.recv()
id, *_ = msg
pid = os.fork()
if pid == 0:
with open(f".tmp/{id}", "w") as f:
print(msg, "ok", file=f)
for i in range(1024):
f.write("".join(random.choices(string.ascii_letters, k=1024)))
f.write("\n")
#time.sleep(6)
q.put(msg)
return
class Workers(dict):
def __missing__(self, key):
r, w = mp.Pipe(False)
p = mp.Process(name=key, target=test, args=(r, q))
p.start()
self[key] = w
return w
class Handler(RequestHandler):
_pool = collections.deque(range(CONCURRENT + RESERVED))
_futures = collections.defaultdict(asyncio.Future)
_workers = Workers()
def set_default_headers(self):
self.set_header("Content-Type", "text/plain; charset=UTF-8")
@classmethod
def complete(cls, id, result):
cls._futures.pop(id).set_result(result)
cls._pool.append(id)
class Api(Handler):
async def get(self, path):
if len(self._pool) <= RESERVED:
raise HTTPError(429)
id = self._pool.popleft()
msg = id, time.time()
self._workers[path].send(msg)
result = await self._futures[id]
print(result, file=self)
with open(f".tmp/{id}", "rb") as f:
self.write(f.read())
if __name__ == '__main__':
app = Application([
(r"(.*)", Api),
])
mp = multiprocessing.get_context('spawn')
q = mp.SimpleQueue()
#f = open('multi.log', 'a')
def on_read(fd, events):
#print(fd, events, q.get(), flush=True)
id, result = q.get()
Handler.complete(id, result)
from tornado.options import define, parse_command_line, options
define("port", default=1111)
define("addr", default="")
parse_command_line()
app.listen(options.port, options.addr, xheaders=True)
from tornado.ioloop import IOLoop
ioloop = IOLoop.current()
ioloop.add_handler(q._reader, on_read, ioloop.READ)
ioloop.start()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment