Last active
September 20, 2018 14:22
-
-
Save omrihar/a832964f6af19c3f51a8e1fd83af40a7 to your computer and use it in GitHub Desktop.
An object that can send events to AWS Kinesis Firehose
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import logging | |
import boto3 | |
class FirehoseStream(object): | |
"""Allows sending of events to Kinesis Firehose""" | |
def __init__(self, stream=None, region=None, | |
project=None, logger=None): | |
"""Create the stream client and wait for events. | |
Parameters | |
---------- | |
stream : str, optional | |
The Firehose stream to use | |
region : str, optional | |
AWS Region to use | |
project : str, optional | |
Name of the project sending the events | |
""" | |
self._stream = stream or os.getenv('AWS_KINESIS_STREAM') | |
self._region = region or os.getenv('AWS_REGION_NAME') | |
self._project = project or os.getenv('PROJECT_ID') | |
self.logger = logger or logging.getLogger(__name__) | |
self.logger.info("Created a FirehoseStream") | |
self.client = boto3.client('firehose', region_name=self._region) | |
self.logger.info(f"Connected to Firehose stream {self._stream} in region " | |
f"{self._region}") | |
def _get_timestamp(self, the_date=None): | |
"""Converts datetimes into time since the unix epoch | |
Parameters | |
---------- | |
the_date : datetime | |
Returns | |
------- | |
time_since_epoch : int | |
Number of seconds since the unix epoch | |
""" | |
if not the_date: | |
the_date = datetime.datetime.utcnow() | |
return calendar.timegm(the_date.timetuple()) | |
def send_event(self, event_name, data, timestamp=None): | |
"""Sends an event to record to the Kinesis Firehose client using the | |
put_record API. | |
Parameters | |
---------- | |
event_name : str | |
One of the event types defined for the project | |
data : dict | |
A JSON-serializable object with data to describe the event. | |
timestamp : datetime, optional | |
The timestamp associated with the event. If not provided, will be | |
generated by send_event. | |
Returns | |
------- | |
RecordId : str | |
The record id of the sent record | |
""" | |
payload = { | |
'event_name': event_name, | |
'event_timestamp': self._get_timestamp(timestamp), | |
'project': self._project, | |
'data': data | |
} | |
self.logger.debug(f'Sending to Firehose: {json.dumps(payload)}') | |
result = self.client.put_record( | |
DeliveryStreamName=self._stream, | |
Record={ | |
'Data': json.dumps(payload) + "\n" | |
} | |
) | |
self.logger.debug(f'Sent to Firehose. Response: {result}') | |
if result and 'RecordId' in result: | |
return result.get('RecordId') | |
self.logger.warning('Did not receive a record_id!') | |
return None |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment