-
-
Save w3irdrobot/688a2dd1a7e288102178 to your computer and use it in GitHub Desktop.
from datetime import datetime, timedelta | |
import json | |
import boto | |
def get_kinesis_data_iterator(stream_name, minutes_running): | |
# Connect to Kinesis | |
kinesis = boto.connect_kinesis() | |
# Get data about Kinesis stream for Tag Monitor | |
kinesis_stream = kinesis.describe_stream(stream_name) | |
# Get the shards in that stream | |
shards = kinesis_stream['StreamDescription']['Shards'] | |
# Collect together the shard IDs | |
shard_ids = [shard['ShardId'] for shard in shards] | |
# Get shard iterator | |
iter_response = kinesis.get_shard_iterator(stream_name, shard_ids[0], "TRIM_HORIZON") | |
shard_iterator = iter_response['ShardIterator'] | |
# Calculate end time | |
end_time = datetime.now() + timedelta(minutes=minutes_running) | |
while True: | |
try: | |
# Get data | |
record_response = kinesis.get_records(shard_iterator) | |
# Only run for a certain amount of time. | |
# Stop looping if no data returned. This means it's done | |
now = datetime.now() | |
print 'Time: {0}'.format(now.strftime('%Y/%m/%d %H:%M:%S')) | |
if end_time < now or not record_response: | |
break | |
# yield data to outside calling iterator | |
for record in record_response['Records']: | |
last_sequence = record['SequenceNumber'] | |
yield json.loads(record['Data']) | |
# Get next iterator for shard from previous request | |
shard_iterator = record_response['NextShardIterator'] | |
# Catch exception meaning hitting API too much | |
except boto.kinesis.exceptions.ProvisionedThroughputExceededException: | |
print 'ProvisionedThroughputExceededException found. Sleeping for 0.5 seconds...' | |
time.sleep(0.5) | |
# Catch exception meaning iterator has expired | |
except boto.kinesis.exceptions.ExpiredIteratorException: | |
iter_response = kinesis.get_shard_iterator(stream_name, shard_ids[0], "AFTER_SEQUENCE_NUMBER", last_sequence) | |
shard_iterator = iter_response['ShardIterator'] | |
kinesis.close() |
import kinesis | |
STREAM_NAME = 'awesome_data_stream' | |
MINUTES_RUNNING = 60 | |
# Get Kinesis generator | |
kinesis_data = kinesis.get_kinesis_data_iterator(STREAM_NAME, MINUTES_RUNNING) | |
# Iterate over records | |
for data in kinesis_data: | |
# Do something crazy with your data | |
pass |
it will be great if you can share the code for boto3 as well with kinesis
@PRPSDeloitte
from datetime import datetime, timedelta
import json
import boto3
from botocore.exceptions import ClientError
def get_kinesis_data_iterator(stream_name, minutes_running):
# Connect to Kinesis
kinesis = boto3.client('kinesis')
# Get data about Kinesis stream for Tag Monitor
kinesis_stream = kinesis.describe_stream(StreamName=stream_name)
# Get the shards in that stream
shards = kinesis_stream['StreamDescription']['Shards']
# Collect together the shard IDs
shard_ids = [shard['ShardId'] for shard in shards]
# Get shard iterator
iter_response = kinesis.get_shard_iterator(StreamName=stream_name, ShardId=shard_ids[0], ShardIteratorType="TRIM_HORIZON")
shard_iterator = iter_response['ShardIterator']
# Calculate end time
end_time = datetime.now() + timedelta(minutes=minutes_running)
while True:
try:
# Get data
record_response = kinesis.get_records(ShardIterator=shard_iterator)
# Only run for a certain amount of time.
# Stop looping if no data returned. This means it's done
now = datetime.now()
print('Time: {0}'.format(now.strftime('%Y/%m/%d %H:%M:%S')))
if end_time < now or not record_response:
break
# yield data to outside calling iterator
for record in record_response['Records']:
last_sequence = record['SequenceNumber']
yield json.loads(record['Data'])
# Get next iterator for shard from previous request
shard_iterator = record_response['NextShardIterator']
except ClientError as e:
logging.error(e)
STREAM_NAME = 'awesome_data_stream'
MINUTES_RUNNING = 1
Get Kinesis generator
kinesis_data = get_kinesis_data_iterator(STREAM_NAME, MINUTES_RUNNING)
Iterate over records
for data in kinesis_data:
# Do something crazy with your data
print(data)
pass
After getting the shards, how would it be possible to extract frame level information?
it will be great if you can share the code for boto3 as well with kinesis