Last active
February 16, 2024 02:26
-
-
Save danialothman/9556d84e60dcff23f0e149f193628026 to your computer and use it in GitHub Desktop.
Send telemetry to ThingsBoard via HTTP Curl (testing)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Change 'YOUR_TOKEN' and IP address 'XX' | |
""" | |
import subprocess | |
import time | |
import random | |
seconds = 20 | |
while True: | |
# Choose a random number between 0 and 8 for PeopleIn | |
people_in = random.randint(0,8) | |
# Create the CURL command string | |
curl_command = f'curl -v -X POST http://192.168.XX.XX:8081/api/v1/YOUR_TOKEN/telemetry --header Content-Type:application/json --data "{{\\"PeopleIn\\":{people_in}}}"' | |
# Execute the CURL command using subprocess | |
subprocess.run(curl_command, shell=True) | |
# Wait for X seconds before the next execution | |
for _ in range(seconds): | |
print(seconds - _) | |
time.sleep(1) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""_summary_ | |
example sending payload to TB CE, supply the device token from TB | |
check this tutorial | |
https://thingsboard.io/docs/user-guide/rule-engine-2-0/tutorials/transform-telemetry-using-previous-record/ | |
notice: for rule chain, make sure another 'post telemetry' directly connects to 'Save Timeseries' or else telemetry for other devices will not be received. Have another 'post telemetry' connect to 'originator attributes', then follow as per tutorial | |
""" | |
import requests | |
import json | |
import time | |
device_token = "DEVICE TOKEN" | |
# Function to send POST request with updated accumulated_flow_litres value | |
def send_data(accumulated_flow): | |
endpoint = "http://192.168.XX.XX:8081/api/v1/" + device_token + "/telemetry" | |
headers = {"Content-Type": "application/json"} | |
payload = {"accumulated_flow_litres": accumulated_flow} | |
try: | |
response = requests.post(endpoint, data=json.dumps(payload), headers=headers) | |
if response.status_code == 200: | |
print("Data sent successfully:", payload) | |
else: | |
print("Failed to send data. Status code:", response.status_code) | |
except requests.RequestException as e: | |
print("Request Exception:", e) | |
# Initial value for accumulated_flow_litres | |
accumulated_flow = 9990 | |
# Main loop to increment and send data every 10 seconds | |
while True: | |
accumulated_flow += 10 # Increment by 10 | |
send_data(accumulated_flow) | |
time.sleep(10) # Wait for 10 seconds |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment