Skip to content

Instantly share code, notes, and snippets.

@n-shinya
Created August 28, 2014 10:03
Show Gist options
  • Save n-shinya/abf20a3485765d7c6111 to your computer and use it in GitHub Desktop.
Save n-shinya/abf20a3485765d7c6111 to your computer and use it in GitHub Desktop.
require 'aws-sdk'
require 'parallel'
print "Enter target stream name:"
stream_name = gets.chomp
abort if stream_name.empty?
begin
client = AWS::Kinesis.new(
access_key_id: ENV['AWS_ACCESS_KEY_ID'],
secret_access_key: ENV['AWS_SECRET_ACCESS_KEY']).client
shards = client.describe_stream(stream_name: stream_name).stream_description.shards
p shards
shard_ids = shards.map(&:shard_id)
Parallel.each(shard_ids, in_threads: shard_ids.count) do | shard_id |
shard_iterator_info = client.get_shard_iterator(
stream_name: stream_name,
shard_id: shard_id,
shard_iterator_type: 'TRIM_HORIZON'
)
shard_iterator = shard_iterator_info.shard_iterator
loop do
records_info = client.get_records(
shard_iterator: shard_iterator,
limit: 100
)
records_info.records.each do |record|
puts "Data : #{record.data}, Partition Key : #{record.partition_key}, Shard Id : #{shard_id}, Sequence Number : #{record.sequence_number}"
end
shard_iterator = records_info.next_shard_iterator
sleep(1)
end
end
rescue => e
puts "Error: #{e.message}"
abort
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment