Skip to content

Instantly share code, notes, and snippets.

@yuta-imai
Created December 12, 2014 07:26
Show Gist options
  • Save yuta-imai/3b59d7407536ee35caa5 to your computer and use it in GitHub Desktop.
Save yuta-imai/3b59d7407536ee35caa5 to your computer and use it in GitHub Desktop.
Tailing utility for Amazon Kinesis
#!/usr/bin/env python
import signal
import sys
import time
from threading import Thread, Event
import boto.kinesis
stream_name = sys.argv[1]
if len(sys.argv) == 3:
region = sys.argv[2]
else:
region = 'ap-northeast-1'
kinesis = boto.kinesis.connect_to_region(region)
stream = kinesis.describe_stream(stream_name)
shards = stream['StreamDescription']['Shards']
def worker(ShardId,event):
iterator = kinesis.get_shard_iterator(stream_name,ShardId,'LATEST')
iterator_str = iterator['ShardIterator']
while (not event.is_set()):
result = kinesis.get_records(iterator_str,limit=100,b64_decode=True)
for record in result['Records']:
print '%s, %s, %s' % (ShardId, record['PartitionKey'], record['Data'])
iterator_str = result['NextShardIterator']
time.sleep(1)
def signal_handler(signal, frame):
print('You pressed Ctrl+C!')
for event in events:
event.set()
sys.exit(0)
events = []
for shard in shards:
event = Event()
events.append(event)
t = Thread(target=worker,args=(shard['ShardId'],event))
t.setDaemon(True)
t.start()
signal.signal(signal.SIGINT, signal_handler)
signal.pause()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment