Skip to content

Instantly share code, notes, and snippets.

@xiazhibin
Created March 29, 2018 13:46
Show Gist options
  • Save xiazhibin/873c9978d168abaed3f6a20cc7b1229f to your computer and use it in GitHub Desktop.
Save xiazhibin/873c9978d168abaed3f6a20cc7b1229f to your computer and use it in GitHub Desktop.
simple tcpserver
import selectors
import errno
import sys
class IOLoop(object):
@classmethod
def instance(cls):
if not hasattr(cls, '_instance'):
cls._instance = IOLoop()
return cls._instance
def __init__(self):
self.stlc = selectors.DefaultSelector()
self._handlers = {}
def add_handler(self, fd, events, handler):
try:
self._handlers[fd] = handler
self.stlc.register(fd, events)
except Exception as e:
print(e)
def start(self):
while True:
event_pair = self.stlc.select(0.1)
for key, events in event_pair:
try:
fd = key.fileobj
self._handlers[fd](fd, events)
except (OSError, IOError) as e:
if e.args[0] == errno.EPIPE:
pass
else:
print(e)
pass
except Exception as ee:
print(ee)
def remove_handler(self, fd):
rv = self._handlers.pop(fd, None)
if rv:
self.stlc.unregister(fd)
def update_handler(self, fd, events):
self.stlc.modify(fd, events)
import collections
import numbers
import selectors
import socket
import sys
import errno
from ioloop import IOLoop
_ERRNO_WOULDBLOCK = (errno.EWOULDBLOCK, errno.EAGAIN)
_ERRNO_CONNRESET = (errno.ECONNRESET, errno.ECONNABORTED, errno.EPIPE)
def _double_prefix(deque):
new_len = max(len(deque[0]) * 2,
(len(deque[0]) + len(deque[1])))
_merge_prefix(deque, new_len)
def _merge_prefix(deque, size):
if len(deque) == 1 and len(deque[0]) <= size:
return
prefix = []
remaining = size
while deque and remaining > 0:
chunk = deque.popleft()
if len(chunk) > remaining:
deque.appendleft(chunk[remaining:])
chunk = chunk[:remaining]
prefix.append(chunk)
remaining -= len(chunk)
if prefix:
deque.appendleft(type(prefix[0])().join(prefix))
if not deque:
deque.appendleft(b"")
class IOStream(object):
def __init__(self, sock):
self.max_buffer_size = 3 * 1024
self.sock = sock
self._read_buffer = collections.deque()
self._write_buffer = collections.deque()
self._read_buffer_size = 0
self._streaming_callback = None
self._read_callback = None
self._close_callback = None
self._closed = False
self.error = None
self._read_until_close = False
self.read_chunk_size = 1024
self._read_bytes = 0
self._pending_callbacks = 0
self._write_buffer_frozen = False
self._write_callback = None
self._state = None
self.io_loop = IOLoop.instance()
def _handle_events(self, fd, events):
if self.closed():
print("Got events for closed stream %d", fd)
return
try:
if events & selectors.EVENT_READ:
self._handle_read()
if self.closed():
return
if events & selectors.EVENT_WRITE:
self._handle_write()
if self.closed():
return
except Exception:
self.close(exc_info=True)
raise
def _handle_write(self):
while self._write_buffer:
try:
if not self._write_buffer_frozen:
_merge_prefix(self._write_buffer, 128 * 1024)
num_bytes = self.write_to_fd(self._write_buffer[0])
if num_bytes == 0:
self._write_buffer_frozen = True
break
self._write_buffer_frozen = False
_merge_prefix(self._write_buffer, num_bytes)
self._write_buffer.popleft()
except (socket.error, IOError, OSError) as e:
if e.args[0] in _ERRNO_WOULDBLOCK:
self._write_buffer_frozen = True
break
else:
if e.args[0] not in _ERRNO_CONNRESET:
self.close(exc_info=True)
return
if not self._write_buffer and self._write_callback:
callback = self._write_callback
self._write_callback = None
self._run_callback(callback)
def close(self, exc_info=False):
if not self.closed():
if exc_info:
if not isinstance(exc_info, tuple):
exc_info = sys.exc_info()
if any(exc_info):
self.error = exc_info[1]
if self._read_until_close:
if self._streaming_callback is not None and self._read_buffer_size:
self._run_callback(self._streaming_callback, self._consume(self._read_buffer_size))
callback = self._read_callback
self._read_callback = None
self._run_callback(callback, self._consume(self._read_buffer_size))
if self._state is not None:
self._state = None
self.io_loop.remove_handler(self.fileno())
self.close_fd()
self._closed = True
self._maybe_run_close_callback()
def close_fd(self):
self.sock.close()
self.sock = None
def _handle_read(self):
try:
try:
self._pending_callbacks += 1
while not self.closed():
if self._read_to_buffer() == 0:
break
finally:
self._pending_callbacks -= 1
except Exception:
self.close(exc_info=True)
return
if self._read_from_buffer():
return
else:
self._maybe_run_close_callback()
def _maybe_run_close_callback(self):
if self.closed() and self._pending_callbacks == 0:
if self._close_callback is not None:
cb = self._close_callback
self._close_callback = None
self._run_callback(cb)
self._read_callback = self._write_callback = None
self._write_buffer = None
def fileno(self):
return self.sock.fileno()
def _read_from_buffer(self):
if self._streaming_callback is not None and self._read_buffer_size:
bytes_to_consume = self._read_buffer_size
if self._read_bytes is not None:
bytes_to_consume = min(self._read_bytes, bytes_to_consume)
self._read_bytes -= bytes_to_consume
self._run_callback(self._streaming_callback,
self._consume(bytes_to_consume))
if self._read_bytes is not None and self._read_buffer_size >= self._read_bytes:
num_bytes = self._read_bytes
callback = self._read_callback
self._read_callback = None
self._streaming_callback = None
self._read_bytes = None
self._run_callback(callback, self._consume(num_bytes))
return True
return False
def _add_io_state(self, state):
if self.closed():
return
if self._state is None:
self._state = state
self.io_loop.add_handler(self.fileno(), self._handle_events, self._state)
elif not self._state & state:
self._state = self._state | state
self.io_loop.update_handler(self.fileno(), self._state)
def read_from_fd(self):
try:
chunk = self.sock.recv(self.read_chunk_size)
except socket.error as e:
if e.args[0] in _ERRNO_WOULDBLOCK:
return None
else:
raise
if not chunk:
self.close()
return None
return chunk
def write_to_fd(self, data):
return self.sock.send(data)
def _read_to_buffer(self):
try:
chunk = self.read_from_fd()
except (socket.error, IOError, OSError) as e:
if e.args[0] in _ERRNO_CONNRESET:
self.close(exc_info=True)
return
self.close(exc_info=True)
raise
if chunk is None:
return 0
self._read_buffer.append(chunk)
self._read_buffer_size += len(chunk)
if self._read_buffer_size >= self.max_buffer_size:
self.close()
raise IOError("Reached maximum read buffer size")
return len(chunk)
def _try_inline_read(self):
if self._read_from_buffer():
return
self._check_closed()
try:
try:
self._pending_callbacks += 1
while not self.closed():
if self._read_to_buffer() == 0:
break
finally:
self._pending_callbacks -= 1
except Exception:
raise
if self._read_from_buffer():
return
self._maybe_add_error_listener()
def _maybe_add_error_listener(self):
if self._state is None and self._pending_callbacks == 0:
if not self.closed():
self._add_io_state(selectors.EVENT_READ)
def _check_closed(self):
if self.closed():
raise Exception("Stream is closed")
def closed(self):
return self._closed
def _run_callback(self, callback, *args, **kwargs):
try:
callback(*args, **kwargs)
except:
self.close()
raise
def _add_io_state(self, state):
if self._state is None:
self._state = selectors.EVENT_READ
IOLoop.instance().add_handler(self.fileno(), self._state, self._handle_events)
elif not self._state & state:
self._state = self._state | state
self.io_loop.update_handler(self.fileno(), self._state)
IOLoop.instance().add_handler(self.fileno(), self._state, self._handle_events)
def read_bytes(self, num_bytes, callback, streaming_callback=None):
self._set_read_callback(callback)
assert isinstance(num_bytes, numbers.Integral)
self._read_bytes = num_bytes
self._streaming_callback = streaming_callback
self._try_inline_read()
def write(self, data, callback=None):
assert isinstance(data, bytes)
self._check_closed()
if data:
WRITE_BUFFER_CHUNK_SIZE = 128 * 1024
if len(data) > WRITE_BUFFER_CHUNK_SIZE:
for i in range(0, len(data), WRITE_BUFFER_CHUNK_SIZE):
self._write_buffer.append(data[i:i + WRITE_BUFFER_CHUNK_SIZE])
else:
self._write_buffer.append(data)
self._write_callback = callback
self._handle_write()
if self._write_buffer:
self._add_io_state(self.io_loop.WRITE)
self._maybe_add_error_listener()
def _consume(self, loc):
if loc == 0:
return b""
_merge_prefix(self._read_buffer, loc)
self._read_buffer_size -= loc
return self._read_buffer.popleft()
def set_close_callback(self, callback):
self._close_callback = callback
def _set_read_callback(self, callback):
assert not self._read_callback, "Already reading"
self._read_callback = callback
def reading(self):
return self._read_callback is not None
def writing(self):
return bool(self._write_buffer)
import selectors
import socket
from ioloop import IOLoop
from iostream import IOStream
class Server(object):
def __init__(self):
self.server_sock = None
def start(self):
address = ('127.0.0.1', 5055)
self.server_sock = socket.socket()
self.server_sock.setblocking(0)
self.server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.server_sock.bind(address)
self.server_sock.listen(128)
IOLoop.instance().add_handler(self.server_sock.fileno(), selectors.EVENT_READ, self.on_accept)
def on_accept(self, fd, events):
try:
conn, address = self.server_sock.accept()
except OSError as e:
print(e)
else:
stream = IOStream(conn)
self.handle_steam(stream)
def handle_steam(self, stream):
raise NotImplementedError
def close(self):
if self.server_sock:
self.server_sock.close()
self.server_sock = None
IOLoop.instance().remove_handler(self.server_sock)
class EchoServer(Server):
def onclose(self):
print('close')
def handle_steam(self, stream):
self.stream = stream
stream.set_close_callback(self.onclose)
stream.read_bytes(4, self.on_read_len)
def on_read_len(self, data):
len_of_data = int.from_bytes(data, byteorder='big')
self.stream.read_bytes(len_of_data, self.on_read_data)
def on_read_data(self, data):
print(data)
self.stream.read_bytes(4, self.on_read_len)
self.stream.write(data)
def on_close(self):
print('closed')
server = EchoServer()
server.start()
IOLoop.instance().start()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment