Skip to content

Instantly share code, notes, and snippets.

@Howard-Chang
Last active April 18, 2018 03:21
Show Gist options
  • Save Howard-Chang/8123d586285b545f23075d58083ec260 to your computer and use it in GitHub Desktop.
Save Howard-Chang/8123d586285b545f23075d58083ec260 to your computer and use it in GitHub Desktop.
Restful_API&Logstash 總整理
/*丟到notepad++ 程式語言選擇C或C#會比較好看*/
logstash過濾:
input {
udp {
port => 514
}}
/*Input plugin: 輸入資料的來源端 它有特定的輸入套件(plugin)
以這個例子來說,我指定udp port:514為輸入來源端
另外也可以指定手動輸入或是讀取logfile等等
如果要手動輸入的話config就要這樣設定:
input { stdin { } }
如果要讀取logfile來過濾的話就要這樣設定:
input {
file {
path => "C:\ELK\logstash-5.5.1\bin\DATA\AAA.txt" //這邊是logfile的路徑
start_position => "beginning" //指定從頭開始讀.這邊要注意的一點是它是從上次讀到的地方繼續往下讀,而不是永遠都從文件檔的開頭開始讀
type => syslog //標記說從這個來源端進來的都是syslog type以利後續的一些應用 這行可有可無
}
}
來源端可以不止一個 舉例來說: //同時指定了從tcp port:5000與udp port:5000的來源端
input {
tcp {
port => 5000
type => syslog
}
udp {
port => 5000
type => syslog
}
}
官方總共提供了54個 input plugin,而剛剛提到的 udp{} tcp{} stdin{} file{} 只是其中的4個,也是目前比較用的到的
參考網站:https://www.elastic.co/guide/en/logstash/current/input-plugins.html
*/
filter {
grok {
match => { "message" => '%{SYSLOGTIMESTAMP} %{IPV4:iphost} date=%{YEAR:year}-%{MONTHNUM:month}-%{MONTHDAY:day} %{GREEDYDATA:fgtlogmsg}'
}
match => { "message" => '<%{NONNEGINT:syslog_pri}>%{GREEDYDATA:fgtlogmsg}'
}
}
kv {
source => "fgtlogmsg"
remove_field => [ "fgtlogmsg" ]
}
syslog_pri { }
if "_grokparsefailure" in [tags] {
drop { }
}
}
/*filter plugin:過濾讀進來的log,這邊我用到的plugin分別是grok與kv
grok: Parses unstructured event data into fields 也就是用來過濾不規則的log
以實際上從fortigate進來的log來看
<189>date=2017-08-23 time=12:17:12 devname=Fortigate-1500D-B devid=FG1K5D3I15801289 logid=0000000013 type=traffic subtype=forward level=notice vd=yajh_S srcip=60.178.246.246 srcport=25601 srcintf="VLAN372_ex" dstip=163.30.202.44 dstport=4672 dstintf="VLAN372_in" poluuid=a9227406-5adf-51e7-de2d-4abd3d6fe8d2 sessionid=3475468153 proto=17 action=deny policyid=14 dstcountry="Taiwan" srccountry="China" trandisp=noop service="udp/4672" duration=0 sentbyte=0 rcvdbyte=0 sentpkt=0 appcat="unscanned" crscore=30 craction=131072 crlevel=high
它的結構是<非負整數>date=日期 time=時間 ......以此類推
所以在過濾的時候他會進到 match => { "message" => '<%{NONNEGINT:syslog_pri}>%{GREEDYDATA:fgtlogmsg}'
"message":log原始資料
%{}代表log的一個欄位,冒號後面指的是將這個欄位的值命名並存入elasticsearch
若不想將欄位的值存入elasticsearch則不需要冒號以及命名
ex:<%{NONNEGINT}>這樣就會對應到<189>並直接略過 而後面的 'GREEDYDATA'指的是剩下還沒過濾的欄位,我將它命名為'fgtlogmsg'
並丟給第二個filter:kv
它的過濾機制是 :Parses key-value pairs 也就是說將log內的欄位依序做對應
假如說有個log長這樣:fruit=apple animal=dog device=computer 這樣的話丟到kv它就會依序幫你套到每個欄位
syslog_pri{}則是用來分析syslog中<xxx>中的數字並將它自動拆成四個欄位然後存到elasticsearch
syslog_facility user-level
syslog_facility_code 1
syslog_severity notice
syslog_severity_code 5
drop { }則是丟掉不要的欄位
if "_grokparsefailure" in [tags] {
drop { }
}
這邊則是說如果輸入的log格式過濾後發現並不符合,這時會出現[tags]="_grokparsefailure"這個欄位,如果發生了就把這筆log丟掉,因為這並不是我要的log
官方提供了47種可以在filter中使用的套件
而在此config中使用了4種也是目前比較常用的,分別是grok kv{} syslog{} drop{}
參考網站:https://www.elastic.co/guide/en/logstash/current/filter-plugins.html
*/
output { //輸出
elasticsearch { hosts => ["localhost:9200"] } //指定輸出到port 9200 也就是elasticsearch搜尋引擎
stdout { codec => rubydebug } //在CMD介面顯示狀態,可用來debug
}
/*
Example:
假設說有個log長這樣:
(Hello)fruit=apple animal=elephant vegetable=onion num=5
如果我只要把 fruit animal 跟num存入elasticsearch中並把num重新命名為number欄位
filter{
grok {
match => { "message" => '(%{WORD})fruit=%{WORD:fruit} animal=%{WORD:animal} vegetable=%{WORD:vegetable} number=%{INT:number}'
}
}
}
如果我要把所有欄位都存入elasticsearch中且把(Hello)濾掉
filter{
grok {
match => { "message" => '(%{WORD})fruit=%{WORD:fruit} %{GREEDYDATA:fgt}
}
}
kv {
source => "fgt"
remove_field => [ "fgt" ]
}
}*/
Restful API:
GET /logstash-*/_search //去抓index開頭為logstash-的資料
{
"_source": { //指定要回傳的欄位
"includes": [ "srcip", "msg","time"]
},
"query":{ //bool底下有四種query方式可以套用,它是以布林組合的方式來query
"bool": { //(官方的定義)bool query: A query that matches documents matching boolean combinations of other queries. The bool query maps to Lucene BooleanQuery. It is built using one or more boolean clauses, each clause with a typed occurrence. The occurrence types are
"must":[ //must:所有分句都必定匹配,与 AND 相同。
{ //must_not:所有分句都必定不匹配,与 NOT 相同。
"range":{ //should:至少有一個分句匹配,与 OR 相同。
"@timestamp":{ //filter:與must不同的地方是它不會去計算score(這部分還要再深入研究XD)
"gte":"2017-08-01T11:00:00", //range query:指定一個特定範圍內的資料,這邊是指定時間範圍 參考網站:https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-range-query.html
"lt":"2017-09-01T12:00:00" //gte=greater than.lt=less than
}
}
},
{
"exists" : { "field" : "msg" } //exists query:log中有存在的欄位 這邊是指定如果log中有msg欄位的話才抓出來
}
],
"filter":[ //用來篩選msg欄位中有"anomaly: udp_src_session," "anomaly: tcp_src_session,"的字串
{"match" : {
"msg" : "anomaly: udp_src_session,"}},
{"match" : {
"msg" : "anomaly: tcp_src_session,"}}
]
}
}
}
GET /logstash-*/_search //另一種query方式
{
"_source": {
"includes": [ "srcip", "msg","time"]
},
"query":{
"bool": {
"must":[
{
"range" : {
"@timestamp" : { //這邊是指定現在時間往前推某個時段
"gt" : "now-100d"
}
}
},
{
"exists" : { "field" : "msg" }
}
],
"filter":[
{"match" : {
"msg" : "anomaly: udp_src_session,"}},
{"match" : {
"msg" : "anomaly: tcp_src_session,"}}
]
}
}
}
若要自動刪除elasticsearch中的indices則必須先下載一款套件->curator 它需要python的支援,但官方有提供windows版的package直接下載後就能使用
首先須要建立兩個yml文件
分別是
curator.yml:
client: //基本設定:指定host,port,timeout等等 基本上這些設定不會有太大的更動
hosts:
- 127.0.0.1
port: 9200
url_prefix:
use_ssl: False
certificate:
client_cert:
client_key:
ssl_no_validate: False
http_auth:
timeout: 30
master_only: False
logging:
loglevel: INFO
logfile:
logformat: default
blacklist: ['elasticsearch', 'urllib3']
delete_indices: //刪除的yml設定檔 目標:把一天以前的資料(log)全部刪除
# Remember, leave a key empty if there is no value. None will be a string,
# not a Python "NoneType"
#
# Also remember that all examples have 'disable_action' set to True. If you
# want to use this action as a template, be sure to set this to False after
# copying it.
actions:
1:
action: delete_indices //指定動作,這邊是指定要刪除index 還有很多動作可以選擇 詳情請看官網:https://www.elastic.co/guide/en/elasticsearch/client/curator/current/actions.html
description: >- //描述程式執行的動作,類似註解
Delete indices older than 1 days (based on index name), for logstash-
prefixed indices. Ignore the error if the filter does not result in an
actionable list of indices (ignore_empty_list) and exit cleanly.
options:
ignore_empty_list: True
disable_action: False //這邊要設定成False才能正常執行action
filters:
- filtertype: pattern
kind: prefix //字首
value: logstash- //由於我的index大概是長這樣logstash-2017-08-09 ->logstash-* 所以這邊字首就設成logstash- //match all indices starting with logstash-
- filtertype: age //過濾的型態:時間
source: name //告訴curator從index的name去讀"timestring"
direction: older //往過去的時間推算
timestring: '%Y-%m-%d' //舉例來說logstash-2017-08-09 這樣它就知道要取2017-08-09
unit: days //單位:天
unit_count: 1 //1天
2: bla bla ... //有需要的話也可以再增加action
// 參考網站:https://www.elastic.co/guide/en/elasticsearch/client/curator/4.0/fe_source.html
接下來就可以透過工作排程來執行bat檔案
test.bat:
cd /d E:\ELK\elasticsearch-curator-5.2.0-amd64\curator-5.2.0-amd64\
curator --config curator.yml delete_indices.yml
finish
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment