| 
 | 1 | +// Copyright 2023 The Prometheus Authors  | 
 | 2 | +// Licensed under the Apache License, Version 2.0 (the "License");  | 
 | 3 | +// you may not use this file except in compliance with the License.  | 
 | 4 | +// You may obtain a copy of the License at  | 
 | 5 | +//  | 
 | 6 | +// http://www.apache.org/licenses/LICENSE-2.0  | 
 | 7 | +//  | 
 | 8 | +// Unless required by applicable law or agreed to in writing, software  | 
 | 9 | +// distributed under the License is distributed on an "AS IS" BASIS,  | 
 | 10 | +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  | 
 | 11 | +// See the License for the specific language governing permissions and  | 
 | 12 | +// limitations under the License.  | 
 | 13 | + | 
 | 14 | +package collector  | 
 | 15 | + | 
 | 16 | +import (  | 
 | 17 | +"context"  | 
 | 18 | +"database/sql"  | 
 | 19 | + | 
 | 20 | +"github.com/go-kit/log"  | 
 | 21 | +"github.com/prometheus/client_golang/prometheus"  | 
 | 22 | +)  | 
 | 23 | + | 
 | 24 | +func init() {  | 
 | 25 | +registerCollector("replication_slot", defaultEnabled, NewPGReplicationSlotCollector)  | 
 | 26 | +}  | 
 | 27 | + | 
 | 28 | +type PGReplicationSlotCollector struct {  | 
 | 29 | +log log.Logger  | 
 | 30 | +}  | 
 | 31 | + | 
 | 32 | +func NewPGReplicationSlotCollector(logger log.Logger) (Collector, error) {  | 
 | 33 | +return &PGReplicationSlotCollector{log: logger}, nil  | 
 | 34 | +}  | 
 | 35 | + | 
 | 36 | +var pgReplicationSlot = map[string]*prometheus.Desc{  | 
 | 37 | +"current_wal_lsn": prometheus.NewDesc(  | 
 | 38 | +"pg_replication_slot_current_wal_lsn",  | 
 | 39 | +"current wal lsn value",  | 
 | 40 | +[]string{"slot_name"}, nil,  | 
 | 41 | +),  | 
 | 42 | +"confirmed_flush_lsn": prometheus.NewDesc(  | 
 | 43 | +"pg_replication_slot_confirmed_flush_lsn",  | 
 | 44 | +"last lsn confirmed flushed to the replication slot",  | 
 | 45 | +[]string{"slot_name"}, nil,  | 
 | 46 | +),  | 
 | 47 | +"is_active": prometheus.NewDesc(  | 
 | 48 | +"pg_replication_slot_is_active",  | 
 | 49 | +"last lsn confirmed flushed to the replication slot",  | 
 | 50 | +[]string{"slot_name"}, nil,  | 
 | 51 | +),  | 
 | 52 | +}  | 
 | 53 | + | 
 | 54 | +func (PGReplicationSlotCollector) Update(ctx context.Context, db *sql.DB, ch chan<- prometheus.Metric) error {  | 
 | 55 | +rows, err := db.QueryContext(ctx,  | 
 | 56 | +`SELECT  | 
 | 57 | +slot_name,  | 
 | 58 | +pg_current_wal_lsn() - '0/0' AS current_wal_lsn,  | 
 | 59 | +coalesce(confirmed_flush_lsn, '0/0') - '0/0',  | 
 | 60 | +active  | 
 | 61 | +FROM  | 
 | 62 | +pg_replication_slots;`)  | 
 | 63 | +if err != nil {  | 
 | 64 | +return err  | 
 | 65 | +}  | 
 | 66 | +defer rows.Close()  | 
 | 67 | + | 
 | 68 | +for rows.Next() {  | 
 | 69 | +var slot_name string  | 
 | 70 | +var wal_lsn int64  | 
 | 71 | +var flush_lsn int64  | 
 | 72 | +var is_active int64  | 
 | 73 | +if err := rows.Scan(&slot_name, &wal_lsn, &flush_lsn, &is_active); err != nil {  | 
 | 74 | +return err  | 
 | 75 | +}  | 
 | 76 | + | 
 | 77 | +ch <- prometheus.MustNewConstMetric(  | 
 | 78 | +pgReplicationSlot["current_wal_lsn"],  | 
 | 79 | +prometheus.GaugeValue, float64(wal_lsn), slot_name,  | 
 | 80 | +)  | 
 | 81 | +if is_active == 1 {  | 
 | 82 | +ch <- prometheus.MustNewConstMetric(  | 
 | 83 | +pgReplicationSlot["confirmed_flush_lsn"],  | 
 | 84 | +prometheus.GaugeValue, float64(flush_lsn), slot_name,  | 
 | 85 | +)  | 
 | 86 | +}  | 
 | 87 | +ch <- prometheus.MustNewConstMetric(  | 
 | 88 | +pgReplicationSlot["is_active"],  | 
 | 89 | +prometheus.GaugeValue, float64(flush_lsn), slot_name,  | 
 | 90 | +)  | 
 | 91 | +}  | 
 | 92 | +if err := rows.Err(); err != nil {  | 
 | 93 | +return err  | 
 | 94 | +}  | 
 | 95 | +return nil  | 
 | 96 | +}  | 
0 commit comments