Last active
April 28, 2020 08:01
-
-
Save sharathsamala/c2d281598187b0e92410eeb9f2fc81d8 to your computer and use it in GitHub Desktop.
Lambda function to load DynamoDB Streams to Elasticsearch
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import urllib3 | |
from datetime import datetime, date | |
from time import struct_time, mktime | |
import decimal | |
from boto3.dynamodb.types import TypeDeserializer | |
URL = "<ES-ENDPOINT>/posts/_doc/{0}" | |
headers = {'Content-Type': 'application/json'} | |
class CustomJSONEncoder(json.JSONEncoder): | |
def default(self, o): | |
if isinstance(o, datetime): | |
return str(o) | |
if isinstance(o, date): | |
return str(o) | |
if isinstance(o, decimal.Decimal): | |
return float(o) | |
if isinstance(o, struct_time): | |
return datetime.fromtimestamp(mktime(o)) | |
# Any other serializer if needed | |
return super(CustomJSONEncoder, self).default(o) | |
def from_dynamodb_to_json(item): | |
d = TypeDeserializer() | |
return {k: d.deserialize(value=v) for k, v in item.items()} | |
def lambda_handler(event, context): | |
print(event) | |
output = {} | |
for x in event["Records"]: | |
event_type = x['eventName'] | |
input_id = x["dynamodb"]["Keys"]["<UniqueIdKey>"]["S"] | |
if event_type == "REMOVE": | |
print("Removing id: " + str(input_id)) | |
http = urllib3.PoolManager() | |
r = http.request('DELETE', URL.format(input_id), headers=headers) | |
output = json.loads(r.data.decode('utf-8')) | |
print(output) | |
elif event_type == "INSERT": | |
parsed_event_data = from_dynamodb_to_json(x["dynamodb"]["NewImage"]) | |
print("inserting into index, id: " + str(input_id)) | |
http = urllib3.PoolManager() | |
encoded_data = json.dumps(parsed_event_data, cls=CustomJSONEncoder).encode('utf-8') | |
r = http.request('POST', URL.format(input_id), headers=headers, body=encoded_data) | |
output = json.loads(r.data.decode('utf-8')) | |
print(output) | |
else: | |
print("Invalid event type: "+str(event_type)) | |
return { | |
'statusCode': 200, | |
'body': json.dumps(output) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment