Skip to content

Instantly share code, notes, and snippets.

@imweijh
Last active June 15, 2020 03:14
Show Gist options
  • Save imweijh/581d69d82ee3b1cbf3ed8692274a6a79 to your computer and use it in GitHub Desktop.
Save imweijh/581d69d82ee3b1cbf3ed8692274a6a79 to your computer and use it in GitHub Desktop.
input {
file {
path => "d:/temp/2020052109.tzt"
start_position => "beginning"
sincedb_path => "NUL"
codec => multiline {
pattern => "^%{TIME}"
negate => true
what => previous
charset => "GBK"
max_lines => 3000
}
}
}
filter {
if ([message] =~ /InterFaceReq/) { drop{} } # 删掉包含InterFaceReq的日志
mutate { gsub => ['message', "\r\n", "~"] }
mutate { gsub => ['message', "\n", "~"] }
mutate { gsub => ['message', "~+", "~"] }
mutate { gsub => ['message', "\r", ""] }
mutate { gsub => ['message', "\<GRID.\>.*\<\/GRID.\>", "GRIDDEL"] }
# mutate { gsub => ['message', "[\[\]]", ""] }
grok {
match => ["message","^%{TIME:logtime}: ThreadNo=%{INT:threadno} %{DATA:logwhat}:~%{GREEDYDATA:msg}"]
}
kv {
source => "msg"
field_split => "~"
value_split => "="
transform_key => "lowercase"
include_keys => [ "action", "handleserialno", "usercode", "username", "imei", "mobilecode", "mobilekind", "account", "fundaccount", "usertype", "ip", "gatewayip", "token", "errormessage", "connectiontype", "wtaccounttype", "wtaccount", "stockcode", "price", "volume", "direction", "khbranch", "jytype" ]
prefix => "zzt_"
remove_field => [ "msg" ]
}
if ( [zzt_action] == "401" ) { drop{} } # 删掉
if ( [zzt_action] == "355" ) { drop{} } # 删掉
if ( [zzt_action] == "35601" ) { drop{} } # 删掉
#if ( [zzt_action] == "INQUIREDEALEX" ) { drop{ percentage => 90 } } # 删掉90%
#if ( [zzt_action] == "INQUIRESTOCK" ) { drop{ percentage => 90 } } # 删掉90%
if ( [zzt_action] == "INQUIREDEALEX" ) { drop{} } # 删掉
if ( [zzt_action] == "INQUIRESTOCK" ) { drop{} } # 删掉
# filebeat字段 "source" => "D:\\zztlog\\2017053117.tzt" 从中取日期
if [source] {
grok {
match => ["source","%{DATA}(?<logdate>\d{8})\d{2}.tzt$"]
tag_on_failure => [ "_grokfailure-logdate"]
add_field => {"localtime" => "%{logdate} %{logtime}"}
}
} else if [logtime] {
mutate {
# this will only work on logs created the same day as read
add_field => {"logdate" => "%{+YYYYMMdd}"}
# merge with existing time field
add_field => {"localtime" => "%{logdate} %{logtime}"}
}
}
date {
match => [ "localtime", "YYYYMMdd HH:mm:ss.SSS" ]
} # set event's @timestamp
#if [zzt_handleserialno] {
# fingerprint {
# source => "zzt_handleserialno"
# target => "handleserialno_hash"
# method => "MD5"
# }
#}
# Measures the execution time of reqans
if [logwhat] == "ClientReq" {
mutate { add_tag => ["STARTTASK"] }
}
if [logwhat] == "ReadAnswer" {
mutate { add_tag => ["ENDTASK"] }
}
elapsed {
start_tag => "STARTTASK"
end_tag => "ENDTASK"
unique_id_field => "zzt_handleserialno"
new_event_on_match => false
# timeout => 360
}
if [zzt_ip] {
if "STARTTASK" in [tags] {
aggregate {
task_id => "%{zzt_handleserialno}"
code => "map['zzt_ip'] = event.get('zzt_ip')"
map_action => "create"
}
}
}
if "elapsed_match" in [tags] {
aggregate {
task_id => "%{zzt_handleserialno}"
code => "event.set('zzt_ip', map['zzt_ip'])"
map_action => "update"
end_of_task => true
# timeout => 4000
}
}
if [elapsed_time] { # 变毫秒;abs避免出现负值;标记大值
ruby{
code=>"event.set('[elapsed_time]', (event.get('elapsed_time').abs*1000))"
}
mutate {
convert => { "elapsed_time" => "integer" }
}
if [elapsed_time] > 1800000 {
mutate {
# update => { "elapsed_time" => 30000 }
add_tag => ["TimeTooBIG"]
}
}
}
if [zzt_price] {
mutate {
convert => { "zzt_price" => "float" }
}
}
mutate {
remove_field => [ "input_type","offset","logdate","logtime","host" ]
}
} # end filter
output {
stdout { codec => rubydebug }
file {
path => "d:/temp/output-zzt-test.txt"
codec => rubydebug
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment