Skip to content

Instantly share code, notes, and snippets.

@pior
Last active August 29, 2015 14:06
Show Gist options
  • Save pior/dfa3c8787345d751654d to your computer and use it in GitHub Desktop.
Save pior/dfa3c8787345d751654d to your computer and use it in GitHub Desktop.
Analytics

Kinesis Keeper

Whole idea

  • deploy with deb packages
  • no daemon (rely on an external process manager)
  • log into syslog
  • take a config file in argument
  • focus on one stream only
  • consume the stream in realtime (given rate limiting)
  • aggregate in memory
  • dump in a normalized s3 object
  • store checkpoint on disk
  • notify of the new batch by SNS (with ample details, stream, location, hash)
  • push metrics to cloudwatch / statsd

Connections

  • boto connection pool for

    • kinesis
    • s3
    • cloudwatch
    • sns
  • python-statsd

Flow

main activity

  • connect to kinesis

  • loop

    • get stream description

    • wait for the stream to exists or be active

    • start / stop shard activities according to shards

    • on SIGTERM

      • signal seppuku to shard activity
      • join
      • exit
    • on SIGHUP or periodically

      • signal seppuku to shard activity
      • join
      • goto loop

shard activity

  • connect to kinesis

  • read checkpoint from disk

  • get shard iterator from checkpoint or TRIM_HORIZON

  • loop

    • get_records

    • accumulate in memory

    • if time_limit or record_limit or seppuku

      • serialize batch
      • upload batch to s3
      • store batch to local disk
      • store checkpoint
      • send notification by SNS
""" Push a record """
kinesis = boto.connect_kinesis()
kinesis.put_record(stream_name, json.dumps(record_data), "partitionkey")
# -> 25ms
# record stored for 24 hours
# 1000 records / seconds / shards
# shard / month = 7$
""" Stream the records """
kinesis = boto.connect_kinesis()
stream = kinesis.describe_stream(stream_name)
shards = stream['StreamDescription']['Shards']
shard_0 = shards[0]
next_iterator = kinesis.get_shard_iterator(stream_name,
shard['ShardId'],
"TRIM_HORIZON")
while True:
response = kinesis.get_records(next_iterator, limit=100)
records = response['Records']
process_the_records(records)
last_sequence_number = records[-1]['SequenceNumber']
checkpoint_the_sequence(last_sequence_number)
next_iterator = response['NextShardIterator']
time.sleep(sleep_interval)
# Reference http://stackoverflow.com/questions/22100206/consuming-a-kinesis-stream-in-python
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment