Last active
July 9, 2017 21:43
-
-
Save pavelpatrin/9c031af170374452ec7f141bcb945a7d to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import select | |
import socket | |
from urllib.parse import urlsplit | |
class Poll: | |
ACTION_READ = 'read' | |
ACTION_WRITE = 'write' | |
EVENTS = { | |
ACTION_READ: select.POLLIN, | |
ACTION_WRITE: select.POLLOUT, | |
} | |
ACTIONS = { | |
value: key for key, value | |
in EVENTS.items() | |
} | |
def __init__(self): | |
self.poll = select.epoll() | |
self.state = {} | |
self.socks = {} | |
def watch(self, sock, action): | |
fileno = sock.fileno() | |
evmask = self.EVENTS[action] | |
evmask |= self.state.get(socket, 0) | |
if not self.state.get(fileno, 0): | |
# Current state is zero: register. | |
self.poll.register(fileno, evmask) | |
self.state[fileno] = evmask | |
self.socks[fileno] = sock | |
else: | |
# Current state is not zero: modify. | |
self.poll.modify(fileno, evmask) | |
self.state[fileno] = evmask | |
self.socks[fileno] = sock | |
def unwatch(self, sock, action): | |
fileno = sock.fileno() | |
evmask = self.EVENTS[action] | |
evmask &= ~self.state[fileno] | |
if evmask: | |
# New state is not zero: modify. | |
self.poll.modify(fileno, evmask) | |
self.state[fileno] = evmask | |
self.socks[fileno] = sock | |
else: | |
# New state is zero: unregister. | |
self.poll.unregister(fileno) | |
del self.state[fileno] | |
del self.socks[fileno] | |
def events(self): | |
while True: | |
for fileno, event in self.poll.poll(1): | |
try: | |
sock = self.socks[fileno] | |
action = self.ACTIONS[event] | |
except KeyError: | |
continue | |
else: | |
yield sock, action | |
class Loop: | |
def __init__(self, tasks): | |
self.tasks = tasks | |
self.index = {} | |
self.poll = Poll() | |
def accept(self, sock, action): | |
# Accept task for socket-action. | |
task = self.index.get((sock, action)) | |
if task is None: | |
return None | |
# Clean listeners and state. | |
self.poll.unwatch(sock, action) | |
del self.index[sock, action] | |
return task | |
def step(self, task): | |
try: | |
# Continue coroutine. | |
sock, action = task.send(None) | |
except StopIteration as e: | |
# Coroutine is finished. | |
return task, True, e.value | |
else: | |
# Plan execution for future. | |
self.poll.watch(sock, action) | |
self.index[sock, action] = task | |
# Coroutine is not finished. | |
return task, False, None | |
def run(self): | |
pending = set(self.tasks) | |
results = {} | |
# Init coroutines. | |
for task in self.tasks: | |
self.step(task) | |
# Handle events. | |
for sock, action in self.poll.events(): | |
task = self.accept(sock, action) | |
if not task: | |
continue | |
reaction = self.step(task) | |
if not reaction: | |
continue | |
task, ready, result = reaction | |
if not ready: | |
continue | |
pending.remove(task) | |
results[task] = result | |
if not pending: | |
return results | |
def async_read(sock, response): | |
yield (sock, Poll.ACTION_READ) | |
return sock.recv(2 ** 20) | |
def async_write(sock, request): | |
yield (sock, Poll.ACTION_WRITE) | |
return sock.send(request) | |
def task(url): | |
split = urlsplit(url) | |
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
sock.setblocking(0) | |
try: | |
sock.connect((split.hostname, split.port or 80)) | |
except BlockingIOError: | |
pass # This is ok. | |
request = bytes( | |
'GET %s HTTP/1.1\r\n' % split.path + | |
'Host: %s\r\n' % split.hostname + | |
'Connection: close\r\n' + | |
'\r\n', 'utf-8' | |
) | |
sent = yield from async_write(sock, request) | |
print('Sent %d bytes' % sent) | |
data = yield from async_read(sock) | |
print('Read %d bytes' % len(data)) | |
return data | |
url = 'http://google.com/api/v2/user.json' | |
loop = Loop([task(url) for num in range(100)]) | |
results = loop.run() | |
for result in results.values(): | |
print(results) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment