Skip to content

Instantly share code, notes, and snippets.

@anhtuank7c
Created November 7, 2024 12:52
Show Gist options
  • Save anhtuank7c/3086b21518a1cb1b315eac5cc2b65fa5 to your computer and use it in GitHub Desktop.
Save anhtuank7c/3086b21518a1cb1b315eac5cc2b65fa5 to your computer and use it in GitHub Desktop.
Test multiprocess in Python
import concurrent.futures
import time
import schedule
from loguru import logger
# Define a sample job function that simulates a long-running process
def my_long_running_task(task_id: int) -> str:
logger.info(f"Task {task_id} started.")
time.sleep(10) # Simulate a task that takes 10 seconds to complete
logger.info(f"Task {task_id} completed.")
return f"Task {task_id} result."
# Wrapper class to manage scheduled jobs and re-runs
class TaskScheduler:
def __init__(self, interval_minutes: int = 5) -> None:
self.interval_minutes = interval_minutes
self.task_states: dict = {} # Dictionary to store task states
def schedule_task(self, task_id: str) -> None:
# Initialize task state
self.task_states[task_id] = None
# Schedule the job with the specified interval
schedule.every(self.interval_minutes).seconds.do(self._run_task, task_id)
def _run_task(self, task_id: int) -> None:
# If a previous task is still running, skip this schedule
if (
self.task_states[task_id] is not None
and not self.task_states[task_id].done()
):
logger.info(f"Task {task_id} is still running; skipping this schedule.")
return
# Submit the task to the executor and update the task state
logger.info(f"Scheduling task {task_id}.")
self.task_states[task_id] = executor.submit(my_long_running_task, task_id)
def start(self) -> None:
while True:
# Run scheduled jobs
schedule.run_pending()
time.sleep(1) # Check every second to avoid busy waiting
print("checking")
if __name__ == "__main__":
logger.info("Start")
# Set up a ProcessPoolExecutor for parallel processing
with concurrent.futures.ProcessPoolExecutor() as executor:
task_scheduler = TaskScheduler(interval_minutes=5)
# Schedule multiple tasks by ID
task_scheduler.schedule_task(task_id=1)
task_scheduler.schedule_task(task_id=2)
task_scheduler.schedule_task(task_id=3)
# Start the scheduler loop
task_scheduler.start()
logger.info("End")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment