Skip to content
This repository was archived by the owner on Mar 18, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 44 additions & 19 deletions pkg/remotewrite/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,37 +13,62 @@ import (

// metricsStorage is an in-memory gather point for metrics
type metricsStorage struct {
m map[string]metrics.Sample
m map[string]*metrics.Metric
}

func newMetricsStorage() *metricsStorage {
return &metricsStorage{
m: make(map[string]metrics.Sample),
m: make(map[string]*metrics.Metric),
}
}

// update modifies metricsStorage and returns updated sample
// so that the stored metric and the returned metric hold the same value
func (ms *metricsStorage) update(sample metrics.Sample, add func(current, s metrics.Sample) metrics.Sample) metrics.Sample {
if current, ok := ms.m[sample.Metric.Name]; ok {
if add == nil {
current.Metric.Sink.Add(sample)
} else {
current = add(current, sample)
func (ms *metricsStorage) update(sample metrics.Sample, add func(*metrics.Metric, metrics.Sample)) *metrics.Metric {
m, ok := ms.m[sample.Metric.Name]
if !ok {
var sink metrics.Sink
switch sample.Metric.Type {
case metrics.Counter:
sink = &metrics.CounterSink{}
case metrics.Gauge:
sink = &metrics.GaugeSink{}
case metrics.Trend:
sink = &metrics.TrendSink{}
case metrics.Rate:
sink = &metrics.RateSink{}
default:
panic("the Metric Type is not supported")
}
current.Time = sample.Time // to avoid duplicates in timestamps
// Sometimes remote write endpoint throws an error about duplicates even if the values
// sent were different. By current observations, this is a hard to repeat case and
// potentially a bug.
// Related: https://github.com/prometheus/prometheus/issues/9210

ms.m[current.Metric.Name] = current
return current

m = &metrics.Metric{
Name: sample.Metric.Name,
Type: sample.Metric.Type,
Contains: sample.Metric.Contains,
Sink: sink,
}

ms.m[m.Name] = m
}

// TODO: https://github.com/grafana/xk6-output-prometheus-remote/issues/11
//
// Sometimes remote write endpoint throws an error about duplicates even if the values
// sent were different. By current observations, this is a hard to repeat case and
// potentially a bug.
// Related: https://github.com/prometheus/prometheus/issues/9210

// TODO: Trend is the unique type that benefits from this logic.
// so this logic can be removed just creating
// a new implementation in this extension
// for TrendSink and its Add method.
if add == nil {
m.Sink.Add(sample)
} else {
sample.Metric.Sink.Add(sample)
ms.m[sample.Metric.Name] = sample
return sample
add(m, sample)
}

return m
}

// transform k6 sample into TimeSeries for remote-write
Expand Down
34 changes: 16 additions & 18 deletions pkg/remotewrite/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ import (
type PrometheusMapping struct{}

func (pm *PrometheusMapping) MapCounter(ms *metricsStorage, sample metrics.Sample, labels []prompb.Label) []prompb.TimeSeries {
sample = ms.update(sample, nil)
aggr := sample.Metric.Sink.Format(0)
metric := ms.update(sample, nil)
aggr := metric.Sink.Format(0)

return []prompb.TimeSeries{
{
Expand All @@ -33,9 +33,6 @@ func (pm *PrometheusMapping) MapCounter(ms *metricsStorage, sample metrics.Sampl
}

func (pm *PrometheusMapping) MapGauge(ms *metricsStorage, sample metrics.Sample, labels []prompb.Label) []prompb.TimeSeries {
sample = ms.update(sample, nil)
aggr := sample.Metric.Sink.Format(0)

return []prompb.TimeSeries{
{
Labels: append(labels, prompb.Label{
Expand All @@ -44,7 +41,9 @@ func (pm *PrometheusMapping) MapGauge(ms *metricsStorage, sample metrics.Sample,
}),
Samples: []prompb.Sample{
{
Value: aggr["value"],
// Gauge is just the latest value
// so we can skip the sink using directly the value from the sample.
Value: sample.Value,
Timestamp: timestamp.FromTime(sample.Time),
},
},
Expand All @@ -53,8 +52,8 @@ func (pm *PrometheusMapping) MapGauge(ms *metricsStorage, sample metrics.Sample,
}

func (pm *PrometheusMapping) MapRate(ms *metricsStorage, sample metrics.Sample, labels []prompb.Label) []prompb.TimeSeries {
sample = ms.update(sample, nil)
aggr := sample.Metric.Sink.Format(0)
metric := ms.update(sample, nil)
aggr := metric.Sink.Format(0)

return []prompb.TimeSeries{
{
Expand All @@ -73,9 +72,13 @@ func (pm *PrometheusMapping) MapRate(ms *metricsStorage, sample metrics.Sample,
}

func (pm *PrometheusMapping) MapTrend(ms *metricsStorage, sample metrics.Sample, labels []prompb.Label) []prompb.TimeSeries {
sample = ms.update(sample, trendAdd)
metric := ms.update(sample, trendAdd)

// Prometheus metric system does not support Trend so this mapping will store gauges
// to keep track of key values.
// TODO: when Prometheus implements support for sparse histograms, re-visit this implementation

s := sample.Metric.Sink.(*metrics.TrendSink)
s := metric.Sink.(*metrics.TrendSink)
aggr := map[string]float64{
"min": s.Min,
"max": s.Max,
Expand All @@ -85,10 +88,6 @@ func (pm *PrometheusMapping) MapTrend(ms *metricsStorage, sample metrics.Sample,
"p(95)": p(s, 0.95),
}

// Prometheus metric system does not support Trend so this mapping will store gauges
// to keep track of key values.
// TODO: when Prometheus implements support for sparse histograms, re-visit this implementation

return []prompb.TimeSeries{
{
Labels: append(labels, prompb.Label{
Expand Down Expand Up @@ -169,8 +168,8 @@ func (pm *PrometheusMapping) MapTrend(ms *metricsStorage, sample metrics.Sample,
// and are a partial copy-paste from k6/metrics.
// TODO: re-write & refactor this once metrics refactoring progresses in k6.

func trendAdd(current, s metrics.Sample) metrics.Sample {
t := current.Metric.Sink.(*metrics.TrendSink)
func trendAdd(current *metrics.Metric, s metrics.Sample) {
t := current.Sink.(*metrics.TrendSink)

// insert into sorted array instead of sorting anew on each addition
index := sort.Search(len(t.Values), func(i int) bool {
Expand All @@ -197,8 +196,7 @@ func trendAdd(current, s metrics.Sample) metrics.Sample {
t.Med = t.Values[t.Count/2]
}

current.Metric.Sink = t
return current
current.Sink = t
}

func p(t *metrics.TrendSink, pct float64) float64 {
Expand Down
85 changes: 50 additions & 35 deletions pkg/remotewrite/prometheus_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
package remotewrite

import (
"math/rand"
"testing"
"time"

"math/rand"

"github.com/stretchr/testify/assert"
"go.k6.io/k6/metrics"
)
Expand All @@ -15,81 +14,97 @@ func TestTrendAdd(t *testing.T) {
t.Parallel()

testCases := []struct {
current, s metrics.Sample
current *metrics.Metric
s metrics.Sample
expected metrics.TrendSink
}{
{
current: metrics.Sample{Metric: &metrics.Metric{
current: &metrics.Metric{
Sink: &metrics.TrendSink{},
}},
},
s: metrics.Sample{Value: 2},
expected: metrics.TrendSink{
Values: []float64{2},
Count: 1,
Min: 2,
Max: 2,
Sum: 2,
Avg: 2,
Med: 2,
},
},
{
current: metrics.Sample{Metric: &metrics.Metric{
current: &metrics.Metric{
Sink: &metrics.TrendSink{
Values: []float64{8, 3, 1, 7, 4, 2},
Count: 6,
Min: 1, Max: 8,
Sum: 25, Avg: (8 + 3 + 1 + 7 + 4 + 2) / 6,
Med: (3 + 4) / 2,
Min: 1,
Max: 8,
Sum: 25,
},
}},
},
s: metrics.Sample{Value: 12.3},
expected: metrics.TrendSink{
Values: []float64{8, 3, 1, 7, 4, 2, 12.3},
Count: 7,
Min: 1,
Max: 12.3,
Sum: 37.3,
Avg: 37.3 / 7,
Med: 7,
},
},
}

for _, testCase := range testCases {
// trendAdd should result in the same values as Sink.Add

s := trendAdd(testCase.current, testCase.s)
sink := s.Metric.Sink.(*metrics.TrendSink)
trendAdd(testCase.current, testCase.s)
sink := testCase.current.Sink.(*metrics.TrendSink)

testCase.current.Metric.Sink.Add(testCase.s)
expected := testCase.current.Metric.Sink.(*metrics.TrendSink)

assert.Equal(t, expected.Count, sink.Count)
assert.Equal(t, expected.Min, sink.Min)
assert.Equal(t, expected.Max, sink.Max)
assert.Equal(t, expected.Sum, sink.Sum)
assert.Equal(t, expected.Avg, sink.Avg)
assert.EqualValues(t, expected.Values, sink.Values)
assert.Equal(t, testCase.expected.Count, sink.Count)
assert.Equal(t, testCase.expected.Min, sink.Min)
assert.Equal(t, testCase.expected.Max, sink.Max)
assert.Equal(t, testCase.expected.Sum, sink.Sum)
assert.Equal(t, testCase.expected.Avg, sink.Avg)
assert.Equal(t, testCase.expected.Med, sink.Med)
assert.Equal(t, testCase.expected.Values, sink.Values)
}
}

func BenchmarkTrendAdd(b *testing.B) {
benchF := []func(b *testing.B, start metrics.Sample){
func(b *testing.B, s metrics.Sample) {
benchF := []func(b *testing.B, start metrics.Metric){
func(b *testing.B, m metrics.Metric) {
b.ResetTimer()
rand.Seed(time.Now().Unix())

for i := 0; i < b.N; i++ {
s = trendAdd(s, metrics.Sample{Value: rand.Float64() * 1000})
sink := s.Metric.Sink.(*metrics.TrendSink)
trendAdd(&m, metrics.Sample{Value: rand.Float64() * 1000})
sink := m.Sink.(*metrics.TrendSink)
p(sink, 0.90)
p(sink, 0.95)
}
},
func(b *testing.B, start metrics.Sample) {
func(b *testing.B, start metrics.Metric) {
b.ResetTimer()
rand.Seed(time.Now().Unix())

for i := 0; i < b.N; i++ {
start.Metric.Sink.Add(metrics.Sample{Value: rand.Float64() * 1000})
start.Metric.Sink.Format(0)
start.Sink.Add(metrics.Sample{Value: rand.Float64() * 1000})
start.Sink.Format(0)
}
},
}

s := metrics.Sample{
Metric: &metrics.Metric{
Type: metrics.Trend,
Sink: &metrics.TrendSink{},
},
start := metrics.Metric{
Type: metrics.Trend,
Sink: &metrics.TrendSink{},
}

b.Run("trendAdd", func(b *testing.B) {
benchF[0](b, s)
benchF[0](b, start)
})
b.Run("TrendSink.Add", func(b *testing.B) {
benchF[1](b, s)
benchF[1](b, start)
})
}