Skip to content

Instantly share code, notes, and snippets.

@abhishekzgithub
Created March 22, 2021 07:26
Show Gist options
  • Save abhishekzgithub/37dc846098c79f1259146cf57985f1b5 to your computer and use it in GitHub Desktop.
Save abhishekzgithub/37dc846098c79f1259146cf57985f1b5 to your computer and use it in GitHub Desktop.
Code to send data from device to iothub using paho-mqtt in python
import json
import ssl
from time import time
from time import sleep
from base64 import b64encode, b64decode
from hashlib import sha256
from urllib import parse
from hmac import HMAC
from paho.mqtt import client as mqtt
from constants import iot_data
def generate_sas_token(uri, key, policy_name, expiry=3600):
"""
generate security token
"""
ttl = time() + expiry
sign_key = "%s\n%d" % ((parse.quote_plus(uri)), int(ttl))
signature = b64encode(HMAC(b64decode(key), sign_key.encode('utf-8'), sha256).digest())
rawtoken = {
'sr' : uri,
'sig': signature,
'se' : str(int(ttl))
}
if policy_name is not None:
rawtoken['skn'] = policy_name
return 'SharedAccessSignature ' + parse.urlencode(rawtoken)
device_id = iot_data["device_id"]
import json
path_file="/Users/abhishek_kumar/Downloads/ra_simulator/out_data.json"
with open(path_file, "r") as fbuff:
out_data = json.load(fbuff)
def on_subscribe(client, userdata, mid, granted_qos):
print('Subscribed for m' + str(mid))
def on_connect(client, userdata, flags, rc):
print("Connected with result code->: %s" % rc)
#client.subscribe(f"devices/{device_id}/messages/devicebound/")
def on_disconnect(client, userdata, rc):
print("Disconnected with result code: %s" % rc)
def on_message(client, userdata, msg):
print(" - ".join((msg.topic, str(msg.payload))))
# Do this only if you want to send a reply message every time you receive one
#client.publish(f"devices/{device_id}/messages/events", "REPLY", qos=1)
def on_publish(client, userdata, mid):
print("Sent message")
print("Message {0} sent from {1}".format(str(mid), device_id))
def publish(topic):
print("Sending data")
result = client.publish(topic, out_data, qos=1)
print(result.is_published())
def on_log(client, userdata, level, buf):
print("log: ",buf)
client = mqtt.Client(client_id=device_id, protocol=mqtt.MQTTv311, clean_session=False)
client.on_connect = on_connect
client.on_disconnect = on_disconnect
client.on_message = on_message
client.on_publish = on_publish
client.on_log = on_log
client.tls_set_context(context=None)
client.tls_insecure_set(True)
policy_name = None
sas_token = generate_sas_token(iot_data["url"]+"/devices/"+device_id, iot_data["access_key"], policy_name)
print(f"sas token is--> {sas_token}")
user = f"{iot_data['hostname']}.azure-devices.net/{device_id}"+"/?api-version=2018-06-30"
print(f'username is {user}')
client.username_pw_set(username=user,
password= sas_token)
topic = "devices/{device_id}/messages/events/".format(device_id=device_id)
# actions
client.connect(f"{iot_data['hostname']}.azure-devices.net", port=8883)
sent=client.publish(topic, payload=out_data, qos=0, retain=False)
print("Is published? ", sent.is_published())
client.subscribe("devices/{device_id}/messages/devicebound/#".format(device_id=device_id))
client.loop_forever()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment