Created
March 14, 2014 07:59
-
-
Save kiyoto/9543713 to your computer and use it in GitHub Desktop.
Heroku HTTP logdrain input plugin for Fluentd
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module Fluent | |
class HerokuHttpInput < Input | |
Plugin.register_input('heroku_http', self) | |
include DetachMultiProcessMixin | |
require 'http/parser' | |
def initialize | |
require 'webrick/httputils' | |
super | |
end | |
config_param :port, :integer, :default => 9880 | |
config_param :bind, :string, :default => '0.0.0.0' | |
config_param :body_size_limit, :size, :default => 32*1024*1024 # TODO default | |
config_param :keepalive_timeout, :time, :default => 10 # TODO default | |
config_param :backlog, :integer, :default => nil | |
# c.f. https://github.com/heroku/logplex/blob/master/doc/README.http_drains.md | |
HEROKU_LOGPLEX_REGEXP = /^\d+ \<(?<pri>[0-9]+)\>(?<pri_version>[1-9][0-9]{0,2}) (?<time>[^ ]+) (?<host>[^ ]*) (?<message>.*)$/ | |
def configure(conf) | |
super | |
# TODO: can it have microseconds? | |
@parser = TextParser::RegexpParser.new(HEROKU_LOGPLEX_REGEXP, "time_format" => "%Y-%m-%dT%H:%M:%S%z") | |
end | |
class KeepaliveManager < Coolio::TimerWatcher | |
class TimerValue | |
def initialize | |
@value = 0 | |
end | |
attr_accessor :value | |
end | |
def initialize(timeout) | |
super(1, true) | |
@cons = {} | |
@timeout = timeout.to_i | |
end | |
def add(sock) | |
@cons[sock] = sock | |
end | |
def delete(sock) | |
@cons.delete(sock) | |
end | |
def on_timer | |
@cons.each_pair {|sock,val| | |
if sock.step_idle > @timeout | |
sock.close | |
end | |
} | |
end | |
end | |
def start | |
log.debug "listening http on #{@bind}:#{@port}" | |
lsock = TCPServer.new(@bind, @port) | |
detach_multi_process do | |
super | |
@km = KeepaliveManager.new(@keepalive_timeout) | |
@lsock = Coolio::TCPServer.new(lsock, nil, Handler, @km, method(:on_request), @body_size_limit, log) | |
@lsock.listen(@backlog) unless @backlog.nil? | |
@loop = Coolio::Loop.new | |
@loop.attach(@km) | |
@loop.attach(@lsock) | |
@thread = Thread.new(&method(:run)) | |
end | |
end | |
def shutdown | |
@loop.watchers.each {|w| w.detach } | |
@loop.stop | |
@lsock.close | |
@thread.join | |
end | |
def run | |
@loop.run | |
rescue | |
log.error "unexpected error", :error=>$!.to_s | |
log.error_backtrace | |
end | |
def on_request(path_info, params, body) | |
path = path_info[1..-1] # remove / | |
tag = path.split('/').join('.') | |
logplex_msg_count = params['HTTP_LOGPLEX_MSG_COUNT'].to_i | |
body.split("\n").each { |msg| | |
next if msg == "" | |
time, record = @parser.call(msg) | |
logplex_msg_count -= 1 | |
begin | |
Engine.emit(tag, time, record) | |
rescue | |
return ["500 Internal Server Error", {'Content-type'=>'text/plain'}, "500 Internal Server Error\n#{$!}\n"] | |
end | |
} | |
if logplex_msg_count != 0 | |
log.warn "Logplex-Msg-Count and body did not match" | |
return ["400 Bad Request", {'Content-type'=>'text/plain'}, "Logplex-Msg-Count and body did not match"] | |
end | |
return ["200 OK", {'Content-type'=>'text/plain'}, ""] | |
end | |
class Handler < Coolio::Socket | |
def initialize(io, km, callback, body_size_limit, log) | |
super(io) | |
@km = km | |
@callback = callback | |
@body_size_limit = body_size_limit | |
@content_type = "" | |
@logplex_msg_count = nil | |
@next_close = false | |
@log = log | |
@idle = 0 | |
@km.add(self) | |
@remote_port, @remote_addr = *Socket.unpack_sockaddr_in(io.getpeername) rescue nil | |
end | |
def step_idle | |
@idle += 1 | |
end | |
def on_close | |
@km.delete(self) | |
end | |
def on_connect | |
@parser = Http::Parser.new(self) | |
end | |
def on_read(data) | |
@idle = 0 | |
@parser << data | |
rescue | |
@log.warn "unexpected error", :error=>$!.to_s | |
@log.warn_backtrace | |
close | |
end | |
def on_message_begin | |
@body = '' | |
end | |
def on_headers_complete(headers) | |
expect = nil | |
size = nil | |
if @parser.http_version == [1, 1] | |
@keep_alive = true | |
else | |
@keep_alive = false | |
end | |
@env = {} | |
headers.each_pair {|k,v| | |
@env["HTTP_#{k.gsub('-','_').upcase}"] = v | |
case k | |
when /Expect/i | |
expect = v | |
when /Content-Length/i | |
size = v.to_i | |
when /Content-Type/i | |
@content_type = v | |
when /Connection/i | |
if v =~ /close/i | |
@keep_alive = false | |
elsif v =~ /Keep-alive/i | |
@keep_alive = true | |
end | |
end | |
} | |
if expect | |
if expect == '100-continue' | |
if !size || size < @body_size_limit | |
send_response_nobody("100 Continue", {}) | |
else | |
send_response_and_close("413 Request Entity Too Large", {}, "Too large") | |
end | |
else | |
send_response_and_close("417 Expectation Failed", {}, "") | |
end | |
end | |
end | |
def on_body(chunk) | |
if @body.bytesize + chunk.bytesize > @body_size_limit | |
unless closing? | |
send_response_and_close("413 Request Entity Too Large", {}, "Too large") | |
end | |
return | |
end | |
@body << chunk | |
end | |
def on_message_complete | |
return if closing? | |
@env['REMOTE_ADDR'] = @remote_addr if @remote_addr | |
params = WEBrick::HTTPUtils.parse_query(@parser.query_string) | |
if @content_type !~ /^application\/logplex/ | |
@log.warn "request with non-Logplex content type:#{@content_type}" | |
return | |
end | |
path_info = @parser.request_path | |
params.merge!(@env) | |
@env.clear | |
if not params['HTTP_LOGPLEX_MSG_COUNT'] | |
@log.warn "request without Logplex-Msg-Count" | |
return | |
end | |
code, header, body = *@callback.call(path_info, params, @body) | |
body = body.to_s | |
if @keep_alive | |
header['Connection'] = 'Keep-Alive' | |
send_response(code, header, body) | |
else | |
send_response_and_close(code, header, body) | |
end | |
end | |
def on_write_complete | |
close if @next_close | |
end | |
def send_response_and_close(code, header, body) | |
send_response(code, header, body) | |
@next_close = true | |
end | |
def closing? | |
@next_close | |
end | |
def send_response(code, header, body) | |
header['Content-length'] ||= body.bytesize | |
header['Content-type'] ||= 'text/plain' | |
data = %[HTTP/1.1 #{code}\r\n] | |
header.each_pair {|k,v| | |
data << "#{k}: #{v}\r\n" | |
} | |
data << "\r\n" | |
write data | |
write body | |
end | |
def send_response_nobody(code, header) | |
data = %[HTTP/1.1 #{code}\r\n] | |
header.each_pair {|k,v| | |
data << "#{k}: #{v}\r\n" | |
} | |
data << "\r\n" | |
write data | |
end | |
end | |
end | |
end | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment