Skip to content

Commit c8fcaba

Browse files
Adding k8s remapper for supporting Kibana Inventory "Kubernetes Pod" page (#20)
* adding k8s remapper * adding event.module * Update remappers/k8smetrics/kubelet.go Co-authored-by: Tetiana Kravchenko <tanya.kravchenko.v@gmail.com> * Update remappers/k8smetrics/kubelet.go Co-authored-by: Tetiana Kravchenko <tanya.kravchenko.v@gmail.com> * merging into internal * fixing conflicts * merging with one internal metrics function * merging with one internal metrics function * adding tests * adding tests * fixing tests with correct results * fixing imports * fixing license lint error * adding dataset on top level * fixing tests with latest dataset changes * updating tests by moving common parts into internal folder * updating tests by moving common parts into internal folder * fixing field allighnement of structs linting errors --------- Co-authored-by: Tetiana Kravchenko <tanya.kravchenko.v@gmail.com>
1 parent 76a6d08 commit c8fcaba

File tree

9 files changed

+892
-292
lines changed

9 files changed

+892
-292
lines changed

remappers/hostmetrics/hostmetrics_test.go

Lines changed: 213 additions & 291 deletions
Large diffs are not rendered by default.

remappers/internal/metric.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
package hostmetrics
18+
package internal
1919

2020
import (
2121
"github.com/elastic/opentelemetry-lib/remappers/common"

remappers/internal/testing.go

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
// Licensed to Elasticsearch B.V. under one or more contributor
2+
// license agreements. See the NOTICE file distributed with
3+
// this work for additional information regarding copyright
4+
// ownership. Elasticsearch B.V. licenses this file to you under
5+
// the Apache License, Version 2.0 (the "License"); you may
6+
// not use this file except in compliance with the License.
7+
// You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package internal
19+
20+
import (
21+
"testing"
22+
23+
"go.opentelemetry.io/collector/pdata/pcommon"
24+
"go.opentelemetry.io/collector/pdata/pmetric"
25+
)
26+
27+
type TestMetric struct {
28+
DP TestDP
29+
Name string
30+
Type pmetric.MetricType
31+
}
32+
33+
type TestDP struct {
34+
Dbl *float64
35+
Int *int64
36+
Attrs map[string]any
37+
Ts pcommon.Timestamp
38+
}
39+
40+
func MetricSliceToTestMetric(t *testing.T, ms pmetric.MetricSlice) []TestMetric {
41+
testMetrics := make([]TestMetric, ms.Len())
42+
for i := 0; i < ms.Len(); i++ {
43+
m := ms.At(i)
44+
testMetrics[i].Name = m.Name()
45+
testMetrics[i].Type = m.Type()
46+
47+
var dps pmetric.NumberDataPointSlice
48+
switch m.Type() {
49+
case pmetric.MetricTypeGauge:
50+
dps = m.Gauge().DataPoints()
51+
case pmetric.MetricTypeSum:
52+
dps = m.Sum().DataPoints()
53+
}
54+
55+
if dps.Len() != 1 {
56+
t.Fatalf("unexpected metric, test is written assuming each metric with a single datapoint")
57+
}
58+
59+
dp := dps.At(0)
60+
testMetrics[i].DP = TestDP{Ts: dp.Timestamp(), Attrs: dp.Attributes().AsRaw()}
61+
switch dp.ValueType() {
62+
case pmetric.NumberDataPointValueTypeInt:
63+
testMetrics[i].DP.Int = Ptr(dp.IntValue())
64+
case pmetric.NumberDataPointValueTypeDouble:
65+
testMetrics[i].DP.Dbl = Ptr(dp.DoubleValue())
66+
}
67+
}
68+
69+
return testMetrics
70+
}
71+
72+
func TestMetricToMetricSlice(t testing.TB, testMetrics []TestMetric, out pmetric.MetricSlice) {
73+
out.EnsureCapacity(len(testMetrics))
74+
75+
for _, testm := range testMetrics {
76+
m := out.AppendEmpty()
77+
m.SetName(testm.Name)
78+
79+
var dps pmetric.NumberDataPointSlice
80+
switch typ := testm.Type; typ {
81+
case pmetric.MetricTypeGauge:
82+
dps = m.SetEmptyGauge().DataPoints()
83+
case pmetric.MetricTypeSum:
84+
dps = m.SetEmptySum().DataPoints()
85+
default:
86+
t.Fatalf("unhandled metric type %s", typ)
87+
}
88+
89+
dp := dps.AppendEmpty()
90+
dp.SetTimestamp(testm.DP.Ts)
91+
if testm.DP.Int != nil {
92+
dp.SetIntValue(*testm.DP.Int)
93+
} else if testm.DP.Dbl != nil {
94+
dp.SetDoubleValue(*testm.DP.Dbl)
95+
}
96+
if err := dp.Attributes().FromRaw(testm.DP.Attrs); err != nil {
97+
t.Fatalf("failed to copy attributes from test data: %v", err)
98+
}
99+
}
100+
}
101+
102+
func Ptr[T any](v T) *T {
103+
return &v
104+
}
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
// Licensed to Elasticsearch B.V. under one or more contributor
2+
// license agreements. See the NOTICE file distributed with
3+
// this work for additional information regarding copyright
4+
// ownership. Elasticsearch B.V. licenses this file to you under
5+
// the Apache License, Version 2.0 (the "License"); you may
6+
// not use this file except in compliance with the License.
7+
// You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package kubernetesmetrics
19+
20+
import (
21+
remappers "github.com/elastic/opentelemetry-lib/remappers/internal"
22+
23+
"go.opentelemetry.io/collector/pdata/pcommon"
24+
"go.opentelemetry.io/collector/pdata/pmetric"
25+
)
26+
27+
func addClusterMetrics(
28+
src, out pmetric.MetricSlice,
29+
_ pcommon.Resource,
30+
dataset string,
31+
) error {
32+
var timestamp pcommon.Timestamp
33+
var node_allocatable_memory, node_allocatable_cpu int64
34+
35+
// iterate all metrics in the current scope and generate the additional Elastic kubernetes integration metrics
36+
for i := 0; i < src.Len(); i++ {
37+
metric := src.At(i)
38+
if metric.Name() == "k8s.node.allocatable_cpu" {
39+
dp := metric.Gauge().DataPoints().At(0)
40+
if timestamp == 0 {
41+
timestamp = dp.Timestamp()
42+
}
43+
node_allocatable_cpu = dp.IntValue()
44+
} else if metric.Name() == "k8s.node.allocatable_memory" {
45+
dp := metric.Gauge().DataPoints().At(0)
46+
if timestamp == 0 {
47+
timestamp = dp.Timestamp()
48+
}
49+
node_allocatable_memory = dp.IntValue()
50+
}
51+
}
52+
53+
remappers.AddMetrics(out, dataset, remappers.EmptyMutator,
54+
remappers.Metric{
55+
DataType: pmetric.MetricTypeGauge,
56+
Name: "kubernetes.node.cpu.allocatable.cores",
57+
Timestamp: timestamp,
58+
IntValue: &node_allocatable_cpu,
59+
},
60+
remappers.Metric{
61+
DataType: pmetric.MetricTypeGauge,
62+
Name: "kubernetes.node.memory.allocatable.bytes",
63+
Timestamp: timestamp,
64+
IntValue: &node_allocatable_memory,
65+
},
66+
)
67+
68+
return nil
69+
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
// Licensed to Elasticsearch B.V. under one or more contributor
2+
// license agreements. See the NOTICE file distributed with
3+
// this work for additional information regarding copyright
4+
// ownership. Elasticsearch B.V. licenses this file to you under
5+
// the Apache License, Version 2.0 (the "License"); you may
6+
// not use this file except in compliance with the License.
7+
// You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package kubernetesmetrics
19+
20+
type config struct {
21+
KubernetesIntegrationDataset bool
22+
}
23+
24+
// Option allows configuring the behavior of the kubernetes remapper.
25+
type Option func(config) config
26+
27+
func newConfig(opts ...Option) (cfg config) {
28+
for _, opt := range opts {
29+
cfg = opt(cfg)
30+
}
31+
return cfg
32+
}
33+
34+
// WithKubernetesIntegrationDataset sets the dataset of the remapped metrics as
35+
// as per the kubernetes integration. Example: kubernetes.pod
36+
func WithKubernetesIntegrationDataset(b bool) Option {
37+
return func(c config) config {
38+
c.KubernetesIntegrationDataset = b
39+
return c
40+
}
41+
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
// Licensed to Elasticsearch B.V. under one or more contributor
2+
// license agreements. See the NOTICE file distributed with
3+
// this work for additional information regarding copyright
4+
// ownership. Elasticsearch B.V. licenses this file to you under
5+
// the Apache License, Version 2.0 (the "License"); you may
6+
// not use this file except in compliance with the License.
7+
// You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package kubernetesmetrics
19+
20+
import (
21+
"testing"
22+
23+
"github.com/stretchr/testify/assert"
24+
)
25+
26+
func TestConfig(t *testing.T) {
27+
for _, tc := range []struct {
28+
name string
29+
opts []Option
30+
expected config
31+
}{
32+
{
33+
name: "default",
34+
opts: nil,
35+
expected: config{
36+
KubernetesIntegrationDataset: false,
37+
},
38+
},
39+
{
40+
name: "k8s_integration_dataset",
41+
opts: []Option{WithKubernetesIntegrationDataset(true)},
42+
expected: config{
43+
KubernetesIntegrationDataset: true,
44+
},
45+
},
46+
} {
47+
t.Run(tc.name, func(t *testing.T) {
48+
assert.Equal(t, tc.expected, newConfig(tc.opts...))
49+
})
50+
}
51+
}
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
// Licensed to Elasticsearch B.V. under one or more contributor
2+
// license agreements. See the NOTICE file distributed with
3+
// this work for additional information regarding copyright
4+
// ownership. Elasticsearch B.V. licenses this file to you under
5+
// the Apache License, Version 2.0 (the "License"); you may
6+
// not use this file except in compliance with the License.
7+
// You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package kubernetesmetrics
19+
20+
import (
21+
"path"
22+
"strings"
23+
24+
"go.opentelemetry.io/collector/pdata/pcommon"
25+
"go.opentelemetry.io/collector/pdata/pmetric"
26+
"go.uber.org/zap"
27+
)
28+
29+
var scraperToElasticDataset = map[string]string{
30+
"kubeletstatsreceiver": "kubernetes.pod",
31+
"k8sclusterreceiver": "kubernetes.node",
32+
}
33+
34+
type remapFunc func(metrics pmetric.MetricSlice, out pmetric.MetricSlice, resource pcommon.Resource, dataset string) error
35+
36+
// Remapper maps the OTel Kubernetes to Elastic Kubernetes metrics. These remapped
37+
// metrics power the curated Kibana dashboards. Each datapoint translated using
38+
// the remapper has the `event.processor` attribute set to `kubernetes`.
39+
type Remapper struct {
40+
logger *zap.Logger
41+
cfg config
42+
}
43+
44+
// NewRemapper creates a new instance of kubernetes remapper.
45+
func NewRemapper(logger *zap.Logger, opts ...Option) *Remapper {
46+
return &Remapper{
47+
cfg: newConfig(opts...),
48+
logger: logger,
49+
}
50+
}
51+
52+
var remapFuncs = map[string]remapFunc{
53+
"kubeletstatsreceiver": addKubeletMetrics,
54+
"k8sclusterreceiver": addClusterMetrics,
55+
}
56+
57+
// Remap remaps an OTel ScopeMetrics to a list of OTel metrics such that the
58+
// remapped metrics could be trivially converted into Elastic system metrics.
59+
// It accepts the resource attributes to enrich the remapped metrics as per
60+
// Elastic convention. The current remapping logic assumes that each Metric
61+
// in the ScopeMetric will have datapoints for a single timestamp only. The
62+
// remapped metrics are added to the output `MetricSlice`.
63+
func (r *Remapper) Remap(
64+
src pmetric.ScopeMetrics,
65+
out pmetric.MetricSlice,
66+
resource pcommon.Resource,
67+
) {
68+
if !r.Valid(src) {
69+
return
70+
}
71+
72+
scope := src.Scope()
73+
scraper := path.Base(scope.Name())
74+
75+
var dataset string // an empty dataset defers setting dataset to the caller
76+
if r.cfg.KubernetesIntegrationDataset {
77+
var ok bool
78+
dataset, ok = scraperToElasticDataset[scraper]
79+
if !ok {
80+
r.logger.Warn("no dataset defined for scraper", zap.String("scraper", scraper))
81+
return
82+
}
83+
}
84+
85+
remapFunc, ok := remapFuncs[scraper]
86+
if !ok {
87+
return
88+
}
89+
90+
err := remapFunc(src.Metrics(), out, resource, dataset)
91+
if err != nil {
92+
r.logger.Warn(
93+
"failed to remap OTel kubernetes",
94+
zap.String("scope", scope.Name()),
95+
zap.Error(err),
96+
)
97+
}
98+
}
99+
100+
// Valid validates a ScopeMetric against the kubernetes metrics remapper requirements.
101+
// Kubernetes remapper only remaps metrics from kubeletstatsreceiver or k8sclusterreceiver.
102+
func (r *Remapper) Valid(sm pmetric.ScopeMetrics) bool {
103+
return strings.HasPrefix(sm.Scope().Name(), "otelcol/kubeletstatsreceiver") || strings.HasPrefix(sm.Scope().Name(), "otelcol/k8sclusterreceiver")
104+
}

0 commit comments

Comments
 (0)