Created
February 1, 2020 11:32
-
-
Save jaycosaur/9359aa8a7e2718308af1d6517f34fbdf to your computer and use it in GitHub Desktop.
Emulation of Ticker class in golang. Emits the time at intervals of 'time_to_wait' seconds on internal queue. Can be stopped by calling 'stop' method.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from queue import Queue, Empty | |
from threading import Thread | |
from typing import Any | |
import time | |
class Ticker: | |
_alive = True | |
_q = Queue() | |
_stop_q = Queue() | |
def __init__(self, time_to_wait: float): | |
self.time_to_wait = time_to_wait | |
self._thread = Thread(target=self.__tick_looper,) | |
self._thread.start() | |
def __tick_looper(self) -> None: | |
while True: | |
try: | |
exit = self._stop_q.get(timeout=self.time_to_wait) | |
if exit: | |
break | |
except Empty: | |
self._q.put(time.time()) | |
def stop(self) -> None: | |
self._stop_q.put(True) | |
self._alive = False | |
def is_stopped(self) -> bool: | |
return not self._alive and self._thread.is_alive() | |
def get(self) -> Any: | |
return self._q.get() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment