Skip to content

Instantly share code, notes, and snippets.

@mookerji
Created February 25, 2017 01:09
Show Gist options
  • Save mookerji/6cd77b292d181298ae06e89ed0b17b27 to your computer and use it in GitHub Desktop.
Save mookerji/6cd77b292d181298ae06e89ed0b17b27 to your computer and use it in GitHub Desktop.
ZMQ piksi client
# Copyright (C) 2015 Swift Navigation Inc.
# Contact: Engineering <[email protected]>
#
# This source is subject to the license found in the file 'LICENSE' which must
# be be distributed together with this source. All other rights reserved.
#
# THIS CODE AND INFORMATION IS PROVIDED "AS IS" WITHOUT WARRANTY OF ANY KIND,
# EITHER EXPRESSED OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND/OR FITNESS FOR A PARTICULAR PURPOSE.
from requests.adapters import DEFAULT_POOLBLOCK, DEFAULT_POOLSIZE, HTTPAdapter
from requests.packages.urllib3.util import Retry
from requests_futures.sessions import FuturesSession
from sbp.client import Handler, Framer
from sbp.client.drivers.pyserial_driver import PySerialDriver
from sbp.client.loggers.json_logger import JSONLogger
from sbp.logging import *
from sbp.navigation import *
from sbp.msg import SBP
from sbp.piksi import MsgReset
from sbp.table import dispatch
import os
import pty
import requests
import serial
import SocketServer
import sys
import threading
import time
import uuid
import zmq
def tprint(msg):
"""like print, but won't get newlines confused with multiple threads"""
sys.stdout.write(msg + '\n')
sys.stdout.flush()
def get_logfilename():
return time.strftime("hackathon-client-%Y%m%d-%H%M%S.log.sbp")
SERIAL_PORT = "/dev/tty.usbserial-FTFMFLNV"
SERIAL_BAUD = 115200
DEFAULT_CHANNEL_UUID = '118db405-b5de-4a05-87b5-605cc85af924'
DEFAULT_SERIAL_ID = 2222
DEFAULT_SERIAL_SIZE = 128
ENDPOINT_URL = ''
DEFAULT_POLLING_INTERVAL = 20
def get_uuid(piksi_uuid=DEFAULT_SERIAL_ID,
channel=DEFAULT_CHANNEL_UUID,):
namespace_uuid = uuid.UUID(channel)
return uuid.uuid5(namespace_uuid, str(piksi_uuid))
def get_header(uuid):
return {
'Device-Uid': str(uuid),
'Content-Type': 'application/vnd.swiftnav.broker.v1+sbp2',
'Accept': 'application/vnd.swiftnav.broker.v1+sbp2'
}
def get_bogus_serial_port():
master, slave = pty.openpty()
s_name = os.ttyname(slave)
return s_name
class TCPHandler(SocketServer.BaseRequestHandler):
def __exit__(self):
self.socket.close()
self.context.term()
super(TCPHandler, self).__exit__()
def handle(self):
print "Connection received from", self.client_address
self.polling_interval = DEFAULT_POLLING_INTERVAL
self.context = zmq.Context()
self.socket = self.context.socket(zmq.DEALER)
self.identity = 'TCPServer'
self.socket.identity = self.identity
self.poll = zmq.Poller()
self.poll.register(self.socket, zmq.POLLIN)
while True:
sockets = dict(self.poll.poll(self.polling_interval))
if self.socket in sockets:
try:
msg = self.socket.recv()
except zmq.ZMQError as e:
if e.errno != zmq.EAGAIN:
raise
self.request.sendall(msg)
class ClientTask(threading.Thread):
def __init__(self, identity, handle, polling_interval=DEFAULT_POLLING_INTERVAL,):
super(ClientTask, self).__init__()
self.identity = identity
self.context = zmq.Context()
self.socket = self.context.socket(zmq.DEALER)
self.socket.identity = self.identity
self.handle = handle
self.polling_interval = polling_interval
self.daemon = True
def connect(self, addr='tcp://localhost:5570'):
self.socket.connect(addr)
print 'Client %s started' % self.identity
self.poll = zmq.Poller()
self.poll.register(self.socket, zmq.POLLIN)
return self
def close(self):
self.socket.close()
self.context.term()
def run(self):
while True:
sockets = dict(self.poll.poll(self.polling_interval))
if self.socket in sockets:
try:
msg = self.socket.recv()
except zmq.ZMQError as e:
if e.errno != zmq.EAGAIN:
raise
self.handle.write(msg)
def get_bogus_serial_port_task():
port_name = get_bogus_serial_port()
handle = serial.serial_for_url(port_name)
print "Fake serial port %s" % handle
task = ClientTask(identity='Bogus Serial Port', handle=handle)
return (handle, task)
def get_parser():
pass
DEFAULT_CONNECT_TIMEOUT = 10
DEFAULT_READ_TIMEOUT = 10
DEFAULT_TIMEOUT = (DEFAULT_CONNECT_TIMEOUT, DEFAULT_READ_TIMEOUT)
MAX_CONNECT_RETRIES = 10
MAX_READ_RETRIES = 10
DEFAULT_RETRIES = (MAX_CONNECT_RETRIES, MAX_READ_RETRIES)
MAX_REDIRECTS = 0
DEFAULT_BACKOFF_FACTOR = 0.2
BROKER_SBP_TYPE = 'application/vnd.swiftnav.broker.v1+sbp2'
class NetworkingReadTask(ClientTask):
def __init__(self, identity):
super(NetworkingReadTask, self).__init__(identity, handle=None)
self.endpoint = ENDPOINT_URL
self.device_uid = get_uuid()
self._session = FuturesSession()
self._timeout = DEFAULT_TIMEOUT
self._retry = Retry(connect=DEFAULT_RETRIES[0],
read=DEFAULT_RETRIES[1],
redirect=MAX_REDIRECTS,
status_forcelist=[500],
backoff_factor=DEFAULT_BACKOFF_FACTOR)
self._session.mount("http://",
HTTPAdapter(pool_connections=DEFAULT_POOLSIZE,
pool_maxsize=DEFAULT_POOLSIZE,
pool_block=DEFAULT_POOLBLOCK,
max_retries=self._retry))
self._session.mount("https://",
HTTPAdapter(pool_connections=DEFAULT_POOLSIZE,
pool_maxsize=DEFAULT_POOLSIZE,
pool_block=DEFAULT_POOLBLOCK,
max_retries=self._retry))
self.connected = False
def _start(self):
self.response = self._session.get(
self.endpoint,
stream=True,
headers=get_header(self.device_uid),
)
if self.response:
while not self.response.result():
self.response = self._session.get(
self.endpoint,
stream=True,
headers=get_header(self.device_uid),
)
time.sleep(1)
print 'Got Skylark reply', self.response.result()
return self.response.result().status_code
def run(self):
while True:
if not self.connected:
print "Connecting to Skylark"
counts = 0
while self._start() != 202 and counts < 5:
print "Connecting to Skylark: Trying again"
counts += 1
time.sleep(1)
if self._start() != 202:
print "Failed to connect to Skylark"
return False
self.connected = True
print "Connected to Skylark"
try:
msg = self.response.result().raw.read(amt=20)
if msg:
try:
self.socket.send(msg)
except zmq.ZMQError as e:
if e.errno != zmq.EAGAIN:
raise
except Exception as exc:
self.connected = False
print "--------------------------", exc
time.sleep(0.1)
import logging
logging.basicConfig(level=logging.DEBUG)
class NetworkingWriteTask(ClientTask):
def __init__(self, identity):
super(NetworkingWriteTask, self).__init__(identity, handle=None)
self.endpoint = ENDPOINT_URL
self.device_uid = get_uuid()
def run(self):
def put():
while True:
sockets = dict(self.poll.poll(self.polling_interval))
if self.socket in sockets:
try:
msg = self.socket.recv()
except zmq.ZMQError as e:
if e.errno != zmq.EAGAIN:
raise
if msg:
yield msg
else:
print "Nothing!"
continue
print 'Write with header', get_header(self.device_uid)
requests.put(
self.endpoint,
headers=get_header(self.device_uid),
stream=True,
data=put())
class JSONLoggerTask(ClientTask):
def __init__(self, identity, handle):
super(JSONLoggerTask, self).__init__(identity, handle)
def run(self):
while True:
sockets = dict(self.poll.poll(self.polling_interval))
if self.socket in sockets:
try:
msg = SBP.unpack(self.socket.recv())
except zmq.ZMQError as e:
if e.errno != zmq.EAGAIN:
raise
except Exception as exc:
print exc
continue
self.handle(msg)
class SerialWorker(threading.Thread):
def __init__(self, context):
threading.Thread.__init__(self)
self.context = context
self.device_handle = PySerialDriver(
port='/dev/tty.usbserial-FTFMFLNV',
baud=115200,)
self.device_handle.handle.timeout = None
self.daemon = True
self.reset = False
def close(self):
self.worker.close()
def connect(self):
self.worker = self.context.socket(zmq.DEALER)
self.worker.connect('inproc://backend')
return self
def run(self):
tprint('SerialWorker started')
with Handler(
Framer(
self.device_handle.read,
self.device_handle.write,
False, )) as link:
def _write(sbp_msg, **metadata):
# FIX(mookerji): Drops session-uid and time
msg = bytearray(sbp_msg.to_binary())
# FIX(mookerji): Replace this with a broadcast
try:
self.worker.send_multipart([bytearray('File Logger'), msg])
self.worker.send_multipart([bytearray('Network Write Task'), msg])
self.worker.send_multipart([bytearray('JSON File Logger'), msg])
self.worker.send_multipart([bytearray('Networking Write Task'), msg])
self.worker.send_multipart([bytearray('TCPServer'), msg])
except zmq.ZMQError as e:
if e.errno != zmq.EAGAIN:
raise
def _log_printer(sbp_msg, **metadata):
levels = {0: 'EMERG',
1: 'ALERT',
2: 'CRIT',
3: 'ERROR',
4: 'WARN',
5: 'NOTICE',
6: 'INFO',
7: 'DEBUG'}
m = MsgLog(sbp_msg)
print levels[m.level], m.text
def _log_position(sbp_msg, **metadata):
m = MsgPosECEF(sbp_msg)
if m.flags >= 1 and m.flags < 5:
print 'Wee!', sbp_msg
if self.reset:
link(MsgReset(flags=0))
while True:
if not link.is_alive():
sys.stderr.write("Serial Worker is dead")
sys.exit()
link.add_callback(_log_printer, SBP_MSG_LOG)
link.add_callback(_log_position, SBP_MSG_POS_ECEF)
link.add_callback(_write)
try:
ident, msg = self.worker.recv_multipart()
except zmq.ZMQError as e:
if e.errno != zmq.EAGAIN:
raise
if msg:
self.device_handle.write(msg)
time.sleep(0)
class ServerTask(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.daemon = True
def run(self):
context = zmq.Context()
frontend = context.socket(zmq.ROUTER)
frontend.bind('tcp://*:5570')
backend = context.socket(zmq.DEALER)
backend.bind('inproc://backend')
worker = SerialWorker(context)
worker.connect().start()
zmq.proxy(frontend, backend)
frontend.close()
backend.close()
context.term()
def get_json_logger_task(filename):
file_handle = open(filename + '.json', 'w+')
print "Logging to %s" % filename + '.json'
logger = JSONLogger(file_handle, dispatcher=dispatch)
task = JSONLoggerTask(identity='JSON File Logger', handle=logger)
return (logger, file_handle, task)
def get_binary_logger_task():
directory = 'logs'
filename = os.path.join(directory, get_logfilename())
print "Logging to %s" % filename
file_handle = open(filename, 'w+')
return (
filename,
file_handle,
ClientTask(identity='File Logger', handle=file_handle), )
def main():
server = ServerTask()
server.start()
log_filename, log_handle, logger = get_binary_logger_task()
logger.connect().start()
json_log_handle, json_logger, json_logger_task = get_json_logger_task(
log_filename)
json_logger_task.connect().start()
net_write = NetworkingWriteTask(identity='Networking Write Task')
net_write.connect().start()
net_read = NetworkingReadTask(identity='Networking Read Task')
#net_read.connect().start()
# TCP server
HOST, PORT = "localhost", 0
tcp = SocketServer.TCPServer((HOST, PORT), TCPHandler)
print "open on", tcp.server_address
server_thread = threading.Thread(target=tcp.serve_forever)
server_thread.daemon = True
server_thread.start()
try:
server.join()
except KeyboardInterrupt:
print "Exiting"
sys.exit()
finally:
log_handle.close()
json_log_handle.close()
json_logger.close()
#fake_serial_handle.close()
if __name__ == "__main__":
main()
#test_connection()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment