Skip to content

Instantly share code, notes, and snippets.

@wiedehopf
Last active March 1, 2022 15:37
Show Gist options
  • Save wiedehopf/a627fed4e2071b31a56165dc8b48b5c6 to your computer and use it in GitHub Desktop.
Save wiedehopf/a627fed4e2071b31a56165dc8b48b5c6 to your computer and use it in GitHub Desktop.
import asyncio
import traceback
import time
import zlib
import ujson
import sys
def eprint(*args, **kwargs):
print(*args, file=sys.stderr, **kwargs)
class Con():
def __init__(self, host, port, gzip):
self.host = host
self.port = port
self.gzip = gzip
self.reader = None
self.writer = None
self.nextReconnect = time.time()
self.reconnectInterval = 5
self.decompressor = None
self.compressor = None
self.tail = ""
async def connect(self):
#if self.writer and not self.writer.is_closing():
# return
try:
if self.writer:
eprint(f'disconnecting {self.host}:{self.port}')
self.writer.close()
await self.writer.wait_closed()
self.writer = None
self.reader = None
now = time.time()
if now > self.nextReconnect:
self.nextReconnect = now + self.reconnectInterval
eprint(f'trying {self.host}:{self.port}')
self.reader, self.writer = await asyncio.wait_for(asyncio.open_connection(self.host, self.port), self.reconnectInterval)
eprint(f'connected {self.host}:{self.port}')
self.decompressor = zlib.decompressobj()
self.compressor = zlib.compressobj()
except:
self.writer = None
self.reader = None
async def readlines(self):
if not self.writer or self.writer.is_closing():
await self.connect()
if not self.writer or self.writer.is_closing():
return "closed"
if self.gzip:
read = self.decompressor.decompress(self.decompressor.unconsumed_tail + (await self.reader.read(64*1024)))
else:
read = await self.reader.read(64*1024)
lines = (self.tail + read.decode()).split('\n')
self.tail = lines[-1]
#eprint(f'read {len(lines) - 1} lines')
return lines[:-1]
async def write(self, data):
if not self.writer or self.writer.is_closing():
await self.connect()
if not self.writer or self.writer.is_closing():
return
try:
await asyncio.wait_for(self.writer.drain(), 0.02)
self.writer.write(data)
#eprint(f'Received: {data.decode()!r}')
except ConnectionResetError:
await self.connect()
except:
traceback.print_exc()
await self.connect()
async def main():
incon = Con('127.0.0.1', 35152, True)
outcon = Con('127.0.0.1', 32006, False)
statusInterval = 600
nextStatus = time.time() + 5
lineCount = 0
lastLineCount = 1
while True:
#time.sleep(0.3)
try:
lines = await asyncio.wait_for(incon.readlines(), 10)
except asyncio.TimeoutError:
eprint(f'readlines timeout')
await incon.connect()
continue
if lines == "closed":
#eprint(f'readlines closed')
time.sleep(0.1)
continue
if isinstance(lines, list):
lc = len(lines)
#eprint(f'got {len(lines)} lines.')
if lc == 0 and lastLineCount == 0:
eprint(f'got 0 lines twice in succession, reconnecting!')
await incon.connect()
lastLineCount = lc
lineCount += lc
now = time.time()
if now > nextStatus:
nextStatus += statusInterval
eprint(f'processed {lineCount} lines.')
lineCount = 0
if not lines:
continue
sbs = ""
for line in lines:
try:
ev = ujson.loads(line)
except ValueError:
continue
sbs += 'MSG,3,1,1'
sbs += ',' + ev.get('ms', '') #5
sbs += ',,,,,'
sbs += ',' + ev.get('cs', '') #11
sbs += ',' + str(ev.get('alt', '')) #12
sbs += ',' + str(ev.get('gs', '')) #13
sbs += ',' + str(ev.get('hd', '')) #14
sbs += ',' + str(ev.get('la', '')) #15
sbs += ',' + str(ev.get('lo', '')) #16
sbs += ',' + str(ev.get('vs', '')) #17
sbs += ',' + str(ev.get('sq', '')) #18
sbs += ',,,,'
ground = ev.get('onground')
if ground == True:
sbs += '1' #22
elif ground == False:
sbs += '0' #22
sbs += '\n'
if sbs:
await asyncio.wait_for(outcon.write((sbs).encode()), 2)
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment