Skip to content

Instantly share code, notes, and snippets.

@gregjhogan
Created April 9, 2018 06:52
Show Gist options
  • Select an option

  • Save gregjhogan/e8c6f88d398c94cee19b28facec12d98 to your computer and use it in GitHub Desktop.

Select an option

Save gregjhogan/e8c6f88d398c94cee19b28facec12d98 to your computer and use it in GitHub Desktop.
Run a GNSS NMEA TCP/IP server for mocking your location on an Android device (with RTK support)
#!/usr/bin/env python
# plug in grey panda
# open Lefebure NTRIP Client android app
# configure it:
# - Receiver Settings -> External via TCP/IP (enter the IP of the machine running this script and port 3001)
# - Receiver Settings -> GPS Mock Locations (check checkbox)
# - NTRIP Settings -> Caster IP/Port/Username Password for your local RTK network
# - NTRIP Settings -> Data Stream needs to be an RTCM3 source
# go into android developer settings and select the Lefebure NTRIP Client app to use for mock locations
# go into android location settings and switch the mode to Device only
# fire up this script (should say waiting for connection)
# connect in the Lefebure NTRIP Client app (should start with GPS fix type, then switch to DGPS then RTK)
import os
import time
import sys
import struct
import traceback
import socket
from panda import Panda, PandaSerial
TCP_ADDR = "0.0.0.0"
TCP_PORT = 3001
CLASS_CFG = 0x06
MSG_CFG_DGNSS = 0x70
def ubx_checksum(data):
#cs = 0
ck_a = 0
ck_b = 0
for i in data:
ck_a = (ck_a + ord(i)) & 0xFF
ck_b = (ck_b + ck_a) & 0xFF
return (ck_a, ck_b)
def add_ubx_checksum(msg):
(ck_a, ck_b) = ubx_checksum(msg[2:])
return msg + struct.pack('<BB', ck_a, ck_b)
def add_nmea_checksum(msg):
d = msg[1:]
cs = 0
for i in d:
cs ^= ord(i)
return msg + "*%02X" % cs
if __name__ == "__main__":
panda = Panda()
ser = PandaSerial(panda, 1, 9600)
# power cycle by toggling reset
print "resetting"
panda.set_esp_power(0)
time.sleep(0.5)
panda.set_esp_power(1)
time.sleep(0.5)
print "done"
ser.read(1024)
# upping baud rate
baudrate = 460800
print "upping baud rate"
msg = add_nmea_checksum("$PUBX,41,1,0039,0003,%d,0" % baudrate)+"\r\n"
print msg
ser.write(msg)
time.sleep(0.1) # needs a wait for it to actually send
# new panda serial
ser = PandaSerial(panda, 1, baudrate)
print "configure RTK"
payload = struct.pack('<BBBB', 3, 0, 0, 0)
header = struct.pack('<BBBBH', 0xb5, 0x62, CLASS_CFG, MSG_CFG_DGNSS, len(payload))
msg = add_ubx_checksum(header + payload)
ser.write(msg)
while 1:
try:
# start tcp server and wait for connection
print 'waiting for connection...'
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind((TCP_ADDR, TCP_PORT))
s.listen(1)
conn, addr = s.accept()
print 'connection address:', addr
while True:
nmea = ser.read(1024)
if len(nmea) > 0:
conn.send(nmea)
sys.stdout.write(nmea)
sys.stdout.flush()
#print str(ret).encode("hex")
try:
conn.settimeout(0.1)
rtcm = conn.recv(16)
while len(rtcm) > 0:
ser.write(rtcm)
#print str(rtcm).encode('hex')
rtcm = conn.recv(16)
conn.settimeout(None)
except socket.timeout:
pass
except KeyboardInterrupt:
print 'shutting down ...'
panda.close()
conn.shutdown(socket.SHUT_RDWR)
conn.close()
sys.exit()
except:
print traceback.format_exc()
time.sleep(2)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment