Last active
March 1, 2022 15:37
-
-
Save wiedehopf/a627fed4e2071b31a56165dc8b48b5c6 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import traceback | |
import time | |
import zlib | |
import ujson | |
import sys | |
def eprint(*args, **kwargs): | |
print(*args, file=sys.stderr, **kwargs) | |
class Con(): | |
def __init__(self, host, port, gzip): | |
self.host = host | |
self.port = port | |
self.gzip = gzip | |
self.reader = None | |
self.writer = None | |
self.nextReconnect = time.time() | |
self.reconnectInterval = 5 | |
self.decompressor = None | |
self.compressor = None | |
self.tail = "" | |
async def connect(self): | |
#if self.writer and not self.writer.is_closing(): | |
# return | |
try: | |
if self.writer: | |
eprint(f'disconnecting {self.host}:{self.port}') | |
self.writer.close() | |
await self.writer.wait_closed() | |
self.writer = None | |
self.reader = None | |
now = time.time() | |
if now > self.nextReconnect: | |
self.nextReconnect = now + self.reconnectInterval | |
eprint(f'trying {self.host}:{self.port}') | |
self.reader, self.writer = await asyncio.wait_for(asyncio.open_connection(self.host, self.port), self.reconnectInterval) | |
eprint(f'connected {self.host}:{self.port}') | |
self.decompressor = zlib.decompressobj() | |
self.compressor = zlib.compressobj() | |
except: | |
self.writer = None | |
self.reader = None | |
async def readlines(self): | |
if not self.writer or self.writer.is_closing(): | |
await self.connect() | |
if not self.writer or self.writer.is_closing(): | |
return "closed" | |
if self.gzip: | |
read = self.decompressor.decompress(self.decompressor.unconsumed_tail + (await self.reader.read(64*1024))) | |
else: | |
read = await self.reader.read(64*1024) | |
lines = (self.tail + read.decode()).split('\n') | |
self.tail = lines[-1] | |
#eprint(f'read {len(lines) - 1} lines') | |
return lines[:-1] | |
async def write(self, data): | |
if not self.writer or self.writer.is_closing(): | |
await self.connect() | |
if not self.writer or self.writer.is_closing(): | |
return | |
try: | |
await asyncio.wait_for(self.writer.drain(), 0.02) | |
self.writer.write(data) | |
#eprint(f'Received: {data.decode()!r}') | |
except ConnectionResetError: | |
await self.connect() | |
except: | |
traceback.print_exc() | |
await self.connect() | |
async def main(): | |
incon = Con('127.0.0.1', 35152, True) | |
outcon = Con('127.0.0.1', 32006, False) | |
statusInterval = 600 | |
nextStatus = time.time() + 5 | |
lineCount = 0 | |
lastLineCount = 1 | |
while True: | |
#time.sleep(0.3) | |
try: | |
lines = await asyncio.wait_for(incon.readlines(), 10) | |
except asyncio.TimeoutError: | |
eprint(f'readlines timeout') | |
await incon.connect() | |
continue | |
if lines == "closed": | |
#eprint(f'readlines closed') | |
time.sleep(0.1) | |
continue | |
if isinstance(lines, list): | |
lc = len(lines) | |
#eprint(f'got {len(lines)} lines.') | |
if lc == 0 and lastLineCount == 0: | |
eprint(f'got 0 lines twice in succession, reconnecting!') | |
await incon.connect() | |
lastLineCount = lc | |
lineCount += lc | |
now = time.time() | |
if now > nextStatus: | |
nextStatus += statusInterval | |
eprint(f'processed {lineCount} lines.') | |
lineCount = 0 | |
if not lines: | |
continue | |
sbs = "" | |
for line in lines: | |
try: | |
ev = ujson.loads(line) | |
except ValueError: | |
continue | |
sbs += 'MSG,3,1,1' | |
sbs += ',' + ev.get('ms', '') #5 | |
sbs += ',,,,,' | |
sbs += ',' + ev.get('cs', '') #11 | |
sbs += ',' + str(ev.get('alt', '')) #12 | |
sbs += ',' + str(ev.get('gs', '')) #13 | |
sbs += ',' + str(ev.get('hd', '')) #14 | |
sbs += ',' + str(ev.get('la', '')) #15 | |
sbs += ',' + str(ev.get('lo', '')) #16 | |
sbs += ',' + str(ev.get('vs', '')) #17 | |
sbs += ',' + str(ev.get('sq', '')) #18 | |
sbs += ',,,,' | |
ground = ev.get('onground') | |
if ground == True: | |
sbs += '1' #22 | |
elif ground == False: | |
sbs += '0' #22 | |
sbs += '\n' | |
if sbs: | |
await asyncio.wait_for(outcon.write((sbs).encode()), 2) | |
asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment