Last active
August 6, 2021 01:08
-
-
Save tchar/8ae29db0ce500ca527caaf9dc12ba6de to your computer and use it in GitHub Desktop.
Singleton service sharing resources with a runner thread that supports stopping, waking up
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from typing import List | |
from threading import Thread, RLock, Event | |
import time | |
import uuid | |
import logging | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
class ServiceThread(Thread): | |
def __init__(self, lock: RLock, sleep_for: float, data: List): | |
super().__init__() | |
self._lock = lock | |
self._sleep_for = sleep_for | |
self._shared_data = data | |
self._run_count = 0 | |
self._woke_event = Event() | |
self._stopped_event = Event() | |
self._id = uuid.uuid4() | |
self._logger = logger.getChild('Thread (id={})'.format(self.id)) | |
@property | |
def id(self): | |
return str(self._id).split('-')[-1] | |
def _run(self) -> None: | |
# Write run code | |
with self._lock: | |
self._shared_data.append(self._run_count) | |
self._run_count += 1 | |
def run(self) -> None: | |
self._logger.info('Starting thread') | |
while not self._stopped_event.is_set(): | |
self._run() | |
self._logger.info('Sleeping for {:.0f}s'.format(self._sleep_for)) | |
self._woke_event.wait(self._sleep_for) | |
self._logger.info('Woke up') | |
self._woke_event.clear() | |
self._logger.info('Stopping thread') | |
def wake(self) -> None: | |
self._woke_event.set() | |
def stop(self) -> None: | |
self._stopped_event.set() | |
self.wake() | |
class Service: | |
_instance = None | |
def __init__(self): | |
self.run_interval = 1 | |
self._lock = RLock() | |
self._thread = None | |
self._shared_data = ['some', 'data'] | |
@classmethod | |
def get_instance(cls) -> 'Service': | |
if cls._instance is None: | |
cls._instance = Service() | |
return cls._instance | |
@property | |
def is_running(self) -> bool: | |
return self._thread is not None | |
@property | |
def data(self): | |
with self._lock: | |
return self._shared_data.copy() | |
def add_data(self, value): | |
with self._lock: | |
self._shared_data.append(value) | |
def run(self) -> None: | |
if not self.is_running: | |
self.start() | |
return | |
self._thread.wake() | |
def start(self) -> None: | |
if self.is_running: | |
return | |
lock, interval, data = self._lock, self.run_interval, self._shared_data | |
self._thread = ServiceThread(lock, interval, data) | |
self._thread.start() | |
def stop(self) -> None: | |
if not self.is_running: | |
return | |
self._thread.stop() | |
self._thread.join() | |
self._thread = None | |
def main(): | |
try: | |
service = Service.get_instance() | |
service.start() | |
for i in range(10): | |
service.add_data(-i) | |
time.sleep(1) | |
print('Data={}'.format(service.data)) | |
except Exception as e: | |
logger.exception(e) | |
finally: | |
service.stop() | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment