Skip to content

Instantly share code, notes, and snippets.

@momijiame
Created July 4, 2016 03:07
Show Gist options
  • Save momijiame/dbf82058cae56000775f6f024c9de9aa to your computer and use it in GitHub Desktop.
Save momijiame/dbf82058cae56000775f6f024c9de9aa to your computer and use it in GitHub Desktop.
特定の時刻にジョブを実行する
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from abc import ABCMeta
from abc import abstractmethod
from datetime import datetime
from threading import Thread
import threading
import time
from future.utils import with_metaclass
class Job(object):
def __init__(self, target_time, function, *args, **kwargs):
self.target_time = target_time
self.function = function
self.args = args
self.kwargs = kwargs
def __iter__(self):
return iter([self.target_time, self.function, self.args, self.kwargs])
class JobConsumer(with_metaclass(ABCMeta)):
def __init__(self, job_q):
self.job_q = job_q
@abstractmethod
def start(self):
pass
class ThreadingConsumer(JobConsumer):
def __init__(self, job_q, workers=1):
super(ThreadingConsumer, self).__init__(job_q)
self._workers = [
JobWorker(self.job_q)
for _ in
range(workers)
]
def start(self):
for worker in self._workers:
worker.start()
class JobWorker(Thread):
def __init__(self, job_q):
super(JobWorker, self).__init__()
self.job_q = job_q
def run(self):
while True:
job = self.job_q.get()
execute(job)
def execute(job):
target_time, function, args, kwargs = job
now = datetime.now()
sleep_dt = target_time - now
sleep_seconds = sleep_dt.total_seconds()
if sleep_seconds > 0:
time.sleep(sleep_seconds)
function(*args, **kwargs)
def print_date():
current_thread = threading.current_thread()
print('{}: {}'.format(current_thread.name, datetime.now()))
def main():
from queue import Queue
job_q = Queue()
consumer = ThreadingConsumer(job_q, workers=2)
consumer.start()
from datetime import timedelta
job_q.put(Job(datetime.now() + timedelta(seconds=2), print_date))
job_q.put(Job(datetime.now() + timedelta(seconds=2), print_date))
job_q.put(Job(datetime.now() + timedelta(seconds=3), print_date))
job_q.put(Job(datetime.now() + timedelta(seconds=4), print_date))
job_q.put(Job(datetime.now() + timedelta(seconds=4), print_date))
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment