Created
August 11, 2020 07:01
-
-
Save serjflint/efe4f42b5934c2db16c64769ffa7499f to your computer and use it in GitHub Desktop.
Trio multi-thread file update idling
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
pyinstaller --onefile --noconfirm --hiddenimport=win32timezone ^ | |
--hiddenimport=pkg_resources.py2_warn --hiddenimport=win32serviceutil ^ | |
--noupx --uac-admin --name=AG_Loader cli.py |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys | |
import servicemanager | |
import win32serviceutil | |
from logbook import Logger, FileHandler, NTEventLogHandler, INFO, set_datetime_format | |
from loader.loader import main, this_exe_dir, Loader | |
from loader.service import Service | |
log = Logger("cli") | |
set_datetime_format("local") | |
LOG_FORMAT = ( | |
"[{record.time}] {record.level_name}: {record.channel}: " | |
"{record.func_name}({record.lineno}): {record.message}" | |
) | |
if __name__ == "__main__": | |
worker = Loader() | |
worker.init() | |
worker_name = worker.config["Addon"]["name"] | |
Service._svc_name_ = worker_name | |
Service._svc_display_name_ = worker_name | |
Service._svc_description_ = "Служба интеграции данных для ПО АвтоГраф" | |
Service._target = (main, worker) | |
with NTEventLogHandler( | |
worker_name, format_string=LOG_FORMAT, level=INFO | |
).applicationbound(): | |
with FileHandler( | |
this_exe_dir().joinpath("service.log"), | |
format_string=LOG_FORMAT, | |
bubble=True, | |
).applicationbound(): | |
try: | |
if len(sys.argv) == 1: | |
servicemanager.Initialize() | |
servicemanager.PrepareToHostSingle(Service) | |
servicemanager.StartServiceCtrlDispatcher() | |
else: | |
win32serviceutil.HandleCommandLine(Service) | |
except Exception as err: | |
log.exception(err) | |
sys.exit(1) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[Addon] | |
name = "AG_Loader" | |
debug = false | |
[AutoGRAPH] | |
dir = "temp" | |
carslist = true | |
sbin = true | |
last_id = 675403987 | |
step = 100000 | |
carlist = true | |
lls = false | |
[Sync] | |
wait = 300 | |
attempts = 30 | |
writes = 30 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import argparse | |
from datetime import datetime, timedelta | |
import os | |
import re | |
import sys | |
import time | |
from collections import namedtuple | |
from typing import List, Dict, Tuple, Callable | |
import random | |
import faulthandler | |
import toml | |
import trio | |
import trio_mysql.cursors | |
from logbook import ( | |
Logger, | |
set_datetime_format, | |
FileHandler, | |
StderrHandler, | |
NullHandler, | |
INFO, | |
DEBUG, | |
) | |
log = Logger("loader") | |
set_datetime_format("local") | |
Record = namedtuple( | |
"Record", | |
[ | |
"rec_id", | |
"obj_id", | |
"rec_date", | |
"lat", | |
"lon", | |
"d1", | |
"d2", | |
"d3", | |
"d4", | |
"d5", | |
"d6", | |
"an1", | |
"an2", | |
], | |
) | |
Objects = Dict[int, int] | |
Groups = Dict[str, List[int]] | |
Records = Dict[int, Dict[Tuple, List[Record]]] | |
def this_exe_dir(): | |
return trio.Path(os.path.dirname(os.path.realpath(sys.argv[0]))) | |
DEFAULT_LAST_ID = 0 | |
DEFAULT_STEP = 10000 | |
DEFAULT_WORKING_DIR = this_exe_dir() | |
DEFAULT_CARSLIST = True | |
DEFAULT_SBIN = True | |
DEFAULT_LLS = True | |
DEFAULT_WAIT = 300 | |
DEFAULT_ATTEMPTS = 30 | |
DEFAULT_WRITES = 30 | |
ONLINE_READ_TIMEOUT = 10 | |
START_DATETIME = datetime.min | |
CHANNEL_BUFFER_SIZE = 1 | |
LOG_FORMAT = ( | |
"[{record.time}] {record.level_name}: {record.channel}: " | |
"{record.func_name}({record.lineno}): {record.message}" | |
) | |
class Loader: | |
""" | |
Stop Callable is used to check if the main thread is scheduled to stop (Windows service) | |
To start Loader use an example below: | |
if __name__ == "__main__": | |
worker = Loader() | |
worker.init() | |
main(worker) | |
""" | |
def __init__(self, stop: Callable = lambda: False): | |
self.connection: dict = dict() | |
self.config: dict = dict() | |
self.config_path: trio.Path = None | |
self.is_stop: Callable = stop | |
self.objects: Objects = dict() | |
self._initialized: bool = False | |
self._producer_last_id: int = 0 | |
self.open_files: set = set() | |
self.writing_files: set = set() | |
def init(self): | |
if sys.argv[0].endswith(".py"): | |
parser: argparse.ArgumentParser = argparse.ArgumentParser( | |
description="AG_Loader" | |
) | |
parser.add_argument( | |
"config", | |
nargs="?", | |
default="config.toml", | |
help="Path to the configuration file", | |
) | |
args, leftovers = parser.parse_known_args() | |
self.config_path = trio.Path(args.config) | |
else: | |
self.config_path = this_exe_dir().joinpath("config.toml") | |
try: | |
with open(self.config_path, mode="r", encoding="utf8") as stream: | |
self.config = toml.load(stream) | |
except FileNotFoundError as err: | |
log.critical(f'Невозможно открыть "config.toml": {err}') | |
sys.exit(1) | |
if ( | |
self.config.get("Addon", None) is None | |
or self.config["Addon"].get("name", None) is None | |
): | |
log.critical("Обязательно наличие названия службы в конфигурации") | |
sys.exit(1) | |
self._initialized = True | |
async def run(self): | |
if not self._initialized: | |
log.critical("Перед запуском основного цикла необходимо вызвать init()!") | |
sys.exit(1) | |
try: | |
await this_exe_dir().joinpath("SBIN").mkdir(exist_ok=True) | |
except OSError as err: | |
log.critical(f"Невозможно создать папку с данными: {err}") | |
sys.exit(1) | |
if ( | |
self.config.get("SERVER", None) is None | |
or self.config["SERVER"].get("host", None) is None | |
): | |
log.critical("Обязательно наличие удалённого сервера в конфигурации") | |
sys.exit(1) | |
a = self.config["AutoGRAPH"] | |
a["last_id"] = int(a.get("last_id", DEFAULT_LAST_ID)) | |
a["step"] = int(a.get("step", DEFAULT_STEP)) or int(p.step) | |
a["dir"] = a.get("dir", DEFAULT_WORKING_DIR) | |
a["carslist"] = bool(a.get("carslist", DEFAULT_CARSLIST)) | |
a["sbin"] = bool(a.get("sbin", DEFAULT_SBIN)) | |
a["lls"] = bool(a.get("lls", DEFAULT_LLS)) | |
try: | |
objects, groups = await self.select_objects() | |
except trio_mysql.Error as err: | |
log.error(f"Ошибка обновления списка объектов: {err}") | |
sys.exit(1) | |
self.objects = objects | |
s = self.config["Sync"] | |
s["wait"] = int(s.get("wait", DEFAULT_WAIT)) | |
s["attempts"] = int(s.get("attempts", DEFAULT_ATTEMPTS)) | |
s["writes"] = int(s.get("writes", DEFAULT_WRITES)) | |
if a["carslist"]: | |
try: | |
await self.save_carslist(a["dir"], groups) | |
except OSError as err: | |
log.error(f"Не удалось записать CarsList: {err}") | |
sys.exit(1) | |
log.info(f"Загружена информация о {len(self.objects)} объектах") | |
async with trio.open_nursery() as nursery: | |
nursery.start_soon(self.schedule, self.save_config, 60, self.config_path) | |
nursery.start_soon( | |
self.schedule, self.update_cars_lists, 60, a["carslist"], a["dir"] | |
) | |
send_channel, receive_channel = trio.open_memory_channel( | |
CHANNEL_BUFFER_SIZE | |
) | |
nursery.start_soon(self.producer, send_channel) | |
nursery.start_soon(self.consumer, receive_channel) | |
async def producer(self, send_channel: trio.MemorySendChannel) -> None: | |
a = self.config["AutoGRAPH"] | |
self._producer_last_id, step = a["last_id"], a["step"] | |
async with send_channel: | |
while not self.is_stop(): | |
await trio.sleep(1) | |
last_id = self._producer_last_id | |
log.debug(f"Enter {self._producer_last_id}") | |
try: | |
max_id = await self.select_last_id() | |
except trio_mysql.Error as err: | |
log.error(f"Ошибка получения записей от источника: {err}") | |
continue | |
try: | |
records, last_id, received = await self.select_records( | |
last_id, step, self.objects | |
) | |
except trio_mysql.Error as err: | |
log.error(f"Ошибка получения записей: {err}") | |
continue | |
log.debug(f"Exit {self._producer_last_id}") | |
if records: | |
if not last_id > self._producer_last_id: | |
log.warn( | |
f"Wrong last_id: {last_id}. Original: {self._producer_last_id}" | |
) | |
continue | |
processed = last_id - self._producer_last_id | |
self._producer_last_id = last_id | |
log.info( | |
f"Пройдено: {processed}. Получено {received}. Прогресс: {last_id} из {max_id}" | |
) | |
await trio.sleep(0.1) | |
await send_channel.send((records, last_id, received)) | |
else: | |
processed = 0 | |
if self._producer_last_id + step < max_id: | |
processed = step | |
last_id = self._producer_last_id + step | |
self._producer_last_id = last_id | |
log.info( | |
f"Пройдено: {processed}. Получено {received}. Прогресс: {last_id} из {max_id}. " | |
) | |
continue | |
if processed < step: | |
if last_id + step > max_id: | |
await trio.sleep(ONLINE_READ_TIMEOUT) | |
async def consumer(self, receive_channel: trio.MemoryReceiveChannel) -> None: | |
lls = self.config["AutoGRAPH"]["lls"] | |
async with receive_channel: | |
async for result in receive_channel: | |
if self.is_stop(): | |
return | |
records, last_id, received = result | |
log.debug(f"Enter {last_id}") | |
while True: | |
try: | |
saved = await self.save_records( | |
records, self.objects, lls=lls, **self.config["Sync"] | |
) | |
self.config["AutoGRAPH"]["last_id"] = last_id | |
log.info( | |
f"Получено: {received}. Сохранено: {saved}. Последний: {last_id}" | |
) | |
break | |
except (OSError, TimeoutError) as err: | |
log.exception(f"Не удалось сохранить записи: {err}") | |
await trio.sleep(1) | |
log.debug(f"Exit {last_id}") | |
async def save_config(self, path) -> None: | |
log.debug(f"Enter") | |
try: | |
with open(path, mode="w", encoding="utf8") as f: | |
data = toml.dumps(self.config) | |
if not data: | |
raise ValueError(f"Cannot dump to TOML: {self.config}") | |
f.write(data) | |
except FileNotFoundError as err: | |
log.critical(f'Невозможно открыть "{path}": {err}') | |
except Exception as err: | |
log.exception(err) | |
log.debug(f"Exit") | |
# debug stats | |
log.debug( | |
f"open_files {len(self.open_files)}, writing files {len(self.writing_files)}" | |
) | |
if self.config["Addon"]["debug"]: | |
stats = trio.to_thread.current_default_thread_limiter().statistics() | |
low_stats = trio.lowlevel.current_statistics() | |
log.debug(f"Thread stats: {stats}\nLow stats: {low_stats}") | |
with open(this_exe_dir().joinpath("thread_dump.log"), mode="w") as f: | |
faulthandler.dump_traceback(f) | |
trio.lowlevel.current_trio_token().run_sync_soon(lambda: None) | |
async def schedule(self, func: Callable, interval: float = 1.0, *args): | |
while not self.is_stop(): | |
await trio.sleep(interval) | |
try: | |
log.debug(f"Enter") | |
await func(*args) | |
log.debug(f"Exit") | |
except Exception as err: | |
log.exception(f"Error in scheduled function {func.__name__}: {err}") | |
async def update_cars_lists(self, carslist, working_dir) -> bool: | |
log.debug(f"Enter") | |
try: | |
objects, groups = await self.select_objects() | |
except trio_mysql.Error as err: | |
log.error(f"Ошибка обновления списка объектов: {err}") | |
return False | |
self.objects = objects | |
if carslist: | |
try: | |
await self.save_carslist(working_dir, groups) | |
except (OSError, TimeoutError) as err: | |
log.error(f"Не удалось записать CarsList: {err}") | |
return False | |
log.debug(f"Exit") | |
return True | |
async def select_last_id(self) -> int: | |
log.debug(f"Enter") | |
last_id = self._producer_last_id + 2 * self.config["AutoGRAPH"]["step"] | |
log.debug(f"Exit") | |
return last_id | |
async def select_objects(self) -> (Objects, Groups): | |
log.debug(f"Enter") | |
objects: dict = dict() | |
groups: dict = dict() | |
for obj in range(10000): | |
objects[obj] = obj | |
buffer: List[int] = groups.get(str(obj % 100), list()) | |
buffer.append(obj) | |
groups[str(obj % 100)] = buffer | |
log.debug(f"Exit") | |
return objects, groups | |
@staticmethod | |
async def save_carslist(working_dir: str, groups: Groups) -> None: | |
log.debug(f"Enter") | |
working_dir = trio.Path(working_dir) | |
await working_dir.mkdir(exist_ok=True) | |
carslist = working_dir.joinpath("CarsList") | |
await carslist.mkdir(exist_ok=True) | |
dbf = working_dir.joinpath("DBF") | |
await dbf.mkdir(exist_ok=True) | |
async def write_carslist(name: str, group: List[int]): | |
name = re.sub(r"[\\/:*?\"\'<>|]", "", name) | |
ini_file = carslist.joinpath(f"{name}.ini") | |
atg_file = dbf.joinpath(f"{name}.atg") | |
ini = f"[SETUP]\nName={name}\n" | |
atg = f"AutoGRAPH database 1.3\n{len(group)} records\n" | |
for obj in group: | |
try: | |
autograph_id = get_autograph_id(obj) | |
except ValueError: | |
log.info(f"Missing autograph_id in {obj}") | |
continue | |
ini += ( | |
f"[{autograph_id}]\n" | |
f"Model={obj}\n" | |
f"Number={obj}\n" | |
"Online=1,1\n" | |
) | |
atg += f"{autograph_id}:\tPASSWORD={autograph_id}0\n" | |
try: | |
async with await trio.open_file(ini_file, mode="w") as f: | |
await f.write(ini) | |
async with await trio.open_file(atg_file, mode="w") as f: | |
await f.write(atg) | |
except UnicodeEncodeError as err: | |
log.error(f"{err}: {ini}") | |
raise | |
with trio.move_on_after(30) as cancel_scope: | |
async with trio.open_nursery() as nursery: | |
for key, value in groups.items(): | |
nursery.start_soon(write_carslist, key, value) | |
if cancel_scope.cancelled_caught: | |
raise TimeoutError("Cannot save CarsList") | |
log.debug(f"Exit") | |
async def select_records( | |
self, last_id: int, step: int, objects: Objects | |
) -> (Record, int): | |
log.debug(f"Enter") | |
records = dict() | |
new_last_id: int = last_id | |
received = 0 | |
for i in range(step): | |
record = Record( | |
last_id + i, | |
random.randrange(10000), | |
datetime(2020, 1, 1, 0, 0, 0), | |
50.0, | |
50.0, | |
2, | |
1, | |
1, | |
1, | |
1, | |
1, | |
None, | |
None, | |
) | |
if record.rec_id > new_last_id: | |
new_last_id = record.rec_id | |
if not record.lat or not record.lon or record.obj_id not in objects: | |
continue | |
obj_dict: dict = records.get(record.obj_id, dict()) | |
week = record.rec_date.isocalendar()[:2] | |
buffer = obj_dict.get(week, list()) | |
buffer.append(record) | |
received += 1 | |
obj_dict[week] = buffer | |
records[record.obj_id] = obj_dict | |
log.debug(f"Exit") | |
return records, new_last_id, received | |
async def save_records( | |
self, | |
records: Records, | |
objects: Objects, | |
lls: bool = False, | |
wait: int = 30, | |
attempts: int = 5, | |
writes: int = 10, | |
) -> int: | |
log.debug(f"Enter") | |
queue = list() | |
for obj_id, weeks in records.items(): | |
if obj_id not in objects: | |
continue | |
for week, batch in weeks.items(): | |
queue.append((obj_id, batch)) | |
log.debug(f"Await {len(queue)} writes") | |
step = writes | |
saved = list() | |
for x in range(0, len(queue), step): | |
with trio.move_on_after(wait) as cancel_scope: | |
async with trio.open_nursery() as nursery: | |
for obj_id, batch in queue[x : x + step]: | |
autograph_id = get_autograph_id(objects[obj_id]) | |
nursery.start_soon( | |
self.write_sbin, autograph_id, batch, lls, attempts, saved | |
) | |
if cancel_scope.cancelled_caught: | |
raise TimeoutError("Can not save records in time") | |
log.debug(f"Exit") | |
return sum(saved) | |
async def write_sbin( | |
self, | |
autograph_id: int, | |
batch: List[Record], | |
lls: bool, | |
attempts: int, | |
saved: list, | |
): | |
log.debug(f"Write {autograph_id}") | |
buffer: bytes = bytes() | |
try: | |
filtered = batch | |
saved.append(len(filtered)) | |
for record in filtered: | |
buffer += record_to_bin(record, lls) | |
except AttributeError as err: | |
raise TypeError(f"Batch should contain entries of type Record: {err}") | |
obj_dir = this_exe_dir().joinpath("SBIN").joinpath(autograph_id) | |
await obj_dir.mkdir(exist_ok=True) | |
today = batch[0].rec_date.date() | |
monday = today - timedelta(days=today.weekday()) | |
file_name = f"{autograph_id}-{monday:%y%m%d}.sbin" | |
sbin_file = obj_dir.joinpath(file_name) | |
for t in range(attempts): | |
try: | |
self.open_files.add(file_name) | |
async with await trio.open_file(sbin_file, mode="ab", buffering=0) as f: | |
self.writing_files.add(file_name) | |
await f.write(buffer) | |
self.writing_files.remove(file_name) | |
# with open(sbin_file, mode="ab", buffering=0) as f: | |
# log.debug(f"Await write {obj_id}") | |
# f.write(buffer) | |
break | |
except OSError as err: | |
log.error(f"Не удалось сохранить записи в файл, попытка {t}: {err}") | |
await trio.sleep(1) | |
else: | |
raise OSError("Не удалось сохранить записи в файл") | |
self.open_files.remove(file_name) | |
def date_to_ag(dt: datetime) -> str: | |
sec = (dt.year - 2009) * 32140800 | |
sec += (dt.month - 1) * 2678400 | |
sec += (dt.day - 1) * 86400 | |
sec += dt.hour * 3600 | |
sec += dt.minute * 60 | |
sec += dt.second | |
return f"{sec:08x}" | |
def nmea_to_ag(val: float): | |
val /= 100.0 | |
dd = int(val) | |
h = (((val - dd) * 100 / 60) + dd) * 1.2 * 1000000 | |
return f"{int(h):07x}" | |
def get_autograph_id(obj: int) -> str: | |
if obj is None: | |
raise ValueError(f"Autograph_id must not be None") | |
return f"{obj:07}"[-7:] | |
# fmt: off | |
CRC8_Table = [ | |
0, 94, 188, 226, 97, 63, 221, 131, 194, 156, 126, 32, 163, 253, 31, 65, | |
157, 195, 33, 127, 252, 162, 64, 30, 95, 1, 227, 189, 62, 96, 130, 220, | |
35, 125, 159, 193, 66, 28, 254, 160, 225, 191, 93, 3, 128, 222, 60, 98, | |
190, 224, 2, 92, 223, 129, 99, 61, 124, 34, 192, 158, 29, 67, 161, 255, | |
70, 24, 250, 164, 39, 121, 155, 197, 132, 218, 56, 102, 229, 187, 89, 7, | |
219, 133, 103, 57, 186, 228, 6, 88, 25, 71, 165, 251, 120, 38, 196, 154, | |
101, 59, 217, 135, 4, 90, 184, 230, 167, 249, 27, 69, 198, 152, 122, 36, | |
248, 166, 68, 26, 153, 199, 37, 123, 58, 100, 134, 216, 91, 5, 231, 185, | |
140, 210, 48, 110, 237, 179, 81, 15, 78, 16, 242, 172, 47, 113, 147, 205, | |
17, 79, 173, 243, 112, 46, 204, 146, 211, 141, 111, 49, 178, 236, 14, 80, | |
175, 241, 19, 77, 206, 144, 114, 44, 109, 51, 209, 143, 12, 82, 176, 238, | |
50, 108, 142, 208, 83, 13, 239, 177, 240, 174, 76, 18, 145, 207, 45, 115, | |
202, 148, 118, 40, 171, 245, 23, 73, 8, 86, 180, 234, 105, 55, 213, 139, | |
87, 9, 235, 181, 54, 104, 138, 212, 149, 203, 41, 119, 244, 170, 72, 22, | |
233, 183, 85, 11, 136, 214, 52, 106, 43, 117, 151, 201, 74, 20, 246, 168, | |
116, 42, 200, 150, 21, 75, 169, 247, 182, 232, 10, 84, 215, 137, 107, 53 | |
] | |
# fmt: on | |
def CRC8(buff): | |
crc = 0 | |
for c in buff: | |
crc = CRC8_Table[crc ^ c] | |
return crc | |
def record_to_bin(record: Record, lls: bool = False) -> bytes: | |
if record.lat < 0 or record.lon < 0: | |
return bytes() | |
rec_date = date_to_ag(record.rec_date) | |
lat = f"B{nmea_to_ag(record.lat)}" | |
lon = f"2{nmea_to_ag(record.lon)}" | |
try: | |
data = list(bytes.fromhex("00E6F70F" + lon + lat + rec_date)) | |
except ValueError as err: | |
log.error(f"{err}: record {record} lon {lon} lat {lat} rec_date {rec_date}") | |
return bytes() | |
data.reverse() | |
d = 0x0F | |
if record.d1 == 2: | |
d -= 1 << 0 | |
if record.d1 == 2: | |
d -= 1 << 1 | |
if record.d1 == 2: | |
d -= 1 << 2 | |
if record.d1 == 2: | |
d -= 1 << 3 | |
data[-4] = d | |
data[-1] = CRC8(data[:-1]) | |
if lls and record.an1 and record.an2 and record.an1 < 4096 and record.an2 < 4096: | |
data += data[:4] | |
data[-1] |= 0b1 << 7 | |
lls1 = f"{int(record.an1):03x}" | |
lls2 = f"{int(record.an2):03x}" | |
data += reversed(list(bytes.fromhex(f"{lls2}{lls1}08"))) | |
data += [3, 0, 0, 0] | |
data += [0x0F, 0xF7, 0xE6, 0] # 0x0E -> 0x0F | |
data[-1] = CRC8(data[-16:-1]) | |
return bytes(data) | |
def main(worker: Loader): | |
level = DEBUG if worker.config["Addon"].get("debug", False) else INFO | |
with NullHandler().applicationbound(): | |
with FileHandler( | |
this_exe_dir().joinpath("worker.log"), | |
format_string=LOG_FORMAT, | |
level=level, | |
).applicationbound(): | |
log.info(f"Программа запущена в {this_exe_dir()}") | |
with StderrHandler( | |
format_string=LOG_FORMAT, bubble=True, level=level | |
).applicationbound(): | |
while True: | |
try: | |
trio.run(worker.run) | |
break | |
except OSError as err: | |
log.error(f"Сервер Автограф не отвечает: {err}") | |
time.sleep(30) | |
except Exception as err: | |
log.exception(err) | |
time.sleep(3) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[tool.poetry] | |
name = "ag_loader_python" | |
version = "0.3.0" | |
description = "" | |
authors = ["Sergei Iakhnitskii <[email protected]>"] | |
[tool.poetry.dependencies] | |
python = "^3.8" | |
logbook = "^1.5.3" | |
trio = "^0.16.0" | |
trio-mysql = "^0.9.3" | |
toml = "^0.10.1" | |
[tool.poetry.dev-dependencies] | |
pyinstaller = "^3.6" | |
pywin32 = "^228" | |
pywin32-ctypes = "^0.2.0" | |
pefile = "^2019.4.18" | |
[build-system] | |
requires = ["poetry>=0.12"] | |
build-backend = "poetry.masonry.api" |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from threading import Thread | |
from typing import Callable | |
import servicemanager | |
import win32event | |
import win32service | |
import win32serviceutil | |
from logbook import Logger | |
log = Logger("service") | |
class Service(win32serviceutil.ServiceFramework): | |
_svc_name_: str = "actualServiceName" | |
_svc_display_name_: str = "actualDisplayName" | |
_svc_description_: str = "actualDescription" | |
_target: (Callable, object) = (lambda x: x, None) | |
_svc_is_auto_start_: bool = True | |
def __init__(self, *args): | |
win32serviceutil.ServiceFramework.__init__(self, *args) | |
self.stop_event = win32event.CreateEvent(None, 0, 0, None) | |
log.info("Init") | |
# logs into the system event log | |
def log(self, msg): | |
servicemanager.LogInfoMsg(str(msg)) | |
def SvcDoRun(self): | |
log.info("start") | |
self.ReportServiceStatus(win32service.SERVICE_START_PENDING) | |
try: | |
self.ReportServiceStatus(win32service.SERVICE_RUNNING) | |
stop_thread: bool = False | |
target, worker = self._target | |
worker.is_stop = lambda: stop_thread | |
try: | |
t = Thread(target=target, args=(worker,)) | |
t.start() | |
except Exception as err: | |
log.exception(err) | |
raise | |
log.info("wait") | |
win32event.WaitForSingleObject(self.stop_event, win32event.INFINITE) | |
log.info("stopping") | |
stop_thread = True | |
t.join() | |
log.info("stop") | |
except Exception as x: | |
log.exception() | |
self.log("Exception : %s" % x) | |
self.SvcStop() | |
def SvcStop(self): | |
self.ReportServiceStatus(win32service.SERVICE_STOP_PENDING) | |
win32event.SetEvent(self.stop_event) | |
self.ReportServiceStatus(win32service.SERVICE_STOPPED) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment