Created
October 26, 2017 17:59
-
-
Save bcavagnolo/8729683ffb85ad8dd86e5749b531624e to your computer and use it in GitHub Desktop.
A better kinesis consumer example in python?
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Most of the kinesis examples out there do not seem to elucidate the | |
# opportunities to parallelize processing of kinesis streams, nor the | |
# interactions of the service limits. You may want to start your | |
# journey by familiarizing yourself with the concepts (e.g., what is a | |
# shard?) and the service limits: | |
# http://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html | |
# The idea is to spawn a process per shard in order to maximize | |
# parallelization while respecting the service limits. If you run | |
# multiple instances of this script (or equivalent) you will exhaust | |
# the service limits. And so in this scenario you may have to futz | |
# with the constants below. Why would you do this? Maybe because you | |
# have diverse and unrelated processing steps that you want to run on | |
# the data. Or maybe you want to improve availability by processing | |
# the stream in multiple AZs. | |
# If you need to increase your read bandwith, you must split your | |
# stream into additional shards. As written, this script must be | |
# restarted in this scenario. | |
# Notably, the "KCL" does offer a high-availability story for | |
# python. But it involves dynamodb and some sort of | |
# java-wrapped-in-python thing that smelled like a terrible amount of | |
# service sprawl. | |
# Some credit for this code goes to | |
# https://www.parse.ly/help/rawdata/code/#python-code-for-kinesis-with-boto3 | |
# ...but that code fails to actually run. | |
import json | |
import time | |
import boto3 | |
from multiprocessing import Process | |
from botocore.exceptions import ClientError | |
STREAM = "my-test-stream" | |
REGION = 'us-gov-west-1' | |
# Each shard can support up to 5 transactions per second for reads, up | |
# to a maximum total data read rate of 2 MB per second. | |
MAX_REQUESTS_PER_SECOND = 5 | |
# Shards are also limited to 2MB per second. | |
MAX_BYTES_PER_SECOND = 2 * 1024 ** 2 # 2MB | |
# This is a property of your data. | |
MAX_RECORD_SIZE = 200 | |
# There does not appear to be any sort of blocking read support for | |
# kinesis streams, and no automatic way to respect the read | |
# bandwidth. So we must explicitly sleep to achieve these | |
# things. Seems like a bit of an antipattern in the context of | |
# streams. Shrug. | |
SLEEP_PERIOD = 1.0/MAX_REQUESTS_PER_SECOND | |
MAX_RECORDS_PER_SECOND = MAX_BYTES_PER_SECOND / MAX_RECORD_SIZE | |
MAX_RECORDS_PER_REQUEST = MAX_RECORDS_PER_SECOND / MAX_REQUESTS_PER_SECOND | |
print 'Planning to read {} records every {} seconds'\ | |
.format(MAX_REQUESTS_PER_SECOND, SLEEP_PERIOD) | |
kinesis = boto3.client('kinesis', region_name=REGION) | |
def get_kinesis_shards(stream): | |
"""Return list of shard iterators, one for each shard of stream.""" | |
descriptor = kinesis.describe_stream(StreamName=stream) | |
shards = descriptor['StreamDescription']['Shards'] | |
shard_ids = [shard[u"ShardId"] for shard in shards] | |
shard_iters = [kinesis.get_shard_iterator( | |
StreamName=stream, | |
ShardId=shard_id, | |
ShardIteratorType="LATEST") | |
for shard_id in shard_ids] | |
return shard_iters | |
def process_shard(shard_number, shard): | |
shard_iterator = shard[u"ShardIterator"] | |
while True: | |
try: | |
response = kinesis.get_records(ShardIterator=shard_iterator, | |
Limit=MAX_RECORDS_PER_REQUEST) | |
except ClientError as e: | |
code = e.response['Error']['Code'] | |
if code != 'ProvisionedThroughputExceededException': | |
raise | |
print 'Throughput exceeded!' | |
time.sleep(SLEEP_PERIOD) | |
continue | |
start = time.time() | |
for record in response[u"Records"]: | |
datum = json.loads(record[u"Data"]) | |
# here's where you do your processing | |
print(shard_number, json.dumps(datum)) | |
shard_iterator = response['NextShardIterator'] | |
delta = time.time() - start | |
time.sleep(SLEEP_PERIOD - delta) | |
shard_iters = get_kinesis_shards(STREAM) | |
while True: | |
processes = [] | |
for i, shard in enumerate(shard_iters): | |
p = Process(target=process_shard, args=(i, shard)) | |
p.start() | |
processes.append(p) | |
for p in processes: | |
p.join() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Just wanted to let you know that this just saved me and my team literal hours of work. We had been struggling to find an "easy" way to read from a kinesis stream so we could test a new integration and the process of repeatedly getting the next shard iterator and running
get-records
was difficult and tedious. This program made it not just possible, but easy. All the changes required were toSTREAM
andREGION
as well as a new line to select a profile (right abovekinesis = boto3.client()
):THANK YOU!