Skip to content

Instantly share code, notes, and snippets.

@frans-k
Created March 25, 2020 10:45
Show Gist options
  • Save frans-k/47ab555b25ce759a9e69a1849e359127 to your computer and use it in GitHub Desktop.
Save frans-k/47ab555b25ce759a9e69a1849e359127 to your computer and use it in GitHub Desktop.
# Resends failed records from S3 to Firehose.
require 'zlib'
BUCKET = ''
PROFILE = ''
REGION = ''
DELIVERY_STREAM_NAME = ''
ERROR_MANIFESTS_PREFIX = ''
def send_to_firehose record
client = Aws::Firehose::Client.new(
region: REGION,
profile: PROFILE
)
resp = client.put_record({
delivery_stream_name: DELIVERY_STREAM_NAME,
record: {
data: record.to_json
}
})
end
s3 = Aws::S3::Client.new(
region: REGION,
profile: PROFILE
# ...
)
objects = s3.list_objects({
bucket: BUCKET,
prefix: ERROR_MANIFESTS_PREFIX
})
keys = objects[:contents].map{|o| o[:key]}
manifests = keys.map do |k|
s3.get_object({
bucket: BUCKET,
key: k
}).body.read
end
urls = manifests.map do |m|
m = JSON.parse(m)
m['entries'].map{|e| e['url']}
end.flatten
events = urls.map do |url|
key = url.gsub("s3://#{BUCKET}/", '')
Zlib.gunzip(s3.get_object({
bucket: BUCKET,
key: key
}).body.read)
end
records = events.map do |g|
g.split(/(?<=\})(?=\{)/).map{|e| JSON.parse(e)}
end.flatten
records.each do |r|
puts send_to_firehose(r)
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment