Skip to content

Instantly share code, notes, and snippets.

@josherich
Last active May 29, 2019 19:23
Show Gist options
  • Save josherich/7a40d7efd31b585812d26d6da0f2f2c6 to your computer and use it in GitHub Desktop.
Save josherich/7a40d7efd31b585812d26d6da0f2f2c6 to your computer and use it in GitHub Desktop.
How to dump mysql and import to elasticsearch using Logstash

1. install elasticsearch(deb)

2. install logstash(deb)

3. link logstash config

ln -s /etc/logstash /usr/share/logstash/config

4. get sql using mysqldump

add condition using -w""

mysqldump -u root -p [database] [table] -w"created_at > '2019-05-22'" > dump.sql

5. add template to elasticsearch

curl -XPUT 'http://localhost:9200/_template/[index_type]' -H 'Content-Type: application/json' -d@index_template.json

a template config (using IK analysis plugin)

template.json

{
  "template": "[template_name]",
  "order":    1,
  "settings": {
    "number_of_shards": 1
  },
  "mappings": {
    "_default_" : {
      "_all" : {"enabled" : false},
      "dynamic_templates" : [ {
        "string_fields" : {
          "match" : "*",
          "match_mapping_type" : "string",
          "mapping" : {
            "type" : "keyword"
          }
        }
      } ]
    },
    "[some_index]": {
      "properties": {
        "id": {
          "type": "long"
        },
        "[some_field]": {
          "type": "text",
          "analyzer": "ik_max_word",
          "search_analyzer": "ik_smart"
        },
        "[some_field]": {
          "type": "text",
          "analyzer": "ik_max_word",
          "search_analyzer": "ik_smart"
        }
      }
    }
  }
}

6. configure Grok filter

following this guide

mysql.conf

input { stdin {} }

filter {

  grok {
    match => {
      "message" => "INSERT INTO \`[table_name]\` VALUES (%{GREEDYDATA:extracted_sql});"
    }
    remove_field => "message"
  }

  if "_grokparsefailure" in [tags] {
    drop { }
  }

  split {
    terminator => "),("
    field => "extracted_sql"
  }

  mutate {
      gsub => ["extracted_sql", "NULL", "''"]
  }
  mutate {
      gsub => ["extracted_sql", "\\'", "\\""]
  }
  mutate {
      gsub => ["extracted_sql","\(",""]
  }
  mutate {
      gsub => ["extracted_sql","\)",""]
  }

  csv {
    source => "extracted_sql"
    quote_char => "'"
    columns => [ 
      "id",
      "all",
      "your",
      "columns",
      "..."
      ]
    remove_field => "extracted_sql"
  }

  date {
      match => [ "created_at", "yyyy-MM-dd HH:mm:ss" ]
  }

  mutate {
    remove_field => [ "@version", "host" ]
  }

}

output {
  elasticsearch {
    hosts => ["http://127.0.0.1:9200"]
    index => "[index_name]"
    document_id => "%{id}"
    document_type => "[doc_type]"
    template => "template.json"
    template_name => "[template_name]"
  }

  stdout {
    codec => "dots"
  }
}

7. run logstash

use head and tail to select lines with INSERT INTO(see match in grok config)

cat ./dump.sql | head -50 | tail -10 | sudo /usr/share/logstash/bin/logstash -f mysql.conf

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment