Skip to content

Instantly share code, notes, and snippets.

@pavelpatrin
Last active July 9, 2017 21:43
Show Gist options
  • Save pavelpatrin/9c031af170374452ec7f141bcb945a7d to your computer and use it in GitHub Desktop.
Save pavelpatrin/9c031af170374452ec7f141bcb945a7d to your computer and use it in GitHub Desktop.
import select
import socket
from urllib.parse import urlsplit
class Poll:
ACTION_READ = 'read'
ACTION_WRITE = 'write'
EVENTS = {
ACTION_READ: select.POLLIN,
ACTION_WRITE: select.POLLOUT,
}
ACTIONS = {
value: key for key, value
in EVENTS.items()
}
def __init__(self):
self.poll = select.epoll()
self.state = {}
self.socks = {}
def watch(self, sock, action):
fileno = sock.fileno()
evmask = self.EVENTS[action]
evmask |= self.state.get(socket, 0)
if not self.state.get(fileno, 0):
# Current state is zero: register.
self.poll.register(fileno, evmask)
self.state[fileno] = evmask
self.socks[fileno] = sock
else:
# Current state is not zero: modify.
self.poll.modify(fileno, evmask)
self.state[fileno] = evmask
self.socks[fileno] = sock
def unwatch(self, sock, action):
fileno = sock.fileno()
evmask = self.EVENTS[action]
evmask &= ~self.state[fileno]
if evmask:
# New state is not zero: modify.
self.poll.modify(fileno, evmask)
self.state[fileno] = evmask
self.socks[fileno] = sock
else:
# New state is zero: unregister.
self.poll.unregister(fileno)
del self.state[fileno]
del self.socks[fileno]
def events(self):
while True:
for fileno, event in self.poll.poll(1):
try:
sock = self.socks[fileno]
action = self.ACTIONS[event]
except KeyError:
continue
else:
yield sock, action
class Loop:
def __init__(self, tasks):
self.tasks = tasks
self.index = {}
self.poll = Poll()
def accept(self, sock, action):
# Accept task for socket-action.
task = self.index.get((sock, action))
if task is None:
return None
# Clean listeners and state.
self.poll.unwatch(sock, action)
del self.index[sock, action]
return task
def step(self, task):
try:
# Continue coroutine.
sock, action = task.send(None)
except StopIteration as e:
# Coroutine is finished.
return task, True, e.value
else:
# Plan execution for future.
self.poll.watch(sock, action)
self.index[sock, action] = task
# Coroutine is not finished.
return task, False, None
def run(self):
pending = set(self.tasks)
results = {}
# Init coroutines.
for task in self.tasks:
self.step(task)
# Handle events.
for sock, action in self.poll.events():
task = self.accept(sock, action)
if not task:
continue
reaction = self.step(task)
if not reaction:
continue
task, ready, result = reaction
if not ready:
continue
pending.remove(task)
results[task] = result
if not pending:
return results
def async_read(sock, response):
yield (sock, Poll.ACTION_READ)
return sock.recv(2 ** 20)
def async_write(sock, request):
yield (sock, Poll.ACTION_WRITE)
return sock.send(request)
def task(url):
split = urlsplit(url)
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setblocking(0)
try:
sock.connect((split.hostname, split.port or 80))
except BlockingIOError:
pass # This is ok.
request = bytes(
'GET %s HTTP/1.1\r\n' % split.path +
'Host: %s\r\n' % split.hostname +
'Connection: close\r\n' +
'\r\n', 'utf-8'
)
sent = yield from async_write(sock, request)
print('Sent %d bytes' % sent)
data = yield from async_read(sock)
print('Read %d bytes' % len(data))
return data
url = 'http://google.com/api/v2/user.json'
loop = Loop([task(url) for num in range(100)])
results = loop.run()
for result in results.values():
print(results)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment