Skip to content

Instantly share code, notes, and snippets.

@trecouvr
Created March 28, 2014 11:17
Show Gist options
  • Save trecouvr/9830473 to your computer and use it in GitHub Desktop.
Save trecouvr/9830473 to your computer and use it in GitHub Desktop.
import logging
import traceback
import asyncio
logger = logging.getLogger(__name__)
class QuitCommand(Exception):
pass
class CommandServer(asyncio.Protocol):
def __init__(self, commands, logger=logger):
"""
Args:
commands (dict): command_name/command_callback pairs
"""
self.commands = commands
self.logger = logger
@asyncio.coroutine
def handle_connection(self, client_reader, client_writer):
peername = client_writer.transport.get_extra_info('peername')
self.logger.info('Connection from "{}"'.format(peername))
try:
while True:
request = yield from client_reader.readline()
response = self.handle_message(request)
serialized_response = self.serialize_response(response)
client_writer.write(serialized_response + b'\n')
except QuitCommand:
self.logger.info("%r quit", peername)
pass
client_writer.close()
def handle_message(self, message):
command, args, kwargs = self.unserialize_request(message)
self.logger.debug('command=%s, args=%s, kwargs=%s', command, args, kwargs)
if command == 'quit':
raise QuitCommand()
elif command in self.commands:
try:
return str(self.commands[command](*args, **kwargs))
except:
return traceback.format_exc()
else:
self.logger.warning('command %r does not exist', command)
return 'Command "%s" does not exist' % command
def unserialize_request(self, request):
line = request.decode('utf8').strip()
splitted_line = tuple(x.strip() for x in line.split(' '))
command, tail = splitted_line[0], splitted_line[1:]
args = []
kwargs = {}
for arg in tail:
if '=' in arg:
key, value = arg.split('=', 1)
kwargs[key] = value
else:
args.append(arg)
return command, args, kwargs
def serialize_response(self, response):
return str(response).encode('utf8')
if __name__ == '__main__':
logger.setLevel(logging.DEBUG)
logger.addHandler(logging.StreamHandler())
loop = asyncio.get_event_loop()
commands = {
'echo': lambda m: m,
}
command_server = CommandServer(commands)
coro = asyncio.start_server(lambda r,w: command_server.handle_connection(r, w), host=None, port=8888)
coro = asyncio.start_unix_server(lambda r,w: command_server.handle_connection(r, w), path='server.sock')
server = loop.run_until_complete(coro)
try:
loop.run_forever()
except KeyboardInterrupt:
print("exit")
finally:
server.close()
loop.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment