Skip to content

Commit 7525c2a

Browse files
committed
feat: Add wal_status to replication_slot
Signed-off-by: MarcWort <113890636+MarcWort@users.noreply.github.com>
1 parent b81d26c commit 7525c2a

File tree

2 files changed

+31
-10
lines changed

2 files changed

+31
-10
lines changed

collector/pg_replication_slot.go

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,15 @@ var (
7272
"number of bytes that can be written to WAL such that this slot is not in danger of getting in state lost",
7373
[]string{"slot_name", "slot_type"}, nil,
7474
)
75+
pgReplicationSlotWalStatus = prometheus.NewDesc(
76+
prometheus.BuildFQName(
77+
namespace,
78+
replicationSlotSubsystem,
79+
"wal_status",
80+
),
81+
"availability of WAL files claimed by this slot",
82+
[]string{"slot_name", "slot_type", "wal_status"}, nil,
83+
)
7584

7685
pgReplicationSlotQuery = `SELECT
7786
slot_name,
@@ -83,7 +92,8 @@ var (
8392
END AS current_wal_lsn,
8493
COALESCE(confirmed_flush_lsn, '0/0') - '0/0' AS confirmed_flush_lsn,
8594
active,
86-
safe_wal_size
95+
safe_wal_size,
96+
wal_status
8797
FROM pg_replication_slots;`
8898
)
8999

@@ -103,7 +113,8 @@ func (PGReplicationSlotCollector) Update(ctx context.Context, instance *instance
103113
var flushLSN sql.NullFloat64
104114
var isActive sql.NullBool
105115
var safeWalSize sql.NullInt64
106-
if err := rows.Scan(&slotName, &slotType, &walLSN, &flushLSN, &isActive, &safeWalSize); err != nil {
116+
var walStatus sql.NullString
117+
if err := rows.Scan(&slotName, &slotType, &walLSN, &flushLSN, &isActive, &safeWalSize, &walStatus); err != nil {
107118
return err
108119
}
109120

@@ -149,6 +160,13 @@ func (PGReplicationSlotCollector) Update(ctx context.Context, instance *instance
149160
prometheus.GaugeValue, float64(safeWalSize.Int64), slotNameLabel, slotTypeLabel,
150161
)
151162
}
163+
164+
if walStatus.Valid {
165+
ch <- prometheus.MustNewConstMetric(
166+
pgReplicationSlotWalStatus,
167+
prometheus.GaugeValue, 1, slotNameLabel, slotTypeLabel, walStatus.String,
168+
)
169+
}
152170
}
153171
return rows.Err()
154172
}

collector/pg_replication_slot_test.go

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,9 @@ func TestPgReplicationSlotCollectorActive(t *testing.T) {
3131

3232
inst := &instance{db: db}
3333

34-
columns := []string{"slot_name", "slot_type", "current_wal_lsn", "confirmed_flush_lsn", "active", "safe_wal_size"}
34+
columns := []string{"slot_name", "slot_type", "current_wal_lsn", "confirmed_flush_lsn", "active", "safe_wal_size", "wal_status"}
3535
rows := sqlmock.NewRows(columns).
36-
AddRow("test_slot", "physical", 5, 3, true, 323906992)
36+
AddRow("test_slot", "physical", 5, 3, true, 323906992, "reserved")
3737
mock.ExpectQuery(sanitizeQuery(pgReplicationSlotQuery)).WillReturnRows(rows)
3838

3939
ch := make(chan prometheus.Metric)
@@ -51,6 +51,7 @@ func TestPgReplicationSlotCollectorActive(t *testing.T) {
5151
{labels: labelMap{"slot_name": "test_slot", "slot_type": "physical"}, value: 3, metricType: dto.MetricType_GAUGE},
5252
{labels: labelMap{"slot_name": "test_slot", "slot_type": "physical"}, value: 1, metricType: dto.MetricType_GAUGE},
5353
{labels: labelMap{"slot_name": "test_slot", "slot_type": "physical"}, value: 323906992, metricType: dto.MetricType_GAUGE},
54+
{labels: labelMap{"slot_name": "test_slot", "slot_type": "physical", "wal_status": "reserved"}, value: 1, metricType: dto.MetricType_GAUGE},
5455
}
5556

5657
convey.Convey("Metrics comparison", t, func() {
@@ -73,9 +74,9 @@ func TestPgReplicationSlotCollectorInActive(t *testing.T) {
7374

7475
inst := &instance{db: db}
7576

76-
columns := []string{"slot_name", "slot_type", "current_wal_lsn", "confirmed_flush_lsn", "active", "safe_wal_size"}
77+
columns := []string{"slot_name", "slot_type", "current_wal_lsn", "confirmed_flush_lsn", "active", "safe_wal_size", "wal_status"}
7778
rows := sqlmock.NewRows(columns).
78-
AddRow("test_slot", "physical", 6, 12, false, -4000)
79+
AddRow("test_slot", "physical", 6, 12, false, -4000, "extended")
7980
mock.ExpectQuery(sanitizeQuery(pgReplicationSlotQuery)).WillReturnRows(rows)
8081

8182
ch := make(chan prometheus.Metric)
@@ -92,6 +93,7 @@ func TestPgReplicationSlotCollectorInActive(t *testing.T) {
9293
{labels: labelMap{"slot_name": "test_slot", "slot_type": "physical"}, value: 6, metricType: dto.MetricType_GAUGE},
9394
{labels: labelMap{"slot_name": "test_slot", "slot_type": "physical"}, value: 0, metricType: dto.MetricType_GAUGE},
9495
{labels: labelMap{"slot_name": "test_slot", "slot_type": "physical"}, value: -4000, metricType: dto.MetricType_GAUGE},
96+
{labels: labelMap{"slot_name": "test_slot", "slot_type": "physical", "wal_status": "extended"}, value: 1, metricType: dto.MetricType_GAUGE},
9597
}
9698

9799
convey.Convey("Metrics comparison", t, func() {
@@ -115,9 +117,9 @@ func TestPgReplicationSlotCollectorActiveNil(t *testing.T) {
115117

116118
inst := &instance{db: db}
117119

118-
columns := []string{"slot_name", "slot_type", "current_wal_lsn", "confirmed_flush_lsn", "active", "safe_wal_size"}
120+
columns := []string{"slot_name", "slot_type", "current_wal_lsn", "confirmed_flush_lsn", "active", "safe_wal_size", "wal_status"}
119121
rows := sqlmock.NewRows(columns).
120-
AddRow("test_slot", "physical", 6, 12, nil, nil)
122+
AddRow("test_slot", "physical", 6, 12, nil, nil, "lost")
121123
mock.ExpectQuery(sanitizeQuery(pgReplicationSlotQuery)).WillReturnRows(rows)
122124

123125
ch := make(chan prometheus.Metric)
@@ -133,6 +135,7 @@ func TestPgReplicationSlotCollectorActiveNil(t *testing.T) {
133135
expected := []MetricResult{
134136
{labels: labelMap{"slot_name": "test_slot", "slot_type": "physical"}, value: 6, metricType: dto.MetricType_GAUGE},
135137
{labels: labelMap{"slot_name": "test_slot", "slot_type": "physical"}, value: 0, metricType: dto.MetricType_GAUGE},
138+
{labels: labelMap{"slot_name": "test_slot", "slot_type": "physical", "wal_status": "lost"}, value: 1, metricType: dto.MetricType_GAUGE},
136139
}
137140

138141
convey.Convey("Metrics comparison", t, func() {
@@ -155,9 +158,9 @@ func TestPgReplicationSlotCollectorTestNilValues(t *testing.T) {
155158

156159
inst := &instance{db: db}
157160

158-
columns := []string{"slot_name", "slot_type", "current_wal_lsn", "confirmed_flush_lsn", "active", "safe_wal_size"}
161+
columns := []string{"slot_name", "slot_type", "current_wal_lsn", "confirmed_flush_lsn", "active", "safe_wal_size", "wal_status"}
159162
rows := sqlmock.NewRows(columns).
160-
AddRow(nil, nil, nil, nil, true, nil)
163+
AddRow(nil, nil, nil, nil, true, nil, nil)
161164
mock.ExpectQuery(sanitizeQuery(pgReplicationSlotQuery)).WillReturnRows(rows)
162165

163166
ch := make(chan prometheus.Metric)

0 commit comments

Comments
 (0)