Last active
September 28, 2015 14:52
-
-
Save trekr5/b6433559e8f892747a8b to your computer and use it in GitHub Desktop.
s3 plugin
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def aws_s3_config | |
@endpoint_region == 'us-east-1' ? @endpoint_region = 's3.amazonaws.com' : @endpoint_region = 's3-'+@endpoint_region+'.amazonaws.com' | |
@logger.info("Registering s3 output", :bucket => @bucket, :endpoint_region => @endpoint_region) | |
AWS.config( | |
:access_key_id => @access_key_id, | |
:secret_access_key => @secret_access_key, | |
:s3_endpoint => @endpoint_region | |
) | |
@s3 = AWS::S3.new | |
end | |
def time_alert(interval) | |
Thread.new do | |
loop do | |
start_time = Time.now | |
yield | |
elapsed = Time.now - start_time | |
sleep([interval - elapsed, 0].max) | |
end | |
end | |
end | |
# this method is used for write files on bucket. It accept the file and the name of file. | |
def write_on_bucket (file_data, file_basename, temp_directory) | |
# if you lose connection with s3, bad control implementation. | |
if ( @s3 == nil) | |
aws_s3_config | |
end | |
# find and use the bucket | |
bucket = @s3.buckets[@bucket] | |
@logger.debug "S3: ready to write "+file_basename+" in bucket "+@bucket+", Fire in the hole!" | |
# prepare for write the file | |
#key = "#{pass_day}/#{file_basename}" can be used to create folders within an s3 bucket | |
object = bucket.objects[file_basename] | |
object.write(:file => file_data, :acl => @canned_acl) | |
@logger.debug "S3: has written successfully "+file_basename+" in bucket "+@bucket + " with canned ACL \"" + @canned_acl + "\"" | |
if File.exists?("#{temp_directory}#{file_basename}") | |
@logger.debug "File exists #{temp_directory}#{file_basename}" | |
else | |
@logger.warn "File #{temp_directory}#{file_basename} does not exist" | |
#exit | |
end | |
end | |
# this method is used for create new path for name the file | |
def getFinalPath | |
@pass_time = Time.now | |
return @temp_directory+"ls.s3."+Socket.gethostname+"."+(@pass_time).strftime("%Y-%m-%dT%H.%M") | |
end | |
# This method is used for restore the previous crash of logstash or to prepare the files to send in bucket. | |
# Take two parameter: flag and name. Flag indicate if you want to restore or not, name is the name of file | |
def upFile(flag, name) | |
# Dir[@temp_directory+name].each do |file| | |
Dir[@temp_directory+name].each do |file| | |
name_file = File.basename(file) | |
@logger.debug "name of file in temporary directory: #{name_file}" | |
#name_file1 = name_file.gsub!(".txt", ".json") | |
name_file1 = name_file.gsub!(/\.txt/, ".json") | |
if (flag == true) | |
@logger.warn "S3: have found temporary file: "+name_file1+", something has crashed before... Prepare for upload in bucket!" | |
end | |
if (!File.zero?(file)) | |
# @pass_day = Time.now.strftime("%Y-%m-%d") | |
# @temp_directory = next_day_check(@pass_day) | |
write_on_bucket(file, name_file1, @temp_directory) | |
if (flag == true) | |
@logger.debug "S3: file: "+name_file1+" restored on bucket "+@bucket | |
else | |
@logger.debug "S3: file: "+name_file1+" was put on bucket "+@bucket | |
end | |
end | |
File.delete(file) | |
#File.delete(name_file1) | |
end | |
end | |
# This method is used for create new empty temporary files for use. Flag is needed for indicate new subsection time_file. | |
#binding.pry | |
def newFile (flag) | |
if (flag == true) | |
@current_final_path = getFinalPath | |
#@extension = ".json" | |
@sizeCounter = 0 | |
end | |
if (@tags.size != 0) | |
@tempFile = File.new(@current_final_path+".tag_"+@tag_path+"part"[email protected]_s+".txt", "w") | |
else | |
#@tempFile = File.new(@current_final_path+".part"[email protected]_s+@extension, "w") | |
@tempFile = File.new(@current_final_path+".part"[email protected]_s+".txt", "w") | |
end | |
end | |
public | |
def register | |
require 'aws-sdk' | |
@pass_day = Time.now.strftime("%Y-%m-%d") | |
@logger.debug "today is #{@pass_day}..." | |
current_temp_dir = "/opt/logstash" | |
Dir.mkdir("#{current_temp_dir}/s3_temp") unless File.exists?("#{current_temp_dir}/s3_temp") | |
@logger.debug "current temp dir is #{current_temp_dir}...." | |
@temp_directory = File.join("#{current_temp_dir}", "s3_temp/#{@pass_day}/") | |
# @today_directory = File.join(@temp_directory, "#{@current_final_path}/") | |
if (@tags.size != 0) | |
@tag_path = "" | |
for i in ([email protected]) | |
@tag_path += @tags[i].to_s+"." | |
end | |
end | |
if !(File.directory? @temp_directory) | |
@logger.debug "S3: Temp Directory "+@temp_directory+" doesn't exist, let's make it!" | |
Dir.mkdir(@temp_directory) | |
@logger.debug "today's directory is #{@temp_directory}..." | |
else | |
@logger.debug "S3: Temp Directory "+@temp_directory+" exist, nothing to do" | |
# @logger.debug "S3: Current Directory "+@today_directory+" exist, nothing to do" | |
end | |
if (@restore == true ) | |
@logger.debug "S3: is attempting to verify previous crashes..." | |
upFile(true, "*.txt") | |
#upFile(true, "*.json") | |
end | |
newFile(true) | |
if (time_file != 0) | |
first_time = true | |
@thread = time_alert(@time_file*60) do | |
if (first_time == false) | |
@logger.debug "S3: time_file triggered, let's bucket the file if dosen't empty and create new file " | |
upFile(false, File.basename(@tempFile)) | |
newFile(true) | |
else | |
first_time = false | |
end | |
end | |
end | |
end | |
public | |
def receive(event) | |
return unless output?(event) | |
# Prepare format of Events | |
if (@format == "plain") | |
message = self.class.format_message(event) | |
elsif (@format == "json") | |
message = event.to_json | |
else | |
message = event.to_s | |
end | |
if(time_file !=0) | |
@logger.debug "S3: trigger files after "+((@pass_time+60*time_file)-Time.now).to_s | |
end | |
# if specific the size | |
if(size_file !=0) | |
if (@tempFile.size <= @size_file ) | |
@logger.debug "S3: File have size: "[email protected]_s+" and size_file is: "+ @size_file.to_s | |
@logger.debug "S3: put event into: "+File.basename(@tempFile) | |
# Put the event in the file, now! | |
File.open(@tempFile, 'a') do |file| | |
file.puts message | |
#file.write "\n" #add new lines in event file | |
end | |
else | |
@logger.debug "S3: file: "+File.basename(@tempFile)+" is too large, let's bucket it and create new file" | |
upFile(false, File.basename(@tempFile)) | |
@sizeCounter += 1 | |
newFile(false) | |
end | |
# else we put all in one file | |
else | |
@logger.debug "S3: put event into "+File.basename(@tempFile) | |
File.open(@tempFile, 'a') do |file| | |
file.puts message | |
# file.write "\n" | |
end | |
end | |
end | |
def self.format_message(event) | |
message = "Date: #{event["@timestamp"]}\n" | |
message << "Source: #{event["source"]}\n" | |
message << "Tags: #{event["tags"].join(', ')}\n" | |
message << "Fields: #{event.to_hash.inspect}\n" | |
message << "Message: #{event["message"]}" | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment