Last active
October 31, 2022 18:51
-
-
Save nijave/1bcf394814d41c4e1e2dca39d13855c2 to your computer and use it in GitHub Desktop.
Postgres database migration with pg_dump and logical replication
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/bash | |
export PGSSLMODE=prefer | |
DATABASE=postgres | |
USER=postgres | |
SOURCE_PASSWORD=postgres | |
SOURCE_HOST=$(docker inspect postgres-5433 | jq -r '.[0].NetworkSettings.IPAddress') | |
SOURCE_PORT=5432 | |
DEST_HOST=$(docker inspect postgres-5434 | jq -r '.[0].NetworkSettings.IPAddress') | |
DEST_PORT=5432 | |
PUB_NAME=migration | |
SLOT_NAME=migration | |
SUB_NAME=migration | |
DUMP_DIRECTORY=${DATABASE}.dump | |
set -euo pipefail | |
echo "Cleaning up old subscriptions & replication slots" | |
psql "host=${SOURCE_HOST} port=${SOURCE_PORT} user=${USER} dbname=${DATABASE}" \ | |
-c "DROP PUBLICATION ${PUB_NAME}" || true | |
psql "host=${SOURCE_HOST} port=${SOURCE_PORT} user=${USER} dbname=${DATABASE}" \ | |
-c "SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE client_addr = '${DEST_HOST}'; SELECT pg_drop_replication_slot('${SLOT_NAME}')" || true | |
rm -rf "${DUMP_DIRECTORY}" || true | |
echo "Creating replication slot" | |
# In order to coordinate a pg_dump with a replication slot, you must create a replication slot | |
# and use the exported snapshot when running pg_dump so that dump contains all data up to when | |
# the slot was created. | |
# The exported snapshot only exists as long as the connection is open AND as long as a subsequent | |
# command hasn't been ran on the same connection. | |
# Therefor it is necessary to "hold on to" the connection used to create the replication slot | |
# until pg_dump has successfully started. | |
cat <<EOF | python3 | while read line; do | |
import sys | |
import time | |
import psycopg2 | |
from psycopg2.extras import LogicalReplicationConnection | |
conn = psycopg2.connect( | |
"host=${SOURCE_HOST} port=${SOURCE_PORT} user=${USER} dbname=${DATABASE}", | |
connection_factory=LogicalReplicationConnection, | |
) | |
cur = conn.cursor() | |
cur.execute("CREATE PUBLICATION ${PUB_NAME} FOR ALL TABLES") | |
cur.create_replication_slot("${SLOT_NAME}", slot_type=psycopg2.extras.REPLICATION_LOGICAL, output_plugin="pgoutput") | |
print(cur.fetchall()[0][2]) | |
sys.stdout.flush() | |
# Give pg_dump some time to start dumping | |
time.sleep(15) | |
conn.close() | |
print("OK!!") | |
EOF | |
if echo "$line" | grep -q "OK!!"; then | |
break | |
else | |
echo "Dumping source database" | |
SNAPSHOT=${line} | |
pg_dump -v \ | |
-j 8 \ | |
--host=${SOURCE_HOST} \ | |
--port=${SOURCE_PORT} \ | |
--user=${USER} \ | |
--dbname=${DATABASE} \ | |
--format=directory \ | |
--create \ | |
--snapshot=${SNAPSHOT} \ | |
--file "${DUMP_DIRECTORY}" | |
fi | |
done | |
echo "Restoring database dump" | |
psql "host=${DEST_HOST} port=${DEST_PORT} user=${USER} dbname=postgres" -c "CREATE DATABASE \"${DATABASE}\"" || true | |
psql "host=${SOURCE_HOST} port=${SOURCE_PORT} user=${USER} dbname=${DATABASE}" \ | |
-c "SELECT extname FROM pg_extension" -tA | while read ext; do | |
psql "host=${DEST_HOST} port=${DEST_PORT} user=${USER} dbname=${DATABASE}" -c "CREATE EXTENSION IF NOT EXISTS \"${ext}\"" | |
done | |
pg_restore -v \ | |
-j 8 \ | |
--host=${DEST_HOST} \ | |
--port=${DEST_PORT} \ | |
--username=${USER} \ | |
--dbname=${DATABASE} \ | |
--format=directory \ | |
"${DUMP_DIRECTORY}" | |
echo "Creating subscription" | |
cat <<EOF | psql "host=${DEST_HOST} port=${DEST_PORT} user=${USER} dbname=${DATABASE}" | |
CREATE SUBSCRIPTION ${SUB_NAME} | |
CONNECTION 'host=${SOURCE_HOST} port=${SOURCE_PORT} user=${USER} password=${SOURCE_PASSWORD} dbname=${DATABASE}' | |
PUBLICATION ${PUB_NAME} | |
WITH (slot_name=${SLOT_NAME}, create_slot=false, copy_data=false); | |
EOF | |
echo "Checking replication lag" | |
# Compare the source server's current WAL LSN to the last LSN the replica has | |
# confirmed it has flushed to disk | |
psql "host=${SOURCE_HOST} port=${SOURCE_PORT} user=${USER} dbname=${DATABASE}" \ | |
-c "select pg_wal_lsn_diff(pg_current_wal_lsn(), (select confirmed_flush_lsn from pg_replication_slots where slot_name = '${SLOT_NAME}'))" | |
echo "Waiting a little bit before checking replication lag again" | |
sleep 30 | |
echo "Checking replication lag" | |
psql "host=${SOURCE_HOST} port=${SOURCE_PORT} user=${USER} dbname=${DATABASE}" \ | |
-c "select pg_wal_lsn_diff(pg_current_wal_lsn(), (select confirmed_flush_lsn from pg_replication_slots where slot_name = '${SLOT_NAME}'))" | |
rm -rf "${DUMP_DIRECTORY}" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment