Last active
June 12, 2018 17:44
-
-
Save amir734jj/6d08e4c4b5bc036c27b4be38f63b433d to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
from time import sleep | |
from threading import Thread, Lock | |
from .task_model import Task | |
from typing import List | |
lock = Lock() | |
class TaskManager: | |
def __init__(self): | |
self.tasks = list() | |
self.tasks.append(Task("Init", "print('Heartbeat ...')")) | |
global lock | |
self.lock: Lock = lock | |
Thread(target=lambda x: self.start_loop(), args=([""])).start() | |
def add_task(self, name: str, task_string: str) -> None: | |
self.tasks.append(Task(name, task_string)) | |
def running_loop(self) -> None: | |
with self.lock: | |
while True: | |
# To prevent infinite loop going wild | |
sleep(1) | |
print(len(self.tasks)) | |
for task in list(self.tasks): | |
task_string = task.script | |
# Safely sandbox the script | |
exec(task_string, {}, {}) | |
sleep(1) | |
def start_loop(self): | |
loop = asyncio.new_event_loop() | |
asyncio.set_event_loop(loop) | |
loop.run_until_complete(self.running_loop()) | |
def get_tasks(self) -> List[Task]: | |
return self.tasks | |
def delete_task(self, id_value: str) -> None: | |
with self.lock: | |
for task in self.tasks: | |
if task.id == id_value: | |
self.tasks.remove(task) | |
break |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment