Created
February 25, 2017 01:09
-
-
Save mookerji/6cd77b292d181298ae06e89ed0b17b27 to your computer and use it in GitHub Desktop.
ZMQ piksi client
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Copyright (C) 2015 Swift Navigation Inc. | |
# Contact: Engineering <[email protected]> | |
# | |
# This source is subject to the license found in the file 'LICENSE' which must | |
# be be distributed together with this source. All other rights reserved. | |
# | |
# THIS CODE AND INFORMATION IS PROVIDED "AS IS" WITHOUT WARRANTY OF ANY KIND, | |
# EITHER EXPRESSED OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE IMPLIED | |
# WARRANTIES OF MERCHANTABILITY AND/OR FITNESS FOR A PARTICULAR PURPOSE. | |
from requests.adapters import DEFAULT_POOLBLOCK, DEFAULT_POOLSIZE, HTTPAdapter | |
from requests.packages.urllib3.util import Retry | |
from requests_futures.sessions import FuturesSession | |
from sbp.client import Handler, Framer | |
from sbp.client.drivers.pyserial_driver import PySerialDriver | |
from sbp.client.loggers.json_logger import JSONLogger | |
from sbp.logging import * | |
from sbp.navigation import * | |
from sbp.msg import SBP | |
from sbp.piksi import MsgReset | |
from sbp.table import dispatch | |
import os | |
import pty | |
import requests | |
import serial | |
import SocketServer | |
import sys | |
import threading | |
import time | |
import uuid | |
import zmq | |
def tprint(msg): | |
"""like print, but won't get newlines confused with multiple threads""" | |
sys.stdout.write(msg + '\n') | |
sys.stdout.flush() | |
def get_logfilename(): | |
return time.strftime("hackathon-client-%Y%m%d-%H%M%S.log.sbp") | |
SERIAL_PORT = "/dev/tty.usbserial-FTFMFLNV" | |
SERIAL_BAUD = 115200 | |
DEFAULT_CHANNEL_UUID = '118db405-b5de-4a05-87b5-605cc85af924' | |
DEFAULT_SERIAL_ID = 2222 | |
DEFAULT_SERIAL_SIZE = 128 | |
ENDPOINT_URL = '' | |
DEFAULT_POLLING_INTERVAL = 20 | |
def get_uuid(piksi_uuid=DEFAULT_SERIAL_ID, | |
channel=DEFAULT_CHANNEL_UUID,): | |
namespace_uuid = uuid.UUID(channel) | |
return uuid.uuid5(namespace_uuid, str(piksi_uuid)) | |
def get_header(uuid): | |
return { | |
'Device-Uid': str(uuid), | |
'Content-Type': 'application/vnd.swiftnav.broker.v1+sbp2', | |
'Accept': 'application/vnd.swiftnav.broker.v1+sbp2' | |
} | |
def get_bogus_serial_port(): | |
master, slave = pty.openpty() | |
s_name = os.ttyname(slave) | |
return s_name | |
class TCPHandler(SocketServer.BaseRequestHandler): | |
def __exit__(self): | |
self.socket.close() | |
self.context.term() | |
super(TCPHandler, self).__exit__() | |
def handle(self): | |
print "Connection received from", self.client_address | |
self.polling_interval = DEFAULT_POLLING_INTERVAL | |
self.context = zmq.Context() | |
self.socket = self.context.socket(zmq.DEALER) | |
self.identity = 'TCPServer' | |
self.socket.identity = self.identity | |
self.poll = zmq.Poller() | |
self.poll.register(self.socket, zmq.POLLIN) | |
while True: | |
sockets = dict(self.poll.poll(self.polling_interval)) | |
if self.socket in sockets: | |
try: | |
msg = self.socket.recv() | |
except zmq.ZMQError as e: | |
if e.errno != zmq.EAGAIN: | |
raise | |
self.request.sendall(msg) | |
class ClientTask(threading.Thread): | |
def __init__(self, identity, handle, polling_interval=DEFAULT_POLLING_INTERVAL,): | |
super(ClientTask, self).__init__() | |
self.identity = identity | |
self.context = zmq.Context() | |
self.socket = self.context.socket(zmq.DEALER) | |
self.socket.identity = self.identity | |
self.handle = handle | |
self.polling_interval = polling_interval | |
self.daemon = True | |
def connect(self, addr='tcp://localhost:5570'): | |
self.socket.connect(addr) | |
print 'Client %s started' % self.identity | |
self.poll = zmq.Poller() | |
self.poll.register(self.socket, zmq.POLLIN) | |
return self | |
def close(self): | |
self.socket.close() | |
self.context.term() | |
def run(self): | |
while True: | |
sockets = dict(self.poll.poll(self.polling_interval)) | |
if self.socket in sockets: | |
try: | |
msg = self.socket.recv() | |
except zmq.ZMQError as e: | |
if e.errno != zmq.EAGAIN: | |
raise | |
self.handle.write(msg) | |
def get_bogus_serial_port_task(): | |
port_name = get_bogus_serial_port() | |
handle = serial.serial_for_url(port_name) | |
print "Fake serial port %s" % handle | |
task = ClientTask(identity='Bogus Serial Port', handle=handle) | |
return (handle, task) | |
def get_parser(): | |
pass | |
DEFAULT_CONNECT_TIMEOUT = 10 | |
DEFAULT_READ_TIMEOUT = 10 | |
DEFAULT_TIMEOUT = (DEFAULT_CONNECT_TIMEOUT, DEFAULT_READ_TIMEOUT) | |
MAX_CONNECT_RETRIES = 10 | |
MAX_READ_RETRIES = 10 | |
DEFAULT_RETRIES = (MAX_CONNECT_RETRIES, MAX_READ_RETRIES) | |
MAX_REDIRECTS = 0 | |
DEFAULT_BACKOFF_FACTOR = 0.2 | |
BROKER_SBP_TYPE = 'application/vnd.swiftnav.broker.v1+sbp2' | |
class NetworkingReadTask(ClientTask): | |
def __init__(self, identity): | |
super(NetworkingReadTask, self).__init__(identity, handle=None) | |
self.endpoint = ENDPOINT_URL | |
self.device_uid = get_uuid() | |
self._session = FuturesSession() | |
self._timeout = DEFAULT_TIMEOUT | |
self._retry = Retry(connect=DEFAULT_RETRIES[0], | |
read=DEFAULT_RETRIES[1], | |
redirect=MAX_REDIRECTS, | |
status_forcelist=[500], | |
backoff_factor=DEFAULT_BACKOFF_FACTOR) | |
self._session.mount("http://", | |
HTTPAdapter(pool_connections=DEFAULT_POOLSIZE, | |
pool_maxsize=DEFAULT_POOLSIZE, | |
pool_block=DEFAULT_POOLBLOCK, | |
max_retries=self._retry)) | |
self._session.mount("https://", | |
HTTPAdapter(pool_connections=DEFAULT_POOLSIZE, | |
pool_maxsize=DEFAULT_POOLSIZE, | |
pool_block=DEFAULT_POOLBLOCK, | |
max_retries=self._retry)) | |
self.connected = False | |
def _start(self): | |
self.response = self._session.get( | |
self.endpoint, | |
stream=True, | |
headers=get_header(self.device_uid), | |
) | |
if self.response: | |
while not self.response.result(): | |
self.response = self._session.get( | |
self.endpoint, | |
stream=True, | |
headers=get_header(self.device_uid), | |
) | |
time.sleep(1) | |
print 'Got Skylark reply', self.response.result() | |
return self.response.result().status_code | |
def run(self): | |
while True: | |
if not self.connected: | |
print "Connecting to Skylark" | |
counts = 0 | |
while self._start() != 202 and counts < 5: | |
print "Connecting to Skylark: Trying again" | |
counts += 1 | |
time.sleep(1) | |
if self._start() != 202: | |
print "Failed to connect to Skylark" | |
return False | |
self.connected = True | |
print "Connected to Skylark" | |
try: | |
msg = self.response.result().raw.read(amt=20) | |
if msg: | |
try: | |
self.socket.send(msg) | |
except zmq.ZMQError as e: | |
if e.errno != zmq.EAGAIN: | |
raise | |
except Exception as exc: | |
self.connected = False | |
print "--------------------------", exc | |
time.sleep(0.1) | |
import logging | |
logging.basicConfig(level=logging.DEBUG) | |
class NetworkingWriteTask(ClientTask): | |
def __init__(self, identity): | |
super(NetworkingWriteTask, self).__init__(identity, handle=None) | |
self.endpoint = ENDPOINT_URL | |
self.device_uid = get_uuid() | |
def run(self): | |
def put(): | |
while True: | |
sockets = dict(self.poll.poll(self.polling_interval)) | |
if self.socket in sockets: | |
try: | |
msg = self.socket.recv() | |
except zmq.ZMQError as e: | |
if e.errno != zmq.EAGAIN: | |
raise | |
if msg: | |
yield msg | |
else: | |
print "Nothing!" | |
continue | |
print 'Write with header', get_header(self.device_uid) | |
requests.put( | |
self.endpoint, | |
headers=get_header(self.device_uid), | |
stream=True, | |
data=put()) | |
class JSONLoggerTask(ClientTask): | |
def __init__(self, identity, handle): | |
super(JSONLoggerTask, self).__init__(identity, handle) | |
def run(self): | |
while True: | |
sockets = dict(self.poll.poll(self.polling_interval)) | |
if self.socket in sockets: | |
try: | |
msg = SBP.unpack(self.socket.recv()) | |
except zmq.ZMQError as e: | |
if e.errno != zmq.EAGAIN: | |
raise | |
except Exception as exc: | |
print exc | |
continue | |
self.handle(msg) | |
class SerialWorker(threading.Thread): | |
def __init__(self, context): | |
threading.Thread.__init__(self) | |
self.context = context | |
self.device_handle = PySerialDriver( | |
port='/dev/tty.usbserial-FTFMFLNV', | |
baud=115200,) | |
self.device_handle.handle.timeout = None | |
self.daemon = True | |
self.reset = False | |
def close(self): | |
self.worker.close() | |
def connect(self): | |
self.worker = self.context.socket(zmq.DEALER) | |
self.worker.connect('inproc://backend') | |
return self | |
def run(self): | |
tprint('SerialWorker started') | |
with Handler( | |
Framer( | |
self.device_handle.read, | |
self.device_handle.write, | |
False, )) as link: | |
def _write(sbp_msg, **metadata): | |
# FIX(mookerji): Drops session-uid and time | |
msg = bytearray(sbp_msg.to_binary()) | |
# FIX(mookerji): Replace this with a broadcast | |
try: | |
self.worker.send_multipart([bytearray('File Logger'), msg]) | |
self.worker.send_multipart([bytearray('Network Write Task'), msg]) | |
self.worker.send_multipart([bytearray('JSON File Logger'), msg]) | |
self.worker.send_multipart([bytearray('Networking Write Task'), msg]) | |
self.worker.send_multipart([bytearray('TCPServer'), msg]) | |
except zmq.ZMQError as e: | |
if e.errno != zmq.EAGAIN: | |
raise | |
def _log_printer(sbp_msg, **metadata): | |
levels = {0: 'EMERG', | |
1: 'ALERT', | |
2: 'CRIT', | |
3: 'ERROR', | |
4: 'WARN', | |
5: 'NOTICE', | |
6: 'INFO', | |
7: 'DEBUG'} | |
m = MsgLog(sbp_msg) | |
print levels[m.level], m.text | |
def _log_position(sbp_msg, **metadata): | |
m = MsgPosECEF(sbp_msg) | |
if m.flags >= 1 and m.flags < 5: | |
print 'Wee!', sbp_msg | |
if self.reset: | |
link(MsgReset(flags=0)) | |
while True: | |
if not link.is_alive(): | |
sys.stderr.write("Serial Worker is dead") | |
sys.exit() | |
link.add_callback(_log_printer, SBP_MSG_LOG) | |
link.add_callback(_log_position, SBP_MSG_POS_ECEF) | |
link.add_callback(_write) | |
try: | |
ident, msg = self.worker.recv_multipart() | |
except zmq.ZMQError as e: | |
if e.errno != zmq.EAGAIN: | |
raise | |
if msg: | |
self.device_handle.write(msg) | |
time.sleep(0) | |
class ServerTask(threading.Thread): | |
def __init__(self): | |
threading.Thread.__init__(self) | |
self.daemon = True | |
def run(self): | |
context = zmq.Context() | |
frontend = context.socket(zmq.ROUTER) | |
frontend.bind('tcp://*:5570') | |
backend = context.socket(zmq.DEALER) | |
backend.bind('inproc://backend') | |
worker = SerialWorker(context) | |
worker.connect().start() | |
zmq.proxy(frontend, backend) | |
frontend.close() | |
backend.close() | |
context.term() | |
def get_json_logger_task(filename): | |
file_handle = open(filename + '.json', 'w+') | |
print "Logging to %s" % filename + '.json' | |
logger = JSONLogger(file_handle, dispatcher=dispatch) | |
task = JSONLoggerTask(identity='JSON File Logger', handle=logger) | |
return (logger, file_handle, task) | |
def get_binary_logger_task(): | |
directory = 'logs' | |
filename = os.path.join(directory, get_logfilename()) | |
print "Logging to %s" % filename | |
file_handle = open(filename, 'w+') | |
return ( | |
filename, | |
file_handle, | |
ClientTask(identity='File Logger', handle=file_handle), ) | |
def main(): | |
server = ServerTask() | |
server.start() | |
log_filename, log_handle, logger = get_binary_logger_task() | |
logger.connect().start() | |
json_log_handle, json_logger, json_logger_task = get_json_logger_task( | |
log_filename) | |
json_logger_task.connect().start() | |
net_write = NetworkingWriteTask(identity='Networking Write Task') | |
net_write.connect().start() | |
net_read = NetworkingReadTask(identity='Networking Read Task') | |
#net_read.connect().start() | |
# TCP server | |
HOST, PORT = "localhost", 0 | |
tcp = SocketServer.TCPServer((HOST, PORT), TCPHandler) | |
print "open on", tcp.server_address | |
server_thread = threading.Thread(target=tcp.serve_forever) | |
server_thread.daemon = True | |
server_thread.start() | |
try: | |
server.join() | |
except KeyboardInterrupt: | |
print "Exiting" | |
sys.exit() | |
finally: | |
log_handle.close() | |
json_log_handle.close() | |
json_logger.close() | |
#fake_serial_handle.close() | |
if __name__ == "__main__": | |
main() | |
#test_connection() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment