Skip to content

Instantly share code, notes, and snippets.

@misodengaku
Last active July 27, 2018 05:57
Show Gist options
  • Save misodengaku/94c83e495f8774c878e4c6914e0eb4da to your computer and use it in GitHub Desktop.
Save misodengaku/94c83e495f8774c878e4c6914e0eb4da to your computer and use it in GitHub Desktop.
import serial
import time
from datetime import datetime
import requests
URL = "http://xxxxxxxxxxxxx"
ser = serial.Serial('/dev/ttyUSB0', 9600, timeout=1.0)
def check_data(data):
if len(data) < 20:
return False
if data[0] != 0x24:
return False
if (data[3] & 0x60) != 0x60:
return False
checksum = data[16]
if checksum != calc_checksum(data):
return False
return True
def calc_checksum(data):
calc_checksum = 0
for i in data[:16]:
calc_checksum = calc_checksum + i
calc_checksum = calc_checksum & 0xFF
return calc_checksum
def parse(data):
wind_direction = data[2] | (data[3] & 0x80) << 1
temperature = ((data[4] | ((data[3] & 0x7) << 8)) - 400) / 10.0
hum = data[5]
wind_speed = (data[6] | ((data[3] & 0x10) << 4)) / 8.0 * 1.12
gust_speed = data[7] * 1.12
rainfall = (data[8] << 8) | data[9]
uv = ((data[10] << 8) | data[11]) / 10.0
light = ((data[12] << 16) | (data[13] << 8) | data[14]) / 10.0
pressure = ((data[17] << 16) | (data[18] << 8) | data[19]) / 100.0
json = {
"direction": wind_direction,
"temperature": temperature,
"humidity": hum,
"wind speed": wind_speed,
"gust speed": gust_speed,
"rainfall": rainfall,
"uv": uv,
"light": light,
"pressure": pressure,
}
requests.post(URL, data=json)
print("direction: %d°" % wind_direction)
print("temperature: %.1f℃" % temperature)
print("humidity: %d%%" % hum)
print("wind speed: %.1fm/s" % wind_speed)
print("gust speed: %.1fm/s" % gust_speed)
print("rainfall: %dmm" % rainfall)
print("uv: %.1fuW/cm2" % uv)
print("light: %.1flux" % light)
print("pressure: %.2fhPa" % pressure)
def get_record(byte_array):
for i, d in enumerate(byte_array):
if d == 0x24:
cropped_data = byte_array[i:i+20]
if check_data(cropped_data):
return (i, cropped_data)
return None
index = 1
buffer = []
while True:
c = ser.read(1)
if c != None:
buffer.extend(c)
if len(buffer) >= 20:
result = get_record(buffer)
if result != None:
print("index %d" % index)
print(datetime.now().strftime("%Y/%m/%d %H:%M:%S"))
if check_data(result[1]):
parse(result[1])
buffer = buffer[result[0] + 1:]
print('----------------------------')
index = index + 1
else:
print('.')
time.sleep(1)
'''
index = 0
while index < len(stream_data):
result = get_record(stream_data[index:])
if result != None:
if check_data(result[1]):
parse(result[1])
index = index + result[0] + 1
print("index: %d" % index)
else:
break
for i, record in enumerate(data):
print("record %d:" % i)
if check_data(record):
parse(record)
else:
print("invalid data")
print("---------------\n\n")
for i, dirty_datum in enumerate(dirty_data):
print("data cleanup %d:" % i)
clean_record = get_record(dirty_datum)
if clean_record != None:
if check_data(record):
parse(record)
print(clean_record)
'''
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment