Skip to content

Instantly share code, notes, and snippets.

@dave-malone
Last active November 9, 2018 17:37
Show Gist options
  • Save dave-malone/891ae707773a8443bd242283541dedf5 to your computer and use it in GitHub Desktop.
Save dave-malone/891ae707773a8443bd242283541dedf5 to your computer and use it in GitHub Desktop.
A simple Python program to demonstrate submitting an Avro encoded file as a binary payload into AWS IoT Core
from avro.datafile import DataFileWriter
from avro.io import DatumWriter
from AWSIoTPythonSDK.MQTTLib import AWSIoTMQTTClient
import avro.schema
import logging
import time
import os
avro_users_schema_file = "user.avsc"
avro_users_file = "users.avro"
mqtt_host = os.getenv("MQTT_HOST")
mqtt_rootCAPath = os.getenv("MQTT_ROOT_CA_PATH")
mqtt_certificatePath = os.getenv("MQTT_CERT_PATH")
mqtt_privateKeyPath = os.getenv("MQTT_PRIVATE_KEY_PATH")
mqtt_clientId = os.getenv("MQTT_CLIENT_ID")
mqtt_port = 8883
mqtt_topic = os.getenv("MQTT_TOPIC")
# Configure logging
logger = logging.getLogger("AWSIoTPythonSDK.core")
logger.setLevel(logging.DEBUG)
streamHandler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
streamHandler.setFormatter(formatter)
logger.addHandler(streamHandler)
# Create Avro users file if it doesn't already exist
try:
open(avro_users_file, "rb")
print('users file exists; skipping creation')
except IOError:
print('users file did not exist; creating now')
schema = avro.schema.parse(open(avro_users_schema_file, "rb").read())
writer = DataFileWriter(open(avro_users_file, "wb"), DatumWriter(), schema)
writer.append({"name": "Alyssa", "favorite_number": 256})
writer.append({"name": "Ben", "favorite_number": 7, "favorite_color": "red"})
writer.close()
mqtt_client = AWSIoTMQTTClient(mqtt_clientId)
mqtt_client.configureEndpoint(mqtt_host, mqtt_port)
mqtt_client.configureCredentials(mqtt_rootCAPath, mqtt_privateKeyPath, mqtt_certificatePath)
# mqtt_client connection configuration
mqtt_client.configureAutoReconnectBackoffTime(1, 32, 20)
mqtt_client.configureOfflinePublishQueueing(-1) # Infinite offline Publish queueing
mqtt_client.configureDrainingFrequency(2) # Draining: 2 Hz
mqtt_client.configureConnectDisconnectTimeout(10) # 10 sec
mqtt_client.configureMQTTOperationTimeout(5) # 5 sec
# Connect to AWS IoT
mqtt_client.connect()
# Publish to the same topic in a loop forever
loopCount = 0
while True:
avroFile = open(avro_users_file, "rb")
message = bytearray(avroFile.read())
mqtt_client.publish(mqtt_topic, message, 1)
print('Published to topic %s: %s\n' % (mqtt_topic, message))
loopCount += 1
time.sleep(1)
@dave-malone
Copy link
Author

dave-malone commented Nov 9, 2018

This program was written using Python 2.7, and has the following dependencies:

  • pip install avro
  • pip install AWSIoTPythonSDK

This program also assumes that you have an AWS account, and a Thing provisioned in AWS IoT. Follow these instructions for getting started with AWS IoT: https://docs.aws.amazon.com/iot/latest/developerguide/iot-gs.html

Once you have provisioned your Thing and downloaded your device certificates, you can run this example using the following bash script. Please note that there are placeholders in this script that must be updated with your own configuration.

# stop script on error
set -e

# Check to see if root CA file exists, download if not
if [ ! -f ./root-CA.crt ]; then
  printf "\nDownloading AWS IoT Root CA certificate from AWS...\n"
  curl https://www.amazontrust.com/repository/AmazonRootCA1.pem > root-CA.crt
fi

pip install avro
pip install AWSIoTPythonSDK

export MQTT_HOST="yourmqttendpoint.iot.us-east-1.amazonaws.com"
export MQTT_ROOT_CA_PATH="root-CA.crt"
export MQTT_PRIVATE_KEY_PATH="your_key.private.key"
export MQTT_CERT_PATH="your_cert.cert.pem"
export MQTT_CLIENT_ID="your_thing_name"
# Basic Ingest topic, but could be any topic of your choosing
# See the following for more details on Basic ingest: https://docs.aws.amazon.com/iot/latest/developerguide/iot-basic-ingest.html
export MQTT_TOPIC="$aws/rules/rule_name/topic/subtopic/etc"

python avro_to_aws_iot.py

user.avsc schema file used in this example:

{"namespace": "example.avro",
 "type": "record",
 "name": "User",
 "fields": [
     {"name": "name", "type": "string"},
     {"name": "favorite_number",  "type": ["int", "null"]},
     {"name": "favorite_color", "type": ["string", "null"]}
 ]
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment