Created
March 29, 2018 13:46
-
-
Save xiazhibin/873c9978d168abaed3f6a20cc7b1229f to your computer and use it in GitHub Desktop.
simple tcpserver
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import selectors | |
import errno | |
import sys | |
class IOLoop(object): | |
@classmethod | |
def instance(cls): | |
if not hasattr(cls, '_instance'): | |
cls._instance = IOLoop() | |
return cls._instance | |
def __init__(self): | |
self.stlc = selectors.DefaultSelector() | |
self._handlers = {} | |
def add_handler(self, fd, events, handler): | |
try: | |
self._handlers[fd] = handler | |
self.stlc.register(fd, events) | |
except Exception as e: | |
print(e) | |
def start(self): | |
while True: | |
event_pair = self.stlc.select(0.1) | |
for key, events in event_pair: | |
try: | |
fd = key.fileobj | |
self._handlers[fd](fd, events) | |
except (OSError, IOError) as e: | |
if e.args[0] == errno.EPIPE: | |
pass | |
else: | |
print(e) | |
pass | |
except Exception as ee: | |
print(ee) | |
def remove_handler(self, fd): | |
rv = self._handlers.pop(fd, None) | |
if rv: | |
self.stlc.unregister(fd) | |
def update_handler(self, fd, events): | |
self.stlc.modify(fd, events) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import collections | |
import numbers | |
import selectors | |
import socket | |
import sys | |
import errno | |
from ioloop import IOLoop | |
_ERRNO_WOULDBLOCK = (errno.EWOULDBLOCK, errno.EAGAIN) | |
_ERRNO_CONNRESET = (errno.ECONNRESET, errno.ECONNABORTED, errno.EPIPE) | |
def _double_prefix(deque): | |
new_len = max(len(deque[0]) * 2, | |
(len(deque[0]) + len(deque[1]))) | |
_merge_prefix(deque, new_len) | |
def _merge_prefix(deque, size): | |
if len(deque) == 1 and len(deque[0]) <= size: | |
return | |
prefix = [] | |
remaining = size | |
while deque and remaining > 0: | |
chunk = deque.popleft() | |
if len(chunk) > remaining: | |
deque.appendleft(chunk[remaining:]) | |
chunk = chunk[:remaining] | |
prefix.append(chunk) | |
remaining -= len(chunk) | |
if prefix: | |
deque.appendleft(type(prefix[0])().join(prefix)) | |
if not deque: | |
deque.appendleft(b"") | |
class IOStream(object): | |
def __init__(self, sock): | |
self.max_buffer_size = 3 * 1024 | |
self.sock = sock | |
self._read_buffer = collections.deque() | |
self._write_buffer = collections.deque() | |
self._read_buffer_size = 0 | |
self._streaming_callback = None | |
self._read_callback = None | |
self._close_callback = None | |
self._closed = False | |
self.error = None | |
self._read_until_close = False | |
self.read_chunk_size = 1024 | |
self._read_bytes = 0 | |
self._pending_callbacks = 0 | |
self._write_buffer_frozen = False | |
self._write_callback = None | |
self._state = None | |
self.io_loop = IOLoop.instance() | |
def _handle_events(self, fd, events): | |
if self.closed(): | |
print("Got events for closed stream %d", fd) | |
return | |
try: | |
if events & selectors.EVENT_READ: | |
self._handle_read() | |
if self.closed(): | |
return | |
if events & selectors.EVENT_WRITE: | |
self._handle_write() | |
if self.closed(): | |
return | |
except Exception: | |
self.close(exc_info=True) | |
raise | |
def _handle_write(self): | |
while self._write_buffer: | |
try: | |
if not self._write_buffer_frozen: | |
_merge_prefix(self._write_buffer, 128 * 1024) | |
num_bytes = self.write_to_fd(self._write_buffer[0]) | |
if num_bytes == 0: | |
self._write_buffer_frozen = True | |
break | |
self._write_buffer_frozen = False | |
_merge_prefix(self._write_buffer, num_bytes) | |
self._write_buffer.popleft() | |
except (socket.error, IOError, OSError) as e: | |
if e.args[0] in _ERRNO_WOULDBLOCK: | |
self._write_buffer_frozen = True | |
break | |
else: | |
if e.args[0] not in _ERRNO_CONNRESET: | |
self.close(exc_info=True) | |
return | |
if not self._write_buffer and self._write_callback: | |
callback = self._write_callback | |
self._write_callback = None | |
self._run_callback(callback) | |
def close(self, exc_info=False): | |
if not self.closed(): | |
if exc_info: | |
if not isinstance(exc_info, tuple): | |
exc_info = sys.exc_info() | |
if any(exc_info): | |
self.error = exc_info[1] | |
if self._read_until_close: | |
if self._streaming_callback is not None and self._read_buffer_size: | |
self._run_callback(self._streaming_callback, self._consume(self._read_buffer_size)) | |
callback = self._read_callback | |
self._read_callback = None | |
self._run_callback(callback, self._consume(self._read_buffer_size)) | |
if self._state is not None: | |
self._state = None | |
self.io_loop.remove_handler(self.fileno()) | |
self.close_fd() | |
self._closed = True | |
self._maybe_run_close_callback() | |
def close_fd(self): | |
self.sock.close() | |
self.sock = None | |
def _handle_read(self): | |
try: | |
try: | |
self._pending_callbacks += 1 | |
while not self.closed(): | |
if self._read_to_buffer() == 0: | |
break | |
finally: | |
self._pending_callbacks -= 1 | |
except Exception: | |
self.close(exc_info=True) | |
return | |
if self._read_from_buffer(): | |
return | |
else: | |
self._maybe_run_close_callback() | |
def _maybe_run_close_callback(self): | |
if self.closed() and self._pending_callbacks == 0: | |
if self._close_callback is not None: | |
cb = self._close_callback | |
self._close_callback = None | |
self._run_callback(cb) | |
self._read_callback = self._write_callback = None | |
self._write_buffer = None | |
def fileno(self): | |
return self.sock.fileno() | |
def _read_from_buffer(self): | |
if self._streaming_callback is not None and self._read_buffer_size: | |
bytes_to_consume = self._read_buffer_size | |
if self._read_bytes is not None: | |
bytes_to_consume = min(self._read_bytes, bytes_to_consume) | |
self._read_bytes -= bytes_to_consume | |
self._run_callback(self._streaming_callback, | |
self._consume(bytes_to_consume)) | |
if self._read_bytes is not None and self._read_buffer_size >= self._read_bytes: | |
num_bytes = self._read_bytes | |
callback = self._read_callback | |
self._read_callback = None | |
self._streaming_callback = None | |
self._read_bytes = None | |
self._run_callback(callback, self._consume(num_bytes)) | |
return True | |
return False | |
def _add_io_state(self, state): | |
if self.closed(): | |
return | |
if self._state is None: | |
self._state = state | |
self.io_loop.add_handler(self.fileno(), self._handle_events, self._state) | |
elif not self._state & state: | |
self._state = self._state | state | |
self.io_loop.update_handler(self.fileno(), self._state) | |
def read_from_fd(self): | |
try: | |
chunk = self.sock.recv(self.read_chunk_size) | |
except socket.error as e: | |
if e.args[0] in _ERRNO_WOULDBLOCK: | |
return None | |
else: | |
raise | |
if not chunk: | |
self.close() | |
return None | |
return chunk | |
def write_to_fd(self, data): | |
return self.sock.send(data) | |
def _read_to_buffer(self): | |
try: | |
chunk = self.read_from_fd() | |
except (socket.error, IOError, OSError) as e: | |
if e.args[0] in _ERRNO_CONNRESET: | |
self.close(exc_info=True) | |
return | |
self.close(exc_info=True) | |
raise | |
if chunk is None: | |
return 0 | |
self._read_buffer.append(chunk) | |
self._read_buffer_size += len(chunk) | |
if self._read_buffer_size >= self.max_buffer_size: | |
self.close() | |
raise IOError("Reached maximum read buffer size") | |
return len(chunk) | |
def _try_inline_read(self): | |
if self._read_from_buffer(): | |
return | |
self._check_closed() | |
try: | |
try: | |
self._pending_callbacks += 1 | |
while not self.closed(): | |
if self._read_to_buffer() == 0: | |
break | |
finally: | |
self._pending_callbacks -= 1 | |
except Exception: | |
raise | |
if self._read_from_buffer(): | |
return | |
self._maybe_add_error_listener() | |
def _maybe_add_error_listener(self): | |
if self._state is None and self._pending_callbacks == 0: | |
if not self.closed(): | |
self._add_io_state(selectors.EVENT_READ) | |
def _check_closed(self): | |
if self.closed(): | |
raise Exception("Stream is closed") | |
def closed(self): | |
return self._closed | |
def _run_callback(self, callback, *args, **kwargs): | |
try: | |
callback(*args, **kwargs) | |
except: | |
self.close() | |
raise | |
def _add_io_state(self, state): | |
if self._state is None: | |
self._state = selectors.EVENT_READ | |
IOLoop.instance().add_handler(self.fileno(), self._state, self._handle_events) | |
elif not self._state & state: | |
self._state = self._state | state | |
self.io_loop.update_handler(self.fileno(), self._state) | |
IOLoop.instance().add_handler(self.fileno(), self._state, self._handle_events) | |
def read_bytes(self, num_bytes, callback, streaming_callback=None): | |
self._set_read_callback(callback) | |
assert isinstance(num_bytes, numbers.Integral) | |
self._read_bytes = num_bytes | |
self._streaming_callback = streaming_callback | |
self._try_inline_read() | |
def write(self, data, callback=None): | |
assert isinstance(data, bytes) | |
self._check_closed() | |
if data: | |
WRITE_BUFFER_CHUNK_SIZE = 128 * 1024 | |
if len(data) > WRITE_BUFFER_CHUNK_SIZE: | |
for i in range(0, len(data), WRITE_BUFFER_CHUNK_SIZE): | |
self._write_buffer.append(data[i:i + WRITE_BUFFER_CHUNK_SIZE]) | |
else: | |
self._write_buffer.append(data) | |
self._write_callback = callback | |
self._handle_write() | |
if self._write_buffer: | |
self._add_io_state(self.io_loop.WRITE) | |
self._maybe_add_error_listener() | |
def _consume(self, loc): | |
if loc == 0: | |
return b"" | |
_merge_prefix(self._read_buffer, loc) | |
self._read_buffer_size -= loc | |
return self._read_buffer.popleft() | |
def set_close_callback(self, callback): | |
self._close_callback = callback | |
def _set_read_callback(self, callback): | |
assert not self._read_callback, "Already reading" | |
self._read_callback = callback | |
def reading(self): | |
return self._read_callback is not None | |
def writing(self): | |
return bool(self._write_buffer) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import selectors | |
import socket | |
from ioloop import IOLoop | |
from iostream import IOStream | |
class Server(object): | |
def __init__(self): | |
self.server_sock = None | |
def start(self): | |
address = ('127.0.0.1', 5055) | |
self.server_sock = socket.socket() | |
self.server_sock.setblocking(0) | |
self.server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | |
self.server_sock.bind(address) | |
self.server_sock.listen(128) | |
IOLoop.instance().add_handler(self.server_sock.fileno(), selectors.EVENT_READ, self.on_accept) | |
def on_accept(self, fd, events): | |
try: | |
conn, address = self.server_sock.accept() | |
except OSError as e: | |
print(e) | |
else: | |
stream = IOStream(conn) | |
self.handle_steam(stream) | |
def handle_steam(self, stream): | |
raise NotImplementedError | |
def close(self): | |
if self.server_sock: | |
self.server_sock.close() | |
self.server_sock = None | |
IOLoop.instance().remove_handler(self.server_sock) | |
class EchoServer(Server): | |
def onclose(self): | |
print('close') | |
def handle_steam(self, stream): | |
self.stream = stream | |
stream.set_close_callback(self.onclose) | |
stream.read_bytes(4, self.on_read_len) | |
def on_read_len(self, data): | |
len_of_data = int.from_bytes(data, byteorder='big') | |
self.stream.read_bytes(len_of_data, self.on_read_data) | |
def on_read_data(self, data): | |
print(data) | |
self.stream.read_bytes(4, self.on_read_len) | |
self.stream.write(data) | |
def on_close(self): | |
print('closed') | |
server = EchoServer() | |
server.start() | |
IOLoop.instance().start() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment