Skip to content

Commit 76a6d08

Browse files
authored
Add support for disk scraper (#17)
* Add support of disk scraper * Tidying up * Add tests and sync to latest changes * Complete the tests for disk metrics * Refactored code * Move metricsToAdd as a global variable * Refactoring the code further * Updated go mod * Update variable name
1 parent 08863a4 commit 76a6d08

File tree

3 files changed

+114
-0
lines changed

3 files changed

+114
-0
lines changed

remappers/hostmetrics/disk.go

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
// Licensed to Elasticsearch B.V. under one or more contributor
2+
// license agreements. See the NOTICE file distributed with
3+
// this work for additional information regarding copyright
4+
// ownership. Elasticsearch B.V. licenses this file to you under
5+
// the Apache License, Version 2.0 (the "License"); you may
6+
// not use this file except in compliance with the License.
7+
// You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package hostmetrics
19+
20+
import (
21+
"errors"
22+
"fmt"
23+
24+
remappers "github.com/elastic/opentelemetry-lib/remappers/internal"
25+
"go.opentelemetry.io/collector/pdata/pcommon"
26+
"go.opentelemetry.io/collector/pdata/pmetric"
27+
)
28+
29+
var metricsToAdd = map[string]string{
30+
"system.disk.io": "system.diskio.%s.bytes",
31+
"system.disk.operations": "system.diskio.%s.count",
32+
"system.disk.pending_operations": "system.diskio.io.%sops",
33+
"system.disk.operation_time": "system.diskio.%s.time",
34+
"system.disk.io_time": "system.diskio.io.%stime",
35+
}
36+
37+
// remapDiskMetrics remaps disk-related metrics from the source to the output metric slice.
38+
func remapDiskMetrics(src, out pmetric.MetricSlice, _ pcommon.Resource, dataset string) error {
39+
var errs []error
40+
for i := 0; i < src.Len(); i++ {
41+
var err error
42+
metric := src.At(i)
43+
switch metric.Name() {
44+
case "system.disk.io", "system.disk.operations", "system.disk.pending_operations":
45+
err = addDiskMetric(metric, out, dataset, 1)
46+
case "system.disk.operation_time", "system.disk.io_time":
47+
err = addDiskMetric(metric, out, dataset, 1000)
48+
}
49+
if err != nil {
50+
errs = append(errs, err)
51+
}
52+
}
53+
return errors.Join(errs...)
54+
}
55+
56+
func addDiskMetric(metric pmetric.Metric, out pmetric.MetricSlice, dataset string, multiplier int64) error {
57+
metricDiskES, ok := metricsToAdd[metric.Name()]
58+
if !ok {
59+
return fmt.Errorf("unexpected metric name: %s", metric.Name())
60+
}
61+
62+
dps := metric.Sum().DataPoints()
63+
for i := 0; i < dps.Len(); i++ {
64+
dp := dps.At(i)
65+
if device, ok := dp.Attributes().Get("device"); ok {
66+
direction, _ := dp.Attributes().Get("direction")
67+
remappedMetric := remappers.Metric{
68+
DataType: pmetric.MetricTypeSum,
69+
Name: fmt.Sprintf(metricDiskES, direction.Str()),
70+
Timestamp: dp.Timestamp(),
71+
}
72+
switch dp.ValueType() {
73+
case pmetric.NumberDataPointValueTypeInt:
74+
v := dp.IntValue() * multiplier
75+
remappedMetric.IntValue = &v
76+
case pmetric.NumberDataPointValueTypeDouble:
77+
v := dp.DoubleValue() * float64(multiplier)
78+
remappedMetric.DoubleValue = &v
79+
}
80+
remappers.AddMetrics(out, dataset, func(dp pmetric.NumberDataPoint) {
81+
dp.Attributes().PutStr("system.diskio.name", device.Str())
82+
}, remappedMetric)
83+
}
84+
}
85+
return nil
86+
}

remappers/hostmetrics/hostmetrics.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ var remapFuncs = map[string]remapFunc{
4949
"process": remapProcessMetrics,
5050
"processes": remapProcessesMetrics,
5151
"network": remapNetworkMetrics,
52+
"disk": remapDiskMetrics,
5253
}
5354

5455
// Remapper maps the OTel hostmetrics to Elastic system metrics. These remapped

remappers/hostmetrics/hostmetrics_test.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ var (
4141
ProcPath string = "/bin/run"
4242
ProcName string = "runner"
4343
Device string = "en0"
44+
Disk string = "nvme0n1p128"
4445
)
4546

4647
func TestRemap(t *testing.T) {
@@ -66,6 +67,8 @@ func doTestRemap(t *testing.T, id string, remapOpts ...Option) {
6667
m["process.name"] = ProcName
6768
case "network":
6869
m["system.network.name"] = Device
70+
case "disk":
71+
m["system.diskio.name"] = Disk
6972
}
7073
return m
7174
}
@@ -267,6 +270,30 @@ func doTestRemap(t *testing.T, id string, remapOpts ...Option) {
267270
{Type: Sum, Name: "system.network.out.errors", DP: testDP{Ts: now, Int: ptr(int64(2)), Attrs: outAttr("network")}},
268271
},
269272
},
273+
{
274+
name: "disk",
275+
scraper: "disk",
276+
input: []testMetric{
277+
{Type: Sum, Name: "system.disk.io", DP: testDP{Ts: now, Int: ptr(int64(1888256)), Attrs: map[string]any{"device": Disk, "direction": "read"}}},
278+
{Type: Sum, Name: "system.disk.io", DP: testDP{Ts: now, Int: ptr(int64(512)), Attrs: map[string]any{"device": Disk, "direction": "write"}}},
279+
{Type: Sum, Name: "system.disk.operations", DP: testDP{Ts: now, Int: ptr(int64(15390)), Attrs: map[string]any{"device": Disk, "direction": "read"}}},
280+
{Type: Sum, Name: "system.disk.operations", DP: testDP{Ts: now, Int: ptr(int64(371687)), Attrs: map[string]any{"device": Disk, "direction": "write"}}},
281+
{Type: Sum, Name: "system.disk.operation_time", DP: testDP{Ts: now, Dbl: ptr(11.182), Attrs: map[string]any{"device": Disk, "direction": "read"}}},
282+
{Type: Sum, Name: "system.disk.operation_time", DP: testDP{Ts: now, Dbl: ptr(617.289), Attrs: map[string]any{"device": Disk, "direction": "write"}}},
283+
{Type: Sum, Name: "system.disk.io_time", DP: testDP{Ts: now, Dbl: ptr(520.3), Attrs: map[string]any{"device": Disk}}},
284+
{Type: Sum, Name: "system.disk.pending_operations", DP: testDP{Ts: now, Int: ptr(int64(102)), Attrs: map[string]any{"device": Disk}}},
285+
},
286+
expected: []testMetric{
287+
{Type: Sum, Name: "system.diskio.read.bytes", DP: testDP{Ts: now, Int: ptr(int64(1888256)), Attrs: outAttr("disk")}},
288+
{Type: Sum, Name: "system.diskio.write.bytes", DP: testDP{Ts: now, Int: ptr(int64(512)), Attrs: outAttr("disk")}},
289+
{Type: Sum, Name: "system.diskio.read.count", DP: testDP{Ts: now, Int: ptr(int64(15390)), Attrs: outAttr("disk")}},
290+
{Type: Sum, Name: "system.diskio.write.count", DP: testDP{Ts: now, Int: ptr(int64(371687)), Attrs: outAttr("disk")}},
291+
{Type: Sum, Name: "system.diskio.read.time", DP: testDP{Ts: now, Dbl: ptr(11182.0), Attrs: outAttr("disk")}},
292+
{Type: Sum, Name: "system.diskio.write.time", DP: testDP{Ts: now, Dbl: ptr(617289.0), Attrs: outAttr("disk")}},
293+
{Type: Sum, Name: "system.diskio.io.time", DP: testDP{Ts: now, Dbl: ptr(520300.0), Attrs: outAttr("disk")}},
294+
{Type: Sum, Name: "system.diskio.io.ops", DP: testDP{Ts: now, Int: ptr(int64(102)), Attrs: outAttr("disk")}},
295+
},
296+
},
270297
} {
271298
t.Run(fmt.Sprintf("%s/%s", tc.name, id), func(t *testing.T) {
272299
sm := pmetric.NewScopeMetrics()

0 commit comments

Comments
 (0)