Last active
September 21, 2024 10:36
-
-
Save infinex/3f2c06bc562a37b38aa8d774ad24a2c0 to your computer and use it in GitHub Desktop.
python utils
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from datetime import datetime, time | |
# FastAPI and Pydantic Related Libraries | |
from time import sleep | |
from apscheduler.events import EVENT_JOB_ERROR | |
from fastapi import FastAPI | |
from pydantic import BaseModel, Field | |
from typing import List | |
# APScheduler Related Libraries | |
from apscheduler.schedulers.asyncio import AsyncIOScheduler | |
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore | |
import logging | |
# Global Variables | |
app = FastAPI(title="Fastapi scheduler", version="2020.11.1", | |
description="An Example for Running Scheduling") | |
Schedule = None | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
def report_error(event): | |
if event.exception: | |
print(event.exception) | |
@app.on_event("startup") | |
async def load_schedule_or_create_blank(): | |
""" | |
Instatialise the Schedule Object as a Global Param and also load existing Schedules from SQLite | |
This allows for persistent schedules across server restarts. | |
""" | |
global Schedule | |
try: | |
jobstores = { | |
'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite') | |
} | |
Schedule = AsyncIOScheduler(jobstores=jobstores) | |
Schedule.add_listener(report_error, EVENT_JOB_ERROR) | |
Schedule.start() | |
logger.info("Created Schedule Object") | |
except: | |
logger.error("Unable to Create Schedule Object") | |
@app.on_event("shutdown") | |
async def pickle_schedule(): | |
""" | |
An Attempt at Shutting down the schedule to avoid orphan jobs | |
""" | |
global Schedule | |
Schedule.shutdown() | |
logger.info("Disabled Schedule") | |
class TaskResponse(BaseModel): | |
date: datetime = Field(title="Date in ISO-8601 Format", | |
description="Date time") | |
job_name: str = Field(title="Job Name", description="Name of Job") | |
class Config: | |
schema_extra = { | |
'example': { | |
"date": "2021-01-12T18:08:34", | |
"job_name": "Name of a Job", | |
} | |
} | |
class CurrentScheduledJob(BaseModel): | |
job_id: str = Field(title="The Job ID in APScheduler", | |
description="The Job ID in APScheduler") | |
run_frequency: str = Field(title="The Job Interval in APScheduler", | |
description="The Job Interval in APScheduler") | |
next_run: str = Field(title="Next Scheduled Run for the Job", | |
description="Next Scheduled Run for the Job") | |
class Config: | |
schema_extra = { | |
'example': { | |
"job_id": "My Job Id", | |
"run_frequency": "interval[0:05:00]", | |
"next_run": "2020-11-10 22:12:09.397935+10:00" | |
} | |
} | |
class CurrentScheduledJobsResponse(BaseModel): | |
jobs: List[CurrentScheduledJob] | |
class JobCreateDeleteResponse(BaseModel): | |
scheduled: bool = Field(title="Whether the job was scheduler or not", | |
description="Whether the job was scheduler or not") | |
job_id: str = Field(title="The Job ID in APScheduler", | |
description="The Job ID in APScheduler") | |
class Config: | |
schema_extra = { | |
'example': { | |
"scheduled": True, | |
"job_id": "My Job Id" | |
} | |
} | |
@app.post("/process_job/", response_model=TaskResponse, | |
tags=["tasks"]) | |
def process_job(job_id="A Job Id"): | |
date = datetime.now() | |
return {"date": date, "job_name": job_id} | |
@app.get("/schedule/show_schedules/", | |
response_model=CurrentScheduledJobsResponse, tags=["schedule"]) | |
async def get_scheduled_syncs(): | |
""" | |
Will provide a list of currently Scheduled Tasks | |
""" | |
schedules = [] | |
for job in Schedule.get_jobs(): | |
schedules.append( | |
{"job_id": str(job.id), "run_frequency": str(job.trigger), | |
"next_run": str(job.next_run_time)}) | |
return {"jobs": schedules} | |
@app.post("/schedule/job/", response_model=JobCreateDeleteResponse, | |
tags=["schedule"]) | |
async def add_daily_job_to_scheduler(hour: int = 12, | |
job_id="An Job Id"): | |
""" | |
Add a New Job to a Schedule | |
""" | |
schedule_ssl_check = Schedule.add_job(process_job, 'cron', | |
hour=hour, id=job_id, | |
args=[job_id]) | |
return {"scheduled": True, "job_id": schedule_ssl_check.id} | |
@app.delete("/schedule/job/", response_model=JobCreateDeleteResponse, | |
tags=["schedule"]) | |
async def remove_job_from_scheduler(job_id): | |
""" | |
Remove a Job from a Schedule | |
""" | |
Schedule.remove_job(job_id) | |
return {"scheduled": False, "job_id": job_id} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
monkeypatch | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment