ln -s /etc/logstash /usr/share/logstash/config
add condition using -w""
mysqldump -u root -p [database] [table] -w"created_at > '2019-05-22'" > dump.sql
curl -XPUT 'http://localhost:9200/_template/[index_type]' -H 'Content-Type: application/json' -d@index_template.json
a template config (using IK analysis plugin)
template.json
{
  "template": "[template_name]",
  "order":    1,
  "settings": {
    "number_of_shards": 1
  },
  "mappings": {
    "_default_" : {
      "_all" : {"enabled" : false},
      "dynamic_templates" : [ {
        "string_fields" : {
          "match" : "*",
          "match_mapping_type" : "string",
          "mapping" : {
            "type" : "keyword"
          }
        }
      } ]
    },
    "[some_index]": {
      "properties": {
        "id": {
          "type": "long"
        },
        "[some_field]": {
          "type": "text",
          "analyzer": "ik_max_word",
          "search_analyzer": "ik_smart"
        },
        "[some_field]": {
          "type": "text",
          "analyzer": "ik_max_word",
          "search_analyzer": "ik_smart"
        }
      }
    }
  }
}following this guide
mysql.conf
input { stdin {} }
filter {
  grok {
    match => {
      "message" => "INSERT INTO \`[table_name]\` VALUES (%{GREEDYDATA:extracted_sql});"
    }
    remove_field => "message"
  }
  if "_grokparsefailure" in [tags] {
    drop { }
  }
  split {
    terminator => "),("
    field => "extracted_sql"
  }
  mutate {
      gsub => ["extracted_sql", "NULL", "''"]
  }
  mutate {
      gsub => ["extracted_sql", "\\'", "\\""]
  }
  mutate {
      gsub => ["extracted_sql","\(",""]
  }
  mutate {
      gsub => ["extracted_sql","\)",""]
  }
  csv {
    source => "extracted_sql"
    quote_char => "'"
    columns => [ 
      "id",
      "all",
      "your",
      "columns",
      "..."
      ]
    remove_field => "extracted_sql"
  }
  date {
      match => [ "created_at", "yyyy-MM-dd HH:mm:ss" ]
  }
  mutate {
    remove_field => [ "@version", "host" ]
  }
}
output {
  elasticsearch {
    hosts => ["http://127.0.0.1:9200"]
    index => "[index_name]"
    document_id => "%{id}"
    document_type => "[doc_type]"
    template => "template.json"
    template_name => "[template_name]"
  }
  stdout {
    codec => "dots"
  }
}
use head and tail to select lines with
INSERT INTO(see match in grok config)
cat ./dump.sql | head -50 | tail -10 | sudo /usr/share/logstash/bin/logstash -f mysql.conf