Skip to content

Instantly share code, notes, and snippets.

@alexdlaird
Last active November 14, 2018 06:11
Show Gist options
  • Save alexdlaird/8e4a5572601ade86dc83ea4acf91f3c4 to your computer and use it in GitHub Desktop.
Save alexdlaird/8e4a5572601ade86dc83ea4acf91f3c4 to your computer and use it in GitHub Desktop.
AWS Lambdas that receive emails from SES, process to SQS (optional deduplication), and send as SMS via Twilio

Trackimo SES to SMS

Create a new Role from a Policy with the following permissions:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogGroup",
                "logs:CreateLogStream",
                "logs:PutLogEvents",
                "sqs:SendMessage",
                "sqs:ReceiveMessage",
                "sqs:DeleteMessage",
                "sqs:GetQueueAttributes"
            ],
            "Resource": "*"
        }
    ]
}

Create a Lambda from file #2 (using the new Role) and a second Lambda (also using the new Role) from either file #3 (SQS consumer, less requests) or file #4 (interval polling, deduplication).

Create a SES "Email Receiving" rule set that sends all inbounds for a particular email address to the Lambda from file #2.

If you chose file #3:

  • Create a SQS queue with a visibility timeout of 9 seconds of type "Standard"
  • Configure your Lambda to read from this queue

If you chose file #4:

  • Create a SQS queue with a visibility timeout of 9 seconds of either type "Standard" or "FIFO" (if you choose "FIFO", you have additional options available to you in regards to deduplication)
  • Configure a CloudWatch Event with a "Schedule expression" of rate(5 minute), which will trigger the Lambda

Whether you chose file #3 or file #4, add the following environment variables to that Lambda:

  • SQS_QUEUE_NAME
  • TWILIO_ACCOUNT_SID
  • TWILIO_AUTH_TOKEN
  • TWILIO_SMS_FROM (Twilio phone number formatted +15555555555)
  • TWILIO_SMS_GEOFENCE_TO (JSON list of phone numbers formatted ["+15555555555","+15555555556"])
  • TWILIO_SMS_BATTERY_TO (JSON list of phone numbers formatted ["+15555555555"])

Add the following environment variables to for file #2:

  • SQS_QUEUE_NAME

Lastly, have your Trackimo alerts sent to the email address you setup with SES.

import logging
import boto3
import json
logger = logging.getLogger()
logger.setLevel(logging.INFO)
SQS_QUEUE_NAME = os.environ.get("SQS_QUEUE_NAME")
def lambda_handler(event, context):
logger.info(event)
for record in event["Records"]:
_process_record(record)
return {
"statusCode": 200
}
def _process_record(record):
subject = record["ses"]["mail"]["commonHeaders"]["subject"]
if "Geo-Fence Crossed" in subject:
msg_type = "geofence"
elif "Battery" in subject:
msg_type = "battery"
else:
logger.info("Nothing to do with subject '{}'".format(subject))
return
message = json.dumps({"type": msg_type, "msg": subject})
sqs = boto3.resource("sqs")
queue = sqs.get_queue_by_name(QueueName=SQS_QUEUE_NAME)
response = queue.send_message(MessageBody=message)
import os
import logging
import json
import boto3
import base64
import urllib
from urllib import request, parse
logger = logging.getLogger()
logger.setLevel(logging.INFO)
SQS_QUEUE_NAME = os.environ.get("SQS_QUEUE_NAME")
TWILIO_ACCOUNT_SID = os.environ.get("TWILIO_ACCOUNT_SID")
TWILIO_AUTH_TOKEN = os.environ.get("TWILIO_AUTH_TOKEN")
TWILIO_SMS_URL = "https://api.twilio.com/2010-04-01/Accounts/{}/Messages.json"
TWILIO_SMS_FROM = os.environ.get("TWILIO_SMS_FROM")
TWILIO_SMS_GEOFENCE_TO = os.environ.get("TWILIO_SMS_GEOFENCE_TO")
TWILIO_SMS_BATTERY_TO = os.environ.get("TWILIO_SMS_BATTERY_TO")
def lambda_handler(event, context):
logger.info(event)
if not TWILIO_ACCOUNT_SID:
return {
"statusCode": 400,
"msg": "The function needs a 'TWILIO_ACCOUNT_SID'"
}
elif not TWILIO_AUTH_TOKEN:
return {
"statusCode": 400,
"msg": "The function needs a 'TWILIO_AUTH_TOKEN'"
}
elif not TWILIO_SMS_FROM:
return {
"statusCode": 400,
"msg": "The function needs a 'TWILIO_SMS_FROM' number in the format +15555555555"
}
elif not TWILIO_SMS_GEOFENCE_TO:
return {
"statusCode": 400,
"msg": "The function needs a 'TWILIO_SMS_GEOFENCE_TO' number in the format +15555555555"
}
elif not TWILIO_SMS_BATTERY_TO:
return {
"statusCode": 400,
"msg": "The function needs a 'TWILIO_SMS_BATTERY_TO' number in the format +15555555555"
}
sqs = boto3.resource("sqs")
queue = sqs.get_queue_by_name(QueueName=SQS_QUEUE_NAME)
messages = {}
sqs_batch = queue.receive_messages()
while sqs_batch:
for message in sqs_batch:
_process_message(messages, message)
sqs_batch = queue.receive_messages()
for message in messages.values():
_send_message(message)
return {
"statusCode": 200
}
def _process_message(messages, message):
logger.info(message)
body = json.loads(message.body)
if body["type"] + body["msg"] not in messages:
logger.info(body)
messages[body["type"] + body["msg"]] = body
message.delete()
def _send_message(message):
msg_type = message["type"]
msg = message["msg"]
if msg_type == "battery":
TWILIO_SMS_TO = json.loads(TWILIO_SMS_BATTERY_TO)
elif msg_type == "geofence":
TWILIO_SMS_TO = json.loads(TWILIO_SMS_GEOFENCE_TO)
else:
logger.info("The given type of '{}' is unknown".format(msg_type))
return
for to_number in TWILIO_SMS_TO:
populated_url = TWILIO_SMS_URL.format(TWILIO_ACCOUNT_SID)
post_params = {"To": to_number, "From": TWILIO_SMS_FROM, "Body": msg}
data = parse.urlencode(post_params).encode()
req = request.Request(populated_url)
authentication = "{}:{}".format(TWILIO_ACCOUNT_SID, TWILIO_AUTH_TOKEN)
base64string = base64.b64encode(authentication.encode("utf-8"))
req.add_header("Authorization", "Basic %s" % base64string.decode("ascii"))
with request.urlopen(req, data) as f:
logger.info("Twilio returned {}".format(str(f.read().decode("utf-8"))))
import os
import logging
import json
import base64
import urllib
from urllib import request, parse
logger = logging.getLogger()
logger.setLevel(logging.INFO)
TWILIO_ACCOUNT_SID = os.environ.get("TWILIO_ACCOUNT_SID")
TWILIO_AUTH_TOKEN = os.environ.get("TWILIO_AUTH_TOKEN")
TWILIO_SMS_URL = "https://api.twilio.com/2010-04-01/Accounts/{}/Messages.json"
TWILIO_SMS_FROM = os.environ.get("TWILIO_SMS_FROM")
TWILIO_SMS_GEOFENCE_TO = os.environ.get("TWILIO_SMS_GEOFENCE_TO")
TWILIO_SMS_BATTERY_TO = os.environ.get("TWILIO_SMS_BATTERY_TO")
def lambda_handler(event, context):
logger.info(event)
if not TWILIO_ACCOUNT_SID:
return {
"statusCode": 400,
"msg": "The function needs a 'TWILIO_ACCOUNT_SID'"
}
elif not TWILIO_AUTH_TOKEN:
return {
"statusCode": 400,
"msg": "The function needs a 'TWILIO_AUTH_TOKEN'"
}
elif not TWILIO_SMS_FROM:
return {
"statusCode": 400,
"msg": "The function needs a 'TWILIO_SMS_FROM' number in the format +15555555555"
}
elif not TWILIO_SMS_GEOFENCE_TO:
return {
"statusCode": 400,
"msg": "The function needs a 'TWILIO_SMS_GEOFENCE_TO' number in the format +15555555555"
}
elif not TWILIO_SMS_BATTERY_TO:
return {
"statusCode": 400,
"msg": "The function needs a 'TWILIO_SMS_BATTERY_TO' number in the format +15555555555"
}
for record in event["Records"]:
_process_record(record)
return {
"statusCode": 200
}
def _process_record(record):
body = json.loads(record["body"])
msg_type = body["type"]
msg = body["msg"]
if msg_type == "battery":
TWILIO_SMS_TO = json.loads(TWILIO_SMS_BATTERY_TO)
elif msg_type == "geofence":
TWILIO_SMS_TO = json.loads(TWILIO_SMS_GEOFENCE_TO)
else:
logger.info("The given type of '{}' is unknown".format(msg_type))
return
for to_number in TWILIO_SMS_TO:
populated_url = TWILIO_SMS_URL.format(TWILIO_ACCOUNT_SID)
post_params = {"To": to_number, "From": TWILIO_SMS_FROM, "Body": msg}
data = parse.urlencode(post_params).encode()
req = request.Request(populated_url)
authentication = "{}:{}".format(TWILIO_ACCOUNT_SID, TWILIO_AUTH_TOKEN)
base64string = base64.b64encode(authentication.encode('utf-8'))
req.add_header("Authorization", "Basic %s" % base64string.decode('ascii'))
with request.urlopen(req, data) as f:
logger.info("Twilio returned {}".format(str(f.read().decode('utf-8'))))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment