Skip to content

Instantly share code, notes, and snippets.

@skirdey
Last active September 19, 2024 11:43
Show Gist options
  • Save skirdey/9cdead881799a47742ff3cd296d06cc1 to your computer and use it in GitHub Desktop.
Save skirdey/9cdead881799a47742ff3cd296d06cc1 to your computer and use it in GitHub Desktop.
Connecting to AWS IoT MQTT topic using Python and Paho MQTT client
import paho.mqtt.client as paho
import os
import socket
import ssl
from time import sleep
from random import uniform
import json
import logging
logging.basicConfig(level=logging.INFO)
# Refactored original source - https://github.com/mariocannistra/python-paho-mqtt-for-aws-iot
class PubSub(object):
def __init__(self, listener = False, topic = "default"):
self.connect = False
self.listener = listener
self.topic = topic
self.logger = logging.getLogger(repr(self))
def __on_connect(self, client, userdata, flags, rc):
self.connect = True
if self.listener:
self.mqttc.subscribe(self.topic)
self.logger.debug("{0}".format(rc))
def __on_message(self, client, userdata, msg):
self.logger.info("{0}, {1} - {2}".format(userdata, msg.topic, msg.payload))
def __on_log(self, client, userdata, level, buf):
self.logger.debug("{0}, {1}, {2}, {3}".format(client, userdata, level, buf))
def bootstrap_mqtt(self):
self.mqttc = paho.Client()
self.mqttc.on_connect = self.__on_connect
self.mqttc.on_message = self.__on_message
self.mqttc.on_log = self.__on_log
awshost = "yourthingendpointname.iot.us-east-1.amazonaws.com"
awsport = 8883
caPath = "./authority.pem" # Root certificate authority, comes from AWS with a long, long name
certPath = "./2bafa20887-certificate.pem.crt"
keyPath = "./2bafa20887-private.pem.key"
self.mqttc.tls_set(caPath,
certfile=certPath,
keyfile=keyPath,
cert_reqs=ssl.CERT_REQUIRED,
tls_version=ssl.PROTOCOL_TLSv1_2,
ciphers=None)
result_of_connection = self.mqttc.connect(awshost, awsport, keepalive=120)
if result_of_connection == 0:
self.connect = True
return self
def start(self):
self.mqttc.loop_start()
while True:
sleep(2)
if self.connect == True:
self.mqttc.publish(self.topic, json.dumps({"message": "Hello COMP680"}), qos=1)
else:
self.logger.debug("Attempting to connect.")
if __name__ == '__main__':
PubSub(listener = True, topic = "chat-evets").bootstrap_mqtt().start()
@hexpwn
Copy link

hexpwn commented Jul 4, 2023

I got this working once but now, looking at Wireshark, it seems I am getting a Encrypted Alert and the TLS is being finalized before getting a CONNACK. Any idea how I can fix this?

@giant995
Copy link

Late response, but this AWS blog post introduction shows how to setup the required certificates.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment