Skip to content

Instantly share code, notes, and snippets.

@suzumura-ss
Last active December 23, 2015 15:38
Show Gist options
  • Save suzumura-ss/cab55b1adeb15f614141 to your computer and use it in GitHub Desktop.
Save suzumura-ss/cab55b1adeb15f614141 to your computer and use it in GitHub Desktop.
fuse-cloudwatchlogfs - Send to AWS-CloudWatchLog
#!/usr/bin/env ruby
=begin
* install
$ yum -y install fuse fuse-devel
$ gem install rfusefs
* mount
$ mkdir group_name
$ ruby cloudwatchlogfs.rb group_name
#=> log-group name is 'group_name'
or
$ mkdir mountpoint
$ ruby cloudwatchlogfs.rb mountpoint --group=group_name
#=> log-group name is 'group_name'
* write log
$ echo -e "user_id:00000000\tresource:/teapot" >> mountpoint/log_stream
or
in nginx.conf:
log_by_lua_block {
local fd = io.open("/opt/nginx/logs/mountpoint/log_stream", "a")
fd:write("user:"..ngx.var.remote_user.."\tresource:"..ngx.var.request_uri)
fd:close()
}
=end
require 'rfusefs'
require 'thread'
require 'aws-sdk'
def log(*msg)
msg.flatten.each{|m|
puts "#{Time.now}\t#{m}"
}
end
class CloudWatchLogsDir
@@lock = Mutex.new
@@group = 'example'
@@streams = {}
def self.group=(group)
log "log_group_name: #{group}"
@@group = group
end
def self.acquire_stream(stream, token)
stream.sub!(%r{^/}, '')
@@lock.synchronize {
@@streams[stream] = token
}
end
def self.fetch_stream(stream)
stream.sub!(%r{^/}, '')
@@lock.synchronize {
return @@streams[stream]
}
end
def self.streams
@@lock.synchronize {
return @@streams.keys
}
end
def put_log_event(stream, message)
timestamp = (Time.now.to_f*1000).to_i
token = CloudWatchLogsDir.fetch_stream(stream)
messages = message.split(/\n/).delete_if{|v| v.empty?}.map{|m| {timestamp:timestamp, message:m}}
return if messages.empty?
param = {
log_group_name:@@group,
log_stream_name:stream,
log_events:messages
}
param.merge!(sequence_token:token) if token
begin
res = @client.put_log_events(param)
rescue Aws::CloudWatchLogs::Errors::InvalidSequenceTokenException=>e
log "#{e.class} - #{e}" if token
token = e.to_s.scan(/\d{48,}/)[0]
param.merge!(sequence_token:token)
retry
rescue Aws::CloudWatchLogs::Errors::ResourceNotFoundException=>e
begin
log "describe group: #{@@group}"
@client.describe_log_streams(log_group_name:@@group)
rescue Aws::CloudWatchLogs::Errors::ResourceNotFoundException=>e
log "create group: #{@@group}"
@client.create_log_group(log_group_name:@@group)
end
log "create stream: #{@@group}:#{stream}"
@client.create_log_stream(log_group_name:@@group, log_stream_name:stream)
retry
end
CloudWatchLogsDir.acquire_stream(stream, res.next_sequence_token)
rescue Aws::CloudWatchLogs::Errors::DataAlreadyAcceptedException=>e
log e.class, e, e.backtrace[0..9]
raise ::Errno::EINVAL, e.message
rescue Aws::CloudWatchLogs::Errors::AccessDeniedException=>e
log e.class, e, e.backtrace[0..9]
raise ::Errno::EACCES, e.message
end
def initialize()
region = ENV['AWS_REGION']||'ap-northeast-1'
proxy = ENV['HTTPS_PROXY']||ENV['HTTP_PROXY']
@client ||= Aws::CloudWatchLogs::Client.new(region:region, http_proxy:proxy)
end
def exist?(path)
CloudWatchLogsDir.fetch_stream(path)
end
def contents(path)
CloudWatchLogsDir.streams
end
def file?(path)
exist?(path)
end
def can_write?(path)
path!="/._rfuse_check_"
end
def write_to(path, body)
put_log_event(path, body)
end
end
# Usage: #{$0} mountpoint [mount_options]
group = nil
ARGV.each_with_index{|arg, index|
case arg
when /^--group=(.+)$/
group = $1
else
next
end
ARGV[index] = nil
}
ARGV.compact!
FuseFS.main(){|options|
group ||= File.basename(options[:mountpoint])
CloudWatchLogsDir.group = group
CloudWatchLogsDir.new
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment