Last active
December 23, 2015 15:38
-
-
Save suzumura-ss/cab55b1adeb15f614141 to your computer and use it in GitHub Desktop.
fuse-cloudwatchlogfs - Send to AWS-CloudWatchLog
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env ruby | |
=begin | |
* install | |
$ yum -y install fuse fuse-devel | |
$ gem install rfusefs | |
* mount | |
$ mkdir group_name | |
$ ruby cloudwatchlogfs.rb group_name | |
#=> log-group name is 'group_name' | |
or | |
$ mkdir mountpoint | |
$ ruby cloudwatchlogfs.rb mountpoint --group=group_name | |
#=> log-group name is 'group_name' | |
* write log | |
$ echo -e "user_id:00000000\tresource:/teapot" >> mountpoint/log_stream | |
or | |
in nginx.conf: | |
log_by_lua_block { | |
local fd = io.open("/opt/nginx/logs/mountpoint/log_stream", "a") | |
fd:write("user:"..ngx.var.remote_user.."\tresource:"..ngx.var.request_uri) | |
fd:close() | |
} | |
=end | |
require 'rfusefs' | |
require 'thread' | |
require 'aws-sdk' | |
def log(*msg) | |
msg.flatten.each{|m| | |
puts "#{Time.now}\t#{m}" | |
} | |
end | |
class CloudWatchLogsDir | |
@@lock = Mutex.new | |
@@group = 'example' | |
@@streams = {} | |
def self.group=(group) | |
log "log_group_name: #{group}" | |
@@group = group | |
end | |
def self.acquire_stream(stream, token) | |
stream.sub!(%r{^/}, '') | |
@@lock.synchronize { | |
@@streams[stream] = token | |
} | |
end | |
def self.fetch_stream(stream) | |
stream.sub!(%r{^/}, '') | |
@@lock.synchronize { | |
return @@streams[stream] | |
} | |
end | |
def self.streams | |
@@lock.synchronize { | |
return @@streams.keys | |
} | |
end | |
def put_log_event(stream, message) | |
timestamp = (Time.now.to_f*1000).to_i | |
token = CloudWatchLogsDir.fetch_stream(stream) | |
messages = message.split(/\n/).delete_if{|v| v.empty?}.map{|m| {timestamp:timestamp, message:m}} | |
return if messages.empty? | |
param = { | |
log_group_name:@@group, | |
log_stream_name:stream, | |
log_events:messages | |
} | |
param.merge!(sequence_token:token) if token | |
begin | |
res = @client.put_log_events(param) | |
rescue Aws::CloudWatchLogs::Errors::InvalidSequenceTokenException=>e | |
log "#{e.class} - #{e}" if token | |
token = e.to_s.scan(/\d{48,}/)[0] | |
param.merge!(sequence_token:token) | |
retry | |
rescue Aws::CloudWatchLogs::Errors::ResourceNotFoundException=>e | |
begin | |
log "describe group: #{@@group}" | |
@client.describe_log_streams(log_group_name:@@group) | |
rescue Aws::CloudWatchLogs::Errors::ResourceNotFoundException=>e | |
log "create group: #{@@group}" | |
@client.create_log_group(log_group_name:@@group) | |
end | |
log "create stream: #{@@group}:#{stream}" | |
@client.create_log_stream(log_group_name:@@group, log_stream_name:stream) | |
retry | |
end | |
CloudWatchLogsDir.acquire_stream(stream, res.next_sequence_token) | |
rescue Aws::CloudWatchLogs::Errors::DataAlreadyAcceptedException=>e | |
log e.class, e, e.backtrace[0..9] | |
raise ::Errno::EINVAL, e.message | |
rescue Aws::CloudWatchLogs::Errors::AccessDeniedException=>e | |
log e.class, e, e.backtrace[0..9] | |
raise ::Errno::EACCES, e.message | |
end | |
def initialize() | |
region = ENV['AWS_REGION']||'ap-northeast-1' | |
proxy = ENV['HTTPS_PROXY']||ENV['HTTP_PROXY'] | |
@client ||= Aws::CloudWatchLogs::Client.new(region:region, http_proxy:proxy) | |
end | |
def exist?(path) | |
CloudWatchLogsDir.fetch_stream(path) | |
end | |
def contents(path) | |
CloudWatchLogsDir.streams | |
end | |
def file?(path) | |
exist?(path) | |
end | |
def can_write?(path) | |
path!="/._rfuse_check_" | |
end | |
def write_to(path, body) | |
put_log_event(path, body) | |
end | |
end | |
# Usage: #{$0} mountpoint [mount_options] | |
group = nil | |
ARGV.each_with_index{|arg, index| | |
case arg | |
when /^--group=(.+)$/ | |
group = $1 | |
else | |
next | |
end | |
ARGV[index] = nil | |
} | |
ARGV.compact! | |
FuseFS.main(){|options| | |
group ||= File.basename(options[:mountpoint]) | |
CloudWatchLogsDir.group = group | |
CloudWatchLogsDir.new | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment