Created
July 17, 2021 06:01
-
-
Save mtsukamoto/1b172967a7971d26a7ea2a7ded181645 to your computer and use it in GitHub Desktop.
AWSBasics Amazon-Location-Service lambda.py modified.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from datetime import date, datetime | |
import json | |
import os | |
import boto3 | |
# Update this to match the name of your Tracker resource | |
TRACKER_NAME = "handson-20210717" | |
""" | |
This Lambda function receives a payload from AWS IoT Core and publishes device updates to Amazon Location Service via the BatchUpdateDevicePosition API. | |
Parameter 'event' is the payload delivered from AWS IoT Core. | |
In this sample, we assume that the payload has a single top-level key 'payload' and a nested key | |
'location' with keys 'lat' and 'long'. We also assume that the name of the device is nested in | |
the payload as 'deviceid'. Finally, the timestamp of the payload is present as 'timestamp'. For | |
example: | |
>>> event | |
{ 'payload': { 'deviceid': 'thing123', 'timestamp': 1604940328, | |
'location': { 'lat': 49.2819, 'long': -123.1187 } } } | |
If your data does not match this schema, you can either use the AWS IoT Core rules engine to | |
format the data before delivering it to this Lambda function, or you can modify the code below to | |
match it. | |
""" | |
def json_serial(obj): | |
# see also: https://www.yoheim.net/blog.php?q=20170703 | |
if isinstance(obj, (datetime, date)): | |
return obj.isoformat() | |
raise TypeError ("Type %s not serializable" % type(obj)) | |
def lambda_handler(event, context): | |
# load the side-loaded Amazon Location Service model; needed during Public Preview | |
os.environ["AWS_DATA_PATH"] = os.environ["LAMBDA_TASK_ROOT"] | |
updates = [ | |
{ | |
"DeviceId": event["payload"]["deviceid"], | |
"SampleTime": datetime.fromtimestamp(event["payload"]["timestamp"]).isoformat(), | |
"Position": [ | |
event["payload"]["location"]["long"], | |
event["payload"]["location"]["lat"] | |
] | |
} | |
] | |
client = boto3.client("location") | |
response = client.batch_update_device_position(TrackerName=TRACKER_NAME, Updates=updates) | |
return { | |
"statusCode": 200, | |
"body": json.dumps(response, default=json_serial) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment