Traces #
Flink exposes a tracing system that allows gathering and exposing traces to external systems.
Reporting traces #
You can access the tracing system from any user function that extends RichFunction by calling getRuntimeContext().getMetricGroup(). This method returns a MetricGroup object via which you can report a new single span trace.
Reporting single Span #
A Span represents something that happened in Flink at certain point of time, that will be reported to a TraceReporter. To report a Span you can use the MetricGroup#addSpan(SpanBuilder) method.
Currently we don’t support traces with multiple spans. Each Span is self-contained and represents things like a checkpoint or recovery.
public class MyClass { void doSomething() { // (...) metricGroup.addSpan( Span.builder(MyClass.class, "SomeAction") .setStartTsMillis(startTs) // Optional .setEndTsMillis(endTs) // Optional .setAttribute("foo", "bar"); } } Currently reporting Spans from Python is not supported. Reporter #
For information on how to set up Flink’s trace reporters please take a look at the trace reporters documentation.
System traces #
Flink reports traces listed below.
The tables below generally feature 5 columns:
-
The “Scope” column describes what is that trace reported scope.
-
The “Name” column describes the name of the reported trace.
-
The “Attributes” column lists the names of all attributes that are reported with the given trace.
-
The “Description” column provides information as to what a given attribute is reporting.
Checkpointing and initialization #
Flink reports a single span trace for the whole checkpoint and job initialization events once that event reaches a terminal state: COMPLETED or FAILED.
| Scope | Name | Attributes | Description |
|---|---|---|---|
| org.apache.flink.runtime.checkpoint.CheckpointStatsTracker | Checkpoint | startTs | Timestamp when the checkpoint has started. |
| endTs | Timestamp when the checkpoint has finished. | ||
| checkpointId | Id of the checkpoint. | ||
| checkpointedSize | Size in bytes of checkpointed state during this checkpoint. Might be smaller than fullSize if incremental checkpoints are used. | ||
| fullSize | Full size in bytes of the referenced state by this checkpoint. Might be larger than checkpointSize if incremental checkpoints are used. | ||
| checkpointStatus | What was the state of this checkpoint: FAILED or COMPLETED. | ||
| JobInitialization | startTs | Timestamp when the job initialization has started. | |
| endTs | Timestamp when the job initialization has finished. | ||
| checkpointId (optional) | Id of the checkpoint that the job recovered from (if any). | ||
| fullSize | Full size in bytes of the referenced state by the checkpoint that was used during recovery (if any). | ||
| (Max/Sum)MailboxStartDurationMs | The aggregated (max and sum) across all subtasks duration between subtask being created until all classes and objects of that subtask are initialize. | ||
| (Max/Sum)ReadOutputDataDurationMs | The aggregated (max and sum) across all subtasks duration of reading unaligned checkpoint's output buffers. | ||
| (Max/Sum)InitializeStateDurationMs | The aggregated (max and sum) across all subtasks duration to initialize a state backend (including state files download time) | ||
| (Max/Sum)GateRestoreDurationMs | The aggregated (max and sum) across all subtasks duration of reading unaligned checkpoint's input buffers. | ||
| (Max/Sum)DownloadStateDurationMs (optional - currently only supported by RocksDB Incremental) | The aggregated (max and sum) across all subtasks duration of downloading state files from the DFS. | ||
| (Max/Sum)RestoredStateSizeBytes.[location] | The aggregated (max and sum) across all subtasks size of restored state by location. Possible locations are defined in Enum StateObjectSizeStatsCollector as LOCAL_MEMORY, LOCAL_DISK, REMOTE, UNKNOWN. |