Skip to content

Instantly share code, notes, and snippets.

@nijave
Last active October 31, 2022 18:51
Show Gist options
  • Save nijave/1bcf394814d41c4e1e2dca39d13855c2 to your computer and use it in GitHub Desktop.
Save nijave/1bcf394814d41c4e1e2dca39d13855c2 to your computer and use it in GitHub Desktop.
Postgres database migration with pg_dump and logical replication
#!/bin/bash
export PGSSLMODE=prefer
DATABASE=postgres
USER=postgres
SOURCE_PASSWORD=postgres
SOURCE_HOST=$(docker inspect postgres-5433 | jq -r '.[0].NetworkSettings.IPAddress')
SOURCE_PORT=5432
DEST_HOST=$(docker inspect postgres-5434 | jq -r '.[0].NetworkSettings.IPAddress')
DEST_PORT=5432
PUB_NAME=migration
SLOT_NAME=migration
SUB_NAME=migration
DUMP_DIRECTORY=${DATABASE}.dump
set -euo pipefail
echo "Cleaning up old subscriptions & replication slots"
psql "host=${SOURCE_HOST} port=${SOURCE_PORT} user=${USER} dbname=${DATABASE}" \
-c "DROP PUBLICATION ${PUB_NAME}" || true
psql "host=${SOURCE_HOST} port=${SOURCE_PORT} user=${USER} dbname=${DATABASE}" \
-c "SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE client_addr = '${DEST_HOST}'; SELECT pg_drop_replication_slot('${SLOT_NAME}')" || true
rm -rf "${DUMP_DIRECTORY}" || true
echo "Creating replication slot"
# In order to coordinate a pg_dump with a replication slot, you must create a replication slot
# and use the exported snapshot when running pg_dump so that dump contains all data up to when
# the slot was created.
# The exported snapshot only exists as long as the connection is open AND as long as a subsequent
# command hasn't been ran on the same connection.
# Therefor it is necessary to "hold on to" the connection used to create the replication slot
# until pg_dump has successfully started.
cat <<EOF | python3 | while read line; do
import sys
import time
import psycopg2
from psycopg2.extras import LogicalReplicationConnection
conn = psycopg2.connect(
"host=${SOURCE_HOST} port=${SOURCE_PORT} user=${USER} dbname=${DATABASE}",
connection_factory=LogicalReplicationConnection,
)
cur = conn.cursor()
cur.execute("CREATE PUBLICATION ${PUB_NAME} FOR ALL TABLES")
cur.create_replication_slot("${SLOT_NAME}", slot_type=psycopg2.extras.REPLICATION_LOGICAL, output_plugin="pgoutput")
print(cur.fetchall()[0][2])
sys.stdout.flush()
# Give pg_dump some time to start dumping
time.sleep(15)
conn.close()
print("OK!!")
EOF
if echo "$line" | grep -q "OK!!"; then
break
else
echo "Dumping source database"
SNAPSHOT=${line}
pg_dump -v \
-j 8 \
--host=${SOURCE_HOST} \
--port=${SOURCE_PORT} \
--user=${USER} \
--dbname=${DATABASE} \
--format=directory \
--create \
--snapshot=${SNAPSHOT} \
--file "${DUMP_DIRECTORY}"
fi
done
echo "Restoring database dump"
psql "host=${DEST_HOST} port=${DEST_PORT} user=${USER} dbname=postgres" -c "CREATE DATABASE \"${DATABASE}\"" || true
psql "host=${SOURCE_HOST} port=${SOURCE_PORT} user=${USER} dbname=${DATABASE}" \
-c "SELECT extname FROM pg_extension" -tA | while read ext; do
psql "host=${DEST_HOST} port=${DEST_PORT} user=${USER} dbname=${DATABASE}" -c "CREATE EXTENSION IF NOT EXISTS \"${ext}\""
done
pg_restore -v \
-j 8 \
--host=${DEST_HOST} \
--port=${DEST_PORT} \
--username=${USER} \
--dbname=${DATABASE} \
--format=directory \
"${DUMP_DIRECTORY}"
echo "Creating subscription"
cat <<EOF | psql "host=${DEST_HOST} port=${DEST_PORT} user=${USER} dbname=${DATABASE}"
CREATE SUBSCRIPTION ${SUB_NAME}
CONNECTION 'host=${SOURCE_HOST} port=${SOURCE_PORT} user=${USER} password=${SOURCE_PASSWORD} dbname=${DATABASE}'
PUBLICATION ${PUB_NAME}
WITH (slot_name=${SLOT_NAME}, create_slot=false, copy_data=false);
EOF
echo "Checking replication lag"
# Compare the source server's current WAL LSN to the last LSN the replica has
# confirmed it has flushed to disk
psql "host=${SOURCE_HOST} port=${SOURCE_PORT} user=${USER} dbname=${DATABASE}" \
-c "select pg_wal_lsn_diff(pg_current_wal_lsn(), (select confirmed_flush_lsn from pg_replication_slots where slot_name = '${SLOT_NAME}'))"
echo "Waiting a little bit before checking replication lag again"
sleep 30
echo "Checking replication lag"
psql "host=${SOURCE_HOST} port=${SOURCE_PORT} user=${USER} dbname=${DATABASE}" \
-c "select pg_wal_lsn_diff(pg_current_wal_lsn(), (select confirmed_flush_lsn from pg_replication_slots where slot_name = '${SLOT_NAME}'))"
rm -rf "${DUMP_DIRECTORY}"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment