Last active
May 3, 2018 22:14
-
-
Save jg75/6ac5685e5371e1aad91d3ba2d6055bff to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Kinesis data streams example how-to.""" | |
import datetime | |
import logging | |
import time | |
import botocore | |
import boto3 | |
logging.basicConfig(level=logging.INFO) | |
class DataStream: | |
"""The example data stream with wait, create and delete methods.""" | |
def __init__( | |
self, | |
stream_name='MyPythonDataStreamExample', | |
client=boto3.client('kinesis'), | |
logger=logging.getLogger('DataStream') | |
): | |
"""Initialize. | |
:param stream_name: The name of the stream | |
:param client: An AWS Kinesis boto3.Client | |
:param logger: A logging instance | |
""" | |
self.stream_name = stream_name | |
self.client = client | |
self.logger = logger | |
def wait(self, **kwargs): | |
"""Wait for the stream to exist.""" | |
waiter = self.client.get_waiter('stream_exists') | |
waiter.wait(StreamName=self.stream_name, **kwargs) | |
def create(self): | |
"""Create the stream.""" | |
try: | |
self.logger.info('CreateStream %s', self.stream_name) | |
self.client.create_stream( | |
StreamName=self.stream_name, | |
ShardCount=1 | |
) | |
self.wait() | |
except botocore.exceptions.ClientError as e: | |
error = e.response['Error'] | |
if error['Code'] == 'ResourceInUseException': | |
pass | |
else: | |
raise e | |
def delete(self): | |
"""Delete the stream.""" | |
try: | |
self.logger.info('DeleteStream %s', self.stream_name) | |
self.client.delete_stream(StreamName=self.stream_name) | |
except botocore.exceptions.ClientError as e: | |
error = e.response['Error'] | |
if error['Code'] == 'ResourceNotFoundException': | |
pass | |
else: | |
raise e | |
class Producer: | |
"""The producer produces data records and puts them in the stream.""" | |
def __init__( | |
self, | |
data_stream, | |
partition_key='MyPythonProducerExample', | |
logger=logging.getLogger('Producer') | |
): | |
"""Initialize. | |
:param data_stream: A DataStream instance | |
:param partition_key: The key for this producer's partition | |
:param logger: A logging instance | |
""" | |
self.data_stream = data_stream | |
self.partition_key = partition_key | |
self.logger = logger | |
def get_data_records(self, timestamp, count=10): | |
"""Get a list of data records. | |
[{ | |
'Data': b'data', | |
'PartitionKey': 'partition' | |
}] | |
:param timestamp: A datetime instance | |
:param count: The number of records | |
:returns: a list of data records | |
""" | |
records = list() | |
for i in range(0, count): | |
data = 'DataRecord {}: {}'.format(i, timestamp) | |
records.append({ | |
'Data': data.encode(), | |
'PartitionKey': self.partition_key | |
}) | |
return records | |
def produce(self, data_records): | |
"""Produce. | |
:param data_records: A list of data records | |
:returns: A list of shard ids | |
""" | |
response = self.data_stream.client.put_records( | |
StreamName=self.data_stream.stream_name, | |
Records=data_records | |
) | |
shards = list() | |
for record in response['Records']: | |
error_code = record.get('ErrorCode') | |
shard = record.get('ShardId') | |
if record.get('ErrorCode'): | |
logger.error(record) | |
elif shard not in shards: | |
shards.append(shard) | |
return shards | |
class Consumer: | |
"""AWS Kinesis Data Stream Application, a data stream consumer.""" | |
# When you read repeatedly from a stream,use a GetShardIterator request | |
# to get the first shard iterator for use in your first GetRecords | |
# request and for subsequent reads use the shard iterator returned by | |
# the GetRecords request in NextShardIterator. A new shard iterator | |
# is returned by every GetRecords request in NextShardIterator, | |
# which you use in the ShardIterator parameter of the next GetRecords | |
# request. | |
# If a GetShardIterator request is made too often, you receive | |
# a ProvisionedThroughputExceededException | |
def __init__( | |
self, | |
data_stream, | |
logger=logging.getLogger('Consumer') | |
): | |
"""Initialize. | |
:param data_stream: A DataStream instance | |
:param logger: A logging instance | |
""" | |
self.data_stream = data_stream | |
self.logger = logger | |
def _get_shard_iterator(self, shard, timestamp): | |
"""Get the shard iterator. | |
:param shard: The shard id of the shard to consume | |
:param timestamp: A datetime instance to start consuming | |
:returns: The shard iterator | |
""" | |
response = self.data_stream.client.get_shard_iterator( | |
StreamName=data_stream.stream_name, | |
ShardId=shard, | |
ShardIteratorType='TRIM_HORIZON', | |
Timestamp=timestamp | |
) | |
return response['ShardIterator'] | |
def consume(self, shard, timestamp): | |
"""Consume. | |
:param shard: The shard id of the shard to consume | |
:param timestamp: A datetime instance to start consuming | |
""" | |
# TODO | |
# rate limit or wait in between empty GetRecords | |
# TODO | |
# consuming by sequence number would probably be more precise | |
# consuming by timestamp is fuzzy and data could get | |
# processed twice if the consumer restarts | |
shard_iterator = self._get_shard_iterator(shard, timestamp) | |
while True: | |
response = self.data_stream.client.get_records( | |
ShardIterator=shard_iterator, | |
Limit=10 | |
) | |
shard_iterator = response['NextShardIterator'] | |
for record in response['Records']: | |
data = record['Data'].decode('utf-8') | |
self.logger.info(data) | |
if __name__ == '__main__': | |
data_stream = DataStream() | |
producer = Producer(data_stream) | |
consumer = Consumer(data_stream) | |
timestamp = datetime.datetime.now() | |
data_records = producer.get_data_records(timestamp=timestamp) | |
data_stream.create() | |
for shard in producer.produce(data_records): | |
consumer.consume(shard, timestamp) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment