Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion remappers/common/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@
package common

const (
// DatastreamDatasetLabel defines a dataset label for attributes.
// DatastreamDatasetLabel defines the datastream dataset label key.
DatastreamDatasetLabel = "data_stream.dataset"

// EventDatasetLabel defines the event dataset label key.
EventDatasetLabel = "event.dataset"

// OTelRemappedLabel is used to identify remapped metrics.
OTelRemappedLabel = "otel_remapped"
)
4 changes: 2 additions & 2 deletions remappers/hostmetrics/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ func newConfig(opts ...Option) (cfg config) {
return cfg
}

// WithSystemIntegrationDataset sets the dataset of the remapped metrics as
// as per the system integration. Example: system.cpu, system.memory, etc.
// WithSystemIntegrationDataset sets the datastream dataset of the remapped
// metrics as as per the system integration. Example: system.cpu, system.memory, etc.
func WithSystemIntegrationDataset(b bool) Option {
return func(c config) config {
c.SystemIntegrationDataset = b
Expand Down
6 changes: 3 additions & 3 deletions remappers/hostmetrics/cpu.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
func remapCPUMetrics(
src, out pmetric.MetricSlice,
_ pcommon.Resource,
dataset string,
mutator func(pmetric.NumberDataPoint),
) error {
var timestamp pcommon.Timestamp
var numCores int64
Expand Down Expand Up @@ -88,7 +88,7 @@ func remapCPUMetrics(
}

// Add all metrics that are independent of cpu logical count.
remappedmetric.Add(out, dataset, remappedmetric.EmptyMutator,
remappedmetric.Add(out, mutator,
remappedmetric.Metric{
DataType: pmetric.MetricTypeGauge,
Name: "system.cpu.total.pct",
Expand Down Expand Up @@ -159,7 +159,7 @@ func remapCPUMetrics(
irqNorm := irqPercent / float64(numCores)
softirqNorm := softirqPercent / float64(numCores)

remappedmetric.Add(out, dataset, remappedmetric.EmptyMutator,
remappedmetric.Add(out, mutator,
remappedmetric.Metric{
DataType: pmetric.MetricTypeSum,
Name: "system.cpu.cores",
Expand Down
26 changes: 19 additions & 7 deletions remappers/hostmetrics/disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,20 @@ var metricsToAdd = map[string]string{
}

// remapDiskMetrics remaps disk-related metrics from the source to the output metric slice.
func remapDiskMetrics(src, out pmetric.MetricSlice, _ pcommon.Resource, dataset string) error {
func remapDiskMetrics(
src, out pmetric.MetricSlice,
_ pcommon.Resource,
mutator func(pmetric.NumberDataPoint),
) error {
var errs []error
for i := 0; i < src.Len(); i++ {
var err error
metric := src.At(i)
switch metric.Name() {
case "system.disk.io", "system.disk.operations", "system.disk.pending_operations":
err = addDiskMetric(metric, out, dataset, 1)
err = addDiskMetric(metric, out, mutator, 1)
case "system.disk.operation_time", "system.disk.io_time":
err = addDiskMetric(metric, out, dataset, 1000)
err = addDiskMetric(metric, out, mutator, 1000)
}
if err != nil {
errs = append(errs, err)
Expand All @@ -54,7 +58,12 @@ func remapDiskMetrics(src, out pmetric.MetricSlice, _ pcommon.Resource, dataset
return errors.Join(errs...)
}

func addDiskMetric(metric pmetric.Metric, out pmetric.MetricSlice, dataset string, multiplier int64) error {
func addDiskMetric(
metric pmetric.Metric,
out pmetric.MetricSlice,
mutator func(pmetric.NumberDataPoint),
multiplier int64,
) error {
metricDiskES, ok := metricsToAdd[metric.Name()]
if !ok {
return fmt.Errorf("unexpected metric name: %s", metric.Name())
Expand All @@ -78,9 +87,12 @@ func addDiskMetric(metric pmetric.Metric, out pmetric.MetricSlice, dataset strin
v := dp.DoubleValue() * float64(multiplier)
newM.DoubleValue = &v
}
remappedmetric.Add(out, dataset, func(dp pmetric.NumberDataPoint) {
dp.Attributes().PutStr("system.diskio.name", device.Str())
}, newM)
remappedmetric.Add(out, remappedmetric.ChainedMutator(
mutator,
func(dp pmetric.NumberDataPoint) {
dp.Attributes().PutStr("system.diskio.name", device.Str())
},
), newM)
}
}
return nil
Expand Down
27 changes: 15 additions & 12 deletions remappers/hostmetrics/filesystem.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,10 @@ type number interface {
~int64 | ~float64
}

func remapFilesystemMetrics(src, out pmetric.MetricSlice,
func remapFilesystemMetrics(
src, out pmetric.MetricSlice,
_ pcommon.Resource,
dataset string,
mutator func(pmetric.NumberDataPoint),
) error {
var timestamp pcommon.Timestamp
deviceMetricsMap := make(map[deviceKey]*deviceMetrics)
Expand All @@ -69,18 +70,18 @@ func remapFilesystemMetrics(src, out pmetric.MetricSlice,
if metric.Name() == "system.filesystem.usage" {
dmetrics.totalUsage += value
dmetrics.usedBytes += value
addFileSystemMetrics(out, timestamp, dataset, "system.filesystem.used.bytes", key.device, key.mpoint, key.fstype, value)
addFileSystemMetrics(out, timestamp, mutator, "system.filesystem.used.bytes", key.device, key.mpoint, key.fstype, value)
} else {
dmetrics.totalInodeUsage += value
}
case "free":
if metric.Name() == "system.filesystem.usage" {
dmetrics.totalUsage += value
addFileSystemMetrics(out, timestamp, dataset, "system.filesystem.free", key.device, key.mpoint, key.fstype, value)
addFileSystemMetrics(out, timestamp, dataset, "system.filesystem.available", key.device, key.mpoint, key.fstype, value)
addFileSystemMetrics(out, timestamp, mutator, "system.filesystem.free", key.device, key.mpoint, key.fstype, value)
addFileSystemMetrics(out, timestamp, mutator, "system.filesystem.available", key.device, key.mpoint, key.fstype, value)
} else {
dmetrics.totalInodeUsage += value
addFileSystemMetrics(out, timestamp, dataset, "system.filesystem.free_files", key.device, key.mpoint, key.fstype, value)
addFileSystemMetrics(out, timestamp, mutator, "system.filesystem.free_files", key.device, key.mpoint, key.fstype, value)
}
}
}
Expand All @@ -92,12 +93,12 @@ func remapFilesystemMetrics(src, out pmetric.MetricSlice,
for key, dmetrics := range deviceMetricsMap {
device, mpoint, fstype := key.device, key.mpoint, key.fstype
if dmetrics.totalUsage > 0 {
addFileSystemMetrics(out, timestamp, dataset, "system.filesystem.total", device, mpoint, fstype, dmetrics.totalUsage)
addFileSystemMetrics(out, timestamp, mutator, "system.filesystem.total", device, mpoint, fstype, dmetrics.totalUsage)
usedPercentage := float64(dmetrics.usedBytes) / float64(dmetrics.totalUsage)
addFileSystemMetrics(out, timestamp, dataset, "system.filesystem.used.pct", device, mpoint, fstype, usedPercentage)
addFileSystemMetrics(out, timestamp, mutator, "system.filesystem.used.pct", device, mpoint, fstype, usedPercentage)
}
if dmetrics.totalInodeUsage > 0 {
addFileSystemMetrics(out, timestamp, dataset, "system.filesystem.files", device, mpoint, fstype, dmetrics.totalInodeUsage)
addFileSystemMetrics(out, timestamp, mutator, "system.filesystem.files", device, mpoint, fstype, dmetrics.totalInodeUsage)
}

}
Expand All @@ -106,7 +107,8 @@ func remapFilesystemMetrics(src, out pmetric.MetricSlice,

func addFileSystemMetrics[T number](out pmetric.MetricSlice,
timestamp pcommon.Timestamp,
dataset, name, device, mpoint, fstype string,
mutator func(pmetric.NumberDataPoint),
name, device, mpoint, fstype string,
value T,
) {
var intValue *int64
Expand All @@ -117,12 +119,13 @@ func addFileSystemMetrics[T number](out pmetric.MetricSlice,
doubleValue = &d
}

remappedmetric.Add(out, dataset,
remappedmetric.Add(out, remappedmetric.ChainedMutator(
mutator,
func(dp pmetric.NumberDataPoint) {
dp.Attributes().PutStr("system.filesystem.device_name", device)
dp.Attributes().PutStr("system.filesystem.mount_point", mpoint)
dp.Attributes().PutStr("system.filesystem.type", fstype)
},
}),
remappedmetric.Metric{
DataType: pmetric.MetricTypeSum,
Name: name,
Expand Down
22 changes: 12 additions & 10 deletions remappers/hostmetrics/hostmetrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"path"
"strings"

"github.com/elastic/opentelemetry-lib/remappers/common"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.uber.org/zap"
Expand All @@ -40,7 +41,7 @@ var scraperToElasticDataset = map[string]string{
"process": "system.process",
}

type remapFunc func(metrics pmetric.MetricSlice, out pmetric.MetricSlice, resource pcommon.Resource, dataset string) error
type remapFunc func(pmetric.MetricSlice, pmetric.MetricSlice, pcommon.Resource, func(pmetric.NumberDataPoint)) error

var remapFuncs = map[string]remapFunc{
"cpu": remapCPUMetrics,
Expand Down Expand Up @@ -87,22 +88,23 @@ func (r *Remapper) Remap(
scope := src.Scope()
scraper := path.Base(scope.Name())

var dataset string // an empty dataset defers setting dataset to the caller
if r.cfg.SystemIntegrationDataset {
var ok bool
dataset, ok = scraperToElasticDataset[scraper]
if !ok {
r.logger.Warn("no dataset defined for scraper", zap.String("scraper", scraper))
return
dataset, ok := scraperToElasticDataset[scraper]
if !ok {
r.logger.Warn("no dataset defined for scraper", zap.String("scraper", scraper))
return
}
datasetMutator := func(m pmetric.NumberDataPoint) {
m.Attributes().PutStr(common.EventDatasetLabel, dataset)
if r.cfg.SystemIntegrationDataset {
m.Attributes().PutStr(common.DatastreamDatasetLabel, dataset)
}
}

remapFunc, ok := remapFuncs[scraper]
if !ok {
return
}

err := remapFunc(src.Metrics(), out, resource, dataset)
err := remapFunc(src.Metrics(), out, resource, datasetMutator)
if err != nil {
r.logger.Warn(
"failed to remap OTel hostmetrics",
Expand Down
8 changes: 6 additions & 2 deletions remappers/hostmetrics/hostmetrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,13 @@ func doTestRemap(t *testing.T, id string, remapOpts ...Option) {

systemIntegration := newConfig(remapOpts...).SystemIntegrationDataset
outAttr := func(scraper string) map[string]any {
m := map[string]any{"otel_remapped": true}
dataset := scraperToElasticDataset[scraper]
m := map[string]any{
common.OTelRemappedLabel: true,
common.EventDatasetLabel: dataset,
}
if systemIntegration {
m[common.DatastreamDatasetLabel] = scraperToElasticDataset[scraper]
m[common.DatastreamDatasetLabel] = dataset
}

switch scraper {
Expand Down
4 changes: 2 additions & 2 deletions remappers/hostmetrics/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
func remapLoadMetrics(
src, out pmetric.MetricSlice,
_ pcommon.Resource,
dataset string,
mutator func(pmetric.NumberDataPoint),
) error {
var timestamp pcommon.Timestamp
var l1, l5, l15 float64
Expand Down Expand Up @@ -57,7 +57,7 @@ func remapLoadMetrics(
}
}

remappedmetric.Add(out, dataset, remappedmetric.EmptyMutator,
remappedmetric.Add(out, mutator,
remappedmetric.Metric{
DataType: pmetric.MetricTypeGauge,
Name: "system.load.1",
Expand Down
4 changes: 2 additions & 2 deletions remappers/hostmetrics/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
func remapMemoryMetrics(
src, out pmetric.MetricSlice,
_ pcommon.Resource,
dataset string,
mutator func(pmetric.NumberDataPoint),
) error {
var timestamp pcommon.Timestamp
var total, free, cached, usedBytes, actualFree, actualUsedBytes int64
Expand Down Expand Up @@ -96,7 +96,7 @@ func remapMemoryMetrics(
usedBytes += total
actualFree = total - actualUsedBytes

remappedmetric.Add(out, dataset, remappedmetric.EmptyMutator,
remappedmetric.Add(out, mutator,
remappedmetric.Metric{
DataType: pmetric.MetricTypeSum,
Name: "system.memory.total",
Expand Down
14 changes: 9 additions & 5 deletions remappers/hostmetrics/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
func remapNetworkMetrics(
src, out pmetric.MetricSlice,
_ pcommon.Resource,
dataset string,
mutator func(pmetric.NumberDataPoint),
) error {
for i := 0; i < src.Len(); i++ {
metric := src.At(i)
Expand All @@ -51,9 +51,9 @@ func remapNetworkMetrics(
}
switch direction.Str() {
case "receive":
addDeviceMetric(out, timestamp, dataset, name, device.Str(), "in", value)
addDeviceMetric(out, timestamp, mutator, name, device.Str(), "in", value)
case "transmit":
addDeviceMetric(out, timestamp, dataset, name, device.Str(), "out", value)
addDeviceMetric(out, timestamp, mutator, name, device.Str(), "out", value)
}
}
}
Expand All @@ -64,7 +64,8 @@ func remapNetworkMetrics(
func addDeviceMetric(
out pmetric.MetricSlice,
timestamp pcommon.Timestamp,
dataset, name, device, direction string,
mutator func(pmetric.NumberDataPoint),
name, device, direction string,
value int64,
) {
metricsToAdd := map[string]string{
Expand All @@ -79,10 +80,13 @@ func addDeviceMetric(
return
}

remappedmetric.Add(out, dataset,
finalMutator := remappedmetric.ChainedMutator(
mutator,
func(dp pmetric.NumberDataPoint) {
dp.Attributes().PutStr("system.network.name", device)
},
)
remappedmetric.Add(out, finalMutator,
remappedmetric.Metric{
DataType: pmetric.MetricTypeSum,
Name: fmt.Sprintf(metricNetworkES, direction),
Expand Down
8 changes: 6 additions & 2 deletions remappers/hostmetrics/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
func remapProcessMetrics(
src, out pmetric.MetricSlice,
resource pcommon.Resource,
dataset string,
mutator func(pmetric.NumberDataPoint),
) error {
var timestamp, startTimestamp pcommon.Timestamp
var threads, memUsage, memVirtual, fdOpen, ioReadBytes, ioWriteBytes, ioReadOperations, ioWriteOperations int64
Expand Down Expand Up @@ -160,7 +160,11 @@ func remapProcessMetrics(
processRuntime := timestamp.AsTime().UnixMilli() - startTimeMillis
cpuPct := cpuTimeValue / float64(processRuntime)

remappedmetric.Add(out, dataset, addProcessResources(resource, startTime.UTC()),
finalMutator := remappedmetric.ChainedMutator(
mutator,
addProcessResources(resource, startTime.UTC()),
)
remappedmetric.Add(out, finalMutator,
// The timestamp metrics get converted from Int to Timestamp in Kibana
// since these are mapped to timestamp datatype
remappedmetric.Metric{
Expand Down
7 changes: 5 additions & 2 deletions remappers/hostmetrics/processes.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
func remapProcessesMetrics(
src, out pmetric.MetricSlice,
_ pcommon.Resource,
dataset string,
mutator func(pmetric.NumberDataPoint),
) error {
var timestamp pcommon.Timestamp
var idleProcesses, sleepingProcesses, stoppedProcesses, zombieProcesses, runningProcesses, totalProcesses int64
Expand Down Expand Up @@ -67,7 +67,8 @@ func remapProcessesMetrics(

}

remappedmetric.Add(out, dataset,
finalMutator := remappedmetric.ChainedMutator(
mutator,
func(dp pmetric.NumberDataPoint) {
// Processes tab in the Kibana curated UI requires the event.dataset
// to work. This is a hard dependency.
Expand All @@ -76,6 +77,8 @@ func remapProcessesMetrics(
// remap functions.
dp.Attributes().PutStr("event.dataset", "system.process.summary")
},
)
remappedmetric.Add(out, finalMutator,
remappedmetric.Metric{
DataType: pmetric.MetricTypeSum,
Name: "system.process.summary.idle",
Expand Down
Loading