Created
August 17, 2012 08:58
-
-
Save anonymous/3377182 to your computer and use it in GitHub Desktop.
Rails logstash logger first experiment
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Do not use this code in any production systems, really. | |
# | |
# Right now that that is out of the way, this is a hacked together | |
# 'see if it can be done' Rails logger for Logstash. | |
# | |
# It is a first look to see what would need to be done to replace | |
# standard rails logging with a nice logstash based solution. | |
# | |
# Current notes: | |
# | |
# This is backwards compatible with the default rails logger I think | |
# It uses a thread to send data to logstash using TCP. | |
# | |
# All the LogSubsrcibers in rails are activated automatically, we need to | |
# unregister them currently as per the discussion in this URL | |
# http://stackoverflow.com/questions/6377190/modify-log-format-and-content-from-default-actioncontroller-logsubscriber-in-rai | |
# | |
# Removing all the basic log subscribers will stop logstash getting polluted with string based logs. | |
# We would then need to re-implement all the rails LogSubscribers to create structured data for this logger | |
# | |
# This currently only logs to the local file and to logstash using TCP. Integrate with cabin? | |
# | |
# There is a very basic support for tagged logging in Rails 3.2. | |
# Basically the code was stolen from the rails Tagged Logger. It will put all the tags in a single string | |
# into the @tags field. Not ideal but this is a first look | |
# | |
# Drop this in your config/initializers file and modify line 494 with your logstash server info. You will need a TCP | |
# input on your logstash server | |
# | |
# As a test I have added a subscriber to the process_action.action_controller ActiveSupport::Notification so that at | |
# least one nicely formatted log gets sent to logstash | |
module LogStash | |
module Inputs; end | |
module Outputs; end | |
module Filters; end | |
module Search; end | |
module Config; end | |
module File; end | |
module Web; end | |
module Util; end | |
SHUTDOWN = :shutdown | |
end # module LogStash | |
require "date" # for DateTime | |
# Provide our own Time wrapper for ISO8601 support | |
# Example: | |
# >> LogStash::Time.now.to_iso8601 | |
# => "2010-10-17 00:25:24.619014-0700" | |
# | |
# >> LogStash::Time.now.utc.to_iso8601 | |
# => "2010-10-17 07:25:26.788704Z" | |
class LogStash::Time < ::Time | |
ISO8601 = "%Y-%m-%dT%H:%M:%S" | |
# Return a string that is this time in ISO8601 format. | |
def to_iso8601 | |
tz = self.utc? ? "Z" : self.strftime("%z") | |
# zero-pad tv_usec so the time string is sortable. | |
return "%s.%06d%s" % [self.strftime(ISO8601), self.tv_usec, tz] | |
end | |
def self.to_iso8601(obj) | |
if obj.is_a?(DateTime) | |
tz = obj.offset == 0 ? "Z" : obj.strftime("%z") | |
# DateTime#sec_fraction is fractional seconds "of a day" | |
sec_fraction = (obj.sec_fraction.to_f * 86400 * 1000000) | |
return "%s.%06d%s" % [obj.strftime(ISO8601), sec_fraction, tz] | |
else | |
raise "Can't convert object of type #{obj.class} (#{obj}) to iso8601." | |
end | |
end | |
end # class LogStash::Time | |
require "json" | |
require "uri" | |
require "time" | |
# General event type. | |
# Basically a light wrapper on top of a hash. | |
# | |
# TODO(sissel): properly handle lazy properties like parsed time formats, urls, | |
# etc, as necessary. | |
class LogStash::Event | |
public | |
def initialize(data=Hash.new) | |
if RUBY_ENGINE == "jruby" | |
@@date_parser ||= Java::org.joda.time.format.ISODateTimeFormat.dateTimeParser.withOffsetParsed | |
else | |
# TODO(sissel): LOGSTASH-217 | |
@@date_parser ||= nil | |
end | |
@cancelled = false | |
@data = { | |
"@source" => "unknown", | |
"@type" => nil, | |
"@tags" => [], | |
"@fields" => {}, | |
}.merge(data) | |
if [email protected]?("@timestamp") | |
@data["@timestamp"] = LogStash::Time.now.utc.to_iso8601 | |
end | |
end # def initialize | |
public | |
def self.from_json(json) | |
return LogStash::Event.new(JSON.parse(json)) | |
end # def self.from_json | |
public | |
def cancel | |
@cancelled = true | |
end # def cancel | |
public | |
def cancelled? | |
return @cancelled | |
end # def cancelled? | |
# Create a deep-ish copy of this event. | |
public | |
def clone | |
newdata = @data.clone | |
newdata["@fields"] = {} | |
fields.each do |k,v| | |
newdata["@fields"][k] = v.clone | |
end | |
return LogStash::Event.new(newdata) | |
end # def clone | |
public | |
def to_s | |
return self.sprintf("%{@timestamp} %{@source}: %{@message}") | |
end # def to_s | |
public | |
def timestamp; @data["@timestamp"]; end # def timestamp | |
def timestamp=(val); @data["@timestamp"] = val; end # def timestamp= | |
public | |
def unix_timestamp | |
if RUBY_ENGINE != "jruby" | |
# This is really slow. See LOGSTASH-217 | |
return Time.parse(timestamp).to_f | |
else | |
time = @@date_parser.parseDateTime(timestamp) | |
return time.getMillis.to_f / 1000 | |
end | |
end | |
public | |
def source; @data["@source"]; end # def source | |
def source=(val) | |
uri = URI.parse(val) rescue nil | |
val = uri if uri | |
if val.is_a?(URI) | |
@data["@source"] = val.to_s | |
@data["@source_host"] = val.host | |
@data["@source_path"] = val.path | |
else | |
@data["@source"] = val | |
@data["@source_host"] = val | |
end | |
end # def source= | |
public | |
def message; @data["@message"]; end # def message | |
def message=(val); @data["@message"] = val; end # def message= | |
public | |
def type; @data["@type"]; end # def type | |
def type=(val); @data["@type"] = val; end # def type= | |
public | |
def tags; @data["@tags"]; end # def tags | |
def tags=(val); @data["@tags"] = val; end # def tags= | |
# field-related access | |
public | |
def [](key) | |
# If the key isn't in fields and it starts with an "@" sign, get it out of data instead of fields | |
if ! @data["@fields"].has_key?(key) and key.slice(0,1) == "@" | |
return @data[key] | |
# Exists in @fields (returns value) or doesn't start with "@" (return null) | |
else | |
return @data["@fields"][key] | |
end | |
end # def [] | |
public | |
def []=(key, value) | |
if @data.has_key?(key) | |
@data[key] = value | |
else | |
@data["@fields"][key] = value | |
end | |
end # def []= | |
def fields; return @data["@fields"] end # def fields | |
public | |
def to_json(*args); return @data.to_json(*args) end # def to_json | |
def to_hash; return @data end # def to_hash | |
public | |
def overwrite(event) | |
@data = event.to_hash | |
end | |
public | |
def include?(key) | |
return (@data.include?(key) or @data["@fields"].include?(key)) | |
end # def include? | |
# Append an event to this one. | |
public | |
def append(event) | |
self.message += "\n" + event.message | |
self.tags |= event.tags | |
# Append all fields | |
event.fields.each do |name, value| | |
if self.fields.include?(name) | |
if !self.fields[name].is_a?(Array) | |
self.fields[name] = [self.fields[name]] | |
end | |
if value.is_a?(Array) | |
self.fields[name] |= value | |
else | |
self.fields[name] << value | |
end | |
else | |
self.fields[name] = value | |
end | |
end # event.fields.each | |
end # def append | |
# Remove a field | |
public | |
def remove(field) | |
if @data.has_key?(field) | |
@data.delete(field) | |
else | |
@data["@fields"].delete(field) | |
end | |
end # def remove | |
# sprintf. This could use a better method name. | |
# The idea is to take an event and convert it to a string based on | |
# any format values, delimited by %{foo} where 'foo' is a field or | |
# metadata member. | |
# | |
# For example, if the event has @type == "foo" and @source == "bar" | |
# then this string: | |
# "type is %{@type} and source is %{@source}" | |
# will return | |
# "type is foo and source is bar" | |
# | |
# If a %{name} value is an array, then we will join by ',' | |
# If a %{name} value does not exist, then no substitution occurs. | |
# | |
# TODO(sissel): It is not clear what the value of a field that | |
# is an array (or hash?) should be. Join by comma? Something else? | |
public | |
def sprintf(format) | |
if format.index("%").nil? | |
return format | |
end | |
return format.gsub(/%\{[^}]+\}/) do |tok| | |
# Take the inside of the %{ ... } | |
key = tok[2 ... -1] | |
if key == "+%s" | |
# Got %{+%s}, support for unix epoch time | |
if RUBY_ENGINE != "jruby" | |
# TODO(sissel): LOGSTASH-217 | |
raise Exception.new("LogStash::Event#sprintf('+%s') is not " \ | |
"supported yet in this version of ruby") | |
end | |
datetime = @@date_parser.parseDateTime(self.timestamp) | |
(datetime.getMillis / 1000).to_i | |
elsif key[0,1] == "+" | |
# We got a %{+TIMEFORMAT} so use joda to format it. | |
if RUBY_ENGINE != "jruby" | |
# TODO(sissel): LOGSTASH-217 | |
raise Exception.new("LogStash::Event#sprintf('+dateformat') is not " \ | |
"supported yet in this version of ruby") | |
end | |
datetime = @@date_parser.parseDateTime(self.timestamp) | |
format = key[1 .. -1] | |
datetime.toString(format) # return requested time format | |
else | |
# Use an event field. | |
value = nil | |
obj = self | |
# If the top-level value exists, use that and don't try | |
# to "look" into data structures. | |
if self[key] | |
value = self[key] | |
else | |
# "." is what ES uses to access structured data, so adopt that | |
# idea here, too. "foo.bar" will access key "bar" under hash "foo". | |
key.split('.').each do |segment| | |
if obj | |
value = obj[segment] rescue nil | |
obj = obj[segment] rescue nil | |
else | |
value = nil | |
break | |
end | |
end # key.split.each | |
end # if self[key] | |
case value | |
when nil | |
tok # leave the %{foo} if this field does not exist in this event. | |
when Array | |
value.join(",") # Join by ',' if value is an array | |
when Hash | |
value.to_json # Convert hashes to json | |
else | |
value # otherwise return the value | |
end | |
end | |
end | |
end # def sprintf | |
public | |
def ==(other) | |
#puts "#{self.class.name}#==(#{other.inspect})" | |
if !other.is_a?(self.class) | |
return false | |
end | |
return other.to_hash == self.to_hash | |
end # def == | |
end # class LogStash::Event | |
require "thread" | |
require "socket" | |
require "securerandom" | |
module Jurassic | |
module Squirrel | |
end | |
end | |
class Jurassic::Squirrel::Logger | |
def initialize(logstash_host, logstash_port, progname) | |
@host = logstash_host | |
@port = logstash_port | |
@progname = progname | |
@events = Queue.new | |
@shutdown = false | |
@workers = [] | |
@tags = Hash.new { |h,k| h[k] = [] } | |
@rails_logger = ActiveSupport::BufferedLogger.new(Rails.root.join('log',"#{Rails.env}.log")) | |
start_message_sender | |
end | |
def tagged(*new_tags) | |
tags = current_tags | |
new_tags = Array.wrap(new_tags).flatten.reject(&:blank?) | |
tags.concat new_tags | |
yield | |
ensure | |
new_tags.size.times { tags.pop } | |
end | |
def silence(temporary_level = Logger::ERROR, &block) | |
@logger.silence(temporary_level, &block) | |
end | |
deprecate :silence | |
def workers | |
@workers | |
end | |
def queue | |
@events | |
end | |
def shutdown! | |
@shutdown = true | |
@workers.each do |w| | |
w.join | |
end | |
end | |
def start_message_sender | |
@workers << Thread.new do | |
until @shutdown == true && @events.size == 0 | |
if @events.size > 0 | |
logstash_socket = TCPSocket.new(@host, @port ) | |
@events.size.times do | |
begin | |
event = @events.shift | |
logstash_socket.puts(event.to_json + "\n") | |
rescue => e | |
logstash_socket.close | |
@events << event | |
raise e | |
end | |
end | |
logstash_socket.close | |
end | |
sleep 0.1 | |
end | |
end | |
end | |
def fatal(message = nil, progname = nil, &block) | |
add("FATAL", message, progname, &block ) | |
@rails_logger.fatal(message, progname, &block ) | |
end | |
def error(message = nil, progname = nil, &block) | |
add("ERROR", message, progname, &block ) | |
@rails_logger.error(message, progname, &block ) | |
end | |
def warn(message = nil, progname = nil, &block) | |
add("WARN", message, progname, &block ) | |
@rails_logger.error(message, progname, &block ) | |
end | |
def info(message = nil, progname = nil, &block) | |
add("INFO", message, progname, &block ) | |
@rails_logger.info(message, progname, &block ) | |
end | |
def debug( message = nil, progname = nil, &block) | |
add("DEBUG", message, progname, &block ) | |
@rails_logger.debug(message, progname, &block ) | |
end | |
def level | |
0 | |
end | |
def add(severity, message = nil, progname = nil, &block) | |
mandatory_fields = { :priority => severity} | |
data = {} | |
real_message = message | |
if message.class == Hash | |
real_message = message.delete("@message") | |
data["@fields"] = message[:fields] ? mandatory_fields.merge(message[:fields]) : mandatory_fields | |
data["@tags"] = message[:tags] if message[:tags] | |
end | |
data["@type"] = @progname | |
data["@source"] = Socket.gethostname | |
data["@message"] = real_message | |
data["@tags"] = tags_text | |
e = LogStash::Event.new(data) | |
@events << e | |
end | |
protected | |
def tags_text | |
tags = current_tags | |
if tags.any? | |
tags.collect { |tag| "[#{tag}]" }.join(" ") + " " | |
end | |
end | |
def current_tags | |
@tags[Thread.current] | |
end | |
at_exit do | |
puts "Shutting down Logstasher" | |
shutdown! | |
end | |
end | |
Rails.logger = Jurassic::Squirrel::Logger.new("XXX.XXX.XXX", 999999, "Test Application") | |
ActiveSupport::Notifications.subscribe "process_action.action_controller" do |name, start, finish, id, payload| | |
url = payload.delete(:params) | |
data = payload || {} | |
data[:page_duration] = (finish - start) * 1000 | |
data[:controller] = url[:controller] if url[:controller] | |
data[:action] = url[:action] if url[:action] | |
data[:params] = url[:params].to_s if url[:params] | |
Rails.logger.info({:fields => data, "@message" => name }) | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment