DEV Community

Aceld
Aceld

Posted on • Edited on

Case (III) - KisFlow-Golang Stream Real- Application of KisFlow in Multi-Goroutines

#go

Github: https://github.com/aceld/kis-flow
Document: https://github.com/aceld/kis-flow/wiki


Part1-OverView
Part2.1-Project Construction / Basic Modules
Part2.2-Project Construction / Basic Modules
Part3-Data Stream
Part4-Function Scheduling
Part5-Connector
Part6-Configuration Import and Export
Part7-KisFlow Action
Part8-Cache/Params Data Caching and Data Parameters
Part9-Multiple Copies of Flow
Part10-Prometheus Metrics Statistics
Part11-Adaptive Registration of FaaS Parameter Types Based on Reflection


Case1-Quick Start
Case2-Flow Parallel Operation
Case3-Application of KisFlow in Multi-Goroutines
Case4-KisFlow in Message Queue (MQ) Applications


Download KisFlow Source

$go get github.com/aceld/kis-flow 
Enter fullscreen mode Exit fullscreen mode

KisFlow Developer Documentation

Source Code Example

https://github.com/aceld/kis-flow-usage/tree/main/6-flow_in_goroutines

If you need the same Flow to run concurrently in multiple Goroutines, you can use the flow.Fork() function to clone a Flow instance with isolated memory but the same configuration. Each Flow instance can then be executed in different Goroutines to compute their respective data streams.

Goroutines

package main import ( "context" "fmt" "github.com/aceld/kis-flow/file" "github.com/aceld/kis-flow/kis" "sync" ) func main() { ctx := context.Background() // Get a WaitGroup var wg sync.WaitGroup // Load Configuration from file if err := file.ConfigImportYaml("conf/"); err != nil { panic(err) } // Get the flow flow1 := kis.Pool().GetFlow("CalStuAvgScore") if flow1 == nil { panic("flow1 is nil") } // Fork the flow flowClone1 := flow1.Fork(ctx) // Add to WaitGroup wg.Add(2) // Run Flow1 go func() { defer wg.Done() // Submit a string _ = flow1.CommitRow(`{"stu_id":101, "score_1":100, "score_2":90, "score_3":80}`) // Submit a string _ = flow1.CommitRow(`{"stu_id":1001, "score_1":100, "score_2":70, "score_3":60}`) // Run the flow if err := flow1.Run(ctx); err != nil { fmt.Println("err: ", err) } }() // Run FlowClone1 go func() { defer wg.Done() // Submit a string _ = flowClone1.CommitRow(`{"stu_id":201, "score_1":100, "score_2":90, "score_3":80}`) // Submit a string _ = flowClone1.CommitRow(`{"stu_id":2001, "score_1":100, "score_2":70, "score_3":60}`) if err := flowClone1.Run(ctx); err != nil { fmt.Println("err: ", err) } }() // Wait for Goroutines to finish wg.Wait() fmt.Println("All flows completed.") return } func init() { // Register functions kis.Pool().FaaS("VerifyStu", VerifyStu) kis.Pool().FaaS("AvgStuScore", AvgStuScore) kis.Pool().FaaS("PrintStuAvgScore", PrintStuAvgScore) } 
Enter fullscreen mode Exit fullscreen mode

In this code snippet, we start two Goroutines to run Flow1 and its clone (FlowClone1) concurrently to calculate the final average scores for students 101, 1001, 201, and 2001.


Author: Aceld
GitHub: https://github.com/aceld

KisFlow Open Source Project Address: https://github.com/aceld/kis-flow

Document: https://github.com/aceld/kis-flow/wiki


Part1-OverView
Part2.1-Project Construction / Basic Modules
Part2.2-Project Construction / Basic Modules
Part3-Data Stream
Part4-Function Scheduling
Part5-Connector
Part6-Configuration Import and Export
Part7-KisFlow Action
Part8-Cache/Params Data Caching and Data Parameters
Part9-Multiple Copies of Flow
Part10-Prometheus Metrics Statistics
Part11-Adaptive Registration of FaaS Parameter Types Based on Reflection


Case1-Quick Start
Case2-Flow Parallel Operation
Case3-Application of KisFlow in Multi-Goroutines
Case4-KisFlow in Message Queue (MQ) Applications

Top comments (0)