Logstash is a free and open server-side data processing pipeline that ingests data from a multitude of sources, transforms it, and then sends it to your favorite "stash."
This section describes how one can use it on UNIX to migrate data from a PostgreSQL database to elasticsearch.
- PostgreSQL version 10 or higher
- Java version 10 or higher
-
Download and Install Logstash. The instructions tell how to install it on different operating systems. For Linux, one run these commands:
# install logstash sudo apt install logstash -y # enable logstash sudo systemctl enable logstash
Once installed, the service associated files will be located at:
/usr/share/logstash
. -
Download and Activate the JDBC Plugin. JDBC (Java Database Connectivity) is an application programming interface (API) for JAVA, which defines how a client may access a database. We will use this plugin to enable Logstash to connect to the PostgreSQL database and retrieve records from it.
NOTE: The Logstash plugins are located at:
/usr/share/logstash/bin/logstash-plugin
JDBC libraries vary based on the database. Here, we will download the PostgreSQL JDBC library.
-
Find the newest JDBC library on this link
-
Copy the latest link and download it using
curl
by running the following command:sudo curl https://jdbc.postgresql.org/download/postgresql-{version}.jar -o /usr/share/logstash/logstash-core/lib/jars/postgresql-jdbc.jar
NOTE: Change the
{version}
with the desired library version. -
Enable the JDBC plugin in Logstash by running the following command:
sudo /usr/share/logstash/bin/logstash-plugin install logstash-input-jdbc
-
-
Configure Logstash to Pull Records. For Logstash to pull records from PostgreSQL, one must create a configuration file in
/etc/logstash/conf.d/
folder. How to configure a Logstash pipeline is presented here. In this section, we will provide the template configuration used to connect to PostgreSQL, retrieving the records, modifying them, and sending them to Elasticsearch.Copy the following content to
/etc/logstash/conf.d/{config_name}.conf
and modify the values in curly brackets (e.g.{value}
):input { jdbc { jdbc_connection_string => "jdbc:postgresql://localhost:5432/{pg_database}" jdbc_user => "{pg_username}" jdbc_password => "{pg_password}" jdbc_driver_class => "org.postgresql.Driver" schedule => "0 * * * *" # cronjob schedule format (see "Helpful Links") statement => "SELECT * FROM public.{pg_table}" # the PG command for retrieving the documents IMPORTANT: no semicolon! jdbc_paging_enabled => "true" jdbc_page_size => "300" } } filter { mutate { # list of fields to remove from the pg input (fields as string) remove_field => ["{pg_table_column_1}", "{pg_table_column_2}"] # substitutes (replaces) parts of the value in {pg_table_column_i} # that matches {regex_i} with {value_i} gsub => [ "{pg_table_column_3}", "{regex_1}", "{value_1}", "{pg_table_column_4}", "{regex_2}", "{value_2}" ] } # this part is used to parse jsonb type of values mutate { join => { "{pg_table_column_5}" => "," } replace => { "{pg_table_column_5}" => "[%{{pg_table_column_5}}]" } } json { source => "{pg_table_column_5}" target => "{pg_table_column_5}" } } output { # used to output the values in the terminal (DEBUGGING) # once everything is working, comment out this line stdout { codec => "json" } # used to output the values into elasticsearch elasticsearch { hosts => ["{es_host_address_1}"] index => "{es-index}" document_id => "document_%{pg_table_column_id}" doc_as_upsert => true # upserts documents (e.g. if the document does not exist, creates a new record) } }
-
Test the Logstash Configuration. Logstash supports running a specific configuration by passing its file path to the
-f
parameter. Run the following command to test your new configuration from the last step:sudo /usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/{config_name}.conf
Check the output to see if the configuration is working accordingly.
-
Start the Logstash service. Once the configuration is working as it should, you can start (or restart) the Logstash service by running the command:
# to start the service sudo systemctl start logstash # to restart the service sudo systemctl restart logstash
After that, the Logstash should periodically synchronize the Elasticsearch service.