Last active
June 4, 2023 21:22
-
-
Save Matjaz-B/d1c632371851df8399e6341272dcbfaa to your computer and use it in GitHub Desktop.
click clean-up after abort
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import click | |
async def do_some_work(duration): | |
print("Function performs some initialization, runs some command, waits and then stops") | |
## some init functions | |
## start operation | |
try: | |
print("Working...") | |
await asyncio.sleep(duration) | |
print("Work done") | |
# except click.Abort: | |
except Exception as e: | |
print(f"do_some_work exception {e}") | |
## clean up - stop operation | |
print("Clean up done") | |
@click.command() | |
@click.option('--duration', type=click.IntRange(1, 1000), default=10, help='Duration in [s]') | |
def worker_cli(duration): | |
print(f'Will work for {duration} [s]') | |
try: | |
loop = asyncio.new_event_loop() | |
asyncio.set_event_loop(loop) | |
loop.run_until_complete(do_some_work(duration)) | |
except Exception as e: | |
print(f"worker_cli exception {e}") | |
if __name__ == "__main__": | |
worker_cli() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
import click | |
class Runner: | |
def __init__(self, delay): | |
print("init called") | |
self.delay = delay | |
def __del__(self): | |
print("deleted") | |
def __enter__(self): | |
print("Enter contex") | |
self.init = 1 | |
return self | |
def __exit__(self, exc_type, exc_value, exc_tb): | |
print("\r\n*** Cleaning up ***\r\n") | |
def action(self, a): | |
print(f'much action: {a*2}') | |
print(f'delaying for {self.delay}') | |
time.sleep(self.delay) | |
@click.command() | |
@click.option('--duration', type=click.IntRange(1, 1000), default=10, help='Duration in [s]') | |
@click.pass_context | |
def worker_contex(ctx, duration): | |
print(f'Will work for {duration} [s]') | |
obj = ctx.with_resource(Runner(duration)) | |
obj.action(1) | |
@click.command() | |
@click.option('--duration', type=click.IntRange(1, 1000), default=10, help='Duration in [s]') | |
@click.pass_context | |
def worker_cli(ctx, duration): | |
@ctx.call_on_close | |
def completed(): | |
print("*** Cleaning up ***") | |
print(f'Will work for {duration} [s]') | |
print(f"Delaying for {duration}") | |
time.sleep(duration) | |
if __name__ == "__main__": | |
# example with adding a callback... | |
worker_cli() | |
# example with contex | |
# worker_contex() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment