Last active
August 11, 2022 09:49
-
-
Save roadsideseb/f54ab060c62103ad3747712a3e45680a to your computer and use it in GitHub Desktop.
A slightly modified version of the 'schedule' Python package that works with asyncio π
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import emoji | |
import schedule | |
import asyncio | |
import inspect | |
from datetime import datetime | |
class Job(schedule.Job): | |
async def run(self): | |
if (inspect.iscoroutine(self.job_func.func) | |
or inspect.iscoroutinefunction(self.job_func.func)): | |
ret = await self.job_func() | |
else: | |
ret = self.job_func() | |
class Scheduler(schedule.Scheduler): | |
async def run_pending(self): | |
runnable_jobs = (job for job in self.jobs if job.should_run) | |
for job in sorted(runnable_jobs): | |
ret = await job.run() | |
if isinstance(ret, schedule.CancelJob) or ret is schedule.CancelJob: | |
self.cancel_job(job) | |
def every(self, interval=1): | |
job = Job(interval) | |
self.jobs.append(job) | |
return job | |
async def show_me_emoji(emoji, count=10): | |
print(' '.join(emoji * count)) | |
def print_sync(message='this is a sync function'): | |
print(message) | |
async def run_scheduler(scheduler): | |
while True: | |
await scheduler.run_pending() | |
await asyncio.sleep(1) | |
if __name__ == '__main__': | |
loop = asyncio.get_event_loop() | |
sch = Scheduler() | |
sch.every(5).seconds.do(show_me_emoji, 'π') | |
sch.every(5).seconds.do(print_sync) | |
sch.every(5).seconds.do(show_me_emoji, 'πΊπΉπ·πΈ', 3) | |
sch.every(5).seconds.do(print_sync, 'can you believe it') | |
sch.every(5).seconds.do(show_me_emoji, 'π₯') | |
loop.run_until_complete(run_scheduler(sch)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Another fix for when you're instantiating the job in the
every
method. This tells the Job which scheduler to register with (the custom one you've made). Also, I removed theself.jobs.append
line from theevery
method. When you instantiate thejob
with theScheduler
you've just implemented, it will automatically add the job to it's list of jobs to run. Here's the line in the scheduler library that does that.