Skip to content

Instantly share code, notes, and snippets.

@stoensin
Created January 11, 2020 03:16
Show Gist options
  • Save stoensin/f24179551dc92e73e4f4b71a8147a623 to your computer and use it in GitHub Desktop.
Save stoensin/f24179551dc92e73e4f4b71a8147a623 to your computer and use it in GitHub Desktop.
PriorityQueue基于 最小堆 算法的,添加和移除一个元素的耗时都是log2(n)
from Queue import PriorityQueue
from datetime import datetime
import threading
class Delayed(object):
# 返回:计划执行时间
# 单位: datetime
def plan_time(self):
pass
def total_seconds(td):
return (td.microseconds + (td.seconds + td.days * 24 * 3600) * 10**6) / 10**6
class DelayQueue(PriorityQueue):
def __init__(self, maxsize):
self.queue = []
# 如果任务没有到达执行时间,则消费者必须等待在此condition上
self.lock = threading.Lock()
self.can_done = threading.Condition(self.lock)
def put_task(self, task):
self.put((task.plan_time, task))
# 检索并移除此队列的头部,如果此队列不存在未到期延迟的元素,则等待它
def take_task(self):
self.can_done.acquire()
try:
task = self.peek()
delta = total_seconds(task.plan_time - datetime.now())
while delta > 0:
self.can_done.wait(delta)
task = self.peek()
delta = total_seconds(task.plan_time - datetime.now())
item = self.get()
self.can_done.notify_all()
return item[1]
finally:
self.can_done.release()
def peek(self):
self.not_empty.acquire()
try:
while not self._qsize():
self.not_empty.wait()
return self.queue[0][1]
finally:
self.not_empty.release()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment