Skip to content

Commit aee4ffd

Browse files
feat: scrape timeout
Signed-off-by: Johannes Würbach <johannes.wuerbach@googlemail.com>
1 parent 57719ba commit aee4ffd

File tree

8 files changed

+72
-46
lines changed

8 files changed

+72
-46
lines changed

cmd/postgres_exporter/datasource.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
package main
1515

1616
import (
17+
"context"
1718
"fmt"
1819
"io/ioutil"
1920
"net/url"
@@ -25,7 +26,7 @@ import (
2526
"github.com/prometheus/client_golang/prometheus"
2627
)
2728

28-
func (e *Exporter) discoverDatabaseDSNs() []string {
29+
func (e *Exporter) discoverDatabaseDSNs(ctx context.Context) []string {
2930
// connstring syntax is complex (and not sure if even regular).
3031
// we don't need to parse it, so just superficially validate that it starts
3132
// with a valid-ish keyword pair
@@ -50,7 +51,7 @@ func (e *Exporter) discoverDatabaseDSNs() []string {
5051
continue
5152
}
5253

53-
server, err := e.servers.GetServer(dsn)
54+
server, err := e.servers.GetServer(ctx, dsn)
5455
if err != nil {
5556
level.Error(logger).Log("msg", "Error opening connection to database", "dsn", loggableDSN(dsn), "err", err)
5657
continue
@@ -60,7 +61,7 @@ func (e *Exporter) discoverDatabaseDSNs() []string {
6061
// If autoDiscoverDatabases is true, set first dsn as master database (Default: false)
6162
server.master = true
6263

63-
databaseNames, err := queryDatabases(server)
64+
databaseNames, err := queryDatabases(ctx, server)
6465
if err != nil {
6566
level.Error(logger).Log("msg", "Error querying databases", "dsn", loggableDSN(dsn), "err", err)
6667
continue
@@ -96,8 +97,8 @@ func (e *Exporter) discoverDatabaseDSNs() []string {
9697
return result
9798
}
9899

99-
func (e *Exporter) scrapeDSN(ch chan<- prometheus.Metric, dsn string) error {
100-
server, err := e.servers.GetServer(dsn)
100+
func (e *Exporter) scrapeDSN(ctx context.Context, ch chan<- prometheus.Metric, dsn string) error {
101+
server, err := e.servers.GetServer(ctx, dsn)
101102

102103
if err != nil {
103104
return &ErrorConnectToServer{fmt.Sprintf("Error opening connection to database (%s): %s", loggableDSN(dsn), err.Error())}
@@ -109,11 +110,11 @@ func (e *Exporter) scrapeDSN(ch chan<- prometheus.Metric, dsn string) error {
109110
}
110111

111112
// Check if map versions need to be updated
112-
if err := e.checkMapVersions(ch, server); err != nil {
113+
if err := e.checkMapVersions(ctx, ch, server); err != nil {
113114
level.Warn(logger).Log("msg", "Proceeding with outdated query maps, as the Postgres version could not be determined", "err", err)
114115
}
115116

116-
return server.Scrape(ch, e.disableSettingsMetrics)
117+
return server.Scrape(ctx, ch, e.disableSettingsMetrics)
117118
}
118119

119120
// try to get the DataSource

cmd/postgres_exporter/main.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ var (
4242
excludeDatabases = kingpin.Flag("exclude-databases", "A list of databases to remove when autoDiscoverDatabases is enabled").Default("").Envar("PG_EXPORTER_EXCLUDE_DATABASES").String()
4343
includeDatabases = kingpin.Flag("include-databases", "A list of databases to include when autoDiscoverDatabases is enabled").Default("").Envar("PG_EXPORTER_INCLUDE_DATABASES").String()
4444
metricPrefix = kingpin.Flag("metric-prefix", "A metric prefix can be used to have non-default (not \"pg\") prefixes for each of the metrics").Default("pg").Envar("PG_EXPORTER_METRIC_PREFIX").String()
45+
scrapeTimeout = kingpin.Flag("scrape-timeout", "Maximum duration of a scrape").Default("60s").Envar("PG_EXPORTER_SCRAPE_TIMEOUT").Duration()
4546
logger = log.NewNopLogger()
4647
)
4748

@@ -105,7 +106,7 @@ func main() {
105106
IncludeDatabases(*includeDatabases),
106107
}
107108

108-
exporter := NewExporter(dsn, opts...)
109+
exporter := NewExporter(dsn, scrapeTimeout, opts...)
109110
defer func() {
110111
exporter.servers.Close()
111112
}()

cmd/postgres_exporter/namespace.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
package main
1515

1616
import (
17+
"context"
1718
"database/sql"
1819
"errors"
1920
"fmt"
@@ -27,7 +28,7 @@ import (
2728

2829
// Query within a namespace mapping and emit metrics. Returns fatal errors if
2930
// the scrape fails, and a slice of errors if they were non-fatal.
30-
func queryNamespaceMapping(server *Server, namespace string, mapping MetricMapNamespace) ([]prometheus.Metric, []error, error) {
31+
func queryNamespaceMapping(ctx context.Context, server *Server, namespace string, mapping MetricMapNamespace) ([]prometheus.Metric, []error, error) {
3132
// Check for a query override for this namespace
3233
query, found := server.queryOverrides[namespace]
3334

@@ -45,9 +46,9 @@ func queryNamespaceMapping(server *Server, namespace string, mapping MetricMapNa
4546
if !found {
4647
// I've no idea how to avoid this properly at the moment, but this is
4748
// an admin tool so you're not injecting SQL right?
48-
rows, err = server.db.Query(fmt.Sprintf("SELECT * FROM %s;", namespace)) // nolint: gas
49+
rows, err = server.db.QueryContext(ctx, fmt.Sprintf("SELECT * FROM %s;", namespace)) // nolint: gas
4950
} else {
50-
rows, err = server.db.Query(query)
51+
rows, err = server.db.QueryContext(ctx, query)
5152
}
5253
if err != nil {
5354
return []prometheus.Metric{}, []error{}, fmt.Errorf("Error running query on database %q: %s %v", server, namespace, err)
@@ -183,7 +184,7 @@ func queryNamespaceMapping(server *Server, namespace string, mapping MetricMapNa
183184

184185
// Iterate through all the namespace mappings in the exporter and run their
185186
// queries.
186-
func queryNamespaceMappings(ch chan<- prometheus.Metric, server *Server) map[string]error {
187+
func queryNamespaceMappings(ctx context.Context, ch chan<- prometheus.Metric, server *Server) map[string]error {
187188
// Return a map of namespace -> errors
188189
namespaceErrors := make(map[string]error)
189190

@@ -225,7 +226,7 @@ func queryNamespaceMappings(ch chan<- prometheus.Metric, server *Server) map[str
225226
var nonFatalErrors []error
226227
var err error
227228
if scrapeMetric {
228-
metrics, nonFatalErrors, err = queryNamespaceMapping(server, namespace, mapping)
229+
metrics, nonFatalErrors, err = queryNamespaceMapping(ctx, server, namespace, mapping)
229230
} else {
230231
metrics = cachedMetric.metrics
231232
}

cmd/postgres_exporter/pg_setting.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
package main
1515

1616
import (
17+
"context"
1718
"fmt"
1819
"math"
1920
"strconv"
@@ -24,7 +25,7 @@ import (
2425
)
2526

2627
// Query the pg_settings view containing runtime variables
27-
func querySettings(ch chan<- prometheus.Metric, server *Server) error {
28+
func querySettings(ctx context.Context, ch chan<- prometheus.Metric, server *Server) error {
2829
level.Debug(logger).Log("msg", "Querying pg_setting view", "server", server)
2930

3031
// pg_settings docs: https://www.postgresql.org/docs/current/static/view-pg-settings.html
@@ -33,7 +34,7 @@ func querySettings(ch chan<- prometheus.Metric, server *Server) error {
3334
// types in normaliseUnit() below
3435
query := "SELECT name, setting, COALESCE(unit, ''), short_desc, vartype FROM pg_settings WHERE vartype IN ('bool', 'integer', 'real');"
3536

36-
rows, err := server.db.Query(query)
37+
rows, err := server.db.QueryContext(ctx, query)
3738
if err != nil {
3839
return fmt.Errorf("Error running query on database %q: %s %v", server, namespace, err)
3940
}

cmd/postgres_exporter/postgres_exporter.go

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
package main
1515

1616
import (
17+
"context"
1718
"crypto/sha256"
1819
"database/sql"
1920
"errors"
@@ -468,6 +469,7 @@ type Exporter struct {
468469
psqlUp prometheus.Gauge
469470
userQueriesError *prometheus.GaugeVec
470471
totalScrapes prometheus.Counter
472+
scrapeTimeout *time.Duration
471473

472474
// servers are used to allow re-using the DB connection between scrapes.
473475
// servers contains metrics map and query overrides.
@@ -555,7 +557,7 @@ func parseConstLabels(s string) prometheus.Labels {
555557
}
556558

557559
// NewExporter returns a new PostgreSQL exporter for the provided DSN.
558-
func NewExporter(dsn []string, opts ...ExporterOpt) *Exporter {
560+
func NewExporter(dsn []string, scrapeTimeout *time.Duration, opts ...ExporterOpt) *Exporter {
559561
e := &Exporter{
560562
dsn: dsn,
561563
builtinMetricMaps: builtinMetricMaps,
@@ -567,6 +569,7 @@ func NewExporter(dsn []string, opts ...ExporterOpt) *Exporter {
567569

568570
e.setupInternalMetrics()
569571
e.servers = NewServers(ServerWithLabels(e.constantLabels))
572+
e.scrapeTimeout = scrapeTimeout
570573

571574
return e
572575
}
@@ -614,7 +617,16 @@ func (e *Exporter) Describe(ch chan<- *prometheus.Desc) {
614617

615618
// Collect implements prometheus.Collector.
616619
func (e *Exporter) Collect(ch chan<- prometheus.Metric) {
617-
e.scrape(ch)
620+
var ctx context.Context
621+
var cancel context.CancelFunc
622+
if e.scrapeTimeout != nil {
623+
ctx, cancel = context.WithTimeout(context.Background(), *e.scrapeTimeout)
624+
} else {
625+
ctx, cancel = context.WithCancel(context.Background())
626+
}
627+
defer cancel()
628+
629+
e.scrape(ctx, ch)
618630

619631
ch <- e.duration
620632
ch <- e.totalScrapes
@@ -630,9 +642,9 @@ func newDesc(subsystem, name, help string, labels prometheus.Labels) *prometheus
630642
)
631643
}
632644

633-
func checkPostgresVersion(db *sql.DB, server string) (semver.Version, string, error) {
645+
func checkPostgresVersion(ctx context.Context, db *sql.DB, server string) (semver.Version, string, error) {
634646
level.Debug(logger).Log("msg", "Querying PostgreSQL version", "server", server)
635-
versionRow := db.QueryRow("SELECT version();")
647+
versionRow := db.QueryRowContext(ctx, "SELECT version();")
636648
var versionString string
637649
err := versionRow.Scan(&versionString)
638650
if err != nil {
@@ -647,8 +659,8 @@ func checkPostgresVersion(db *sql.DB, server string) (semver.Version, string, er
647659
}
648660

649661
// Check and update the exporters query maps if the version has changed.
650-
func (e *Exporter) checkMapVersions(ch chan<- prometheus.Metric, server *Server) error {
651-
semanticVersion, versionString, err := checkPostgresVersion(server.db, server.String())
662+
func (e *Exporter) checkMapVersions(ctx context.Context, ch chan<- prometheus.Metric, server *Server) error {
663+
semanticVersion, versionString, err := checkPostgresVersion(ctx, server.db, server.String())
652664
if err != nil {
653665
return fmt.Errorf("Error fetching version string on %q: %v", server, err)
654666
}
@@ -709,7 +721,7 @@ func (e *Exporter) checkMapVersions(ch chan<- prometheus.Metric, server *Server)
709721
return nil
710722
}
711723

712-
func (e *Exporter) scrape(ch chan<- prometheus.Metric) {
724+
func (e *Exporter) scrape(ctx context.Context, ch chan<- prometheus.Metric) {
713725
defer func(begun time.Time) {
714726
e.duration.Set(time.Since(begun).Seconds())
715727
}(time.Now())
@@ -718,14 +730,14 @@ func (e *Exporter) scrape(ch chan<- prometheus.Metric) {
718730

719731
dsns := e.dsn
720732
if e.autoDiscoverDatabases {
721-
dsns = e.discoverDatabaseDSNs()
733+
dsns = e.discoverDatabaseDSNs(ctx)
722734
}
723735

724736
var errorsCount int
725737
var connectionErrorsCount int
726738

727739
for _, dsn := range dsns {
728-
if err := e.scrapeDSN(ch, dsn); err != nil {
740+
if err := e.scrapeDSN(ctx, ch, dsn); err != nil {
729741
errorsCount++
730742

731743
level.Error(logger).Log("err", err)

cmd/postgres_exporter/postgres_exporter_integration_test.go

Lines changed: 21 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,12 @@
1919
package main
2020

2121
import (
22+
"context"
2223
"fmt"
2324
"os"
2425
"strings"
2526
"testing"
27+
"time"
2628

2729
_ "github.com/lib/pq"
2830
"github.com/prometheus/client_golang/prometheus"
@@ -38,11 +40,13 @@ type IntegrationSuite struct {
3840

3941
var _ = Suite(&IntegrationSuite{})
4042

43+
var testScrapeDuration = 10 * time.Second
44+
4145
func (s *IntegrationSuite) SetUpSuite(c *C) {
4246
dsn := os.Getenv("DATA_SOURCE_NAME")
4347
c.Assert(dsn, Not(Equals), "")
4448

45-
exporter := NewExporter(strings.Split(dsn, ","))
49+
exporter := NewExporter(strings.Split(dsn, ","), &testScrapeDuration)
4650
c.Assert(exporter, NotNil)
4751
// Assign the exporter to the suite
4852
s.e = exporter
@@ -59,24 +63,26 @@ func (s *IntegrationSuite) TestAllNamespacesReturnResults(c *C) {
5963
}
6064
}()
6165

66+
ctx := context.Background()
67+
6268
for _, dsn := range s.e.dsn {
6369
// Open a database connection
6470
server, err := NewServer(dsn)
6571
c.Assert(server, NotNil)
6672
c.Assert(err, IsNil)
6773

6874
// Do a version update
69-
err = s.e.checkMapVersions(ch, server)
75+
err = s.e.checkMapVersions(ctx, ch, server)
7076
c.Assert(err, IsNil)
7177

72-
err = querySettings(ch, server)
78+
err = querySettings(ctx, ch, server)
7379
if !c.Check(err, Equals, nil) {
7480
fmt.Println("## ERRORS FOUND")
7581
fmt.Println(err)
7682
}
7783

7884
// This should never happen in our test cases.
79-
errMap := queryNamespaceMappings(ch, server)
85+
errMap := queryNamespaceMappings(ctx, ch, server)
8086
if !c.Check(len(errMap), Equals, 0) {
8187
fmt.Println("## NAMESPACE ERRORS FOUND")
8288
for namespace, err := range errMap {
@@ -99,14 +105,15 @@ func (s *IntegrationSuite) TestInvalidDsnDoesntCrash(c *C) {
99105
}()
100106

101107
// Send a bad DSN
102-
exporter := NewExporter([]string{"invalid dsn"})
108+
ctx := context.Background()
109+
exporter := NewExporter([]string{"invalid dsn"}, &testScrapeDuration)
103110
c.Assert(exporter, NotNil)
104-
exporter.scrape(ch)
111+
exporter.scrape(ctx, ch)
105112

106113
// Send a DSN to a non-listening port.
107-
exporter = NewExporter([]string{"postgresql://nothing:nothing@127.0.0.1:1/nothing"})
114+
exporter = NewExporter([]string{"postgresql://nothing:nothing@127.0.0.1:1/nothing"}, &testScrapeDuration)
108115
c.Assert(exporter, NotNil)
109-
exporter.scrape(ch)
116+
exporter.scrape(ctx, ch)
110117
}
111118

112119
// TestUnknownMetricParsingDoesntCrash deliberately deletes all the column maps out
@@ -122,7 +129,7 @@ func (s *IntegrationSuite) TestUnknownMetricParsingDoesntCrash(c *C) {
122129
dsn := os.Getenv("DATA_SOURCE_NAME")
123130
c.Assert(dsn, Not(Equals), "")
124131

125-
exporter := NewExporter(strings.Split(dsn, ","))
132+
exporter := NewExporter(strings.Split(dsn, ","), &testScrapeDuration)
126133
c.Assert(exporter, NotNil)
127134

128135
// Convert the default maps into a list of empty maps.
@@ -137,7 +144,7 @@ func (s *IntegrationSuite) TestUnknownMetricParsingDoesntCrash(c *C) {
137144
exporter.builtinMetricMaps = emptyMaps
138145

139146
// scrape the exporter and make sure it works
140-
exporter.scrape(ch)
147+
exporter.scrape(context.Background(), ch)
141148
}
142149

143150
// TestExtendQueriesDoesntCrash tests that specifying extend.query-path doesn't
@@ -154,24 +161,25 @@ func (s *IntegrationSuite) TestExtendQueriesDoesntCrash(c *C) {
154161
c.Assert(dsn, Not(Equals), "")
155162

156163
exporter := NewExporter(
157-
strings.Split(dsn, ","),
164+
strings.Split(dsn, ","), &testScrapeDuration,
158165
WithUserQueriesPath("../user_queries_test.yaml"),
159166
)
160167
c.Assert(exporter, NotNil)
161168

162169
// scrape the exporter and make sure it works
163-
exporter.scrape(ch)
170+
exporter.scrape(context.Background(), ch)
164171
}
165172

166173
func (s *IntegrationSuite) TestAutoDiscoverDatabases(c *C) {
167174
dsn := os.Getenv("DATA_SOURCE_NAME")
168175

169176
exporter := NewExporter(
170177
strings.Split(dsn, ","),
178+
&testScrapeDuration,
171179
)
172180
c.Assert(exporter, NotNil)
173181

174-
dsns := exporter.discoverDatabaseDSNs()
182+
dsns := exporter.discoverDatabaseDSNs(context.Background())
175183

176184
c.Assert(len(dsns), Equals, 2)
177185
}

cmd/postgres_exporter/queries.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
package main
1515

1616
import (
17+
"context"
1718
"errors"
1819
"fmt"
1920

@@ -282,8 +283,8 @@ func addQueries(content []byte, pgVersion semver.Version, server *Server) error
282283
return nil
283284
}
284285

285-
func queryDatabases(server *Server) ([]string, error) {
286-
rows, err := server.db.Query("SELECT datname FROM pg_database WHERE datallowconn = true AND datistemplate = false AND datname != current_database()")
286+
func queryDatabases(ctx context.Context, server *Server) ([]string, error) {
287+
rows, err := server.db.QueryContext(ctx, "SELECT datname FROM pg_database WHERE datallowconn = true AND datistemplate = false AND datname != current_database()")
287288
if err != nil {
288289
return nil, fmt.Errorf("Error retrieving databases: %v", err)
289290
}

0 commit comments

Comments
 (0)