Skip to content

Instantly share code, notes, and snippets.

@Disassembler0
Created December 18, 2018 14:14
Show Gist options
  • Save Disassembler0/45f3e5e63326c1c163621a81d4a4e1ae to your computer and use it in GitHub Desktop.
Save Disassembler0/45f3e5e63326c1c163621a81d4a4e1ae to your computer and use it in GitHub Desktop.
#!/usr/bin/python3
import asyncio
import os
import signal
class Server():
def __init__(self, socket_path):
self.jobs = {}
self.socket_path = socket_path
def bind(self):
if (os.path.exists(self.socket_path)):
os.unlink(self.socket_path)
self.loop = asyncio.get_event_loop()
self.loop.add_signal_handler(signal.SIGINT, self.stop)
self.loop.add_signal_handler(signal.SIGTERM, self.stop)
server = asyncio.start_unix_server(self.handle_connection, self.socket_path, loop=self.loop)
self.server = self.loop.run_until_complete(server)
def unbind(self):
self.server.close()
self.loop.run_until_complete(self.server.wait_closed())
self.loop.close()
os.unlink(self.socket_path)
def run(self):
self.bind()
self.loop.run_forever()
self.unbind()
def stop(self):
self.loop.stop()
@asyncio.coroutine
def handle_connection(self, reader, writer):
data = yield from reader.read(1024)
result = self.handle_message(data)
writer.write(bytes(result, 'UTF-8'))
yield from writer.drain()
writer.close()
def handle_message(self, data):
return None
if __name__ == '__main__':
server = Server('/tmp/server.sock')
server.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment