Github: https://github.com/aceld/kis-flow
Document: https://github.com/aceld/kis-flow/wiki
Part1-OverView
Part2.1-Project Construction / Basic Modules
Part2.2-Project Construction / Basic Modules
Part3-Data Stream
Part4-Function Scheduling
Part5-Connector
Part6-Configuration Import and Export
Part7-KisFlow Action
Part8-Cache/Params Data Caching and Data Parameters
Part9-Multiple Copies of Flow
Part10-Prometheus Metrics Statistics
Part11-Adaptive Registration of FaaS Parameter Types Based on Reflection
Case1-Quick Start
Case2-Flow Parallel Operation
Case3-Application of KisFlow in Multi-Goroutines
Case4-KisFlow in Message Queue (MQ) Applications
Before diving into this chapter, let's introduce how to start the Prometheus Metrics service. For those unfamiliar with Prometheus, it's advisable to look up additional information. In simple terms, Prometheus is a system monitoring and metrics tool.
As KisFlow is a stream computing framework, metrics such as function scheduling time, total data volume, and algorithm speed are crucial for developers and project teams. These metrics can be recorded using Prometheus Metrics through KisFlow.
Next, we will configure the framework globally, allowing developers to enable Prometheus metrics collection if needed.
10.1 Prometheus Metrics Service
10.1.1 Prometheus Client SDK
First, add the necessary dependency in the kis-flow/go.mod
file:
module kis-flow go 1.18 require ( github.com/google/uuid v1.5.0 github.com/patrickmn/go-cache v2.1.0+incompatible github.com/prometheus/client_golang v1.14.0 //++++++++ gopkg.in/yaml.v3 v3.0.1 )
We use the official Prometheus Golang client SDK. More details can be found in the official README documentation:
Next, let's write a simple Prometheus service that allows external access to KisFlow service metrics. Create a new directory kis-flow/metrics/
for the KisFlow metrics code.
kis-flow/metrics/kis_metrics.go
package metrics
import (
"github.com/prometheus/client_golang/prometheus/promhttp"
"kis-flow/common"
"kis-flow/log"
"net/http"
)
// RunMetricsService starts the Prometheus monitoring service
func RunMetricsService(serverAddr string) error {
// Register the Prometheus monitoring route path http.Handle(common.METRICS_ROUTE, promhttp.Handler()) // Start the HTTP server err := http.ListenAndServe(serverAddr, nil) // Multiple processes cannot listen on the same port if err != nil { log.Logger().ErrorF("RunMetricsService err = %s\n", err) } return err
}
Define `METRICS_ROUTE` as the monitoring service HTTP route path in `kis-flow/common/const.go`: > kis-flow/common/const.go ```go // ... ... // metrics const ( METRICS_ROUTE string = "/metrics" ) // ... ...
Let's briefly explain the above code. RunMetricsService()
starts the Prometheus monitoring HTTP service. The purpose of this service is to provide metrics for the current KisFlow process through HTTP requests. While we haven't collected specific metrics yet, Prometheus will provide default metrics such as the current Go version, GC garbage collection time, memory allocation, etc.
-
serverAddr
parameter: This is the address for the Prometheus monitoring service, usually a local address with a port number like "0.0.0.0:20004".
http.Handle(common.METRICS_ROUTE, promhttp.Handler())
This line of code sets "0.0.0.0:20004/metrics" as the metrics entry point.
After writing the above code, remember to pull the relevant dependency package from Prometheus Golang Client SDK, https://github.com/prometheus/client_golang .
$ go mod tidy
After pulling, the current go.mod
dependencies will look something like this (with version differences):
kis-flow/go.mod
module kis-flow
go 1.18
require (
github.com/google/uuid v1.5.0
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/prometheus/client_golang v1.14.0
gopkg.in/yaml.v3 v3.0.1
)
require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
github.com/prometheus/client_model v0.3.0 // indirect
github.com/prometheus/common v0.37.0 // indirect
github.com/prometheus/procfs v0.8.0 // indirect
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a // indirect
google.golang.org/protobuf v1.28.1 // indirect
)
### 10.1.2 Unit Testing for Prometheus Server Service Startup Next, let's perform a simple test to verify if the service can start. Create a file named `prometheus_server_test.go` in the `kis-flow/test/` directory: > kis-flow/test/prometheus_server_test.go ```go package test import ( "kis-flow/metrics" "testing" ) func TestPrometheusServer(t *testing.T) { err := metrics.RunMetricsService("0.0.0.0:20004") if err != nil { panic(err) } }
Here, the monitoring address is "0.0.0.0:20004". Next, start this unit test case by opening terminal A and navigating to the kis-flow/test/
directory:
$ cd kis-flow/test/ $ go test -test.v -test.paniconexit0 -test.run TestPrometheusServer === RUN TestPrometheusServer
Then, open another terminal B and enter the following command to simulate an HTTP client request:
$ curl http://0.0.0.0:20004/metrics
After that, we should see the monitoring metrics result in terminal B as follows:
# HELP go_gc_duration_seconds A summary of the pause duration of garbage collection cycles. # TYPE go_gc_duration_seconds summary go_gc_duration_seconds{quantile="0"} 0 go_gc_duration_seconds{quantile="0.25"} 0 go_gc_duration_seconds{quantile="0.5"} 0 go_gc_duration_seconds{quantile="0.75"} 0 go_gc_duration_seconds{quantile="1"} 0 go_gc_duration_seconds_sum 0 go_gc_duration_seconds_count 0 # HELP go_goroutines Number of goroutines that currently exist. # TYPE go_goroutines gauge go_goroutines 8 # HELP go_info Information about the Go environment. # TYPE go_info gauge go_info{version="go1.18.8"} 1 # HELP go_memstats_alloc_bytes Number of bytes allocated and still in use. # TYPE go_memstats_alloc_bytes gauge go_memstats_alloc_bytes 3.2364e+06 # HELP go_memstats_alloc_bytes_total Total number of bytes allocated, even if freed. # TYPE go_memstats_alloc_bytes_total counter go_memstats_alloc_bytes_total 3.2364e+06 # HELP go_memstats_buck_hash_sys_bytes Number of bytes used by the profiling bucket hash table. # TYPE go_memstats_buck_hash_sys_bytes gauge go_memstats_buck_hash_sys_bytes 1.446507e+06 # HELP go_memstats_frees_total Total number of frees. # TYPE go_memstats_frees_total counter go_memstats_frees_total 0 # HELP go_memstats_gc_sys_bytes Number of bytes used for garbage collection system metadata. # TYPE go_memstats_gc_sys_bytes gauge go_memstats_gc_sys_bytes 3.561224e+06 # HELP go_memstats_heap_alloc_bytes Number of heap bytes allocated and still in use. # TYPE go_memstats_heap_alloc_bytes gauge go_memstats_heap_alloc_bytes 3.2364e+06 # HELP go_memstats_heap_idle_bytes Number of heap bytes waiting to be used. # TYPE go_memstats_heap_idle_bytes gauge go_memstats_heap_idle_bytes 4.636672e+06 # HELP go_memstats_heap_inuse_bytes Number of heap bytes that are in use. # TYPE go_memstats_heap_inuse_bytes gauge go_memstats_heap_inuse_bytes 3.260416e+06 # HELP go_memstats_heap_objects Number of allocated objects. # TYPE go_memstats_heap_objects gauge go_memstats_heap_objects 21294 # HELP go_memstats_heap_released_bytes Number of heap bytes released to OS. # TYPE go_memstats_heap_released_bytes gauge go_memstats_heap_released_bytes 4.636672e+06 # HELP go_memstats_heap_sys_bytes Number of heap bytes obtained from system. # TYPE go_memstats_heap_sys_bytes gauge go_memstats_heap_sys_bytes 7.897088e+06 # HELP go_memstats_last_gc_time_seconds Number of seconds since 1970 of last garbage collection. # TYPE go_memstats_last_gc_time_seconds gauge go_memstats_last_gc_time_seconds 0 # HELP go_memstats_lookups_total Total number of pointer lookups. # TYPE go_memstats_lookups_total counter go_memstats_lookups_total 0 # HELP go_memstats_mallocs_total Total number of mallocs. # TYPE go_memstats_mallocs_total counter go_memstats_mallocs_total 21294 # HELP go_memstats_mcache_inuse_bytes Number of bytes in use by mcache structures. # TYPE go_memstats_mcache_inuse_bytes gauge go_memstats_mcache_inuse_bytes 9600 # HELP go_memstats_mcache_sys_bytes Number of bytes used for mcache structures obtained from system. # TYPE go_memstats_mcache_sys_bytes gauge go_memstats_mcache_sys_bytes 15600 # HELP go_memstats_mspan_inuse_bytes Number of bytes in use by mspan structures. # TYPE go_memstats_mspan_inuse_bytes gauge go_memstats_mspan_inuse_bytes 46376 # HELP go_memstats_mspan_sys_bytes Number of bytes used for mspan structures obtained from system. # TYPE go_memstats_mspan_sys_bytes gauge go_memstats_mspan_sys_bytes 48960 # HELP go_memstats_next_gc_bytes Number of heap bytes when next garbage collection will take place. # TYPE go_memstats_next_gc_bytes gauge go_memstats_next_gc_bytes 4.194304e+06 # HELP go_memstats_other_sys_bytes Number of bytes used for other system allocations. # TYPE go_memstats_other_sys_bytes gauge go_memstats_other_sys_bytes 1.171301e+06 # HELP go_memstats_stack_inuse_bytes Number of bytes in use by the stack allocator. # TYPE go_memstats_stack_inuse_bytes gauge go_memstats_stack_inuse_bytes 491520 # HELP go_memstats_stack_sys_bytes Number of bytes obtained from system for stack allocator. # TYPE go_memstats_stack_sys_bytes gauge go_memstats_stack_sys_bytes 491520 # HELP go_memstats_sys_bytes Number of bytes obtained from system. # TYPE go_memstats_sys_bytes gauge go_memstats_sys_bytes 1.46322e+07 # HELP go_threads Number of OS threads created. # TYPE go_threads gauge go_threads 7 # HELP promhttp_metric_handler_requests_in_flight Current number of scrapes being served. # TYPE promhttp_metric_handler_requests_in_flight gauge promhttp_metric_handler_requests_in_flight 1 # HELP promhttp_metric_handler_requests_total Total number of scrapes by HTTP status code. # TYPE promhttp_metric_handler_requests_total counter promhttp_metric_handler_requests_total{code="200"} 1 promhttp_metric_handler_requests_total{code="500"} 0 promhttp_metric_handler_requests_total{code="503"} 0
We have already provided configurations for Function
, Flow
, and Connector in KisFlow
, distinguished by kistype. Next, we will implement a global configuration with kistype set to global. In this configuration, we will add settings to enable or disable Prometheus and Metrics collection.
Let's proceed to add global configuration properties to KisFlow.
10.2 KisFlow Global Configuration
10.2.1 Loading Global Configuration Files
The global configuration in YAML format is as follows:
# kistype Global for the global configuration of KisFlow kistype: global # Whether to enable Prometheus monitoring prometheus_enable: true # Whether KisFlow needs to start a separate port listener prometheus_listen: true # The address for Prometheus to listen for metrics prometheus_serve: 0.0.0.0:20004
10.2.2 Struct Definition
Next, based on the configuration protocol above, we'll define the strategy configuration struct for KisFlow and provide some initialization methods. Create a file named kis_global_config.go
in the project documentation. Here, we'll define the necessary configuration.
kis-flow/config/kis_global_config.go
package config
type KisGlobalConfig struct {
// kistype Global for the global configuration of KisFlow
KisType string yaml:"kistype"
// Whether to enable Prometheus monitoring
EnableProm bool yaml:"prometheus_enable"
// Whether KisFlow needs to start a separate port listener
PrometheusListen bool yaml:"prometheus_listen"
// The address for Prometheus to listen for metrics
PrometheusServe string yaml:"prometheus_serve"
}
// GlobalConfig is the default global configuration, all are turned off
var GlobalConfig = new(KisGlobalConfig)
Here, we provide a global `GlobalConfig` object, which is a public variable, making it convenient for other modules to share the global configuration. ### 10.2.3 Configuration File Parsing Next, we'll parse the global configuration and import it. Add the following function in `kis-flow/file/config_import.go`: > kis-flow/file/config_import.go ```go // kisTypeGlobalConfigure parses the Global configuration file in YAML format func kisTypeGlobalConfigure(confData []byte, fileName string, kisType interface{}) error { // Global configuration if err := yaml.Unmarshal(confData, config.GlobalConfig); err != nil { return fmt.Errorf("%s has wrong format kisType = %s", fileName, kisType) } // TODO Initialize Prometheus metrics // TODO Start Prometheus metrics service return nil }
This function loads the global YAML configuration file. After loading, it determines whether to initialize Prometheus metrics monitoring, which we will add later.
Where is kisTypeGlobalConfigure()
called? It is invoked during the loading and scanning of local configuration files, similar to other configuration files:
kis-flow/file/config_import.go
// parseConfigWalkYaml parses all configuration files in YAML format and loads the configuration information into allConfig func parseConfigWalkYaml(loadPath string) (*allConfig, error) { // ... ...
err := filepath.Walk(loadPath, func(filePath string, info os.FileInfo, err error) error { // ... ... // Check if kistype exists if kisType, ok := confMap["kistype"]; !ok { return fmt.Errorf("yaml file %s has no field [kistype]!", filePath) } else { switch kisType { case common.KisIdTypeFlow: return kisTypeFlowConfigure(all, confData, filePath, kisType) case common.KisIdTypeFunction: return kisTypeFuncConfigure(all, confData, filePath, kisType) case common.KisIdTypeConnector: return kisTypeConnConfigure(all, confData, filePath, kisType) // +++++++++++++++++++++++++++++++++ case common.KisIdTypeGlobal: return kisTypeGlobalConfigure(confData, filePath, kisType) // +++++++++++++++++++++++++++++++++ default: return fmt.Errorf("%s sets wrong kistype %s", filePath, kisType) } } }) if err != nil { return nil, err } return all, nil
}
Here, we add a case for kistype: `KisIdTypeGlobal` to call `kisTypeGlobalConfigure()`. Next, we will create the Metrics module. In this section, we will start by tracking a simple metric: the total amount of data processed by KisFlow (based on the number of source data processed). ## 10.3 Metrics - DataTotal Metric ### 10.3.1 KisMetrics First, create a KisMetrics module by creating the directory `kis-flow/metrics/` and the file `kis_metrics.go`: > kis-flow/metrics/kis_metrics.go ```go package metrics import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" "kis-flow/common" "kis-flow/log" "net/http" ) // kisMetrics defines the Prometheus metrics for KisFlow type kisMetrics struct { // Total data count DataTotal prometheus.Counter } var Metrics *kisMetrics // RunMetricsService starts the Prometheus monitoring service func RunMetricsService(serverAddr string) error { // Register Prometheus monitoring route http.Handle(common.METRICS_ROUTE, promhttp.Handler()) // Start HTTP server err := http.ListenAndServe(serverAddr, nil) // Multiple processes cannot listen on the same port if err != nil { log.Logger().ErrorF("RunMetricsService err = %s\n", err) } return err } // InitMetrics initializes the metrics func InitMetrics() { Metrics = new(kisMetrics) // Initialize the DataTotal counter Metrics.DataTotal = prometheus.NewCounter(prometheus.CounterOpts{ Name: common.COUNTER_KISFLOW_DATA_TOTAL_NAME, Help: common.COUNTER_KISFLOW_DATA_TOTAL_HELP, }) // Register the metrics prometheus.MustRegister(Metrics.DataTotal) }
kisMetrics struct
: This struct defines the metrics that KisFlow needs to track. Currently, it only includes one metric, DataTotal, which is of type prometheus.Counter (for more information on the prometheus.Counter type, please refer to the Prometheus documentation).Metrics *kisMetrics
: This is a global metrics tracking object for KisFlow, making it publicly accessible for other modules.RunMetricsService(serverAddr string)
: This function starts the Prometheus service listener, which was already unit tested in previous chapters.InitMetrics()
: This function initializes the global object and sets up the metrics. It calls prometheus.MustRegister to register the metrics with Prometheus, which is a necessary step in Prometheus metrics programming.
There are two constants representing the metric display name and description. These are defined as follows:
kis-flow/common/const.go
// metrics const ( METRICS_ROUTE string = "/metrics"
COUNTER_KISFLOW_DATA_TOTAL_NAME string = "kisflow_data_total" COUNTER_KISFLOW_DATA_TOTAL_HELP string = "Total data processed by all Flows in KisFlow"
)
### 10.3.2 DataTotal Metric Tracking To track the total data processed by KisFlow, we need to add the metrics tracking code in the `commitSrcData()` method. This method submits the current Flow's source data, indicating the first time the original source data is submitted to the current Flow. The updated code is as follows: > kis-flow/flow/kis_flow_data.go ```go func (flow *KisFlow) commitSrcData(ctx context.Context) error { // Create a batch of data dataCnt := len(flow.buffer) batch := make(common.KisRowArr, 0, dataCnt) for _, row := range flow.buffer { batch = append(batch, row) } // Clear previous data flow.clearData(flow.data) // Record the original data for the first submission // Since this is the first submission, PrevFunctionId is FirstVirtual because there is no previous Function flow.data[common.FunctionIdFirstVirtual] = batch // Clear the buffer flow.buffer = flow.buffer[0:0] // +++++++++++++++++++++++++++++++ // Track the total data count on the first submission if config.GlobalConfig.EnableProm == true { // Increment the DataTotal metric by the data count metrics.Metrics.DataTotal.Add(float64(dataCnt)) } // ++++++++++++++++++++++++++++++ log.Logger().DebugFX(ctx, "====> After CommitSrcData, flow_name = %s, flow_id = %s\nAll Level Data =\n %+v\n", flow.Name, flow.Id, flow.data) return nil }
First, it checks the global configuration to determine if metrics tracking is enabled. If true
, it increments the total data count metric with the following code:
metrics.Metrics.DataTotal.Add(float64(dataCnt))
Here, dataCnt
is the number of data items being added to the total count.
10.3.3 Starting the Metrics Service
After importing the configuration, we need to start the metrics service. The configuration file is updated as follows:
kis-flow/file/config_import.go
// kisTypeGlobalConfigure parses the Global configuration file in YAML format func kisTypeGlobalConfigure(confData []byte, fileName string, kisType interface{}) error { // Global configuration if ok := yaml.Unmarshal(confData, config.GlobalConfig); ok != nil { return errors.New(fmt.Sprintf("%s is wrong format kisType = %s", fileName, kisType)) }
// ++++++++++++++++++++ // Start the Metrics service metrics.RunMetrics() return nil
}
The `RunMetrics()` function is implemented as follows: > kis-flow/metrics/kis_metrics.go ```go // RunMetrics starts the Prometheus metrics service func RunMetrics() { // Initialize Prometheus metrics InitMetrics() if config.GlobalConfig.EnableProm == true && config.GlobalConfig.PrometheusListen == true { // Start the Prometheus metrics service go RunMetricsService(config.GlobalConfig.PrometheusServe) } }
With this setup, after importing the global configuration, it checks whether metrics tracking is enabled. If it is, a new goroutine is started to launch the Prometheus server, listening on the IP and port specified in the configuration file.
Next, we will create a unit test for the DataTotal metric to validate our implementation.
10.4 KisMetrics Unit Testing
10.4.1 Create Global Configuration File
Create a global configuration file kis-flow.yml
under kis-flow/test/load_conf/
with the following content:
kis-flow/test/load_conf/kis-flow.yml
# kistype Global for kisflow global configuration kistype: global # Enable prometheus monitoring prometheus_enable: true # Enable separate kisflow port listening prometheus_listen: true # Prometheus endpoint listening address prometheus_serve: 0.0.0.0:20004
10.4.2 Create Unit Test
Next, create the test case code in kis-flow/test/
, and create the kis_metrics_test.go file as follows:
kis-flow/test/kis_metrics_test.go
package test
import (
"context"
"kis-flow/common"
"kis-flow/file"
"kis-flow/kis"
"kis-flow/test/caas"
"kis-flow/test/faas"
"testing"
"time"
)
func TestMetricsDataTotal(t *testing.T) {
ctx := context.Background()
// 0. Register Function callback business logic kis.Pool().FaaS("funcName1", faas.FuncDemo1Handler) kis.Pool().FaaS("funcName2", faas.FuncDemo2Handler) kis.Pool().FaaS("funcName3", faas.FuncDemo3Handler) // 0. Register ConnectorInit and Connector callback business logic kis.Pool().CaaSInit("ConnName1", caas.InitConnDemo1) kis.Pool().CaaS("ConnName1", "funcName2", common.S, caas.CaasDemoHanler1) // 1. Load configuration file and build Flow if err := file.ConfigImportYaml("/Users/tal/gopath/src/kis-flow/test/load_conf/"); err != nil { panic(err) } // 2. Get Flow flow1 := kis.Pool().GetFlow("flowName1") n := 0 for n < 10 { // 3. Submit raw data _ = flow1.CommitRow("This is Data1 from Test") // 4. Execute flow1 if err := flow1.Run(ctx); err != nil { panic(err) } time.Sleep(1 * time.Second) n++ } select {}
}
This case works similarly to starting KisFlow usually, except that it includes a loop that starts a stream computation every second and submits a piece of data, looping 10 times. Afterward, we can check the total data amount through the Prometheus monitoring service. The select{} statement prevents the main goroutine from exiting, ensuring that the Prometheus monitoring goroutine continues running. Run the unit test by navigating to `kis-flow/test/` and executing: ```bash go test -test.v -test.paniconexit0 -test.run TestMetricsDataTotal
You will see a lot of log output. Wait for 10 seconds, then open another terminal and enter the following command:
$ curl http://0.0.0.0:20004/metrics
The result will be:
# ... ... # HELP kisflow_data_total KisFlow total data amount of all Flows # TYPE kisflow_data_total counter kisflow_data_total 10 # ... ...
Here, you'll find that the kisflow_data_total
metric appears with a result of 10, indicating that our metrics are correctly tracked. Next, we can add more complex metrics for KisFlow.
10.5 Additional Metrics
10.5.1 Metric: Flow Data Total
(1) Define Metric
First, define the metric type as follows:
kis-flow/metrics/kis_metrics.go
// kisMetrics Prometheus monitoring metrics for kisFlow type kisMetrics struct { // Total data amount DataTotal prometheus.Counter // Data total per Flow FlowDataTotal *prometheus.GaugeVec }
FlowDataTotal
uses the prometheus.GaugeVec
type to distinguish which Flow generates the data.
(2) Initialize and Register Metrics
kis-flow/metrics/kis_metrics.go
func InitMetrics() { Metrics = new(kisMetrics)
// Initialize DataTotal Counter Metrics.DataTotal = prometheus.NewCounter(prometheus.CounterOpts{ Name: common.COUNTER_KISFLOW_DATA_TOTAL_NAME, Help: common.COUNTER_KISFLOW_DATA_TOTAL_HELP, }) // +++++++++++ // Initialize FlowDataTotal GaugeVec Metrics.FlowDataTotal = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Name: common.GANGE_FLOW_DATA_TOTAL_NAME, Help: common.GANGE_FLOW_DATA_TOTAL_HELP, }, // Label names []string{common.LABEL_FLOW_NAME}, ) // Register Metrics prometheus.MustRegister(Metrics.DataTotal) prometheus.MustRegister(Metrics.FlowDataTotal) // +++
}
Related constant definitions: > kis-flow/common/const.go ```go // metrics const ( METRICS_ROUTE string = "/metrics" // ++++++++ LABEL_FLOW_NAME string = "flow_name" LABEL_FLOW_ID string = "flow_id" LABEL_FUNCTION_NAME string = "func_name" LABEL_FUNCTION_MODE string = "func_mode" COUNTER_KISFLOW_DATA_TOTAL_NAME string = "kisflow_data_total" COUNTER_KISFLOW_DATA_TOTAL_HELP string = "KisFlow total data amount of all Flows" // +++++++ GANGE_FLOW_DATA_TOTAL_NAME string = "flow_data_total" GANGE_FLOW_DATA_TOTAL_HELP string = "KisFlow data total amount per Flow" )
(3) Add Metric Tracking
We should track the total data amount when submitting raw data.
kis-flow/flow/kis_flow_data.go
func (flow *KisFlow) commitSrcData(ctx context.Context) error {
// Create batch data dataCnt := len(flow.buffer) batch := make(common.KisRowArr, 0, dataCnt) for _, row := range flow.buffer { batch = append(batch, row) } // Clear previous data flow.clearData(flow.data) // First submission, record flow raw data flow.data[common.FunctionIdFirstVirtual] = batch // Clear buffer flow.buffer = flow.buffer[0:0] // First submission, track data total if config.GlobalConfig.EnableProm == true { // Track data total Metrics.DataTotal metrics.Metrics.DataTotal.Add(float64(dataCnt)) // ++++++++ // Track current Flow data total metrics.Metrics.FlowDataTotal.WithLabelValues(flow.Name).Add(float64(dataCnt)) } log.Logger().DebugFX(ctx, "====> After CommitSrcData, flow_name = %s, flow_id = %s\nAll Level Data =\n %+v\n", flow.Name, flow.Id, flow.data) return nil
}
So the tracking point is in the same position as before, but we add a `flow.Name` label when accumulating the data. ### 10.5.2 Metric: Flow Scheduling Count #### (1) Metric Definition First, define the metric type as follows: > kis-flow/metrics/kis_metrics.go ```go // kisMetrics represents the Prometheus monitoring metrics for kisFlow type kisMetrics struct { // Total data count DataTotal prometheus.Counter // Total data processed by each Flow FlowDataTotal *prometheus.GaugeVec // Total Flow scheduling count FlowScheduleCntsTotal *prometheus.GaugeVec //++++ }
FlowScheduleCntsTotal
uses prometheus.GaugeVec
type to distinguish which Flow produced the data.
(2) Metric Initialization and Registration
kis-flow/metrics/kis_metrics.go
func InitMetrics() { Metrics = new(kisMetrics) // Initialize DataTotal as a Counter Metrics.DataTotal = prometheus.NewCounter(prometheus.CounterOpts{ Name: common.COUNTER_KISFLOW_DATA_TOTAL_NAME, Help: common.COUNTER_KISFLOW_DATA_TOTAL_HELP, }) // Initialize FlowDataTotal as a GaugeVec Metrics.FlowDataTotal = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Name: common.GANGE_FLOW_DATA_TOTAL_NAME, Help: common.GANGE_FLOW_DATA_TOTAL_HELP, }, // Label name []string{common.LABEL_FLOW_NAME}, ) // +++++++++++++ // Initialize FlowScheduleCntsTotal as a GaugeVec Metrics.FlowScheduleCntsTotal = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Name: common.GANGE_FLOW_SCHE_CNTS_NAME, Help: common.GANGE_FLOW_SCHE_CNTS_HELP, }, // Label name []string{common.LABEL_FLOW_NAME}, ) // Register metrics prometheus.MustRegister(Metrics.DataTotal) prometheus.MustRegister(Metrics.FlowDataTotal) // +++++ prometheus.MustRegister(Metrics.FlowScheduleCntsTotal) }
Define the relevant constants:
kis-flow/common/const.go
// metrics
const (
METRICS_ROUTE string = "/metrics"
LABEL_FLOW_NAME string = "flow_name" LABEL_FLOW_ID string = "flow_id" LABEL_FUNCTION_NAME string = "func_name" LABEL_FUNCTION_MODE string = "func_mode" COUNTER_KISFLOW_DATA_TOTAL_NAME string = "kisflow_data_total" COUNTER_KISFLOW_DATA_TOTAL_HELP string = "KisFlow total data count" GANGE_FLOW_DATA_TOTAL_NAME string = "flow_data_total" GANGE_FLOW_DATA_TOTAL_HELP string = "Total data count for each FlowID in KisFlow" // +++++++ GANGE_FLOW_SCHE_CNTS_NAME string = "flow_schedule_cnts" GANGE_FLOW_SCHE_CNTS_HELP string = "Scheduling count for each FlowID in KisFlow"
)
#### (3) Metric Data Collection To collect the scheduling count for each Flow, we should collect data in the main entry point `flow.Run()`, as follows: > kis-flow/flow/kis_flow.go ```go // Run starts the stream computation of KisFlow and executes the flow from the initial Function func (flow *KisFlow) Run(ctx context.Context) error { var fn kis.Function fn = flow.FlowHead flow.abort = false if flow.Conf.Status == int(common.FlowDisable) { // Flow is disabled in configuration return nil } // Since no Function has been executed yet, PrevFunctionId is set to FirstVirtual because there is no previous Function flow.PrevFunctionId = common.FunctionIdFirstVirtual // Commit the original stream data if err := flow.commitSrcData(ctx); err != nil { return err } // +++++++++++ Metrics if config.GlobalConfig.EnableProm == true { // Collect scheduling count for Flow metrics.Metrics.FlowScheduleCntsTotal.WithLabelValues(flow.Name).Inc() } // ++++++++++++++++++++ // Chain-style stream invocation for fn != nil && flow.abort == false { // Record the current Function being executed in the Flow fid := fn.GetId() flow.ThisFunction = fn flow.ThisFunctionId = fid // Obtain the source data for the current Function to process if inputData, err := flow.getCurData(); err != nil { log.Logger().ErrorFX(ctx, "flow.Run(): getCurData err = %s\n", err.Error()) return err } else { flow.inPut = inputData } if err := fn.Call(ctx, flow); err != nil { // Error return err } else { // Success fn, err = flow.dealAction(ctx, fn) if err != nil { return err } } } return nil }
Here, the metric collection occurs before calling the fn.Call()
method, and we increment the counter for each Flow execution, grouping by flow.Name.
10.5.3 Metric: Function Scheduling Count
(1) Metric Definition
First, define the metric type as follows:
kis-flow/metrics/kis_metrics.go
// kisMetrics represents the Prometheus monitoring metrics for kisFlow type kisMetrics struct { // Total data count DataTotal prometheus.Counter // Total data processed by each Flow FlowDataTotal *prometheus.GaugeVec // Total Flow scheduling count FlowScheduleCntsTotal *prometheus.GaugeVec // Total Function scheduling count FuncScheduleCntsTotal *prometheus.GaugeVec //++++ }
FuncScheduleCntsTotal
uses prometheus.GaugeVec
type to distinguish which Function produced the data.
(2) Metric Initialization and Registration
kis-flow/metrics/kis_metrics.go
func InitMetrics() { Metrics = new(kisMetrics)
// Initialize DataTotal as a Counter Metrics.DataTotal = prometheus.NewCounter(prometheus.CounterOpts{ Name: common.COUNTER_KISFLOW_DATA_TOTAL_NAME, Help: common.COUNTER_KISFLOW_DATA_TOTAL_HELP, }) // Initialize FlowDataTotal as a GaugeVec Metrics.FlowDataTotal = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Name: common.GANGE_FLOW_DATA_TOTAL_NAME, Help: common.GANGE_FLOW_DATA_TOTAL_HELP, }, // Label name []string{common.LABEL_FLOW_NAME}, ) // Initialize FlowScheduleCntsTotal as a GaugeVec Metrics.FlowScheduleCntsTotal = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Name: common.GANGE_FLOW_SCHE_CNTS_NAME, Help: common.GANGE_FLOW_SCHE_CNTS_HELP, }, // Label name []string{common.LABEL_FLOW_NAME}, ) // ++++++++++ // Initialize FuncScheduleCntsTotal as a GaugeVec Metrics.FuncScheduleCntsTotal = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Name: common.GANGE_FUNC_SCHE_CNTS_NAME, Help: common.GANGE_FUNC_SCHE_CNTS_HELP, }, // Label names []string{common.LABEL_FUNCTION_NAME, common.LABEL_FUNCTION_MODE}, ) // Register metrics prometheus.MustRegister(Metrics.DataTotal) prometheus.MustRegister(Metrics.FlowDataTotal) prometheus.MustRegister(Metrics.FlowScheduleCntsTotal) // +++++++ prometheus.MustRegister(Metrics.FuncScheduleCntsTotal)
}
Define the relevant constants: > kis-flow/common/const.go ```go // metrics const ( METRICS_ROUTE string = "/metrics" LABEL_FLOW_NAME string = "flow_name" LABEL_FLOW_ID string = "flow_id" LABEL_FUNCTION_NAME string = "func_name" LABEL_FUNCTION_MODE string = "func_mode" COUNTER_KISFLOW_DATA_TOTAL_NAME string = "kisflow_data_total" COUNTER_KISFLOW_DATA_TOTAL_HELP string = "KisFlow total data count" GANGE_FLOW_DATA_TOTAL_NAME string = "flow_data_total" GANGE_FLOW_DATA_TOTAL_HELP string = "Total data count for each FlowID in KisFlow" GANGE_FLOW_SCHE_CNTS_NAME string = "flow_schedule_cnts" GANGE_FLOW_SCHE_CNTS_HELP string = "Scheduling count for each FlowID in KisFlow" // +++++++++ GANGE_FUNC_SCHE_CNTS_NAME string = "func_schedule_cnts" GANGE_FUNC_SCHE_CNTS_HELP string = "Scheduling count for each Function in KisFlow" )
(3) Metric Data Collection
To collect the scheduling count for each Function, we should collect data in the main entry point flow.Run()
, as follows:
kis-flow/flow/kis_flow.go
// Run starts the stream computation of KisFlow and executes the flow from the initial Function func (flow *KisFlow) Run(ctx context.Context) error {
var fn kis.Function fn = flow.FlowHead flow.abort = false if flow.Conf.Status == int(common.FlowDisable) { // Flow is disabled in configuration return nil } // Since no Function has been executed yet, PrevFunctionId is set to FirstVirtual because there is no previous Function flow.PrevFunctionId = common.FunctionIdFirstVirtual // Commit the original stream data if err := flow.commitSrcData(ctx); err != nil { return err } if config.GlobalConfig.EnableProm == true { // Collect scheduling count for Flow metrics.Metrics.FlowScheduleCntsTotal.WithLabelValues(flow.Name).Inc() } // Chain-style stream invocation for fn != nil && flow.abort == false { // Record the current Function being executed in the Flow fid := fn.GetId() flow.ThisFunction = fn flow.ThisFunctionId = fid // ++++++++++++ fName := fn.GetConfig().FName fMode := fn.GetConfig().FMode // +++++++++++++++++++++++++++ if config.GlobalConfig.EnableProm == true { // Collect scheduling count for Function metrics.Metrics.FuncScheduleCntsTotal.WithLabelValues(fName, fMode).Inc() } // ++++++++++++++++++++++++++++ // Obtain the source data for the current Function to process if inputData, err := flow.getCurData(); err != nil { log.Logger().ErrorFX(ctx, "flow.Run(): getCurData err = %s\n", err.Error()) return err } else { flow.inPut = inputData } if err := fn.Call(ctx, flow); err != nil { // Error return err } else { // Success fn, err = flow.dealAction(ctx, fn) if err != nil { return err } } } return nil
}
Here, the metric collection occurs before calling the `fn.Call()` method. Each time a Function is scheduled, the counter is incremented, grouped by `fName` and `fMode`. ### 10.5.4 Metric: Function Execution Time #### (1) Metric Definition Define the metric type as follows: > kis-flow/metrics/kis_metrics.go ```go // kisMetrics defines the Prometheus metrics for kisFlow type kisMetrics struct { // Total data count DataTotal prometheus.Counter // Total data processed by each Flow FlowDataTotal *prometheus.GaugeVec // Flow schedule count FlowScheduleCntsTotal *prometheus.GaugeVec // Function schedule count FuncScheduleCntsTotal *prometheus.GaugeVec // Function execution time FunctionDuration *prometheus.HistogramVec //++++ }
FunctionDuration
uses the prometheus.HistogramVec
type. This type provides distribution statistics across different time intervals, with various buckets representing different time ranges.
(2) Metric Initialization and Registration
kis-flow/metrics/kis_metrics.go
func InitMetrics() { Metrics = new(kisMetrics)
// Initialize DataTotal Counter Metrics.DataTotal = prometheus.NewCounter(prometheus.CounterOpts{ Name: common.COUNTER_KISFLOW_DATA_TOTAL_NAME, Help: common.COUNTER_KISFLOW_DATA_TOTAL_HELP, }) // Initialize FlowDataTotal GaugeVec Metrics.FlowDataTotal = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Name: common.GAUGE_FLOW_DATA_TOTAL_NAME, Help: common.GAUGE_FLOW_DATA_TOTAL_HELP, }, // Label name []string{common.LABEL_FLOW_NAME}, ) // Initialize FlowScheduleCntsTotal GaugeVec Metrics.FlowScheduleCntsTotal = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Name: common.GAUGE_FLOW_SCHE_CNTS_NAME, Help: common.GAUGE_FLOW_SCHE_CNTS_HELP, }, // Label name []string{common.LABEL_FLOW_NAME}, ) // Initialize FuncScheduleCntsTotal GaugeVec Metrics.FuncScheduleCntsTotal = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Name: common.GAUGE_FUNC_SCHE_CNTS_NAME, Help: common.GAUGE_FUNC_SCHE_CNTS_HELP, }, // Label names []string{common.LABEL_FUNCTION_NAME, common.LABEL_FUNCTION_MODE}, ) // ++++++++++++++++++++++++++ // Initialize FunctionDuration HistogramVec Metrics.FunctionDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{ Name: common.HISTOGRAM_FUNCTION_DURATION_NAME, Help: common.HISTOGRAM_FUNCTION_DURATION_HELP, Buckets: []float64{0.005, 0.01, 0.03, 0.08, 0.1, 0.5, 1.0, 5.0, 10, 100, 1000, 5000, 30000}, // unit ms, max half a minute }, []string{common.LABEL_FUNCTION_NAME, common.LABEL_FUNCTION_MODE}, ) // Register Metrics prometheus.MustRegister(Metrics.DataTotal) prometheus.MustRegister(Metrics.FlowDataTotal) prometheus.MustRegister(Metrics.FlowScheduleCntsTotal) prometheus.MustRegister(Metrics.FuncScheduleCntsTotal) // +++++++ prometheus.MustRegister(Metrics.FunctionDuration)
}
Related constant definitions: > kis-flow/common/const.go ```go // metrics const ( METRICS_ROUTE string = "/metrics" LABEL_FLOW_NAME string = "flow_name" LABEL_FLOW_ID string = "flow_id" LABEL_FUNCTION_NAME string = "func_name" LABEL_FUNCTION_MODE string = "func_mode" COUNTER_KISFLOW_DATA_TOTAL_NAME string = "kisflow_data_total" COUNTER_KISFLOW_DATA_TOTAL_HELP string = "Total data processed by KisFlow" GAUGE_FLOW_DATA_TOTAL_NAME string = "flow_data_total" GAUGE_FLOW_DATA_TOTAL_HELP string = "Total data processed by each FlowID in KisFlow" GAUGE_FLOW_SCHE_CNTS_NAME string = "flow_schedule_cnts" GAUGE_FLOW_SCHE_CNTS_HELP string = "Flow schedule counts for each FlowID in KisFlow" GAUGE_FUNC_SCHE_CNTS_NAME string = "func_schedule_cnts" GAUGE_FUNC_SCHE_CNTS_HELP string = "Function schedule counts for each Function in KisFlow" HISTOGRAM_FUNCTION_DURATION_NAME string = "func_run_duration" HISTOGRAM_FUNCTION_DURATION_HELP string = "Function execution duration" )
(3) Metric Instrumentation
To measure the execution time of each Function, we should instrument the main entry point of the Flow, flow.Run()
, as follows:
kis-flow/flow/kis_flow.go
// Run starts the stream processing of KisFlow, executing from the starting Function func (flow *KisFlow) Run(ctx context.Context) error {
var fn kis.Function fn = flow.FlowHead flow.abort = false if flow.Conf.Status == int(common.FlowDisable) { // Flow is configured to be disabled return nil } // ++++++++++ Metrics +++++++++ var funcStart time.Time // Since no Function has been executed yet, set PrevFunctionId to FirstVirtual as there is no previous Function flow.PrevFunctionId = common.FunctionIdFirstVirtual // Commit the original stream data if err := flow.commitSrcData(ctx); err != nil { return err } if config.GlobalConfig.EnableProm == true { // Record Flow schedule count metrics.Metrics.FlowScheduleCntsTotal.WithLabelValues(flow.Name).Inc() } // Chain-style stream invocation for fn != nil && flow.abort == false { // Record the current Function being executed in the Flow fid := fn.GetId() flow.ThisFunction = fn flow.ThisFunctionId = fid fName := fn.GetConfig().FName fMode := fn.GetConfig().FMode if config.GlobalConfig.EnableProm == true { // Record Function schedule count metrics.Metrics.FuncScheduleCntsTotal.WithLabelValues(fName, fMode).Inc() // +++++++++++++++ // Record Function execution time start funcStart = time.Now() } // Obtain the source data for the current Function to process if inputData, err := flow.getCurData(); err != nil { log.Logger().ErrorFX(ctx, "flow.Run(): getCurData err = %s\n", err.Error()) return err } else { flow.inPut = inputData } if err := fn.Call(ctx, flow); err != nil { // Error return err } else { // Success fn, err = flow.dealAction(ctx, fn) if err != nil { return err } // +++++++++++++++ // Record Function execution duration if config.GlobalConfig.EnableProm == true { // Function execution duration duration := time.Since(funcStart) // Record the current Function execution time metric metrics.Metrics.FunctionDuration.With( prometheus.Labels{ common.LABEL_FUNCTION_NAME: fName, common.LABEL_FUNCTION_MODE: fMode}).Observe(duration.Seconds() * 1000) } // +++++++++++++++ } } return nil
}
The instrumentation captures the start time before invoking the `Call()` method of the Function and calculates the execution duration after the Function completes. This duration is then recorded in the corresponding bucket of the `HistogramVec` for the Function execution time. ### 10.5.5 Metric: Flow Execution Time #### (1) Metric Definition Define the metric type as follows: > kis-flow/metrics/kis_metrics.go ```go // kisMetrics defines the Prometheus metrics for kisFlow type kisMetrics struct { // Total data count DataTotal prometheus.Counter // Total data processed by each Flow FlowDataTotal *prometheus.GaugeVec // Flow schedule count FlowScheduleCntsTotal *prometheus.GaugeVec // Function schedule count FuncScheduleCntsTotal *prometheus.GaugeVec // Function execution time FunctionDuration *prometheus.HistogramVec // Flow execution time FlowDuration *prometheus.HistogramVec // ++++ }
FlowDuration
uses the prometheus.HistogramVec
type. This type provides distribution statistics across different time intervals, with various buckets representing different time ranges.
(2) Metric Initialization and Registration
kis-flow/metrics/kis_metrics.go
func InitMetrics() { Metrics = new(kisMetrics)
// Initialize DataTotal Counter Metrics.DataTotal = prometheus.NewCounter(prometheus.CounterOpts{ Name: common.COUNTER_KISFLOW_DATA_TOTAL_NAME, Help: common.COUNTER_KISFLOW_DATA_TOTAL_HELP, }) // Initialize FlowDataTotal GaugeVec Metrics.FlowDataTotal = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Name: common.GAUGE_FLOW_DATA_TOTAL_NAME, Help: common.GAUGE_FLOW_DATA_TOTAL_HELP, }, // Label name []string{common.LABEL_FLOW_NAME}, ) // Initialize FlowScheduleCntsTotal GaugeVec Metrics.FlowScheduleCntsTotal = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Name: common.GAUGE_FLOW_SCHE_CNTS_NAME, Help: common.GAUGE_FLOW_SCHE_CNTS_HELP, }, // Label name []string{common.LABEL_FLOW_NAME}, ) // Initialize FuncScheduleCntsTotal GaugeVec Metrics.FuncScheduleCntsTotal = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Name: common.GAUGE_FUNC_SCHE_CNTS_NAME, Help: common.GAUGE_FUNC_SCHE_CNTS_HELP, }, // Label names []string{common.LABEL_FUNCTION_NAME, common.LABEL_FUNCTION_MODE}, ) // Initialize FunctionDuration HistogramVec Metrics.FunctionDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{ Name: common.HISTOGRAM_FUNCTION_DURATION_NAME, Help: common.HISTOGRAM_FUNCTION_DURATION_HELP, Buckets: []float64{0.005, 0.01, 0.03, 0.08, 0.1, 0.5, 1.0, 5.0, 10, 100, 1000, 5000, 30000}, // unit ms, max half a minute }, []string{common.LABEL_FUNCTION_NAME, common.LABEL_FUNCTION_MODE}, ) // ++++++++++++++++++++++++++ // Initialize FlowDuration HistogramVec Metrics.FlowDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{ Name: common.HISTOGRAM_FLOW_DURATION_NAME, Help: common.HISTOGRAM_FLOW_DURATION_HELP, Buckets: []float64{0.005, 0.01, 0.03, 0.08, 0.1, 0.5, 1.0, 5.0, 10, 100, 1000, 5000, 30000}, // unit ms, max half a minute }, []string{common.LABEL_FLOW_NAME}, ) // Register Metrics prometheus.MustRegister(Metrics.DataTotal) prometheus.MustRegister(Metrics.FlowDataTotal) prometheus.MustRegister(Metrics.FlowScheduleCntsTotal) prometheus.MustRegister(Metrics.FuncScheduleCntsTotal) prometheus.MustRegister(Metrics.FunctionDuration) // +++++++ prometheus.MustRegister(Metrics.FlowDuration)
}
Related constant definitions: > kis-flow/common/const.go ```go // metrics const ( METRICS_ROUTE string = "/metrics" LABEL_FLOW_NAME string = "flow_name" LABEL_FLOW_ID string = "flow_id" LABEL_FUNCTION_NAME string = "func_name" LABEL_FUNCTION_MODE string = "func_mode" COUNTER_KISFLOW_DATA_TOTAL_NAME string = "kisflow_data_total" COUNTER_KISFLOW_DATA_TOTAL_HELP string = "Total data processed by KisFlow" GAUGE_FLOW_DATA_TOTAL_NAME string = "flow_data_total" GAUGE_FLOW_DATA_TOTAL_HELP string = "Total data processed by each FlowID in KisFlow" GAUGE_FLOW_SCHE_CNTS_NAME string = "flow_schedule_cnts" GAUGE_FLOW_SCHE_CNTS_HELP string = "Flow schedule counts for each FlowID in KisFlow" GAUGE_FUNC_SCHE_CNTS_NAME string = "func_schedule_cnts" GAUGE_FUNC_SCHE_CNTS_HELP string = "Function schedule counts for each Function in KisFlow" HISTOGRAM_FUNCTION_DURATION_NAME string = "func_run_duration" HISTOGRAM_FUNCTION_DURATION_HELP string = "Function execution duration" HISTOGRAM_FLOW_DURATION_NAME string = "flow_run_duration" HISTOGRAM_FLOW_DURATION_HELP string = "Flow execution duration" )
(3) Metric Instrumentation
To measure the execution time of each Flow, we should instrument the main entry point of the Flow, flow.Run()
, as follows:
kis-flow/flow/kis_flow.go
// Run starts the stream processing of KisFlow, executing from the starting Function func (flow *KisFlow) Run(ctx context.Context) error {
var fn kis.Function fn = flow.FlowHead flow.abort = false if flow.Conf.Status == int(common.FlowDisable) { // Flow is configured to be disabled return nil } // ++++++++++ Metrics +++++++++ var funcStart, flowStart time.Time // Since no Function has been executed yet, set PrevFunctionId to FirstVirtual as there is no previous Function flow.PrevFunctionId = common.FunctionIdFirstVirtual // Commit the original stream data if err := flow.commitSrcData(ctx); err != nil { return err } if config.GlobalConfig.EnableProm == true { // Record Flow schedule count metrics.Metrics.FlowScheduleCntsTotal.WithLabelValues(flow.Name).Inc() // Record Flow execution time start flowStart = time.Now() } // Chain-style stream invocation for fn != nil && flow.abort == false { // Record the current Function being executed in the Flow fid := fn.GetId() flow.ThisFunction = fn flow.ThisFunctionId = fid fName := fn.GetConfig().FName fMode := fn.GetConfig().FMode if config.GlobalConfig.EnableProm == true { // Record Function schedule count metrics.Metrics.FuncScheduleCntsTotal.WithLabelValues(fName, fMode).Inc() // Record Function execution time start funcStart = time.Now() } // Obtain the source data for the current Function to process if inputData, err := flow.getCurData(); err != nil { log.Logger().ErrorFX(ctx, "flow.Run(): getCurData err = %s\n", err.Error()) return err } else { flow.inPut = inputData } if err := fn.Call(ctx, flow); err != nil { // Error return err } else { // Success fn, err = flow.dealAction(ctx, fn) if err != nil { return err } // Record Function execution duration if config.GlobalConfig.EnableProm == true { // Function execution duration duration := time.Since(funcStart) // Record the current Function execution time metric metrics.Metrics.FunctionDuration.With( prometheus.Labels{ common.LABEL_FUNCTION_NAME: fName, common.LABEL_FUNCTION_MODE: fMode}).Observe(duration.Seconds() * 1000) } } } // Record Flow execution duration if config.GlobalConfig.EnableProm == true { // Flow execution duration duration := time.Since(flowStart) // Record the Flow execution time metric metrics.Metrics.FlowDuration.With( prometheus.Labels{ common.LABEL_FLOW_NAME: flow.Name}).Observe(duration.Seconds() * 1000) } return nil
}
The instrumentation captures the start time before invoking the first Function and calculates the execution duration after the Flow completes. This duration is then recorded in the corresponding bucket of the `HistogramVec` for the Flow execution time. ## 10.6 KieMetrics Unit Testing (Other Metrics Indicators) ### 10.6.1 Creating Unit Tests We can reuse the previous TestMetricsDataTotal() method for unit test cases, as shown below: > kis-flow/test/kis_metrics_test.go ```go package test import ( "context" "kis-flow/common" "kis-flow/file" "kis-flow/kis" "kis-flow/test/caas" "kis-flow/test/faas" "testing" "time" ) func TestMetricsDataTotal(t *testing.T) { ctx := context.Background() // 0. Register Function callbacks kis.Pool().FaaS("funcName1", faas.FuncDemo1Handler) kis.Pool().FaaS("funcName2", faas.FuncDemo2Handler) kis.Pool().FaaS("funcName3", faas.FuncDemo3Handler) // 0. Register ConnectorInit and Connector callbacks kis.Pool().CaaSInit("ConnName1", caas.InitConnDemo1) kis.Pool().CaaS("ConnName1", "funcName2", common.S, caas.CaasDemoHandler1) // 1. Load configuration files and build Flow if err := file.ConfigImportYaml("/Users/tal/gopath/src/kis-flow/test/load_conf/"); err != nil { panic(err) } // 2. Get Flow flow1 := kis.Pool().GetFlow("flowName1") n := 0 for n < 10 { // 3. Submit raw data _ = flow1.CommitRow("This is Data1 from Test") // 4. Execute flow1 if err := flow1.Run(ctx); err != nil { panic(err) } time.Sleep(1 * time.Second) n++ } select {} }
Execute the unit test by navigating to kis-flow/test/
and running:
go test -test.v -test.paniconexit0 -test.run TestMetricsDataTotal
You will see many log outputs. After waiting for 10 seconds
, open another terminal and input the following command:
curl http://0.0.0.0:20004/metrics
You will see the following results:
# HELP flow_data_total KisFlow data count for each FlowID # TYPE flow_data_total gauge flow_data_total{flow_name="flowName1"} 10 # HELP flow_run_duration Flow execution time # TYPE flow_run_duration histogram flow_run_duration_bucket{flow_name="flowName1",le="0.005"} 0 flow_run_duration_bucket{flow_name="flowName1",le="0.01"} 0 flow_run_duration_bucket{flow_name="flowName1",le="0.03"} 0 flow_run_duration_bucket{flow_name="flowName1",le="0.08"} 0 flow_run_duration_bucket{flow_name="flowName1",le="0.1"} 0 flow_run_duration_bucket{flow_name="flowName1",le="0.5"} 0 flow_run_duration_bucket{flow_name="flowName1",le="1"} 0 flow_run_duration_bucket{flow_name="flowName1",le="5"} 9 flow_run_duration_bucket{flow_name="flowName1",le="10"} 10 flow_run_duration_bucket{flow_name="flowName1",le="100"} 10 flow_run_duration_bucket{flow_name="flowName1",le="1000"} 10 flow_run_duration_bucket{flow_name="flowName1",le="5000"} 10 flow_run_duration_bucket{flow_name="flowName1",le="30000"} 10 flow_run_duration_bucket{flow_name="flowName1",le="60000"} 10 flow_run_duration_bucket{flow_name="flowName1",le="+Inf"} 10 flow_run_duration_sum{flow_name="flowName1"} 29.135023 flow_run_duration_count{flow_name="flowName1"} 10 # HELP flow_schedule_cnts Number of times each FlowID is scheduled in KisFlow # TYPE flow_schedule_cnts gauge flow_schedule_cnts{flow_name="flowName1"} 10 # HELP func_run_duration Function execution time # TYPE func_run_duration histogram func_run_duration_bucket{func_mode="Calculate",func_name="funcName3",le="0.005"} 0 func_run_duration_bucket{func_mode="Calculate",func_name="funcName3",le="0.01"} 0 func_run_duration_bucket{func_mode="Calculate",func_name="funcName3",le="0.03"} 0 func_run_duration_bucket{func_mode="Calculate",func_name="funcName3",le="0.08"} 0 func_run_duration_bucket{func_mode="Calculate",func_name="funcName3",le="0.1"} 0 func_run_duration_bucket{func_mode="Calculate",func_name="funcName3",le="0.5"} 0 func_run_duration_bucket{func_mode="Calculate",func_name="funcName3",le="1"} 0 func_run_duration_bucket{func_mode="Calculate",func_name="funcName3",le="5"} 9 func_run_duration_bucket{func_mode="Calculate",func_name="funcName3",le="10"} 10 func_run_duration_bucket{func_mode="Calculate",func_name="funcName3",le="100"} 10 func_run_duration_bucket{func_mode="Calculate",func_name="funcName3",le="1000"} 10 func_run_duration_bucket{func_mode="Calculate",func_name="funcName3",le="5000"} 10 func_run_duration_bucket{func_mode="Calculate",func_name="funcName3",le="30000"} 10 func_run_duration_bucket{func_mode="Calculate",func_name="funcName3",le="+Inf"} 10 func_run_duration_sum{func_mode="Calculate",func_name="funcName3"} 20.925857 func_run_duration_count{func_mode="Calculate",func_name="funcName3"} 10 func_run_duration_bucket{func_mode="Save",func_name="funcName2",le="0.005"} 0 func_run_duration_bucket{func_mode="Save",func_name="funcName2",le="0.01"} 0 func_run_duration_bucket{func_mode="Save",func_name="funcName2",le="0.03"} 0 func_run_duration_bucket{func_mode="Save",func_name="funcName2",le="0.08"} 0 func_run_duration_bucket{func_mode="Save",func_name="funcName2",le="0.1"} 0 func_run_duration_bucket{func_mode="Save",func_name="funcName2",le="0.5"} 0 func_run_duration_bucket{func_mode="Save",func_name="funcName2",le="1"} 1 func_run_duration_bucket{func_mode="Save",func_name="funcName2",le="5"} 10 func_run_duration_bucket{func_mode="Save",func_name="funcName2",le="10"} 10 func_run_duration_bucket{func_mode="Save",func_name="funcName2",le="100"} 10 func_run_duration_bucket{func_mode="Save",func_name="funcName2",le="1000"} 10 func_run_duration_bucket{func_mode="Save",func_name="funcName2",le="5000"} 10 func_run_duration_bucket{func_mode="Save",func_name="funcName2",le="30000"} 10 func_run_duration_bucket{func_mode="Save",func_name="funcName2",le="+Inf"} 10 func_run_duration_sum{func_mode="Save",func_name="funcName2"} 27.026124 func_run_duration_count{func_mode="Save",func_name="funcName2"} 10 func_run_duration_bucket{func_mode="Verify",func_name="funcName1",le="0.005"} 0 func_run_duration_bucket{func_mode="Verify",func_name="funcName1",le="0.01"} 0 func_run_duration_bucket{func_mode="Verify",func_name="funcName1",le="0.03"} 0 func_run_duration_bucket{func_mode="Verify",func_name="funcName1",le="0.08"} 0 func_run_duration_bucket{func_mode="Verify",func_name="funcName1",le="0.1"} 0 func_run_duration_bucket{func_mode="Verify",func_name="funcName1",le="0.5"} 5 func_run_duration_bucket{func_mode="Verify",func_name="funcName1",le="1"} 10 func_run_duration_bucket{func_mode="Verify",func_name="funcName1",le="5"} 10 func_run_duration_bucket{func_mode="Verify",func_name="funcName1",le="10"} 10 func_run_duration_bucket{func_mode="Verify",func_name="funcName1",le="100"} 10 func_run_duration_bucket{func_mode="Verify",func_name="funcName1",le="1000"} 10 func_run_duration_bucket{func_mode="Verify",func_name="funcName1",le="5000"} 10 func_run_duration_bucket{func_mode="Verify",func_name="funcName1",le="30000"} 10 func_run_duration_bucket{func_mode="Verify",func_name="funcName1",le="+Inf"} 10 func_run_duration_sum{func_mode="Verify",func_name="funcName1"} 13.858197 func_run_duration_count{func_mode="Verify",func_name="funcName1"} 10
This concludes the section on KieMetrics Unit Testing and other Metrics Indicators.
10.7 Grafana Dashboard Display for KisFlow Metrics
With Prometheus metrics collected, we can integrate Grafana to display dashboards for KisFlow stream processing programs. Since each developer's project metrics and dashboard requirements may vary, this document does not provide specific Grafana dashboard configuration files. Instead, here is a sample dashboard for a KisFlow project for demonstration purposes, as shown below:
10.8 [V0.9] Source Code
https://github.com/aceld/kis-flow/releases/tag/v0.9
Author: Aceld
GitHub: https://github.com/aceld
KisFlow Open Source Project Address: https://github.com/aceld/kis-flow
Document: https://github.com/aceld/kis-flow/wiki
Part1-OverView
Part2.1-Project Construction / Basic Modules
Part2.2-Project Construction / Basic Modules
Part3-Data Stream
Part4-Function Scheduling
Part5-Connector
Part6-Configuration Import and Export
Part7-KisFlow Action
Part8-Cache/Params Data Caching and Data Parameters
Part9-Multiple Copies of Flow
Part10-Prometheus Metrics Statistics
Part11-Adaptive Registration of FaaS Parameter Types Based on Reflection
Case1-Quick Start
Case2-Flow Parallel Operation
Case3-Application of KisFlow in Multi-Goroutines
Case4-KisFlow in Message Queue (MQ) Applications
Top comments (0)