Sync PostgreSQL with Elasticsearch

Kundan
4 min readApr 19, 2021

--

In this article, we will learn, how to sync PostgreSQL data to elasticsearch. There are lots of ways to sync PostgreSQL data with elasticsearch. Here will we learn PostgreSQL-elasticsearch sync with logstash.

Logstash is a lightweight, open-source, server-side data processing pipeline that allows you to collect data from a variety of sources, transform it on the fly, and send it to your desired destination. It is most often used as a data pipeline for Elasticsearch, an open-source analytics and search engine.

As a prerequisite, you should have installed java on your server.
https://www.digitalocean.com/community/tutorials/how-to-install-and-configure-elasticsearch-on-ubuntu-18-04

Guide to install elastic-search https://www.digitalocean.com/community/tutorials/how-to-install-and-configure-elasticsearch-on-ubuntu-18-04

Download jdbc jar file (according to your jdk version)
https://jdbc.postgresql.org/download.html

Installing logstash on ubuntu

sudo apt-get install logstash

Verify logstash status

sudo systemctl status logstash

If the logstash service is inactive, please use below command:

sudo systemctl restart logstash

Configure Logstash

Logstash pipeline configuration file structure contains three stages, input, filter, and output. Within each stage, there may be one or more plugins depending on the need for the pipeline.

We will first create a conf file in /etc/logstash/conf.d named as <file_name>.conf, naming is up to you.

input {
jdbc {
jdbc_connection_string => "jdbc:postgresql://<host>:<port>/<database_name>"
jdbc_user => "<pg_user>"
jdbc_password => "<pg_password>"
jdbc_driver_library => "<path of downloaded jdbc file>"
jdbc_driver_class => "org.postgresql.Driver"
statement_filepath => "/mnt/<file_name>.sql"
tracking_column => "updated_dt"
tracking_column_type => "timestamp"
use_column_value => true
schedule => "* * * * *"
last_run_metadata_path => "/mnt/.logstash_jdbc_last_run"
}
}
output {
elasticsearch {
hosts => ["<host>:<port>"]
index => "es_mirror"
document_id => "%{[vid]}"
}
}
  1. jdbc_connection_string is the database connection string.
  2. jdbc_user is the database user.
  3. jdbc_password is the password for the database user
  4. jdbc_driver_library is the Postgres connection JDBC driver.
  5. jdbc_driver_class is the Postgres JDBC driver class to be loaded.
  6. statement_filepath is the query path that will be executed against the database.
  7. tracking_column is the query column that will be used to capture changes and send only changed or created records to elasticsearch.
  8. tracking_column_type specifies the data type of the column set in the tracking_column config, this option accepts numeric or timestamp. In our case, as we are going to identify the changes through a record update timestamp column, this option will be timestamp.
  9. use_column_value enables synchronization through the fieldset in the tracking_column config.
  10. schedule is the execution schedule of the pipeline’s JDBC plugin, this option follows the Linux cron pattern. In our case, the input is scheduled to be executed every minute.
  11. last_run_metadata_path specifies a file where the last executed record of the column specified in tracking_column is recorded in the updated_at column.

If the tracking_column is an update date of the row, the query must have an order by asc by this date, otherwise, the data will not be synchronized correctly.

If the query retrieves a lot of information, you can use a limit clause on the query. If needed the pipeline can be executed every second. To do this, just add one more * to the schedule.

Still, on the last_run_metadata_path, logstash creates a file where the last value of the tracking_column [updated_dt] will be saved, with each execution of the pipeline the logstash overwrites the file and stores the last value of the tracking_column [updated_dt].

As the query has an order by updated_at asc clause, it will be executed and the logstash will iterate and save the file every minute. Let’s imagine that the value that the logstash retrieved was ‘2020–01–01T00: 01: 01.003’.

In the next run — after one minute — logstash will pass this value as a parameter for the query, retrieving only those records where the tracking_column [updated_dt] is greater than ‘2020–01–01T00: 01: 01.003’, and so on in every run.

At this point you may be wondering, how does logstash manage to pass the value that is in last_run_metadata_path [“/mnt/.logstash_jdbc_last_run”] within the query as a filter.

Here is an example of how the query would look inside the file set in the JDBC input option in /mnt/users.sql.

select VID, DESCRIPTION, CHANNELID from  from yt_data where updated_dt > :sql_last_value order by updated_dt asc limit 1000

The parameter: sql_last_value inside the query is responsible for sending the value saved in the .logstash_jdbc_last_run file and always keeping the updates running smoothly.

Lastly, just run logstash with the -f option pointing to your configuration file.

/usr/share/logstash/bin/logstash -f mypipeline.conf

Run logstash in the background with &

/usr/share/logstash/bin/logstash -f mypipeline.conf &

If you want to execute multiple pipelines in logstash then please follow the following steps:

  • Open file pipeline.yml file in the following location
vi /etc/logstash/pipelines.yml

Add the previously created config path to it.

For this example, I have created 3 pipelines into it.

- pipeline.id: video-pipelinepath.config: "/etc/logstash/conf.d/pipeline.conf"pipeline.workers: 3- pipeline.id: channel-info-pipelinepath.config: "/etc/logstash/conf.d/chanel_pipeline_new.conf"queue.type: persisted- pipeline.id: channel-context-pipelinepath.config: "/etc/logstash/conf.d/chanel_context_pipeline.conf"queue.type: persisted

Now, you just have to restart the logstash to run this configuration.

sudo systemctl restart logstash

--

--