Skip to content

Instantly share code, notes, and snippets.

@jg75
Last active May 3, 2018 22:14
Show Gist options
  • Save jg75/6ac5685e5371e1aad91d3ba2d6055bff to your computer and use it in GitHub Desktop.
Save jg75/6ac5685e5371e1aad91d3ba2d6055bff to your computer and use it in GitHub Desktop.
"""Kinesis data streams example how-to."""
import datetime
import logging
import time
import botocore
import boto3
logging.basicConfig(level=logging.INFO)
class DataStream:
"""The example data stream with wait, create and delete methods."""
def __init__(
self,
stream_name='MyPythonDataStreamExample',
client=boto3.client('kinesis'),
logger=logging.getLogger('DataStream')
):
"""Initialize.
:param stream_name: The name of the stream
:param client: An AWS Kinesis boto3.Client
:param logger: A logging instance
"""
self.stream_name = stream_name
self.client = client
self.logger = logger
def wait(self, **kwargs):
"""Wait for the stream to exist."""
waiter = self.client.get_waiter('stream_exists')
waiter.wait(StreamName=self.stream_name, **kwargs)
def create(self):
"""Create the stream."""
try:
self.logger.info('CreateStream %s', self.stream_name)
self.client.create_stream(
StreamName=self.stream_name,
ShardCount=1
)
self.wait()
except botocore.exceptions.ClientError as e:
error = e.response['Error']
if error['Code'] == 'ResourceInUseException':
pass
else:
raise e
def delete(self):
"""Delete the stream."""
try:
self.logger.info('DeleteStream %s', self.stream_name)
self.client.delete_stream(StreamName=self.stream_name)
except botocore.exceptions.ClientError as e:
error = e.response['Error']
if error['Code'] == 'ResourceNotFoundException':
pass
else:
raise e
class Producer:
"""The producer produces data records and puts them in the stream."""
def __init__(
self,
data_stream,
partition_key='MyPythonProducerExample',
logger=logging.getLogger('Producer')
):
"""Initialize.
:param data_stream: A DataStream instance
:param partition_key: The key for this producer's partition
:param logger: A logging instance
"""
self.data_stream = data_stream
self.partition_key = partition_key
self.logger = logger
def get_data_records(self, timestamp, count=10):
"""Get a list of data records.
[{
'Data': b'data',
'PartitionKey': 'partition'
}]
:param timestamp: A datetime instance
:param count: The number of records
:returns: a list of data records
"""
records = list()
for i in range(0, count):
data = 'DataRecord {}: {}'.format(i, timestamp)
records.append({
'Data': data.encode(),
'PartitionKey': self.partition_key
})
return records
def produce(self, data_records):
"""Produce.
:param data_records: A list of data records
:returns: A list of shard ids
"""
response = self.data_stream.client.put_records(
StreamName=self.data_stream.stream_name,
Records=data_records
)
shards = list()
for record in response['Records']:
error_code = record.get('ErrorCode')
shard = record.get('ShardId')
if record.get('ErrorCode'):
logger.error(record)
elif shard not in shards:
shards.append(shard)
return shards
class Consumer:
"""AWS Kinesis Data Stream Application, a data stream consumer."""
# When you read repeatedly from a stream,use a GetShardIterator request
# to get the first shard iterator for use in your first GetRecords
# request and for subsequent reads use the shard iterator returned by
# the GetRecords request in NextShardIterator. A new shard iterator
# is returned by every GetRecords request in NextShardIterator,
# which you use in the ShardIterator parameter of the next GetRecords
# request.
# If a GetShardIterator request is made too often, you receive
# a ProvisionedThroughputExceededException
def __init__(
self,
data_stream,
logger=logging.getLogger('Consumer')
):
"""Initialize.
:param data_stream: A DataStream instance
:param logger: A logging instance
"""
self.data_stream = data_stream
self.logger = logger
def _get_shard_iterator(self, shard, timestamp):
"""Get the shard iterator.
:param shard: The shard id of the shard to consume
:param timestamp: A datetime instance to start consuming
:returns: The shard iterator
"""
response = self.data_stream.client.get_shard_iterator(
StreamName=data_stream.stream_name,
ShardId=shard,
ShardIteratorType='TRIM_HORIZON',
Timestamp=timestamp
)
return response['ShardIterator']
def consume(self, shard, timestamp):
"""Consume.
:param shard: The shard id of the shard to consume
:param timestamp: A datetime instance to start consuming
"""
# TODO
# rate limit or wait in between empty GetRecords
# TODO
# consuming by sequence number would probably be more precise
# consuming by timestamp is fuzzy and data could get
# processed twice if the consumer restarts
shard_iterator = self._get_shard_iterator(shard, timestamp)
while True:
response = self.data_stream.client.get_records(
ShardIterator=shard_iterator,
Limit=10
)
shard_iterator = response['NextShardIterator']
for record in response['Records']:
data = record['Data'].decode('utf-8')
self.logger.info(data)
if __name__ == '__main__':
data_stream = DataStream()
producer = Producer(data_stream)
consumer = Consumer(data_stream)
timestamp = datetime.datetime.now()
data_records = producer.get_data_records(timestamp=timestamp)
data_stream.create()
for shard in producer.produce(data_records):
consumer.consume(shard, timestamp)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment