Skip to content
18 changes: 13 additions & 5 deletions collector/pg_database.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package collector

import (
"context"
"database/sql"

"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -79,32 +80,39 @@ func (c PGDatabaseCollector) Update(ctx context.Context, instance *instance, ch
var databases []string

for rows.Next() {
var datname string
var datname sql.NullString
if err := rows.Scan(&datname); err != nil {
return err
}

if !datname.Valid {
continue
}
// Ignore excluded databases
// Filtering is done here instead of in the query to avoid
// a complicated NOT IN query with a variable number of parameters
if sliceContains(c.excludedDatabases, datname) {
if sliceContains(c.excludedDatabases, datname.String) {
continue
}

databases = append(databases, datname)
databases = append(databases, datname.String)
}

// Query the size of the databases
for _, datname := range databases {
var size int64
var size sql.NullFloat64
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also don't see any documentation that says that this field can be NULL>

err = db.QueryRowContext(ctx, pgDatabaseSizeQuery, datname).Scan(&size)
if err != nil {
return err
}

sizeMetric := 0.0
if size.Valid {
sizeMetric = size.Float64
}
ch <- prometheus.MustNewConstMetric(
pgDatabaseSizeDesc,
prometheus.GaugeValue, float64(size), datname,
prometheus.GaugeValue, sizeMetric, datname,
)
}
if err := rows.Err(); err != nil {
Expand Down
40 changes: 40 additions & 0 deletions collector/pg_database_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,43 @@ func TestPGDatabaseCollector(t *testing.T) {
t.Errorf("there were unfulfilled exceptions: %s", err)
}
}

// TODO add a null db test

func TestPGDatabaseCollectorNullMetric(t *testing.T) {
db, mock, err := sqlmock.New()
if err != nil {
t.Fatalf("Error opening a stub db connection: %s", err)
}
defer db.Close()

inst := &instance{db: db}

mock.ExpectQuery(sanitizeQuery(pgDatabaseQuery)).WillReturnRows(sqlmock.NewRows([]string{"datname"}).
AddRow("postgres"))

mock.ExpectQuery(sanitizeQuery(pgDatabaseSizeQuery)).WithArgs("postgres").WillReturnRows(sqlmock.NewRows([]string{"pg_database_size"}).
AddRow(nil))

ch := make(chan prometheus.Metric)
go func() {
defer close(ch)
c := PGDatabaseCollector{}
if err := c.Update(context.Background(), inst, ch); err != nil {
t.Errorf("Error calling PGDatabaseCollector.Update: %s", err)
}
}()

expected := []MetricResult{
{labels: labelMap{"datname": "postgres"}, value: 0, metricType: dto.MetricType_GAUGE},
}
convey.Convey("Metrics comparison", t, func() {
for _, expect := range expected {
m := readMetric(<-ch)
convey.So(expect, convey.ShouldResemble, m)
}
})
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("there were unfulfilled exceptions: %s", err)
}
}
9 changes: 7 additions & 2 deletions collector/pg_postmaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package collector

import (
"context"
"database/sql"

"github.com/prometheus/client_golang/prometheus"
)
Expand Down Expand Up @@ -51,14 +52,18 @@ func (c *PGPostmasterCollector) Update(ctx context.Context, instance *instance,
row := db.QueryRowContext(ctx,
pgPostmasterQuery)

var startTimeSeconds float64
var startTimeSeconds sql.NullFloat64
err := row.Scan(&startTimeSeconds)
if err != nil {
return err
}
startTimeSecondsMetric := 0.0
if startTimeSeconds.Valid {
startTimeSecondsMetric = startTimeSeconds.Float64
}
ch <- prometheus.MustNewConstMetric(
pgPostMasterStartTimeSeconds,
prometheus.GaugeValue, startTimeSeconds,
prometheus.GaugeValue, startTimeSecondsMetric,
)
return nil
}
36 changes: 36 additions & 0 deletions collector/pg_postmaster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,39 @@ func TestPgPostmasterCollector(t *testing.T) {
t.Errorf("there were unfulfilled exceptions: %s", err)
}
}

func TestPgPostmasterCollectorNullTime(t *testing.T) {
db, mock, err := sqlmock.New()
if err != nil {
t.Fatalf("Error opening a stub db connection: %s", err)
}
defer db.Close()

inst := &instance{db: db}

mock.ExpectQuery(sanitizeQuery(pgPostmasterQuery)).WillReturnRows(sqlmock.NewRows([]string{"pg_postmaster_start_time"}).
AddRow(nil))

ch := make(chan prometheus.Metric)
go func() {
defer close(ch)
c := PGPostmasterCollector{}

if err := c.Update(context.Background(), inst, ch); err != nil {
t.Errorf("Error calling PGPostmasterCollector.Update: %s", err)
}
}()

expected := []MetricResult{
{labels: labelMap{}, value: 0, metricType: dto.MetricType_GAUGE},
}
convey.Convey("Metrics comparison", t, func() {
for _, expect := range expected {
m := readMetric(<-ch)
convey.So(expect, convey.ShouldResemble, m)
}
})
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("there were unfulfilled exceptions: %s", err)
}
}
36 changes: 26 additions & 10 deletions collector/pg_process_idle.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,23 @@ package collector

import (
"context"
"database/sql"

"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
)

const processIdleSubsystem = "process_idle"

func init() {
registerCollector(processIdleSubsystem, defaultEnabled, NewPGProcessIdleCollector)
// Making this default disabled because we have no tests for it
registerCollector(processIdleSubsystem, defaultDisabled, NewPGProcessIdleCollector)
}

type PGProcessIdleCollector struct {
log log.Logger
}

const processIdleSubsystem = "process_idle"

func NewPGProcessIdleCollector(config collectorConfig) (Collector, error) {
return &PGProcessIdleCollector{log: config.logger}, nil
}
Expand All @@ -41,8 +43,8 @@ var pgProcessIdleSeconds = prometheus.NewDesc(
prometheus.Labels{},
)

func (PGProcessIdleCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error {
db := instance.getDB()
func (PGProcessIdleCollector) Update(ctx context.Context, inst *instance, ch chan<- prometheus.Metric) error {
db := inst.getDB()
row := db.QueryRowContext(ctx,
`WITH
metrics AS (
Expand Down Expand Up @@ -79,9 +81,9 @@ func (PGProcessIdleCollector) Update(ctx context.Context, instance *instance, ch
FROM metrics JOIN buckets USING (application_name)
GROUP BY 1, 2, 3;`)

var applicationName string
var secondsSum int64
var secondsCount uint64
var applicationName sql.NullString
var secondsSum sql.NullInt64
var secondsCount sql.NullInt64
var seconds []int64
var secondsBucket []uint64

Expand All @@ -97,10 +99,24 @@ func (PGProcessIdleCollector) Update(ctx context.Context, instance *instance, ch
if err != nil {
return err
}

applicationNameLabel := "unknown"
if applicationName.Valid {
applicationNameLabel = applicationName.String
}

var secondsCountMetric uint64
if secondsCount.Valid {
secondsCountMetric = uint64(secondsCount.Int64)
}
secondsSumMetric := 0.0
if secondsSum.Valid {
secondsSumMetric = float64(secondsSum.Int64)
}
ch <- prometheus.MustNewConstHistogram(
pgProcessIdleSeconds,
secondsCount, float64(secondsSum), buckets,
applicationName,
secondsCountMetric, secondsSumMetric, buckets,
applicationNameLabel,
)
return nil
}
35 changes: 24 additions & 11 deletions collector/pg_replication_slot.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package collector

import (
"context"
"database/sql"

"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -82,32 +83,44 @@ func (PGReplicationSlotCollector) Update(ctx context.Context, instance *instance
defer rows.Close()

for rows.Next() {
var slotName string
var walLSN int64
var flushLSN int64
var isActive bool
var slotName sql.NullString
var walLSN sql.NullFloat64
var flushLSN sql.NullFloat64
var isActive sql.NullBool
if err := rows.Scan(&slotName, &walLSN, &flushLSN, &isActive); err != nil {
return err
}

isActiveValue := 0
if isActive {
isActiveValue = 1
isActiveValue := 0.0
if isActive.Valid && isActive.Bool {
isActiveValue = 1.0
}
slotNameLabel := "unknown"
if slotName.Valid {
slotNameLabel = slotName.String
}

var walLSNMetric float64
if walLSN.Valid {
walLSNMetric = walLSN.Float64
}
ch <- prometheus.MustNewConstMetric(
pgReplicationSlotCurrentWalDesc,
prometheus.GaugeValue, float64(walLSN), slotName,
prometheus.GaugeValue, walLSNMetric, slotNameLabel,
)
if isActive {
if isActive.Valid && isActive.Bool {
var flushLSNMetric float64
if flushLSN.Valid {
flushLSNMetric = flushLSN.Float64
}
ch <- prometheus.MustNewConstMetric(
pgReplicationSlotCurrentFlushDesc,
prometheus.GaugeValue, float64(flushLSN), slotName,
prometheus.GaugeValue, flushLSNMetric, slotNameLabel,
)
}
ch <- prometheus.MustNewConstMetric(
pgReplicationSlotIsActiveDesc,
prometheus.GaugeValue, float64(isActiveValue), slotName,
prometheus.GaugeValue, isActiveValue, slotNameLabel,
)
}
if err := rows.Err(); err != nil {
Expand Down
81 changes: 81 additions & 0 deletions collector/pg_replication_slot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,3 +103,84 @@ func TestPgReplicationSlotCollectorInActive(t *testing.T) {
}

}

func TestPgReplicationSlotCollectorActiveNil(t *testing.T) {
db, mock, err := sqlmock.New()
if err != nil {
t.Fatalf("Error opening a stub db connection: %s", err)
}
defer db.Close()

inst := &instance{db: db}

columns := []string{"slot_name", "current_wal_lsn", "confirmed_flush_lsn", "active"}
rows := sqlmock.NewRows(columns).
AddRow("test_slot", 6, 12, nil)
mock.ExpectQuery(sanitizeQuery(pgReplicationSlotQuery)).WillReturnRows(rows)

ch := make(chan prometheus.Metric)
go func() {
defer close(ch)
c := PGReplicationSlotCollector{}

if err := c.Update(context.Background(), inst, ch); err != nil {
t.Errorf("Error calling PGReplicationSlotCollector.Update: %s", err)
}
}()

expected := []MetricResult{
{labels: labelMap{"slot_name": "test_slot"}, value: 6, metricType: dto.MetricType_GAUGE},
{labels: labelMap{"slot_name": "test_slot"}, value: 0, metricType: dto.MetricType_GAUGE},
}

convey.Convey("Metrics comparison", t, func() {
for _, expect := range expected {
m := readMetric(<-ch)
convey.So(expect, convey.ShouldResemble, m)
}
})
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("there were unfulfilled exceptions: %s", err)
}
}

func TestPgReplicationSlotCollectorTestNilValues(t *testing.T) {
db, mock, err := sqlmock.New()
if err != nil {
t.Fatalf("Error opening a stub db connection: %s", err)
}
defer db.Close()

inst := &instance{db: db}

columns := []string{"slot_name", "current_wal_lsn", "confirmed_flush_lsn", "active"}
rows := sqlmock.NewRows(columns).
AddRow(nil, nil, nil, true)
mock.ExpectQuery(sanitizeQuery(pgReplicationSlotQuery)).WillReturnRows(rows)

ch := make(chan prometheus.Metric)
go func() {
defer close(ch)
c := PGReplicationSlotCollector{}

if err := c.Update(context.Background(), inst, ch); err != nil {
t.Errorf("Error calling PGReplicationSlotCollector.Update: %s", err)
}
}()

expected := []MetricResult{
{labels: labelMap{"slot_name": "unknown"}, value: 0, metricType: dto.MetricType_GAUGE},
{labels: labelMap{"slot_name": "unknown"}, value: 0, metricType: dto.MetricType_GAUGE},
{labels: labelMap{"slot_name": "unknown"}, value: 1, metricType: dto.MetricType_GAUGE},
}

convey.Convey("Metrics comparison", t, func() {
for _, expect := range expected {
m := readMetric(<-ch)
convey.So(expect, convey.ShouldResemble, m)
}
})
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("there were unfulfilled exceptions: %s", err)
}
}
Loading