Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion src/otel/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,15 @@ fn flatten_severity(severity_number: i32) -> Map<String, Value> {
Value::Number(severity_number.into()),
);
let severity = SeverityNumber::try_from(severity_number).unwrap();
let severity_text = severity.as_str_name().to_string();
severity_json.insert(
"severity_text".to_string(),
Value::String(severity.as_str_name().to_string()),
Value::String(
severity_text
.strip_prefix("SEVERITY_NUMBER_")
.unwrap_or(&severity_text)
.to_string(),
),
);
severity_json
}
Expand Down
10 changes: 5 additions & 5 deletions src/otel/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -620,9 +620,9 @@ fn flatten_aggregation_temporality(aggregation_temporality: i32) -> Map<String,
Value::Number(aggregation_temporality.into()),
);
let description = match aggregation_temporality {
0 => "AGGREGATION_TEMPORALITY_UNSPECIFIED",
1 => "AGGREGATION_TEMPORALITY_DELTA",
2 => "AGGREGATION_TEMPORALITY_CUMULATIVE",
0 => "UNSPECIFIED",
1 => "DELTA",
2 => "CUMULATIVE",
_ => "",
};
aggregation_temporality_json.insert(
Expand All @@ -637,8 +637,8 @@ fn flatten_data_point_flags(flags: u32) -> Map<String, Value> {
let mut data_point_flags_json = Map::new();
data_point_flags_json.insert("data_point_flags".to_string(), Value::Number(flags.into()));
let description = match flags {
0 => "DATA_POINT_FLAGS_DO_NOT_USE",
1 => "DATA_POINT_FLAGS_NO_RECORDED_VALUE_MASK",
0 => "DO_NOT_USE",
1 => "NO_RECORDED_VALUE_MASK",
_ => "",
};
data_point_flags_json.insert(
Expand Down
98 changes: 65 additions & 33 deletions src/otel/traces.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use serde_json::{Map, Value};
use super::otel_utils::convert_epoch_nano_to_timestamp;
use super::otel_utils::insert_attributes;

pub const OTEL_TRACES_KNOWN_FIELD_LIST: [&str; 30] = [
pub const OTEL_TRACES_KNOWN_FIELD_LIST: [&str; 32] = [
"scope_name",
"scope_version",
"scope_schema_url",
Expand All @@ -43,8 +43,10 @@ pub const OTEL_TRACES_KNOWN_FIELD_LIST: [&str; 30] = [
"span_kind_description",
"span_start_time_unix_nano",
"span_end_time_unix_nano",
"span_duration_ms",
"event_name",
"event_time_unix_nano",
"event_duration_ms",
"event_dropped_attributes_count",
"link_span_id",
"link_trace_id",
Expand Down Expand Up @@ -169,7 +171,7 @@ pub fn flatten_otel_traces(message: &TracesData) -> Vec<Value> {
/// otel traces has json array of events
/// this function flattens the `Event` object
/// and returns a `Vec` of `Map` of the flattened json
fn flatten_events(events: &[Event]) -> Vec<Map<String, Value>> {
fn flatten_events(events: &[Event], span_start_time_unix_nano: u64) -> Vec<Map<String, Value>> {
events
.iter()
.map(|event| {
Expand All @@ -181,6 +183,20 @@ fn flatten_events(events: &[Event]) -> Vec<Map<String, Value>> {
),
);
event_json.insert("event_name".to_string(), Value::String(event.name.clone()));

// Calculate event duration in milliseconds from span start
let duration_nanos = event
.time_unix_nano
.saturating_sub(span_start_time_unix_nano);
let duration_ms = duration_nanos as f64 / 1_000_000.0; // Convert nanoseconds to milliseconds
event_json.insert(
"event_duration_ms".to_string(),
Value::Number(
serde_json::Number::from_f64(duration_ms)
.unwrap_or_else(|| serde_json::Number::from(0)),
),
);

insert_attributes(&mut event_json, &event.attributes);
event_json.insert(
"event_dropped_attributes_count".to_string(),
Expand Down Expand Up @@ -233,9 +249,9 @@ fn flatten_status(status: &Status) -> Map<String, Value> {
Value::Number(status.code.into()),
);
let description = match status.code {
0 => "STATUS_CODE_UNSET",
1 => "STATUS_CODE_OK",
2 => "STATUS_CODE_ERROR",
0 => "UNSET",
1 => "OK",
2 => "ERROR",
_ => "",
};
status_json.insert(
Expand All @@ -254,10 +270,10 @@ fn flatten_flags(flags: u32) -> Map<String, Value> {
let mut flags_json = Map::new();
flags_json.insert("span_flags".to_string(), Value::Number(flags.into()));
let description = match flags {
0 => "SPAN_FLAGS_DO_NOT_USE",
255 => "SPAN_FLAGS_TRACE_FLAGS_MASK",
256 => "SPAN_FLAGS_CONTEXT_HAS_IS_REMOTE_MASK",
512 => "SPAN_FLAGS_CONTEXT_IS_REMOTE_MASK",
0 => "DO_NOT_USE",
255 => "TRACE_FLAGS_MASK",
256 => "CONTEXT_HAS_IS_REMOTE_MASK",
512 => "CONTEXT_IS_REMOTE_MASK",
_ => "",
};
flags_json.insert(
Expand All @@ -276,12 +292,12 @@ fn flatten_kind(kind: i32) -> Map<String, Value> {
let mut kind_json = Map::new();
kind_json.insert("span_kind".to_string(), Value::Number(kind.into()));
let description = match kind {
0 => "SPAN_KIND_UNSPECIFIED",
1 => "SPAN_KIND_INTERNAL",
2 => "SPAN_KIND_SERVER",
3 => "SPAN_KIND_CLIENT",
4 => "SPAN_KIND_PRODUCER",
5 => "SPAN_KIND_CONSUMER",
0 => "UNSPECIFIED",
1 => "INTERNAL",
2 => "SERVER",
3 => "CLIENT",
4 => "PRODUCER",
5 => "CONSUMER",
_ => "",
};
kind_json.insert(
Expand Down Expand Up @@ -332,12 +348,26 @@ fn flatten_span_record(span_record: &Span) -> Vec<Map<String, Value>> {
span_record.end_time_unix_nano as i64,
)),
);

// Calculate span duration in milliseconds
let duration_nanos = span_record
.end_time_unix_nano
.saturating_sub(span_record.start_time_unix_nano);
let duration_ms = duration_nanos as f64 / 1_000_000.0; // Convert nanoseconds to milliseconds
span_record_json.insert(
"span_duration_ms".to_string(),
Value::Number(
serde_json::Number::from_f64(duration_ms)
.unwrap_or_else(|| serde_json::Number::from(0)),
),
);

insert_attributes(&mut span_record_json, &span_record.attributes);
span_record_json.insert(
"span_dropped_attributes_count".to_string(),
Value::Number(span_record.dropped_attributes_count.into()),
);
let events_json = flatten_events(&span_record.events);
let events_json = flatten_events(&span_record.events, span_record.start_time_unix_nano);
span_records_json.extend(events_json);
span_record_json.insert(
"span_dropped_events_count".to_string(),
Expand Down Expand Up @@ -414,9 +444,9 @@ mod tests {
fn test_flatten_status_code_mapping() {
// Test that status codes are correctly mapped to descriptions
let test_cases = vec![
(0, "STATUS_CODE_UNSET"),
(1, "STATUS_CODE_OK"),
(2, "STATUS_CODE_ERROR"),
(0, "UNSET"),
(1, "OK"),
(2, "ERROR"),
(999, ""), // Unknown status code should return empty string
];

Expand Down Expand Up @@ -451,12 +481,12 @@ mod tests {
fn test_flatten_span_kind_mapping() {
// Test that span kinds are correctly mapped to descriptions
let test_cases = vec![
(0, "SPAN_KIND_UNSPECIFIED"),
(1, "SPAN_KIND_INTERNAL"),
(2, "SPAN_KIND_SERVER"),
(3, "SPAN_KIND_CLIENT"),
(4, "SPAN_KIND_PRODUCER"),
(5, "SPAN_KIND_CONSUMER"),
(0, "UNSPECIFIED"),
(1, "INTERNAL"),
(2, "SERVER"),
(3, "CLIENT"),
(4, "PRODUCER"),
(5, "CONSUMER"),
(999, ""), // Unknown kind should return empty string
];

Expand All @@ -481,10 +511,10 @@ mod tests {
fn test_flatten_flags_mapping() {
// Test that flags are correctly mapped to descriptions
let test_cases = vec![
(0, "SPAN_FLAGS_DO_NOT_USE"),
(255, "SPAN_FLAGS_TRACE_FLAGS_MASK"),
(256, "SPAN_FLAGS_CONTEXT_HAS_IS_REMOTE_MASK"),
(512, "SPAN_FLAGS_CONTEXT_IS_REMOTE_MASK"),
(0, "DO_NOT_USE"),
(255, "TRACE_FLAGS_MASK"),
(256, "CONTEXT_HAS_IS_REMOTE_MASK"),
(512, "CONTEXT_IS_REMOTE_MASK"),
(999, ""), // Unknown flag should return empty string
];

Expand Down Expand Up @@ -523,7 +553,7 @@ mod tests {
},
];

let result = flatten_events(&events);
let result = flatten_events(&events, 1640995200000000000);

assert_eq!(result.len(), 2, "Should have two flattened events");

Expand Down Expand Up @@ -656,7 +686,7 @@ mod tests {
);
assert_eq!(
record.get("span_kind_description").unwrap(),
&Value::String("SPAN_KIND_SERVER".to_string()),
&Value::String("SERVER".to_string()),
"All records should contain span kind description"
);
assert!(
Expand Down Expand Up @@ -897,7 +927,7 @@ mod tests {
);
assert_eq!(
record.get("span_kind_description").unwrap(),
&Value::String("SPAN_KIND_CLIENT".to_string()),
&Value::String("CLIENT".to_string()),
"Should contain span kind description"
);
assert_eq!(
Expand All @@ -907,7 +937,7 @@ mod tests {
);
assert_eq!(
record.get("span_status_description").unwrap(),
&Value::String("STATUS_CODE_OK".to_string()),
&Value::String("OK".to_string()),
"Should contain status description"
);
}
Expand All @@ -934,12 +964,14 @@ mod tests {
"event_name",
"event_time_unix_nano",
"event_dropped_attributes_count",
"event_duration_ms",
"link_span_id",
"link_trace_id",
"link_dropped_attributes_count",
"span_dropped_events_count",
"span_dropped_links_count",
"span_dropped_attributes_count",
"span_duration_ms",
"span_trace_state",
"span_flags",
"span_flags_description",
Expand Down
Loading