Created
August 30, 2016 17:16
-
-
Save squeaky-pl/7d05ffff36ec384c6215c1b5352ef3b8 to your computer and use it in GitHub Desktop.
test.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio as aio | |
import signal | |
import os | |
from functools import partial | |
from datetime import datetime, timedelta | |
initial_alerts = { | |
1: datetime(2015, 12, 12, 12, 13, 4), | |
2: datetime(2015, 8, 16, 12, 14, 5) | |
} | |
def sleep_calculator(created_at): | |
while True: | |
now = datetime.now() | |
next_send = now.replace(hour=created_at.hour, minute=created_at.minute, second=created_at.second) | |
if next_send <= now: | |
next_send += timedelta(days=1) | |
yield (next_send - now).total_seconds() | |
async def send_alert(id): | |
print('I am sending alert', id) | |
async def wait_for_alert(id, created_at): | |
print('Starting the wait for', id) | |
for delay in sleep_calculator(created_at): | |
print('Will wait', delay, 'for', id) | |
try: | |
# This is cancellable | |
await aio.sleep(delay) | |
except aio.CancelledError: | |
print('Terminating coroutine for', id) | |
return | |
# This is not cancellable | |
await aio.shield(send_alert(id)) | |
def handle_terminate(terminating): | |
print('Shutting down') | |
terminating.set() | |
def handle_line(tasks, line): | |
cmd, *params = line.split() | |
if cmd == 'add': | |
id = int(params[0]) | |
created_at = datetime(*map(int, params[1:])) | |
tasks[id] = aio.ensure_future(wait_for_alert(id, created_at)) | |
elif cmd == 'del': | |
id = int(params[0]) | |
tasks.pop(id).cancel() | |
else: | |
print('What did you mean?') | |
async def handle_client(clients, tasks, reader, writer): | |
clients.append(aio.Task.current_task()) | |
print('New client connected') | |
while True: | |
try: | |
line = (await reader.readline()).decode('ascii').strip() | |
# the process is shutting down but we hace clients, disconnect them | |
except aio.CancelledError: | |
writer.write('Goodbye\n'.encode('ascii')) | |
writer.write_eof() | |
break | |
if not line: | |
break | |
print('got', line) | |
handle_line(tasks, line) | |
print('Client disconnected') | |
async def run(terminating): | |
# initial tasks | |
tasks = { | |
id: aio.ensure_future(wait_for_alert(id, created_at)) | |
for id, created_at in initial_alerts.items()} | |
clients = [] | |
# start control server | |
bound_handle_client = partial(handle_client, clients, tasks) | |
print('ncat 127.0.0.1 9999') | |
server = await aio.start_server(bound_handle_client, host='127.0.0.1', port=9999) | |
await terminating.wait() | |
server.close() | |
# stop all alert tasks | |
for task in tasks.values(): | |
task.cancel() | |
# disconnect all clients | |
for client in clients: | |
client.cancel() | |
await aio.gather(*[*tasks.values(), *clients, server.wait_closed()]) | |
def main(): | |
print('kill -TERM', os.getpid()) | |
loop = aio.get_event_loop() | |
terminating = aio.Event() | |
loop.add_signal_handler(signal.SIGTERM, handle_terminate, terminating) | |
loop.run_until_complete(run(terminating)) | |
loop.close() | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment