Skip to content

Instantly share code, notes, and snippets.

@yuta-imai
Created May 12, 2015 02:49
Show Gist options
  • Save yuta-imai/406d08ab39f5363559a6 to your computer and use it in GitHub Desktop.
Save yuta-imai/406d08ab39f5363559a6 to your computer and use it in GitHub Desktop.
require 'aws-sdk'
require 'json'
stream_name = "stream_handson"
kinesis = Aws::Kinesis::Client.new(region: "ap-northeast-1")
stream = kinesis.describe_stream(stream_name: stream_name)
shard_id_array = stream[:stream_description][:shards].map{|shard|
shard[:shard_id]
}
iterators = {}
shard_id_array.each{|shard_id|
iterator = kinesis.get_shard_iterator(
stream_name: stream_name,
shard_id: shard_id,
shard_iterator_type: "LATEST"
)
iterators["#{shard_id}"] = iterator[:shard_iterator]
}
counter = {}
Thread.new do
while true
sleep 2
p counter
counter = {}
end
end
while true
iterators.each{|shard_id, shard_iterator|
records = kinesis.get_records(
shard_iterator: shard_iterator
)
records[:records].each{|record|
obj = JSON.load(record["data"])
uri = obj["uri"]
if counter["#{uri}"]
counter["#{uri}"] += 1
else
counter["#{uri}"] = 0
end
}
}
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment