Skip to content

Instantly share code, notes, and snippets.

@wshihadeh
Created January 22, 2020 19:24
Show Gist options
  • Save wshihadeh/b5184e7582d365e338b52d9c881cc50b to your computer and use it in GitHub Desktop.
Save wshihadeh/b5184e7582d365e338b52d9c881cc50b to your computer and use it in GitHub Desktop.
Extract Rails attributes
<match multiline.**>
@type record_reformer
tag parsed.${tag_suffix[2]}
renew_record false
enable_ruby true
<record>
request_start_time ${record['message'].scan(/\[(?<param>[^ ]+) \#\d+\] [^ ]+ -- : (\[[^ ]+\] )+Started/).flatten.compact[0]}
log_level ${record['message'].scan(/^(I|F|D|W|E), \[[^ ]+ \#\d+\]( )+(?<param>[^ ]+) -- :/).flatten.compact.sort.first}
request_id ${record['message'].scan(/\[[^ ]+ \#\d+\] [^ ]+ -- : \[(?<param>[^ ]+)\] (\[[^ ]+\] )*Started/).flatten.compact[0]}
request_method ${record['message'].scan(/\[[^ ]+ \#\d+\] [^ ]+ -- : (\[[^ ]+\] )+Started (?<param>[^ ]+) [^ ]+ for/).flatten.compact[0]}
request_path ${record['message'].scan(/Started [^ ]+ "(?<param>[^ ]+)" for/).flatten.compact[0]}
request_host_ip ${record['message'].scan(/Started [^ ]+ "[^ ]+" for (?<param>[^ ]+)/).flatten.compact[0]}
request_start_time_rails ${record['message'].scan(/Started [^ ]+ "[^ ]+" for [^ ]+ at (?<param>[^ ]+ [^ ]+ [^\n]+)\n/).flatten.compact[0]}
request_controller ${record['message'].scan(/Processing by (?<param>[^\#]+)\#/).flatten.compact[0]}
request_controller_method ${record['message'].scan(/Processing by [^ ]+\#(?<param>[^ ]+) as/).flatten.compact[0]}
request_format ${record['message'].scan(/Processing by [^ ]+\#[^ ]+ as (?<param>[^\n]+)\n/).flatten.compact[0]}
request_parameters ${record['message'].scan(/( Parameters: (?<param>[^\n]+))?/).flatten.compact[0]}
response_code ${record['message'].scan(/[^ ]+ -- : (\[[^ ]+\] )+Completed (?<param>\d+) /).flatten.compact[0]}
request_runtime ${record['message'].scan(/(\[[^ ]+\] )+Completed \d+ ([^ ]+ )+in (?<param>[\d\.]+)ms/).flatten.compact[0]}
request_view_runtime ${record['message'].scan(/Completed \d+ ([^ ]+ )+in [\d\.]+ms \(Views: (?<param>[\d\.]+)ms/).flatten.compact[0]}
request_end_time ${record['message'].scan(/\[(?<param>[^ ]+) \#\d+\] [^ ]+ -- : (\[[^ ]+\] )+Completed/).flatten.compact[0]}
exception ${(!record['message'].scan(/.+(\.|e)rb:\d+:in `.+'$/).flatten.compact[0].nil?).to_s}
message ${record['message'].gsub(/^$\n/, '')}
</record>
</match>
@wwwlde
Copy link

wwwlde commented Oct 18, 2020

Hello! Can you please provide full config of fluentd ? Thanks in advance!

@wshihadeh
Copy link
Author

  @type tail
  @id in_tail_container_logs
  path /var/lib/docker/containers/*/*-json.log
  pos_file /fluentd/log/containers.log.pos
  time_format "%Y-%m-%dT%H:%M:%S.%L%Z"
  keep_time_key true
  read_from_head true
  tag "docker.*"
  format json
</source>

----
<filter docker.var.lib.docker.containers.*.*.log>
  @type docker_metadata
  labels com.docker.stack.namespace:namespace,com.docker.swarm.service.name:service_name,com.docker.swarm.node.id:node_id
</filter>

<match docker.var.lib.docker.containers.*.*.log>
  @type rewrite_tag_filter
  <rule>
    key container_name
    pattern ^(.+)busylogbox(.+)$
    tag row.busylogbox.dlog.$1
  </rule>
  <rule>
    key container_name
    pattern ^(.+)errbit(.+)$
    tag single.errbit.dlog.$1
  </rule>
  <rule>
    key container_name
    pattern ^(.+)_(elastic|kibana|fluent)(.+)$
    tag ignore.dlog.$1
  </rule>
</match>

<match ignore.**>
  @type null
</match>

<match single.errbit.**>
  @type record_reformer
  tag multiline.${tag_suffix[1]}
  renew_record false
  enable_ruby true
  <record>
    request_id ${record['log'].scan(/^., \[.+\]( )+[^ ]+ -- : \[(?<request_id>[^ ]+)\]/).flatten.compact[0]}
    message ${record['log'].gsub(/^$\n/, '')}
  </record>
</match>

<filter multiline.**>
   @type concat
   key message
   multiline_start_regexp /^(E|F), \[[^ ]+ \#\d+\]( )+[^ ]+ -- :/
   continuous_line_regexp /^(?!(^., \[[^ ]+ \#\d+\]( )+[^ ]+ -- :)).+/
   flush_interval 60
   timeout_label @MULTILINE_LOGS
</filter>

<filter multiline.**>
  @type concat
  key message
  stream_identity_key request_id
  multiline_start_regexp /^I, \[.+\]  INFO -- : \[.+\] Started.+for.+at.+/
  flush_interval 60
  timeout_label @MULTILINE_LOGS
</filter>

<match **>
  @type relabel
  @label @MULTILINE_LOGS
</match>

-----
<label @MULTILINE_LOGS>
  <match row.busylogbox.**>
    @type record_reformer
    tag busylogbox.${tag_suffix[2]}
    renew_record false
    remove_keys log
    enable_ruby true
    <record>
      request_time ${record['log'].scan(/(?<param>^[^ ]+ [^ ]+)/).flatten.compact[0]}
      request_id ${record['log'].scan(/(?<param>[^ ]+) OS/).flatten.compact[0]}
      os ${record['log'].scan(/OS=(?<param>[^ ]+)/).flatten.compact[0]}
      architecture ${record['log'].scan(/AR=(?<param>[^ ]+)/).flatten.compact[0]}
      cpu ${record['log'].scan(/CPU=\((?<param>[^ ]+)\)/).flatten.compact[0]}
      usr ${record['log'].scan(/USR=(?<param>[^ ]+)/).flatten.compact[0]}
      nice ${record['log'].scan(/NICE=(?<param>[^ ]+)/).flatten.compact[0]}
      sys ${record['log'].scan(/SYS=(?<param>[^ ]+)/).flatten.compact[0]}
      iowait ${record['log'].scan(/IOWAIT=(?<param>[^ ]+)/).flatten.compact[0]}
      irq ${record['log'].scan(/IRQ=(?<param>[^ ]+)/).flatten.compact[0]}
      soft ${record['log'].scan(/SOFT=(?<param>[^ ]+)/).flatten.compact[0]}
      soft ${record['log'].scan(/STEAL=(?<param>[^ ]+)/).flatten.compact[0]}
      soft ${record['log'].scan(/GUEST=(?<param>[^ ]+)/).flatten.compact[0]}
      soft ${record['log'].scan(/IDLE=(?<param>[^ ]+)/).flatten.compact[0]}
      message ${record['log'].to_s.strip}
    </record>
  </match>

  <match multiline.**>
    @type record_reformer
    tag parsed.${tag_suffix[2]}
    renew_record false
    enable_ruby true
    <record>
      request_start_time ${record['message'].scan(/\[(?<param>[^ ]+) \#\d+\]  [^ ]+ -- : (\[[^ ]+\] )+Started/).flatten.compact[0]}
      log_level ${record['message'].scan(/^(I|F|D|W|E), \[[^ ]+ \#\d+\]( )+(?<param>[^ ]+) -- :/).flatten.compact.sort.first}
      request_id ${record['message'].scan(/\[[^ ]+ \#\d+\]  [^ ]+ -- : \[(?<param>[^ ]+)\] (\[[^ ]+\] )*Started/).flatten.compact[0]}
      request_method ${record['message'].scan(/\[[^ ]+ \#\d+\]  [^ ]+ -- : (\[[^ ]+\] )+Started (?<param>[^ ]+) [^ ]+ for/).flatten.compact[0]}
      request_path ${record['message'].scan(/Started [^ ]+ "(?<param>[^ ]+)" for/).flatten.compact[0]}
      request_host_ip ${record['message'].scan(/Started [^ ]+ "[^ ]+" for (?<param>[^ ]+)/).flatten.compact[0]}
      request_start_time_rails ${record['message'].scan(/Started [^ ]+ "[^ ]+" for [^ ]+ at (?<param>[^ ]+ [^ ]+ [^\n]+)\n/).flatten.compact[0]}
      request_controller ${record['message'].scan(/Processing by (?<param>[^\#]+)\#/).flatten.compact[0]}
      request_controller_method ${record['message'].scan(/Processing by [^ ]+\#(?<param>[^ ]+) as/).flatten.compact[0]}
      request_format ${record['message'].scan(/Processing by [^ ]+\#[^ ]+ as (?<param>[^\n]+)\n/).flatten.compact[0]}
      request_parameters ${record['message'].scan(/(  Parameters: (?<param>[^\n]+))?/).flatten.compact[0]}
      response_code ${record['message'].scan(/[^ ]+ -- : (\[[^ ]+\] )+Completed (?<param>\d+) /).flatten.compact[0]}
      request_runtime ${record['message'].scan(/(\[[^ ]+\] )+Completed \d+ ([^ ]+ )+in (?<param>[\d\.]+)ms/).flatten.compact[0]}
      request_view_runtime ${record['message'].scan(/Completed \d+ ([^ ]+ )+in [\d\.]+ms \(Views: (?<param>[\d\.]+)ms/).flatten.compact[0]}
      request_end_time ${record['message'].scan(/\[(?<param>[^ ]+) \#\d+\]  [^ ]+ -- : (\[[^ ]+\] )+Completed/).flatten.compact[0]}
      exception ${(!record['message'].scan(/.+(\.|e)rb:\d+:in `.+'$/).flatten.compact[0].nil?).to_s}
      message ${record['message'].gsub(/^$\n/, '')}
    </record>
  </match>

  <match parsed.**>
    @type record_reformer
    tag errbit.${tag_suffix[2]}
    renew_record false
    remove_keys log
    enable_ruby true
    <record>
      empty_record ${(record['log'].to_s == "\n").to_s}
    </record>
  </match>

  <filter **>
    @type grep
    <exclude>
      key message
      pattern /Timeout flush:/
    </exclude>
    <exclude>
      key empty_record
      pattern /true/
    </exclude>
  </filter>


  <filter **>
    @type typecast
    types request_start_time:string,log_level:string,request_id:string,request_method:string,request_path:string,request_host_ip:string,request_start_time2:string,request_controller:string,request_controller_method:string,request_format:string,request_parameters:string,response_code:integer,request_view_runtime:float,request_es_runtime:float,request_ar_runtime:float,request_end_time:string,request_runtime:float,mq_start_time:string,mq_queue:string,mq_id:string,mq_end_time:string,dj_start_time:string,dj_job:string,dj_id:integer,dj_queue:string,dj_end_time:string,dj_runtime:float,request_protocol:string,response_size:integer,request_referer:string,request_agent:string,request_count:integer,response_message:string,request_backend_url:string,proxy_request_method:string,proxy_request_url:string,proxy_response_code:integer,proxy_response_runtime:float,request_time:string,request_start_time_rails:string,request_upstream_connect_time:float,request_upstream_header_time:float,request_upstream_response_time:float
  </filter>

  <filter **>
    @type empty_keys
    empty_keys response_code:0,response_size:0,request_count:0,request_view_runtime:0.0,request_es_runtime:0.0,request_ar_runtime:0.0,request_runtime:0.0,proxy_response_code:0,proxy_response_runtime:0.0,dj_runtime:0.0,dj_id:0,request_upstream_connect_time:0.0,request_upstream_header_time:0.0,request_upstream_response_time:0.0
  </filter>

  <match **>
    @type relabel
    @label @LOGS_PARSED
  </match>
</label>

---
@include fluent-source.conf
@include fluent-docker.conf
@include fluent-parse.conf

<label @LOGS_PARSED>
  <match **.**>
    @type elasticsearch

    host "#{ENV['ELASTICSEARCH_HOST']}"
    port "#{ENV['ELASTICSEARCH_PORT']}"
    scheme "#{ENV['ELASTICSEARCH_SCHEME'] || 'http'}"
    type_name dlog

    include_tag_key true
    tag_key @logtag

    logstash_format true
    logstash_prefix dlog
    logstash_dateformat %Y%m%d

    reconnect_on_error true

    <buffer>
      @type file
      path /fluentd/log/dlog/elastic-buffer
      flush_thread_count 8
      flush_interval 1s
      chunk_limit_size 32M
      queue_limit_length 4
      flush_mode interval
      retry_max_interval 30
      retry_forever true
    </buffer>
  </match>
</label>```

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment