Skip to content

Instantly share code, notes, and snippets.

@infinex
Last active September 21, 2024 10:36
Show Gist options
  • Save infinex/3f2c06bc562a37b38aa8d774ad24a2c0 to your computer and use it in GitHub Desktop.
Save infinex/3f2c06bc562a37b38aa8d774ad24a2c0 to your computer and use it in GitHub Desktop.
python utils
from datetime import datetime, time
# FastAPI and Pydantic Related Libraries
from time import sleep
from apscheduler.events import EVENT_JOB_ERROR
from fastapi import FastAPI
from pydantic import BaseModel, Field
from typing import List
# APScheduler Related Libraries
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
import logging
# Global Variables
app = FastAPI(title="Fastapi scheduler", version="2020.11.1",
description="An Example for Running Scheduling")
Schedule = None
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def report_error(event):
if event.exception:
print(event.exception)
@app.on_event("startup")
async def load_schedule_or_create_blank():
"""
Instatialise the Schedule Object as a Global Param and also load existing Schedules from SQLite
This allows for persistent schedules across server restarts.
"""
global Schedule
try:
jobstores = {
'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
Schedule = AsyncIOScheduler(jobstores=jobstores)
Schedule.add_listener(report_error, EVENT_JOB_ERROR)
Schedule.start()
logger.info("Created Schedule Object")
except:
logger.error("Unable to Create Schedule Object")
@app.on_event("shutdown")
async def pickle_schedule():
"""
An Attempt at Shutting down the schedule to avoid orphan jobs
"""
global Schedule
Schedule.shutdown()
logger.info("Disabled Schedule")
class TaskResponse(BaseModel):
date: datetime = Field(title="Date in ISO-8601 Format",
description="Date time")
job_name: str = Field(title="Job Name", description="Name of Job")
class Config:
schema_extra = {
'example': {
"date": "2021-01-12T18:08:34",
"job_name": "Name of a Job",
}
}
class CurrentScheduledJob(BaseModel):
job_id: str = Field(title="The Job ID in APScheduler",
description="The Job ID in APScheduler")
run_frequency: str = Field(title="The Job Interval in APScheduler",
description="The Job Interval in APScheduler")
next_run: str = Field(title="Next Scheduled Run for the Job",
description="Next Scheduled Run for the Job")
class Config:
schema_extra = {
'example': {
"job_id": "My Job Id",
"run_frequency": "interval[0:05:00]",
"next_run": "2020-11-10 22:12:09.397935+10:00"
}
}
class CurrentScheduledJobsResponse(BaseModel):
jobs: List[CurrentScheduledJob]
class JobCreateDeleteResponse(BaseModel):
scheduled: bool = Field(title="Whether the job was scheduler or not",
description="Whether the job was scheduler or not")
job_id: str = Field(title="The Job ID in APScheduler",
description="The Job ID in APScheduler")
class Config:
schema_extra = {
'example': {
"scheduled": True,
"job_id": "My Job Id"
}
}
@app.post("/process_job/", response_model=TaskResponse,
tags=["tasks"])
def process_job(job_id="A Job Id"):
date = datetime.now()
return {"date": date, "job_name": job_id}
@app.get("/schedule/show_schedules/",
response_model=CurrentScheduledJobsResponse, tags=["schedule"])
async def get_scheduled_syncs():
"""
Will provide a list of currently Scheduled Tasks
"""
schedules = []
for job in Schedule.get_jobs():
schedules.append(
{"job_id": str(job.id), "run_frequency": str(job.trigger),
"next_run": str(job.next_run_time)})
return {"jobs": schedules}
@app.post("/schedule/job/", response_model=JobCreateDeleteResponse,
tags=["schedule"])
async def add_daily_job_to_scheduler(hour: int = 12,
job_id="An Job Id"):
"""
Add a New Job to a Schedule
"""
schedule_ssl_check = Schedule.add_job(process_job, 'cron',
hour=hour, id=job_id,
args=[job_id])
return {"scheduled": True, "job_id": schedule_ssl_check.id}
@app.delete("/schedule/job/", response_model=JobCreateDeleteResponse,
tags=["schedule"])
async def remove_job_from_scheduler(job_id):
"""
Remove a Job from a Schedule
"""
Schedule.remove_job(job_id)
return {"scheduled": False, "job_id": job_id}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment