ln -s /etc/logstash /usr/share/logstash/config
add condition using -w""
mysqldump -u root -p [database] [table] -w"created_at > '2019-05-22'" > dump.sql
curl -XPUT 'http://localhost:9200/_template/[index_type]' -H 'Content-Type: application/json' -d@index_template.json
a template config (using IK analysis plugin)
template.json
{
"template": "[template_name]",
"order": 1,
"settings": {
"number_of_shards": 1
},
"mappings": {
"_default_" : {
"_all" : {"enabled" : false},
"dynamic_templates" : [ {
"string_fields" : {
"match" : "*",
"match_mapping_type" : "string",
"mapping" : {
"type" : "keyword"
}
}
} ]
},
"[some_index]": {
"properties": {
"id": {
"type": "long"
},
"[some_field]": {
"type": "text",
"analyzer": "ik_max_word",
"search_analyzer": "ik_smart"
},
"[some_field]": {
"type": "text",
"analyzer": "ik_max_word",
"search_analyzer": "ik_smart"
}
}
}
}
}
following this guide
mysql.conf
input { stdin {} }
filter {
grok {
match => {
"message" => "INSERT INTO \`[table_name]\` VALUES (%{GREEDYDATA:extracted_sql});"
}
remove_field => "message"
}
if "_grokparsefailure" in [tags] {
drop { }
}
split {
terminator => "),("
field => "extracted_sql"
}
mutate {
gsub => ["extracted_sql", "NULL", "''"]
}
mutate {
gsub => ["extracted_sql", "\\'", "\\""]
}
mutate {
gsub => ["extracted_sql","\(",""]
}
mutate {
gsub => ["extracted_sql","\)",""]
}
csv {
source => "extracted_sql"
quote_char => "'"
columns => [
"id",
"all",
"your",
"columns",
"..."
]
remove_field => "extracted_sql"
}
date {
match => [ "created_at", "yyyy-MM-dd HH:mm:ss" ]
}
mutate {
remove_field => [ "@version", "host" ]
}
}
output {
elasticsearch {
hosts => ["http://127.0.0.1:9200"]
index => "[index_name]"
document_id => "%{id}"
document_type => "[doc_type]"
template => "template.json"
template_name => "[template_name]"
}
stdout {
codec => "dots"
}
}
use head and tail to select lines with
INSERT INTO
(see match in grok config)
cat ./dump.sql | head -50 | tail -10 | sudo /usr/share/logstash/bin/logstash -f mysql.conf