Skip to content

Instantly share code, notes, and snippets.

@w3irdrobot
Last active February 4, 2022 15:02
Show Gist options
  • Save w3irdrobot/688a2dd1a7e288102178 to your computer and use it in GitHub Desktop.
Save w3irdrobot/688a2dd1a7e288102178 to your computer and use it in GitHub Desktop.
Reading Data From Kinesis
from datetime import datetime, timedelta
import json
import boto
def get_kinesis_data_iterator(stream_name, minutes_running):
# Connect to Kinesis
kinesis = boto.connect_kinesis()
# Get data about Kinesis stream for Tag Monitor
kinesis_stream = kinesis.describe_stream(stream_name)
# Get the shards in that stream
shards = kinesis_stream['StreamDescription']['Shards']
# Collect together the shard IDs
shard_ids = [shard['ShardId'] for shard in shards]
# Get shard iterator
iter_response = kinesis.get_shard_iterator(stream_name, shard_ids[0], "TRIM_HORIZON")
shard_iterator = iter_response['ShardIterator']
# Calculate end time
end_time = datetime.now() + timedelta(minutes=minutes_running)
while True:
try:
# Get data
record_response = kinesis.get_records(shard_iterator)
# Only run for a certain amount of time.
# Stop looping if no data returned. This means it's done
now = datetime.now()
print 'Time: {0}'.format(now.strftime('%Y/%m/%d %H:%M:%S'))
if end_time < now or not record_response:
break
# yield data to outside calling iterator
for record in record_response['Records']:
last_sequence = record['SequenceNumber']
yield json.loads(record['Data'])
# Get next iterator for shard from previous request
shard_iterator = record_response['NextShardIterator']
# Catch exception meaning hitting API too much
except boto.kinesis.exceptions.ProvisionedThroughputExceededException:
print 'ProvisionedThroughputExceededException found. Sleeping for 0.5 seconds...'
time.sleep(0.5)
# Catch exception meaning iterator has expired
except boto.kinesis.exceptions.ExpiredIteratorException:
iter_response = kinesis.get_shard_iterator(stream_name, shard_ids[0], "AFTER_SEQUENCE_NUMBER", last_sequence)
shard_iterator = iter_response['ShardIterator']
kinesis.close()
import kinesis
STREAM_NAME = 'awesome_data_stream'
MINUTES_RUNNING = 60
# Get Kinesis generator
kinesis_data = kinesis.get_kinesis_data_iterator(STREAM_NAME, MINUTES_RUNNING)
# Iterate over records
for data in kinesis_data:
# Do something crazy with your data
pass
@PRPSDeloitte
Copy link

it will be great if you can share the code for boto3 as well with kinesis

@jensenity
Copy link

jensenity commented Dec 9, 2019

it will be great if you can share the code for boto3 as well with kinesis
@PRPSDeloitte

from datetime import datetime, timedelta
import json

import boto3
from botocore.exceptions import ClientError

def get_kinesis_data_iterator(stream_name, minutes_running):
# Connect to Kinesis
kinesis = boto3.client('kinesis')
# Get data about Kinesis stream for Tag Monitor
kinesis_stream = kinesis.describe_stream(StreamName=stream_name)
# Get the shards in that stream
shards = kinesis_stream['StreamDescription']['Shards']
# Collect together the shard IDs
shard_ids = [shard['ShardId'] for shard in shards]
# Get shard iterator
iter_response = kinesis.get_shard_iterator(StreamName=stream_name, ShardId=shard_ids[0], ShardIteratorType="TRIM_HORIZON")
shard_iterator = iter_response['ShardIterator']

# Calculate end time
end_time = datetime.now() + timedelta(minutes=minutes_running)
while True:
    try:
        # Get data
        record_response = kinesis.get_records(ShardIterator=shard_iterator)
        # Only run for a certain amount of time.
        # Stop looping if no data returned. This means it's done
        now = datetime.now()
        print('Time: {0}'.format(now.strftime('%Y/%m/%d %H:%M:%S')))
        if end_time < now or not record_response:
            break
        # yield data to outside calling iterator
        for record in record_response['Records']:
            last_sequence = record['SequenceNumber']
            yield json.loads(record['Data'])
        # Get next iterator for shard from previous request
        shard_iterator = record_response['NextShardIterator']
    except ClientError as e:
        logging.error(e)

STREAM_NAME = 'awesome_data_stream'
MINUTES_RUNNING = 1

Get Kinesis generator

kinesis_data = get_kinesis_data_iterator(STREAM_NAME, MINUTES_RUNNING)

Iterate over records

for data in kinesis_data:
# Do something crazy with your data
print(data)
pass

@ishangupta3
Copy link

After getting the shards, how would it be possible to extract frame level information?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment