Skip to content

Commit 97e7ff7

Browse files
authored
PG: Add metrics for pg_stat_replication's sent/write/flush/replay (#21844)
* PG: Add metrics for pg_stat_replication's sent/write/flush/replay pg_stat_replication provides metrics on the last sent/write/flush/replay WAL location by a standby server. sent delay doesn't depend on a feedback message from the standby since it tracks the sent WAL through the connection. This can be used to gauge how fast and how late a standby is when catching up * Make PG checkpoint test less flaky Only run checkpoint and check metrics on the primary. This will remove the possible uncertainty of having the Checkpoint record being correctly propagated to the standby before standby's checkpoint is triggered. * Reduce test_backend_transaction_age flakiness Add the timestamp to the application name to reduce the risk of leftover query and connection being present when metrics are collected.
1 parent 2188654 commit 97e7ff7

File tree

5 files changed

+36
-24
lines changed

5 files changed

+36
-24
lines changed

postgres/changelog.d/21844.added

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
PG: Add metrics for pg_stat_replication's sent/write/flush/replay

postgres/datadog_checks/postgres/util.py

Lines changed: 25 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -450,19 +450,27 @@ def trim_leading_set_stmts(sql):
450450
'name': 'replication_stats_metrics',
451451
'query': """
452452
SELECT
453-
pg_stat_replication.application_name,
454-
pg_stat_replication.state,
455-
pg_stat_replication.sync_state,
456-
pg_stat_replication.client_addr,
457-
pg_stat_replication_slot.slot_name,
458-
pg_stat_replication_slot.slot_type,
459-
GREATEST (0, age(pg_stat_replication.backend_xmin)) as backend_xmin_age,
460-
GREATEST (0, EXTRACT(epoch from pg_stat_replication.write_lag)) as write_lag,
461-
GREATEST (0, EXTRACT(epoch from pg_stat_replication.flush_lag)) as flush_lag,
462-
GREATEST (0, EXTRACT(epoch from pg_stat_replication.replay_lag)) AS replay_lag
463-
FROM pg_stat_replication as pg_stat_replication
464-
LEFT JOIN pg_replication_slots as pg_stat_replication_slot
465-
ON pg_stat_replication.pid = pg_stat_replication_slot.active_pid;
453+
rep.application_name,
454+
rep.state,
455+
rep.sync_state,
456+
rep.client_addr,
457+
slot.slot_name,
458+
slot.slot_type,
459+
GREATEST (0, age(rep.backend_xmin)) as backend_xmin_age,
460+
pg_wal_lsn_diff(
461+
CASE WHEN pg_is_in_recovery() THEN pg_last_wal_receive_lsn() ELSE pg_current_wal_lsn() END, sent_lsn),
462+
pg_wal_lsn_diff(
463+
CASE WHEN pg_is_in_recovery() THEN pg_last_wal_receive_lsn() ELSE pg_current_wal_lsn() END, write_lsn),
464+
pg_wal_lsn_diff(
465+
CASE WHEN pg_is_in_recovery() THEN pg_last_wal_receive_lsn() ELSE pg_current_wal_lsn() END, flush_lsn),
466+
pg_wal_lsn_diff(
467+
CASE WHEN pg_is_in_recovery() THEN pg_last_wal_receive_lsn() ELSE pg_current_wal_lsn() END, replay_lsn),
468+
GREATEST (0, EXTRACT(epoch from rep.write_lag)) as write_lag,
469+
GREATEST (0, EXTRACT(epoch from rep.flush_lag)) as flush_lag,
470+
GREATEST (0, EXTRACT(epoch from rep.replay_lag)) AS replay_lag
471+
FROM pg_stat_replication as rep
472+
LEFT JOIN pg_replication_slots as slot
473+
ON rep.pid = slot.active_pid;
466474
""".strip(),
467475
'columns': [
468476
{'name': 'wal_app_name', 'type': 'tag'},
@@ -472,6 +480,10 @@ def trim_leading_set_stmts(sql):
472480
{'name': 'slot_name', 'type': 'tag_not_null'},
473481
{'name': 'slot_type', 'type': 'tag_not_null'},
474482
{'name': 'replication.backend_xmin_age', 'type': 'gauge'},
483+
{'name': 'replication.sent_lsn_delay', 'type': 'gauge'},
484+
{'name': 'replication.write_lsn_delay', 'type': 'gauge'},
485+
{'name': 'replication.flush_lsn_delay', 'type': 'gauge'},
486+
{'name': 'replication.replay_lsn_delay', 'type': 'gauge'},
475487
{'name': 'replication.wal_write_lag', 'type': 'gauge'},
476488
{'name': 'replication.wal_flush_lag', 'type': 'gauge'},
477489
{'name': 'replication.wal_replay_lag', 'type': 'gauge'},

postgres/metadata.csv

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,9 +137,13 @@ postgresql.relation.tuples,gauge,,,,"Number of live rows in the table. This is o
137137
postgresql.relation.xmin,gauge,,,,"Transaction ID of the latest relation's modification in pg_class. This metric is tagged with db, schema, table",0,postgres,relation xmin,,
138138
postgresql.relation_size,gauge,,byte,,"The disk space used by the specified table. TOAST data, indexes, free space map and visibility map are not included. This metric is tagged with db, schema, table.",0,postgres,relation size,,
139139
postgresql.replication.backend_xmin_age,gauge,,,,The age of the standby server's xmin horizon (relative to latest stable xid) reported by hot_standby_feedback.,-1,postgres,repl backend xmin,,
140+
postgresql.replication.flush_lsn_delay,gauge,,byte,,The delay in bytes between the current WAL position and the last location flushed by the standby server,-1,postgres,flush delay,,
141+
postgresql.replication.replay_lsn_delay,gauge,,byte,,The delay in bytes between the current WAL position and the last location replayed by the standby server,-1,postgres,replay delay,,
142+
postgresql.replication.sent_lsn_delay,gauge,,byte,,The delay in bytes between the current WAL position and the last location sent by the standby server,-1,postgres,sent delay,,
140143
postgresql.replication.wal_flush_lag,gauge,,second,,Time elapsed between flushing recent WAL locally and receiving notification that this standby server has written and flushed it (but not yet applied it). This can be used to gauge the delay that synchronous_commit level on incurred while committing if this server was configured as a synchronous standby. Only available with postgresql 10 and newer.,-1,postgres,repl flush lag,,
141144
postgresql.replication.wal_replay_lag,gauge,,second,,"Time elapsed between flushing recent WAL locally and receiving notification that this standby server has written, flushed and applied it. This can be used to gauge the delay that synchronous_commit level remote_apply incurred while committing if this server was configured as a synchronous standby. Only available with postgresql 10 and newer.",-1,postgres,repl replay lag,,
142145
postgresql.replication.wal_write_lag,gauge,,second,,Time elapsed between flushing recent WAL locally and receiving notification that this standby server has written it (but not yet flushed it or applied it). This can be used to gauge the delay that synchronous_commit level remote_write incurred while committing if this server was configured as a synchronous standby. Only available with postgresql 10 and newer.,-1,postgres,repl write lag,,
146+
postgresql.replication.write_lsn_delay,gauge,,byte,,The delay in bytes between the current WAL position and the last location written by the standby server,-1,postgres,write delay,,
143147
postgresql.replication_delay,gauge,,second,,The current replication delay in seconds. Only available with postgresql 9.1 and newer,-1,postgres,repl delay,,
144148
postgresql.replication_delay_bytes,gauge,,byte,,The current replication delay in bytes. Only available with postgresql 9.2 and newer,-1,postgres,repl delay bytes,,
145149
postgresql.replication_slot.catalog_xmin_age,gauge,,transaction,,"The age of the oldest transaction affecting the system catalogs that this slot needs the database to retain. VACUUM cannot remove catalog tuples deleted by any later transaction. This metric is tagged with slot_name, slot_type, slot_persistence, slot_state.",-1,postgres,repslot catalog_xmin,,
@@ -236,4 +240,4 @@ postgresql.wal_receiver.last_msg_receipt_age,gauge,,second,,Time since the recep
236240
postgresql.wal_receiver.last_msg_send_age,gauge,,second,,The age of the latest message's send time received from the WAL sender. This metric is tagged with status.,0,postgres,wal receiver send age,,
237241
postgresql.wal_receiver.latest_end_age,gauge,,second,,Time since the reception of the last message from the WAL sender with an WAL location update. This metric is tagged with status.,0,postgres,wal receiver latest end,,
238242
postgresql.wal_receiver.received_timeline,gauge,,,,"Timeline number of last write-ahead log location received and flushed to disk, the initial value of this field being the timeline number of the first log location used when WAL receiver is started. This metric is tagged with status.",0,postgres,wal receiver tli,,
239-
postgresql.wal_size,gauge,,byte,,The sum of all WAL files on disk.,-1,postgres,wal size,,
243+
postgresql.wal_size,gauge,,byte,,The sum of all WAL files on disk.,-1,postgres,wal size,,

postgres/tests/test_pg_integration.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@ def _increase_txid(cur):
119119
else:
120120
query = 'select txid_current();'
121121
cur.execute(query)
122+
assert cur.fetchone() is not None
122123

123124

124125
def test_initialization_tags(integration_check, pg_instance):
@@ -485,7 +486,7 @@ def test_backend_transaction_age(aggregator, integration_check, pg_instance):
485486

486487
check.run()
487488

488-
app = 'test_backend_transaction_age'
489+
app = f'test_backend_transaction_age_{time.time()}'
489490
conn1 = _get_conn(pg_instance, application_name=app)
490491
cur = conn1.cursor()
491492

@@ -498,8 +499,6 @@ def test_backend_transaction_age(aggregator, integration_check, pg_instance):
498499
cur.execute('BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ;')
499500
# Force assignement of a txid and keep the transaction opened
500501
_increase_txid(cur)
501-
# Make sure to fetch the result to make sure we start the timer after the transaction started
502-
cur.fetchall()
503502
start_transaction_time = time.time()
504503

505504
aggregator.reset()

postgres/tests/test_pg_replication.py

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -221,22 +221,18 @@ def test_conflicts_bufferpin(aggregator, integration_check, pg_instance, pg_repl
221221

222222

223223
@requires_over_10
224-
def test_pg_control_replication(aggregator, integration_check, pg_instance, pg_replica_instance):
225-
check = integration_check(pg_replica_instance)
224+
def test_pg_control_replication(aggregator, integration_check, pg_instance):
225+
check = integration_check(pg_instance)
226226
check.run()
227227

228-
dd_agent_tags = _get_expected_tags(check, pg_replica_instance, role='standby')
228+
dd_agent_tags = _get_expected_tags(check, pg_instance, role='master')
229229
aggregator.assert_metric('postgresql.control.timeline_id', count=1, value=1, tags=dd_agent_tags)
230230

231231
# Also checkpoint on primary to generate changes
232232
master_conn = _get_superconn(pg_instance)
233233
with master_conn.cursor() as cur:
234234
cur.execute("CHECKPOINT;")
235235

236-
postgres_conn = _get_superconn(pg_replica_instance)
237-
with postgres_conn.cursor() as cur:
238-
cur.execute("CHECKPOINT;")
239-
240236
aggregator.reset()
241237
check.run()
242238
# checkpoint should be less than 2s old

0 commit comments

Comments
 (0)