Skip to content

Instantly share code, notes, and snippets.

@charlie2951
Created June 21, 2022 15:16
Show Gist options
  • Save charlie2951/2c3420241ec07d8ecff3c901a8b94832 to your computer and use it in GitHub Desktop.
Save charlie2951/2c3420241ec07d8ecff3c901a8b94832 to your computer and use it in GitHub Desktop.
#Python script to read serial port data and send them to UBIDOTS cloud
import time
import requests
import math
import random
import serial
TOKEN = "BBFF-ofWonfD405MScpxOFPtRVbQkhVSscz" # Put your TOKEN here
DEVICE_LABEL = "My_PC" # Put your device label here
VARIABLE_LABEL_1 = "temperature" # Put your first variable label here
VARIABLE_LABEL_2 = "humidity" # Put your second variable label here
#configure serial port
ser = serial.Serial('COM3', 9600, timeout=1)
print("Reading data from serial port.....");
time.sleep(2)
ser.reset_input_buffer() # Delete any stale data.
def build_payload(variable_1, variable_2, data):
# Creates two random values for sending data
value_1 = int(data[0])
value_2 = int(data[1])
payload = {variable_1: value_1,variable_2: value_2}
return payload
def post_request(payload):
# Creates the headers for the HTTP requests
url = "http://industrial.api.ubidots.com"
url = "{}/api/v1.6/devices/{}".format(url, DEVICE_LABEL)
headers = {"X-Auth-Token": TOKEN, "Content-Type": "application/json"}
# Makes the HTTP requests
status = 400
attempts = 0
while status >= 400 and attempts <= 5:
req = requests.post(url=url, headers=headers, json=payload)
status = req.status_code
attempts += 1
time.sleep(1)
# Processes results
#print(req.status_code, req.json())
if status >= 400:
print("[ERROR] Could not send data after 5 attempts, please check \
your token credentials and internet connection")
return False
print("Device ID updated...wait for data transfer")
return True
def main():
# Reading all bytes available bytes till EOL
line = ser.readline()
if line:
# Converting Byte Strings into unicode strings
rxdata = line.decode()
# Converting Unicode String into integer
#num = int(rxdata)
#print(num)
tmp = rxdata.split(',')
#send collected data to cloud as a payload
payload = build_payload(VARIABLE_LABEL_1, VARIABLE_LABEL_2,tmp)
print(tmp[0],tmp[1])
del tmp #clear received data
print("Data transfer started")
post_request(payload)
print("Data Successfully sent to cloud")
if __name__ == '__main__':
while (True):
main()
time.sleep(1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment