Last active
April 18, 2018 03:21
-
-
Save Howard-Chang/8123d586285b545f23075d58083ec260 to your computer and use it in GitHub Desktop.
Restful_API&Logstash 總整理
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/*丟到notepad++ 程式語言選擇C或C#會比較好看*/ | |
logstash過濾: | |
input { | |
udp { | |
port => 514 | |
}} | |
/*Input plugin: 輸入資料的來源端 它有特定的輸入套件(plugin) | |
以這個例子來說,我指定udp port:514為輸入來源端 | |
另外也可以指定手動輸入或是讀取logfile等等 | |
如果要手動輸入的話config就要這樣設定: | |
input { stdin { } } | |
如果要讀取logfile來過濾的話就要這樣設定: | |
input { | |
file { | |
path => "C:\ELK\logstash-5.5.1\bin\DATA\AAA.txt" //這邊是logfile的路徑 | |
start_position => "beginning" //指定從頭開始讀.這邊要注意的一點是它是從上次讀到的地方繼續往下讀,而不是永遠都從文件檔的開頭開始讀 | |
type => syslog //標記說從這個來源端進來的都是syslog type以利後續的一些應用 這行可有可無 | |
} | |
} | |
來源端可以不止一個 舉例來說: //同時指定了從tcp port:5000與udp port:5000的來源端 | |
input { | |
tcp { | |
port => 5000 | |
type => syslog | |
} | |
udp { | |
port => 5000 | |
type => syslog | |
} | |
} | |
官方總共提供了54個 input plugin,而剛剛提到的 udp{} tcp{} stdin{} file{} 只是其中的4個,也是目前比較用的到的 | |
參考網站:https://www.elastic.co/guide/en/logstash/current/input-plugins.html | |
*/ | |
filter { | |
grok { | |
match => { "message" => '%{SYSLOGTIMESTAMP} %{IPV4:iphost} date=%{YEAR:year}-%{MONTHNUM:month}-%{MONTHDAY:day} %{GREEDYDATA:fgtlogmsg}' | |
} | |
match => { "message" => '<%{NONNEGINT:syslog_pri}>%{GREEDYDATA:fgtlogmsg}' | |
} | |
} | |
kv { | |
source => "fgtlogmsg" | |
remove_field => [ "fgtlogmsg" ] | |
} | |
syslog_pri { } | |
if "_grokparsefailure" in [tags] { | |
drop { } | |
} | |
} | |
/*filter plugin:過濾讀進來的log,這邊我用到的plugin分別是grok與kv | |
grok: Parses unstructured event data into fields 也就是用來過濾不規則的log | |
以實際上從fortigate進來的log來看 | |
<189>date=2017-08-23 time=12:17:12 devname=Fortigate-1500D-B devid=FG1K5D3I15801289 logid=0000000013 type=traffic subtype=forward level=notice vd=yajh_S srcip=60.178.246.246 srcport=25601 srcintf="VLAN372_ex" dstip=163.30.202.44 dstport=4672 dstintf="VLAN372_in" poluuid=a9227406-5adf-51e7-de2d-4abd3d6fe8d2 sessionid=3475468153 proto=17 action=deny policyid=14 dstcountry="Taiwan" srccountry="China" trandisp=noop service="udp/4672" duration=0 sentbyte=0 rcvdbyte=0 sentpkt=0 appcat="unscanned" crscore=30 craction=131072 crlevel=high | |
它的結構是<非負整數>date=日期 time=時間 ......以此類推 | |
所以在過濾的時候他會進到 match => { "message" => '<%{NONNEGINT:syslog_pri}>%{GREEDYDATA:fgtlogmsg}' | |
"message":log原始資料 | |
%{}代表log的一個欄位,冒號後面指的是將這個欄位的值命名並存入elasticsearch | |
若不想將欄位的值存入elasticsearch則不需要冒號以及命名 | |
ex:<%{NONNEGINT}>這樣就會對應到<189>並直接略過 而後面的 'GREEDYDATA'指的是剩下還沒過濾的欄位,我將它命名為'fgtlogmsg' | |
並丟給第二個filter:kv | |
它的過濾機制是 :Parses key-value pairs 也就是說將log內的欄位依序做對應 | |
假如說有個log長這樣:fruit=apple animal=dog device=computer 這樣的話丟到kv它就會依序幫你套到每個欄位 | |
syslog_pri{}則是用來分析syslog中<xxx>中的數字並將它自動拆成四個欄位然後存到elasticsearch | |
syslog_facility user-level | |
syslog_facility_code 1 | |
syslog_severity notice | |
syslog_severity_code 5 | |
drop { }則是丟掉不要的欄位 | |
if "_grokparsefailure" in [tags] { | |
drop { } | |
} | |
這邊則是說如果輸入的log格式過濾後發現並不符合,這時會出現[tags]="_grokparsefailure"這個欄位,如果發生了就把這筆log丟掉,因為這並不是我要的log | |
官方提供了47種可以在filter中使用的套件 | |
而在此config中使用了4種也是目前比較常用的,分別是grok kv{} syslog{} drop{} | |
參考網站:https://www.elastic.co/guide/en/logstash/current/filter-plugins.html | |
*/ | |
output { //輸出 | |
elasticsearch { hosts => ["localhost:9200"] } //指定輸出到port 9200 也就是elasticsearch搜尋引擎 | |
stdout { codec => rubydebug } //在CMD介面顯示狀態,可用來debug | |
} | |
/* | |
Example: | |
假設說有個log長這樣: | |
(Hello)fruit=apple animal=elephant vegetable=onion num=5 | |
如果我只要把 fruit animal 跟num存入elasticsearch中並把num重新命名為number欄位 | |
filter{ | |
grok { | |
match => { "message" => '(%{WORD})fruit=%{WORD:fruit} animal=%{WORD:animal} vegetable=%{WORD:vegetable} number=%{INT:number}' | |
} | |
} | |
} | |
如果我要把所有欄位都存入elasticsearch中且把(Hello)濾掉 | |
filter{ | |
grok { | |
match => { "message" => '(%{WORD})fruit=%{WORD:fruit} %{GREEDYDATA:fgt} | |
} | |
} | |
kv { | |
source => "fgt" | |
remove_field => [ "fgt" ] | |
} | |
}*/ | |
Restful API: | |
GET /logstash-*/_search //去抓index開頭為logstash-的資料 | |
{ | |
"_source": { //指定要回傳的欄位 | |
"includes": [ "srcip", "msg","time"] | |
}, | |
"query":{ //bool底下有四種query方式可以套用,它是以布林組合的方式來query | |
"bool": { //(官方的定義)bool query: A query that matches documents matching boolean combinations of other queries. The bool query maps to Lucene BooleanQuery. It is built using one or more boolean clauses, each clause with a typed occurrence. The occurrence types are | |
"must":[ //must:所有分句都必定匹配,与 AND 相同。 | |
{ //must_not:所有分句都必定不匹配,与 NOT 相同。 | |
"range":{ //should:至少有一個分句匹配,与 OR 相同。 | |
"@timestamp":{ //filter:與must不同的地方是它不會去計算score(這部分還要再深入研究XD) | |
"gte":"2017-08-01T11:00:00", //range query:指定一個特定範圍內的資料,這邊是指定時間範圍 參考網站:https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-range-query.html | |
"lt":"2017-09-01T12:00:00" //gte=greater than.lt=less than | |
} | |
} | |
}, | |
{ | |
"exists" : { "field" : "msg" } //exists query:log中有存在的欄位 這邊是指定如果log中有msg欄位的話才抓出來 | |
} | |
], | |
"filter":[ //用來篩選msg欄位中有"anomaly: udp_src_session," "anomaly: tcp_src_session,"的字串 | |
{"match" : { | |
"msg" : "anomaly: udp_src_session,"}}, | |
{"match" : { | |
"msg" : "anomaly: tcp_src_session,"}} | |
] | |
} | |
} | |
} | |
GET /logstash-*/_search //另一種query方式 | |
{ | |
"_source": { | |
"includes": [ "srcip", "msg","time"] | |
}, | |
"query":{ | |
"bool": { | |
"must":[ | |
{ | |
"range" : { | |
"@timestamp" : { //這邊是指定現在時間往前推某個時段 | |
"gt" : "now-100d" | |
} | |
} | |
}, | |
{ | |
"exists" : { "field" : "msg" } | |
} | |
], | |
"filter":[ | |
{"match" : { | |
"msg" : "anomaly: udp_src_session,"}}, | |
{"match" : { | |
"msg" : "anomaly: tcp_src_session,"}} | |
] | |
} | |
} | |
} | |
若要自動刪除elasticsearch中的indices則必須先下載一款套件->curator 它需要python的支援,但官方有提供windows版的package直接下載後就能使用 | |
首先須要建立兩個yml文件 | |
分別是 | |
curator.yml: | |
client: //基本設定:指定host,port,timeout等等 基本上這些設定不會有太大的更動 | |
hosts: | |
- 127.0.0.1 | |
port: 9200 | |
url_prefix: | |
use_ssl: False | |
certificate: | |
client_cert: | |
client_key: | |
ssl_no_validate: False | |
http_auth: | |
timeout: 30 | |
master_only: False | |
logging: | |
loglevel: INFO | |
logfile: | |
logformat: default | |
blacklist: ['elasticsearch', 'urllib3'] | |
delete_indices: //刪除的yml設定檔 目標:把一天以前的資料(log)全部刪除 | |
# Remember, leave a key empty if there is no value. None will be a string, | |
# not a Python "NoneType" | |
# | |
# Also remember that all examples have 'disable_action' set to True. If you | |
# want to use this action as a template, be sure to set this to False after | |
# copying it. | |
actions: | |
1: | |
action: delete_indices //指定動作,這邊是指定要刪除index 還有很多動作可以選擇 詳情請看官網:https://www.elastic.co/guide/en/elasticsearch/client/curator/current/actions.html | |
description: >- //描述程式執行的動作,類似註解 | |
Delete indices older than 1 days (based on index name), for logstash- | |
prefixed indices. Ignore the error if the filter does not result in an | |
actionable list of indices (ignore_empty_list) and exit cleanly. | |
options: | |
ignore_empty_list: True | |
disable_action: False //這邊要設定成False才能正常執行action | |
filters: | |
- filtertype: pattern | |
kind: prefix //字首 | |
value: logstash- //由於我的index大概是長這樣logstash-2017-08-09 ->logstash-* 所以這邊字首就設成logstash- //match all indices starting with logstash- | |
- filtertype: age //過濾的型態:時間 | |
source: name //告訴curator從index的name去讀"timestring" | |
direction: older //往過去的時間推算 | |
timestring: '%Y-%m-%d' //舉例來說logstash-2017-08-09 這樣它就知道要取2017-08-09 | |
unit: days //單位:天 | |
unit_count: 1 //1天 | |
2: bla bla ... //有需要的話也可以再增加action | |
// 參考網站:https://www.elastic.co/guide/en/elasticsearch/client/curator/4.0/fe_source.html | |
接下來就可以透過工作排程來執行bat檔案 | |
test.bat: | |
cd /d E:\ELK\elasticsearch-curator-5.2.0-amd64\curator-5.2.0-amd64\ | |
curator --config curator.yml delete_indices.yml | |
finish |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment