Last active
August 31, 2017 13:27
-
-
Save fred/97c26c34366b24b09e2e to your computer and use it in GitHub Desktop.
Script to get SQS messages, download S3 file of cloudtrail and index in Elasticsearch using bulk mode
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Ruby Script to Get messages from SQS containing information of Cloudtrail json.gz file in S3 | |
# everytime a cloudtrail event occurs, AWS will upload the log in json.gz format to S3 and notify in SQS | |
# we use SQS to get new log events and download from S3, combine all in one json file for bulk importing to Elasticserach | |
# ready to be used with Kibana | |
# Run this houly or every 30 minutes. | |
require 'aws-sdk' | |
require 'json' | |
Aws.config.update({ | |
region: 'ap-southeast-1' | |
}) | |
SQS_URL = "https://sqs.ap-southeast-1.amazonaws.com/xxxxxxxxx/xxxxxxxxx" | |
SQS_ENDPOINT = "sqs.ap-southeast-1.amazonaws.com" | |
BASE="AWSLogs/xxxxxxxxxxx/CloudTrail/ap-southeast-1" | |
def gunzip(data) | |
sio = StringIO.new(data) | |
gz = Zlib::GzipReader.new(sio) | |
read_data = gz.read | |
gz.close | |
read_data | |
end | |
@s3_client = Aws::S3::Client.new( | |
region: 'ap-southeast-1', | |
access_key_id: 'xxxxxxxxxxxx', | |
secret_access_key: 'xxxxxxxxxxxxxxxxx' | |
) | |
@sqs_client = Aws::SQS::Client.new( | |
region: 'ap-southeast-1', | |
access_key_id: 'xxxxxxxxxxxxxxxxx', | |
secret_access_key: 'xxxxxxxxxxxxxxxxx' | |
) | |
resp = @sqs_client.receive_message({ | |
queue_url: SQS_URL, | |
attribute_names: ["Policy", "VisibilityTimeout", "CreatedTimestamp"], | |
message_attribute_names: ["MessageAttributeName"], | |
max_number_of_messages: 10, # The maximum number of messages to return, max: 10 | |
visibility_timeout: 60, # The duration in seconds that the received messages are hidden from subsequent retrieve requests after being retrieved by a ReceiveMessage request | |
wait_time_seconds: 1, | |
}) | |
@json = [] | |
@message_handlers = [] | |
resp.messages.each do |message| | |
message.receipt_handle | |
body = JSON.parse(message.body) | |
timestamp = body["Timestamp"] | |
msg = JSON.parse body["Message"] | |
bucket = msg["s3Bucket"] | |
key = msg["s3ObjectKey"][0] | |
file = @s3_client.get_object( | |
response_target: '/tmp/json.gz', | |
bucket: bucket, | |
key: key | |
) | |
data = gunzip(File.read "/tmp/json.gz") | |
if json_data = JSON.load(data)["Records"] | |
@json += json_data | |
@message_handlers << message.receipt_handle | |
end | |
end | |
if @json.empty? | |
puts "Nothing in SQS" | |
exit | |
end | |
@logstash_date = Time.now.strftime("%Y.%m.%d") | |
@all = File.open('all', 'w') | |
@json.each do |json| | |
date = json["eventTime"] | |
@all.write %Q{{ "index" : { "_index" : "logstash-#{@logstash_date}", "_type" : "fluentd", "_timestamp" : "#{date}" } }} | |
@all.write "\n" | |
@all.write json.to_json | |
@all.write "\n" | |
end | |
@all.close | |
puts "-------------------" | |
puts "Processing #{@json.size} requests for #{@logstash_date}" | |
`curl -s -XPOST localhost:9200/_bulk --data-binary @all; echo` | |
puts "Deleting #{@message_handlers.size} SQS messages" | |
@message_handlers.each do |handler| | |
@sqs_client.delete_message({ | |
queue_url: SQS_URL, | |
receipt_handle: handler | |
}) | |
end | |
puts "Done" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment