Skip to content

Instantly share code, notes, and snippets.

@danialothman
Last active February 16, 2024 02:26
Show Gist options
  • Save danialothman/9556d84e60dcff23f0e149f193628026 to your computer and use it in GitHub Desktop.
Save danialothman/9556d84e60dcff23f0e149f193628026 to your computer and use it in GitHub Desktop.
Send telemetry to ThingsBoard via HTTP Curl (testing)
"""
Change 'YOUR_TOKEN' and IP address 'XX'
"""
import subprocess
import time
import random
seconds = 20
while True:
# Choose a random number between 0 and 8 for PeopleIn
people_in = random.randint(0,8)
# Create the CURL command string
curl_command = f'curl -v -X POST http://192.168.XX.XX:8081/api/v1/YOUR_TOKEN/telemetry --header Content-Type:application/json --data "{{\\"PeopleIn\\":{people_in}}}"'
# Execute the CURL command using subprocess
subprocess.run(curl_command, shell=True)
# Wait for X seconds before the next execution
for _ in range(seconds):
print(seconds - _)
time.sleep(1)
"""_summary_
example sending payload to TB CE, supply the device token from TB
check this tutorial
https://thingsboard.io/docs/user-guide/rule-engine-2-0/tutorials/transform-telemetry-using-previous-record/
notice: for rule chain, make sure another 'post telemetry' directly connects to 'Save Timeseries' or else telemetry for other devices will not be received. Have another 'post telemetry' connect to 'originator attributes', then follow as per tutorial
"""
import requests
import json
import time
device_token = "DEVICE TOKEN"
# Function to send POST request with updated accumulated_flow_litres value
def send_data(accumulated_flow):
endpoint = "http://192.168.XX.XX:8081/api/v1/" + device_token + "/telemetry"
headers = {"Content-Type": "application/json"}
payload = {"accumulated_flow_litres": accumulated_flow}
try:
response = requests.post(endpoint, data=json.dumps(payload), headers=headers)
if response.status_code == 200:
print("Data sent successfully:", payload)
else:
print("Failed to send data. Status code:", response.status_code)
except requests.RequestException as e:
print("Request Exception:", e)
# Initial value for accumulated_flow_litres
accumulated_flow = 9990
# Main loop to increment and send data every 10 seconds
while True:
accumulated_flow += 10 # Increment by 10
send_data(accumulated_flow)
time.sleep(10) # Wait for 10 seconds
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment