Skip to content

Commit c4c90df

Browse files
committed
feat: Add safe_wal_size to replication_slot
1 parent cc0fd2e commit c4c90df

File tree

2 files changed

+30
-10
lines changed

2 files changed

+30
-10
lines changed

collector/pg_replication_slot.go

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,15 @@ var (
6363
"whether the replication slot is active or not",
6464
[]string{"slot_name", "slot_type"}, nil,
6565
)
66+
pgReplicationSlotSafeWal = prometheus.NewDesc(
67+
prometheus.BuildFQName(
68+
namespace,
69+
replicationSlotSubsystem,
70+
"safe_wal_size",
71+
),
72+
"number of bytes that can be written to WAL such that this slot is not in danger of getting in state lost",
73+
[]string{"slot_name", "slot_type"}, nil,
74+
)
6675

6776
pgReplicationSlotQuery = `SELECT
6877
slot_name,
@@ -73,7 +82,8 @@ var (
7382
pg_current_wal_lsn() - '0/0'
7483
END AS current_wal_lsn,
7584
COALESCE(confirmed_flush_lsn, '0/0') - '0/0' AS confirmed_flush_lsn,
76-
active
85+
active,
86+
safe_wal_size
7787
FROM pg_replication_slots;`
7888
)
7989

@@ -92,7 +102,8 @@ func (PGReplicationSlotCollector) Update(ctx context.Context, instance *instance
92102
var walLSN sql.NullFloat64
93103
var flushLSN sql.NullFloat64
94104
var isActive sql.NullBool
95-
if err := rows.Scan(&slotName, &slotType, &walLSN, &flushLSN, &isActive); err != nil {
105+
var safeWalSize sql.NullInt64
106+
if err := rows.Scan(&slotName, &slotType, &walLSN, &flushLSN, &isActive, &safeWalSize); err != nil {
96107
return err
97108
}
98109

@@ -131,6 +142,13 @@ func (PGReplicationSlotCollector) Update(ctx context.Context, instance *instance
131142
pgReplicationSlotIsActiveDesc,
132143
prometheus.GaugeValue, isActiveValue, slotNameLabel, slotTypeLabel,
133144
)
145+
146+
if safeWalSize.Valid {
147+
ch <- prometheus.MustNewConstMetric(
148+
pgReplicationSlotSafeWal,
149+
prometheus.GaugeValue, float64(safeWalSize.Int64), slotNameLabel, slotTypeLabel,
150+
)
151+
}
134152
}
135153
return rows.Err()
136154
}

collector/pg_replication_slot_test.go

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,9 @@ func TestPgReplicationSlotCollectorActive(t *testing.T) {
3131

3232
inst := &instance{db: db}
3333

34-
columns := []string{"slot_name", "slot_type", "current_wal_lsn", "confirmed_flush_lsn", "active"}
34+
columns := []string{"slot_name", "slot_type", "current_wal_lsn", "confirmed_flush_lsn", "active", "safe_wal_size"}
3535
rows := sqlmock.NewRows(columns).
36-
AddRow("test_slot", "physical", 5, 3, true)
36+
AddRow("test_slot", "physical", 5, 3, true, 323906992)
3737
mock.ExpectQuery(sanitizeQuery(pgReplicationSlotQuery)).WillReturnRows(rows)
3838

3939
ch := make(chan prometheus.Metric)
@@ -50,6 +50,7 @@ func TestPgReplicationSlotCollectorActive(t *testing.T) {
5050
{labels: labelMap{"slot_name": "test_slot", "slot_type": "physical"}, value: 5, metricType: dto.MetricType_GAUGE},
5151
{labels: labelMap{"slot_name": "test_slot", "slot_type": "physical"}, value: 3, metricType: dto.MetricType_GAUGE},
5252
{labels: labelMap{"slot_name": "test_slot", "slot_type": "physical"}, value: 1, metricType: dto.MetricType_GAUGE},
53+
{labels: labelMap{"slot_name": "test_slot", "slot_type": "physical"}, value: 323906992, metricType: dto.MetricType_GAUGE},
5354
}
5455

5556
convey.Convey("Metrics comparison", t, func() {
@@ -72,9 +73,9 @@ func TestPgReplicationSlotCollectorInActive(t *testing.T) {
7273

7374
inst := &instance{db: db}
7475

75-
columns := []string{"slot_name", "slot_type", "current_wal_lsn", "confirmed_flush_lsn", "active"}
76+
columns := []string{"slot_name", "slot_type", "current_wal_lsn", "confirmed_flush_lsn", "active", "safe_wal_size"}
7677
rows := sqlmock.NewRows(columns).
77-
AddRow("test_slot", "physical", 6, 12, false)
78+
AddRow("test_slot", "physical", 6, 12, false, -4000)
7879
mock.ExpectQuery(sanitizeQuery(pgReplicationSlotQuery)).WillReturnRows(rows)
7980

8081
ch := make(chan prometheus.Metric)
@@ -90,6 +91,7 @@ func TestPgReplicationSlotCollectorInActive(t *testing.T) {
9091
expected := []MetricResult{
9192
{labels: labelMap{"slot_name": "test_slot", "slot_type": "physical"}, value: 6, metricType: dto.MetricType_GAUGE},
9293
{labels: labelMap{"slot_name": "test_slot", "slot_type": "physical"}, value: 0, metricType: dto.MetricType_GAUGE},
94+
{labels: labelMap{"slot_name": "test_slot", "slot_type": "physical"}, value: -4000, metricType: dto.MetricType_GAUGE},
9395
}
9496

9597
convey.Convey("Metrics comparison", t, func() {
@@ -113,9 +115,9 @@ func TestPgReplicationSlotCollectorActiveNil(t *testing.T) {
113115

114116
inst := &instance{db: db}
115117

116-
columns := []string{"slot_name", "slot_type", "current_wal_lsn", "confirmed_flush_lsn", "active"}
118+
columns := []string{"slot_name", "slot_type", "current_wal_lsn", "confirmed_flush_lsn", "active", "safe_wal_size"}
117119
rows := sqlmock.NewRows(columns).
118-
AddRow("test_slot", "physical", 6, 12, nil)
120+
AddRow("test_slot", "physical", 6, 12, nil, nil)
119121
mock.ExpectQuery(sanitizeQuery(pgReplicationSlotQuery)).WillReturnRows(rows)
120122

121123
ch := make(chan prometheus.Metric)
@@ -153,9 +155,9 @@ func TestPgReplicationSlotCollectorTestNilValues(t *testing.T) {
153155

154156
inst := &instance{db: db}
155157

156-
columns := []string{"slot_name", "slot_type", "current_wal_lsn", "confirmed_flush_lsn", "active"}
158+
columns := []string{"slot_name", "slot_type", "current_wal_lsn", "confirmed_flush_lsn", "active", "safe_wal_size"}
157159
rows := sqlmock.NewRows(columns).
158-
AddRow(nil, nil, nil, nil, true)
160+
AddRow(nil, nil, nil, nil, true, nil)
159161
mock.ExpectQuery(sanitizeQuery(pgReplicationSlotQuery)).WillReturnRows(rows)
160162

161163
ch := make(chan prometheus.Metric)

0 commit comments

Comments
 (0)