Created
April 9, 2018 06:52
-
-
Save gregjhogan/e8c6f88d398c94cee19b28facec12d98 to your computer and use it in GitHub Desktop.
Run a GNSS NMEA TCP/IP server for mocking your location on an Android device (with RTK support)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python | |
| # plug in grey panda | |
| # open Lefebure NTRIP Client android app | |
| # configure it: | |
| # - Receiver Settings -> External via TCP/IP (enter the IP of the machine running this script and port 3001) | |
| # - Receiver Settings -> GPS Mock Locations (check checkbox) | |
| # - NTRIP Settings -> Caster IP/Port/Username Password for your local RTK network | |
| # - NTRIP Settings -> Data Stream needs to be an RTCM3 source | |
| # go into android developer settings and select the Lefebure NTRIP Client app to use for mock locations | |
| # go into android location settings and switch the mode to Device only | |
| # fire up this script (should say waiting for connection) | |
| # connect in the Lefebure NTRIP Client app (should start with GPS fix type, then switch to DGPS then RTK) | |
| import os | |
| import time | |
| import sys | |
| import struct | |
| import traceback | |
| import socket | |
| from panda import Panda, PandaSerial | |
| TCP_ADDR = "0.0.0.0" | |
| TCP_PORT = 3001 | |
| CLASS_CFG = 0x06 | |
| MSG_CFG_DGNSS = 0x70 | |
| def ubx_checksum(data): | |
| #cs = 0 | |
| ck_a = 0 | |
| ck_b = 0 | |
| for i in data: | |
| ck_a = (ck_a + ord(i)) & 0xFF | |
| ck_b = (ck_b + ck_a) & 0xFF | |
| return (ck_a, ck_b) | |
| def add_ubx_checksum(msg): | |
| (ck_a, ck_b) = ubx_checksum(msg[2:]) | |
| return msg + struct.pack('<BB', ck_a, ck_b) | |
| def add_nmea_checksum(msg): | |
| d = msg[1:] | |
| cs = 0 | |
| for i in d: | |
| cs ^= ord(i) | |
| return msg + "*%02X" % cs | |
| if __name__ == "__main__": | |
| panda = Panda() | |
| ser = PandaSerial(panda, 1, 9600) | |
| # power cycle by toggling reset | |
| print "resetting" | |
| panda.set_esp_power(0) | |
| time.sleep(0.5) | |
| panda.set_esp_power(1) | |
| time.sleep(0.5) | |
| print "done" | |
| ser.read(1024) | |
| # upping baud rate | |
| baudrate = 460800 | |
| print "upping baud rate" | |
| msg = add_nmea_checksum("$PUBX,41,1,0039,0003,%d,0" % baudrate)+"\r\n" | |
| print msg | |
| ser.write(msg) | |
| time.sleep(0.1) # needs a wait for it to actually send | |
| # new panda serial | |
| ser = PandaSerial(panda, 1, baudrate) | |
| print "configure RTK" | |
| payload = struct.pack('<BBBB', 3, 0, 0, 0) | |
| header = struct.pack('<BBBBH', 0xb5, 0x62, CLASS_CFG, MSG_CFG_DGNSS, len(payload)) | |
| msg = add_ubx_checksum(header + payload) | |
| ser.write(msg) | |
| while 1: | |
| try: | |
| # start tcp server and wait for connection | |
| print 'waiting for connection...' | |
| s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
| s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | |
| s.bind((TCP_ADDR, TCP_PORT)) | |
| s.listen(1) | |
| conn, addr = s.accept() | |
| print 'connection address:', addr | |
| while True: | |
| nmea = ser.read(1024) | |
| if len(nmea) > 0: | |
| conn.send(nmea) | |
| sys.stdout.write(nmea) | |
| sys.stdout.flush() | |
| #print str(ret).encode("hex") | |
| try: | |
| conn.settimeout(0.1) | |
| rtcm = conn.recv(16) | |
| while len(rtcm) > 0: | |
| ser.write(rtcm) | |
| #print str(rtcm).encode('hex') | |
| rtcm = conn.recv(16) | |
| conn.settimeout(None) | |
| except socket.timeout: | |
| pass | |
| except KeyboardInterrupt: | |
| print 'shutting down ...' | |
| panda.close() | |
| conn.shutdown(socket.SHUT_RDWR) | |
| conn.close() | |
| sys.exit() | |
| except: | |
| print traceback.format_exc() | |
| time.sleep(2) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment