Skip to content

Instantly share code, notes, and snippets.

@lefred
Last active June 8, 2024 09:31
Show Gist options
  • Save lefred/1bad64403923664a14e0f20f572d7526 to your computer and use it in GitHub Desktop.
Save lefred/1bad64403923664a14e0f20f572d7526 to your computer and use it in GitHub Desktop.
Get replication information
use sys;
DROP VIEW IF EXISTS replication_status_full;
CREATE
ALGORITHM = MERGE
SQL SECURITY INVOKER
VIEW replication_status_full
AS
SELECT
concat(s.channel_name, ' (', w.worker_id,')') AS channel,
c.host,
c.port,
c.user,
s.source_uuid,
s.group_name,
s.last_heartbeat_timestamp,
c.heartbeat_interval,
s.service_state io_state,
st.processlist_state io_thread_state,
s.last_error_number io_errno,
s.last_error_message io_errmsg,
s.last_error_timestamp io_errtime,
co.service_state co_state,
cot.processlist_state co_thread_state,
co.last_error_number co_errno,
co.last_error_message co_errmsg,
co.last_error_timestamp co_errtime,
w.service_state w_state,
wt.processlist_state w_thread_state,
w.last_error_number w_errno,
w.last_error_message w_errmsg,
w.last_error_timestamp w_errtime,
TIMEDIFF(NOW(6), IF(TIMEDIFF(s.LAST_QUEUED_TRANSACTION_START_QUEUE_TIMESTAMP,
s.LAST_HEARTBEAT_TIMESTAMP) >= 0,
s.LAST_QUEUED_TRANSACTION_START_QUEUE_TIMESTAMP,
s.LAST_HEARTBEAT_TIMESTAMP)
) as time_since_last_message,
IF(s.LAST_QUEUED_TRANSACTION=''
OR s.LAST_QUEUED_TRANSACTION=latest_w.LAST_APPLIED_TRANSACTION,
'IDLE', 'APPLYING') as applier_busy_state,
IF(co.SERVICE_STATE = 'OFF'
OR s.SERVICE_STATE = 'OFF', 'null',
IF(
GTID_SUBTRACT(s.LAST_QUEUED_TRANSACTION,
w.LAST_APPLIED_TRANSACTION) = ''
OR
UNIX_TIMESTAMP(w.APPLYING_TRANSACTION_ORIGINAL_COMMIT_TIMESTAMP) = 0,
'none',
TIMEDIFF(
NOW(6),w.APPLYING_TRANSACTION_ORIGINAL_COMMIT_TIMESTAMP
)
)
) AS lag_from_original,
IF(co.SERVICE_STATE = 'OFF'
OR s.SERVICE_STATE = 'OFF', 'null',
IF(
GTID_SUBTRACT(s.LAST_QUEUED_TRANSACTION,
w.LAST_APPLIED_TRANSACTION) = ''
OR
UNIX_TIMESTAMP(w.APPLYING_TRANSACTION_IMMEDIATE_COMMIT_TIMESTAMP) = 0,
'none',
TIMEDIFF(
NOW(6),w.APPLYING_TRANSACTION_IMMEDIATE_COMMIT_TIMESTAMP
)
)
) AS lag_from_immediate,
format_pico_time((unix_timestamp(LAST_QUEUED_TRANSACTION_START_QUEUE_TIMESTAMP) -
unix_timestamp(LAST_QUEUED_TRANSACTION_ORIGINAL_COMMIT_TIMESTAMP)) * 100000000000) transport_time,
format_pico_time((unix_timestamp(LAST_QUEUED_TRANSACTION_END_QUEUE_TIMESTAMP) -
unix_timestamp(LAST_QUEUED_TRANSACTION_START_QUEUE_TIMESTAMP)) * 1000000000000) time_to_relay_log,
format_pico_time((unix_timestamp(w.LAST_APPLIED_TRANSACTION_END_APPLY_TIMESTAMP) -
unix_timestamp(w.LAST_APPLIED_TRANSACTION_START_APPLY_TIMESTAMP)) * 1000000000000) apply_time,
w.LAST_APPLIED_TRANSACTION AS last_applied_transaction,
s.LAST_QUEUED_TRANSACTION AS last_queued_transaction,
GTID_SUBTRACT(s.RECEIVED_TRANSACTION_SET, (select variable_value from performance_schema.global_variables where variable_name='gtid_executed') ) as queued_gtid_set_to_apply
FROM performance_schema.replication_connection_configuration c
JOIN performance_schema.replication_connection_status s
ON c.channel_name = s.channel_name
LEFT JOIN performance_schema.replication_applier_status_by_coordinator co
ON c.channel_name = co.channel_name
JOIN performance_schema.replication_applier_status a
ON c.channel_name = a.channel_name
JOIN performance_schema.replication_applier_status_by_worker w
ON c.channel_name = w.channel_name
LEFT JOIN (
SELECT * FROM performance_schema.replication_applier_status_by_worker LIMIT 1
) latest_w
ON c.channel_name = latest_w.channel_name
LEFT JOIN performance_schema.threads st
ON s.thread_id = st.thread_id
LEFT JOIN performance_schema.threads cot
ON co.thread_id = cot.thread_id
LEFT JOIN performance_schema.threads wt
ON w.thread_id = wt.thread_id;
DROP VIEW IF EXISTS x$replication_status_full;
CREATE
ALGORITHM = MERGE
SQL SECURITY INVOKER
VIEW x$replication_status_full
AS
SELECT
concat(s.channel_name, ' (', w.worker_id,')') AS channel,
c.host,
c.port,
c.user,
s.source_uuid,
s.group_name,
s.last_heartbeat_timestamp,
c.heartbeat_interval,
s.service_state io_state,
st.processlist_state io_thread_state,
s.last_error_number io_errno,
s.last_error_message io_errmsg,
s.last_error_timestamp io_errtime,
co.service_state co_state,
cot.processlist_state co_thread_state,
co.last_error_number co_errno,
co.last_error_message co_errmsg,
co.last_error_timestamp co_errtime,
w.service_state w_state,
wt.processlist_state w_thread_state,
w.last_error_number w_errno,
w.last_error_message w_errmsg,
w.last_error_timestamp w_errtime,
TIMEDIFF(NOW(6), IF(TIMEDIFF(s.LAST_QUEUED_TRANSACTION_START_QUEUE_TIMESTAMP,
s.LAST_HEARTBEAT_TIMESTAMP) >= 0,
s.LAST_QUEUED_TRANSACTION_START_QUEUE_TIMESTAMP,
s.LAST_HEARTBEAT_TIMESTAMP)
) as time_since_last_message,
IF(s.LAST_QUEUED_TRANSACTION=''
OR s.LAST_QUEUED_TRANSACTION=latest_w.LAST_APPLIED_TRANSACTION,
'IDLE', 'APPLYING') as applier_busy_state,
IF(co.SERVICE_STATE = 'OFF'
OR s.SERVICE_STATE = 'OFF', 'null',
IF(
GTID_SUBTRACT(s.LAST_QUEUED_TRANSACTION,
w.LAST_APPLIED_TRANSACTION) = ''
OR
UNIX_TIMESTAMP(w.APPLYING_TRANSACTION_ORIGINAL_COMMIT_TIMESTAMP) = 0,
'none',
TIMEDIFF(
NOW(6),w.APPLYING_TRANSACTION_ORIGINAL_COMMIT_TIMESTAMP
)
)
) AS lag_from_original,
IF(co.SERVICE_STATE = 'OFF'
OR s.SERVICE_STATE = 'OFF', 'null',
IF(
GTID_SUBTRACT(s.LAST_QUEUED_TRANSACTION,
w.LAST_APPLIED_TRANSACTION) = ''
OR
UNIX_TIMESTAMP(w.APPLYING_TRANSACTION_IMMEDIATE_COMMIT_TIMESTAMP) = 0,
'none',
TIMEDIFF(
NOW(6),w.APPLYING_TRANSACTION_IMMEDIATE_COMMIT_TIMESTAMP
)
)
) AS lag_from_immediate,
((unix_timestamp(LAST_QUEUED_TRANSACTION_START_QUEUE_TIMESTAMP) -
unix_timestamp(LAST_QUEUED_TRANSACTION_ORIGINAL_COMMIT_TIMESTAMP)) * 100000000000) transport_time,
((unix_timestamp(LAST_QUEUED_TRANSACTION_END_QUEUE_TIMESTAMP) -
unix_timestamp(LAST_QUEUED_TRANSACTION_START_QUEUE_TIMESTAMP)) * 1000000000000) time_to_relay_log,
((unix_timestamp(w.LAST_APPLIED_TRANSACTION_END_APPLY_TIMESTAMP) -
unix_timestamp(w.LAST_APPLIED_TRANSACTION_START_APPLY_TIMESTAMP)) * 1000000000000) apply_time,
w.LAST_APPLIED_TRANSACTION AS last_applied_transaction,
s.LAST_QUEUED_TRANSACTION AS last_queued_transaction,
GTID_SUBTRACT(s.RECEIVED_TRANSACTION_SET, (select variable_value from performance_schema.global_variables where variable_name='gtid_executed')) as queued_gtid_set_to_apply
FROM performance_schema.replication_connection_configuration c
JOIN performance_schema.replication_connection_status s
ON c.channel_name = s.channel_name
LEFT JOIN performance_schema.replication_applier_status_by_coordinator co
ON c.channel_name = co.channel_name
JOIN performance_schema.replication_applier_status a
ON c.channel_name = a.channel_name
JOIN performance_schema.replication_applier_status_by_worker w
ON c.channel_name = w.channel_name
LEFT JOIN (
SELECT * FROM performance_schema.replication_applier_status_by_worker LIMIT 1
) latest_w
ON c.channel_name = latest_w.channel_name
LEFT JOIN performance_schema.threads st
ON s.thread_id = st.thread_id
LEFT JOIN performance_schema.threads cot
ON co.thread_id = cot.thread_id
LEFT JOIN performance_schema.threads wt
ON w.thread_id = wt.thread_id;
DROP VIEW IF EXISTS replication_status;
CREATE
ALGORITHM = MERGE
SQL SECURITY INVOKER
VIEW replication_status
AS
SELECT
concat(s.channel_name, ' (', w.worker_id,')') AS channel,
s.service_state io_state,
co.service_state co_state,
w.service_state w_state,
IF(co.SERVICE_STATE = 'OFF'
OR s.SERVICE_STATE = 'OFF', 'null',
IF(
GTID_SUBTRACT(s.LAST_QUEUED_TRANSACTION,
w.LAST_APPLIED_TRANSACTION) = ''
OR
UNIX_TIMESTAMP(w.APPLYING_TRANSACTION_ORIGINAL_COMMIT_TIMESTAMP) = 0,
'none',
TIMEDIFF(
NOW(6),w.APPLYING_TRANSACTION_ORIGINAL_COMMIT_TIMESTAMP
)
)
) AS lag_from_original,
IF(co.SERVICE_STATE = 'OFF'
OR s.SERVICE_STATE = 'OFF', 'null',
IF(
GTID_SUBTRACT(s.LAST_QUEUED_TRANSACTION,
w.LAST_APPLIED_TRANSACTION) = ''
OR
UNIX_TIMESTAMP(w.APPLYING_TRANSACTION_IMMEDIATE_COMMIT_TIMESTAMP) = 0,
'none',
TIMEDIFF(
NOW(6),w.APPLYING_TRANSACTION_IMMEDIATE_COMMIT_TIMESTAMP
)
)
) AS lag_from_immediate
FROM performance_schema.replication_connection_configuration c
JOIN performance_schema.replication_connection_status s
ON c.channel_name = s.channel_name
LEFT JOIN performance_schema.replication_applier_status_by_coordinator co
ON c.channel_name = co.channel_name
JOIN performance_schema.replication_applier_status a
ON c.channel_name = a.channel_name
JOIN performance_schema.replication_applier_status_by_worker w
ON c.channel_name = w.channel_name
LEFT JOIN (
SELECT * FROM performance_schema.replication_applier_status_by_worker LIMIT 1
) latest_w
ON c.channel_name = latest_w.channel_name
LEFT JOIN performance_schema.threads st
ON s.thread_id = st.thread_id
LEFT JOIN performance_schema.threads cot
ON co.thread_id = cot.thread_id
LEFT JOIN performance_schema.threads wt
ON w.thread_id = wt.thread_id;
DROP VIEW IF EXISTS replication_lag;
CREATE
ALGORITHM = MERGE
SQL SECURITY INVOKER
VIEW replication_lag
AS
SELECT
s.channel_name,
max(
IF(co.SERVICE_STATE = 'OFF'
OR s.SERVICE_STATE = 'OFF', 'null',
IF(
GTID_SUBTRACT(s.LAST_QUEUED_TRANSACTION,
w.LAST_APPLIED_TRANSACTION) = ''
OR
UNIX_TIMESTAMP(w.APPLYING_TRANSACTION_ORIGINAL_COMMIT_TIMESTAMP) = 0,
0,
TIMEDIFF(
NOW(6),w.APPLYING_TRANSACTION_ORIGINAL_COMMIT_TIMESTAMP
)
)
)) AS max_lag_from_original,
max(IF(co.SERVICE_STATE = 'OFF'
OR s.SERVICE_STATE = 'OFF', 'null',
IF(
GTID_SUBTRACT(s.LAST_QUEUED_TRANSACTION,
w.LAST_APPLIED_TRANSACTION) = ''
OR
UNIX_TIMESTAMP(w.APPLYING_TRANSACTION_IMMEDIATE_COMMIT_TIMESTAMP) = 0,
0,
TIMEDIFF(
NOW(6),w.APPLYING_TRANSACTION_IMMEDIATE_COMMIT_TIMESTAMP
)
)
)) AS max_lag_from_immediate
FROM performance_schema.replication_connection_configuration c
JOIN performance_schema.replication_connection_status s
ON c.channel_name = s.channel_name
LEFT JOIN performance_schema.replication_applier_status_by_coordinator co
ON c.channel_name = co.channel_name
JOIN performance_schema.replication_applier_status_by_worker w
ON c.channel_name = w.channel_name
GROUP BY 1 order by 2 desc, 3 desc;
DROP VIEW IF EXISTS replication_lag_human;
CREATE
ALGORITHM = MERGE
SQL SECURITY INVOKER
VIEW replication_lag_human
AS
SELECT channel_name, format_pico_time(MAX(seconds_behind_source)*1000000) lag_behind_source FROM (
SELECT
CHANNEL_NAME,
MAX(TIMESTAMPDIFF(MICROSECOND, APPLYING_TRANSACTION_IMMEDIATE_COMMIT_TIMESTAMP, NOW()))
AS seconds_behind_source
FROM performance_schema.replication_applier_status_by_worker GROUP BY CHANNEL_NAME
UNION
SELECT w.CHANNEL_NAME, MIN(if(GTID_SUBTRACT(LAST_QUEUED_TRANSACTION, LAST_APPLIED_TRANSACTION) = '', 0,
TIMESTAMPDIFF(MICROSECOND, LAST_APPLIED_TRANSACTION_IMMEDIATE_COMMIT_TIMESTAMP, NOW())))
AS seconds_behind_source
FROM performance_schema.replication_applier_status_by_worker AS w
JOIN performance_schema.replication_connection_status GROUP BY w.CHANNEL_NAME
) a GROUP BY CHANNEL_NAME;
@4evgenyg
Copy link

Thank you very much.
Very helpful code.
I found one edge case, may it help to someone with weird setup (like mine) so commenting there.
If you have replication channel that defined from non GTID enabled source then in performance_schema.replication_connection_status the LAST_QUEUED_TRANSACTION will be ANONYMOUS and therefore the code would fail on GTID_SUBTRACT.

The solution is easy though, add where clause
to replication_lag_human, replication_lag, replication_status, x$replication_status_full :
LAST_QUEUED_TRANSACTION <> 'ANONYMOUS'
to replication_status_full
LAST_QUEUED_TRANSACTION <> 'ANONYMOUS' AND s.CHANNEL_NAME like 'group_replication%';

Best Regards
Evgeni

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment