Created
November 7, 2024 12:52
-
-
Save anhtuank7c/3086b21518a1cb1b315eac5cc2b65fa5 to your computer and use it in GitHub Desktop.
Test multiprocess in Python
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import concurrent.futures | |
import time | |
import schedule | |
from loguru import logger | |
# Define a sample job function that simulates a long-running process | |
def my_long_running_task(task_id: int) -> str: | |
logger.info(f"Task {task_id} started.") | |
time.sleep(10) # Simulate a task that takes 10 seconds to complete | |
logger.info(f"Task {task_id} completed.") | |
return f"Task {task_id} result." | |
# Wrapper class to manage scheduled jobs and re-runs | |
class TaskScheduler: | |
def __init__(self, interval_minutes: int = 5) -> None: | |
self.interval_minutes = interval_minutes | |
self.task_states: dict = {} # Dictionary to store task states | |
def schedule_task(self, task_id: str) -> None: | |
# Initialize task state | |
self.task_states[task_id] = None | |
# Schedule the job with the specified interval | |
schedule.every(self.interval_minutes).seconds.do(self._run_task, task_id) | |
def _run_task(self, task_id: int) -> None: | |
# If a previous task is still running, skip this schedule | |
if ( | |
self.task_states[task_id] is not None | |
and not self.task_states[task_id].done() | |
): | |
logger.info(f"Task {task_id} is still running; skipping this schedule.") | |
return | |
# Submit the task to the executor and update the task state | |
logger.info(f"Scheduling task {task_id}.") | |
self.task_states[task_id] = executor.submit(my_long_running_task, task_id) | |
def start(self) -> None: | |
while True: | |
# Run scheduled jobs | |
schedule.run_pending() | |
time.sleep(1) # Check every second to avoid busy waiting | |
print("checking") | |
if __name__ == "__main__": | |
logger.info("Start") | |
# Set up a ProcessPoolExecutor for parallel processing | |
with concurrent.futures.ProcessPoolExecutor() as executor: | |
task_scheduler = TaskScheduler(interval_minutes=5) | |
# Schedule multiple tasks by ID | |
task_scheduler.schedule_task(task_id=1) | |
task_scheduler.schedule_task(task_id=2) | |
task_scheduler.schedule_task(task_id=3) | |
# Start the scheduler loop | |
task_scheduler.start() | |
logger.info("End") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment