Last active
August 29, 2015 14:23
-
-
Save JimHaughwout/95b89b99a39aee21e096 to your computer and use it in GitHub Desktop.
Cobra Tag Exercise
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import getopt | |
import sys | |
from datetime import datetime | |
from time import sleep | |
import pytz | |
#### CRC-16 MODBUS Stuff. Included so we do all in one script #### | |
INITIAL_MODBUS = 0xFFFF | |
table = ( | |
0x0000, 0xC0C1, 0xC181, 0x0140, 0xC301, 0x03C0, 0x0280, 0xC241, | |
0xC601, 0x06C0, 0x0780, 0xC741, 0x0500, 0xC5C1, 0xC481, 0x0440, | |
0xCC01, 0x0CC0, 0x0D80, 0xCD41, 0x0F00, 0xCFC1, 0xCE81, 0x0E40, | |
0x0A00, 0xCAC1, 0xCB81, 0x0B40, 0xC901, 0x09C0, 0x0880, 0xC841, | |
0xD801, 0x18C0, 0x1980, 0xD941, 0x1B00, 0xDBC1, 0xDA81, 0x1A40, | |
0x1E00, 0xDEC1, 0xDF81, 0x1F40, 0xDD01, 0x1DC0, 0x1C80, 0xDC41, | |
0x1400, 0xD4C1, 0xD581, 0x1540, 0xD701, 0x17C0, 0x1680, 0xD641, | |
0xD201, 0x12C0, 0x1380, 0xD341, 0x1100, 0xD1C1, 0xD081, 0x1040, | |
0xF001, 0x30C0, 0x3180, 0xF141, 0x3300, 0xF3C1, 0xF281, 0x3240, | |
0x3600, 0xF6C1, 0xF781, 0x3740, 0xF501, 0x35C0, 0x3480, 0xF441, | |
0x3C00, 0xFCC1, 0xFD81, 0x3D40, 0xFF01, 0x3FC0, 0x3E80, 0xFE41, | |
0xFA01, 0x3AC0, 0x3B80, 0xFB41, 0x3900, 0xF9C1, 0xF881, 0x3840, | |
0x2800, 0xE8C1, 0xE981, 0x2940, 0xEB01, 0x2BC0, 0x2A80, 0xEA41, | |
0xEE01, 0x2EC0, 0x2F80, 0xEF41, 0x2D00, 0xEDC1, 0xEC81, 0x2C40, | |
0xE401, 0x24C0, 0x2580, 0xE541, 0x2700, 0xE7C1, 0xE681, 0x2640, | |
0x2200, 0xE2C1, 0xE381, 0x2340, 0xE101, 0x21C0, 0x2080, 0xE041, | |
0xA001, 0x60C0, 0x6180, 0xA141, 0x6300, 0xA3C1, 0xA281, 0x6240, | |
0x6600, 0xA6C1, 0xA781, 0x6740, 0xA501, 0x65C0, 0x6480, 0xA441, | |
0x6C00, 0xACC1, 0xAD81, 0x6D40, 0xAF01, 0x6FC0, 0x6E80, 0xAE41, | |
0xAA01, 0x6AC0, 0x6B80, 0xAB41, 0x6900, 0xA9C1, 0xA881, 0x6840, | |
0x7800, 0xB8C1, 0xB981, 0x7940, 0xBB01, 0x7BC0, 0x7A80, 0xBA41, | |
0xBE01, 0x7EC0, 0x7F80, 0xBF41, 0x7D00, 0xBDC1, 0xBC81, 0x7C40, | |
0xB401, 0x74C0, 0x7580, 0xB541, 0x7700, 0xB7C1, 0xB681, 0x7640, | |
0x7200, 0xB2C1, 0xB381, 0x7340, 0xB101, 0x71C0, 0x7080, 0xB041, | |
0x5000, 0x90C1, 0x9181, 0x5140, 0x9301, 0x53C0, 0x5280, 0x9241, | |
0x9601, 0x56C0, 0x5780, 0x9741, 0x5500, 0x95C1, 0x9481, 0x5440, | |
0x9C01, 0x5CC0, 0x5D80, 0x9D41, 0x5F00, 0x9FC1, 0x9E81, 0x5E40, | |
0x5A00, 0x9AC1, 0x9B81, 0x5B40, 0x9901, 0x59C0, 0x5880, 0x9841, | |
0x8801, 0x48C0, 0x4980, 0x8941, 0x4B00, 0x8BC1, 0x8A81, 0x4A40, | |
0x4E00, 0x8EC1, 0x8F81, 0x4F40, 0x8D01, 0x4DC0, 0x4C80, 0x8C41, | |
0x4400, 0x84C1, 0x8581, 0x4540, 0x8701, 0x47C0, 0x4680, 0x8641, | |
0x8201, 0x42C0, 0x4380, 0x8341, 0x4100, 0x81C1, 0x8081, 0x4040 | |
) | |
def calcByte( ch, crc): | |
"""Given a new Byte and previous CRC, Calc a new CRC-16""" | |
if type(ch) == type("c"): | |
by = ord(ch) | |
else: | |
by = ch | |
crc = (crc >> 8) ^ table[(crc ^ by) & 0xFF] | |
return (crc & 0xFFFF) | |
def calcString(st, crc=INITIAL_MODBUS): | |
"""Given a binary string and starting CRC, Calc a final CRC-16""" | |
for ch in st: | |
crc = (crc >> 8) ^ table[(crc ^ ord(ch)) & 0xFF] | |
return crc | |
#### Cobra Generation #### | |
def generate_cobra_data(device_id): | |
"""Generates Cobra read""" | |
prologue = "$69" | |
common_data = "0068CB%s%s+000101AAAA21209500010010F1" | |
date_time = datetime.utcnow().today().strftime('%Y%m%d%H%M%S') | |
data = common_data % (device_id, date_time) | |
crc = calcString(data) | |
return prologue + data + hex(crc).split('x')[1] | |
def print_cobra_header(raw_read): | |
"""Prints out Cobra header from raw read""" | |
if len(raw_read) < 42: | |
print_usage_and_exit('No header') | |
type_mapping = {'CB' : 'Savi Cobra'} | |
try: | |
print "\nHeader:\n-------" | |
print "%-25s%s" % ("Prologue:", raw_read[0:3]) | |
print "%-25s%d" % ("Length:", int(raw_read[3:7])) | |
print "%-25s%s ('%s')" % ("Type:", raw_read[7:9], type_mapping[raw_read[7:9]]) | |
print "%-25s%s" % ("Device ID:", raw_read[9:23]) | |
read_date_time = "%4s-%2s-%2s %2s:%2s:%2s%3s" % ( | |
raw_read[23:27], raw_read[27:29], raw_read[29:31], | |
raw_read[31:33], raw_read[33:35], raw_read[35:37], raw_read[37:40]) | |
print "%-25s%s" % ("Date-Time:", read_date_time) | |
except: | |
print_usage_and_exit('Invalid header') | |
def print_cobra_status(raw_read, idx): | |
"""Prints out Cobra Status payload from raw read and position index""" | |
update_period_map = {'2': '15 minutes'} | |
sample_period_map = {'1': '5 minutes'} | |
gps_status_map = {'1': 'Disabled', '2': 'Enabled'} | |
try: | |
m_type = raw_read[idx:idx+2] | |
if int(m_type) != 1: | |
msg = 'Payload starting at %d is not a Status Message' % idx | |
print_usage_and_exit(msg) | |
firmware_ver = raw_read[idx+2:idx+6] | |
tag_upd_period = update_period_map[raw_read[idx+6:idx+7]] | |
sample_upd_period = sample_period_map[raw_read[idx+7:idx+8]] | |
gps_status = gps_status_map[raw_read[idx+8:idx+9]] | |
battery_level = raw_read[idx+9:idx+12] | |
calls_attempted = raw_read[idx+12:idx+16] | |
calls_completed = raw_read[idx+16:idx+20] | |
call_status = raw_read[idx+20:idx+22] | |
print "\nStatus:\n-------" | |
print "%-25s%s" % ("Firmware Version:", firmware_ver) | |
print "%-25s%s" % ("Tag Update Period:", tag_upd_period) | |
print "%-25s%s" % ("Sample Update Period:", sample_upd_period) | |
print "%-25s%s" % ("AGPS Status:", gps_status) | |
print "%-25s%.2f" % ("Battery Level:", float(battery_level)/100.0) | |
print "%-25s%d" % ("Calls Attempted:", int(calls_attempted)) | |
print "%-25s%d" % ("Calls Completed:", int(calls_completed)) | |
print "%-25s%s" % ("Call Status:", call_status) | |
except: | |
msg = 'Payload starting at %d is not a Status Message' % idx | |
print_usage_and_exit(msg) | |
def print_cobra_agps(raw_read, start_position): | |
"""TODO""" | |
print "postion" | |
def print_cobra_data(raw_read, target_url): | |
"""Prints out parsed Cobra tag read, telnet instructions, and raw data""" | |
port_map = {'CB' : 9999} | |
payload_len_map = {'01': 22} | |
print "\n===========\nPARSED READ\n===========" | |
print_cobra_header(raw_read) | |
num_payloads = int(raw_read[40:42]) | |
print "%-15s%d" % ("Payload Count:", num_payloads) | |
if num_payloads > 0: | |
postion = 42 # Header Length | |
for payload in range (1, num_payloads + 1): | |
msg_type = raw_read[postion:postion+2] | |
if msg_type == '01': | |
print_cobra_status(raw_read, postion) | |
postion += payload_len_map[msg_type] | |
elif msg_type == '02': | |
print "TODO" | |
# print_cobra_agps(raw_read, postion) | |
print "\nCRC:\n----\n%s\n" % raw_read[postion:postion+4] | |
print "\n========\nRAW READ\n========" | |
print "telnet %s %d" % (target_url, port_map[raw_read[7:9]]) | |
print "%s\n" % raw_read | |
############################## Main script ############################## | |
# Print usage help and applicable error text | |
def print_usage_and_exit(msg=None): | |
print "\nUsage: %s [-h][-d][-u]" % sys.argv[0] | |
print "\t-h\tPrint usage help" | |
print "\t-d\tDeviceId to use (otherwise default to TestDeviceXXXXX)" | |
print "\t-u\tURL to send message (otherwise default to PROD)" | |
if msg: print msg + "\n" | |
sys.exit(2) | |
def main(argv): | |
"""Main method""" | |
COBRA_ID_LEN = 14 | |
DEFAULT_DEVICE_ID = 'TestDeviceXXXX' # Must be 14 characters | |
DEFAULT_URL = 'iota-gprs.savitechnology.com' | |
device_id = None | |
endpoint_url = None | |
try: | |
opts, args = getopt.getopt(argv, "hd:u:") | |
except getopt.GetoptError: | |
print_usage_and_exit('Could not parse options') | |
for opt, arg in opts: | |
if opt == '-h': | |
print_usage_and_exit() | |
elif opt == '-d': | |
input_device_id = arg | |
input_len = len(input_device_id) | |
if input_len < COBRA_ID_LEN: | |
padding = "X" * (COBRA_ID_LEN - input_len) | |
device_id = input_device_id + padding | |
else: | |
device_id = input_device_id[0:14] | |
elif opt == '-u': | |
endpoint_url = arg | |
if not(device_id): | |
device_id = DEFAULT_DEVICE_ID | |
if not(endpoint_url): | |
endpoint_url = DEFAULT_URL | |
generated_read = generate_cobra_data(device_id) | |
print_cobra_data(generated_read, endpoint_url) | |
if __name__ == '__main__': | |
main(sys.argv[1:]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment