Github: https://github.com/aceld/kis-flow
Document: https://github.com/aceld/kis-flow/wiki
Part1-OverView
Part2.1-Project Construction / Basic Modules
Part2.2-Project Construction / Basic Modules
Part3-Data Stream
Part4-Function Scheduling
Part5-Connector
Part6-Configuration Import and Export
Part7-KisFlow Action
Part8-Cache/Params Data Caching and Data Parameters
Part9-Multiple Copies of Flow
Part10-Prometheus Metrics Statistics
Part11-Adaptive Registration of FaaS Parameter Types Based on Reflection
Case1-Quick Start
Case2-Flow Parallel Operation
Case3-Application of KisFlow in Multi-Goroutines
Case4-KisFlow in Message Queue (MQ) Applications
9.1 Multi-Replica Capability
If KisFlow needs to be used concurrently by multiple Goroutines while executing flows, it may require creating multiple flows with the same configuration to match multiple concurrent computation flows. Therefore, Flow needs the ability to create replicas. This chapter will implement this capability.
9.1.1 Adding Fork Interface to Flow
First, add a new interface Fork()
to the Flow abstraction layer with the following prototype:
kis-flow/kis/flow.go
type Flow interface { // Run schedules the Flow, sequentially dispatching and executing the Functions within the Flow Run(ctx context.Context) error // Link connects the Functions within the Flow according to the configuration file Link(fConf *config.KisFuncConfig, fParams config.FParam) error // CommitRow submits data to be executed to the Function layer of the Flow CommitRow(row interface{}) error // Input gets the input source data of the currently executing Function in the Flow Input() common.KisRowArr // GetName gets the name of the Flow GetName() string // GetThisFunction gets the currently executing Function GetThisFunction() Function // GetThisFuncConf gets the configuration of the currently executing Function GetThisFuncConf() *config.KisFuncConfig // GetConnector gets the Connector of the currently executing Function GetConnector() (Connector, error) // GetConnConf gets the configuration of the Connector of the currently executing Function GetConnConf() (*config.KisConnConfig, error) // GetConfig gets the configuration of the current Flow GetConfig() *config.KisFlowConfig // GetFuncConfigByName gets the configuration of the Function by name in the current Flow GetFuncConfigByName(funcName string) *config.KisFuncConfig // Next performs the Action for the next Function in the Flow Next(acts ...ActionFunc) error // GetCacheData gets the cached data of the current Flow GetCacheData(key string) interface{} // SetCacheData sets the cached data of the current Flow SetCacheData(key string, value interface{}, Exp time.Duration) // GetMetaData gets the temporary data of the current Flow GetMetaData(key string) interface{} // SetMetaData sets the temporary data of the current Flow SetMetaData(key string, value interface{}) // GetFuncParam gets a key-value pair of the default parameters for the currently executing Function in the Flow GetFuncParam(key string) string // GetFuncParamAll gets all key-value pairs of the default parameters for the currently executing Function in the Flow GetFuncParamAll() config.FParam // +++++++++++++++++++++++++ // Fork gets a deep copy of the Flow Fork(ctx context.Context) Flow }
Fork()
will clone a new KisFlow instance based on an existing KisFlow instance, creating a resource-isolated but identically configured KisFlow instance. The implementation method is as follows:
kis-flow/flow/kis_flow.go
// Fork gets a deep copy of the Flow func (flow *KisFlow) Fork(ctx context.Context) kis.Flow { config := flow.Conf // Create a new Flow using the previous configuration newFlow := NewKisFlow(config) for _, fp := range flow.Conf.Flows { if _, ok := flow.funcParams[flow.Funcs[fp.FuncName].GetId()]; !ok { // The current Function has no Params configured newFlow.Link(flow.Funcs[fp.FuncName].GetConfig(), nil) } else { // The current Function has Params configured newFlow.Link(flow.Funcs[fp.FuncName].GetConfig(), fp.Params) } } log.Logger().DebugFX(ctx, "=====>Flow Fork, oldFlow.funcParams = %+v\n", flow.funcParams) log.Logger().DebugFX(ctx, "=====>Flow Fork, newFlow.funcParams = %+v\n", newFlow.GetFuncParamsAllFuncs()) return newFlow }
In Fork()
, a new KisFlow instance is created based on the configuration information of the existing flow, and the associated Params and other configuration information are copied along with it. Finally, the newly created Functions are linked to the new Flow using Link()
.
To facilitate debugging, a new interface GetFuncParamsAllFuncs()
has been added to the Flow to print the information of all FuncParams
. The implementation method is as follows:
kis-flow/kis/flow.go
type Flow interface { // ... ... // ... ... // GetFuncParamsAllFuncs retrieves all key-value pairs of the FuncParams for all Functions in the Flow GetFuncParamsAllFuncs() map[string]config.FParam // ... ... }
kis-flow/flow/kis_flow_data.go
// GetFuncParamsAllFuncs retrieves all key-value pairs of the FuncParams for all Functions in the Flow func (flow *KisFlow) GetFuncParamsAllFuncs() map[string]config.FParam { flow.fplock.RLock() defer flow.fplock.RUnlock() return flow.funcParams }
9.2 Unit Testing
Next, we will test the Fork capability with the following unit test code:
kis-flow/test/kis_fork_test.go
func TestForkFlow(t *testing.T) { ctx := context.Background() // 0. Register Function callback businesses kis.Pool().FaaS("funcName1", faas.FuncDemo1Handler) kis.Pool().FaaS("funcName2", faas.FuncDemo2Handler) kis.Pool().FaaS("funcName3", faas.FuncDemo3Handler) // 0. Register ConnectorInit and Connector callback businesses kis.Pool().CaaSInit("ConnName1", caas.InitConnDemo1) kis.Pool().CaaS("ConnName1", "funcName2", common.S, caas.CaasDemoHanler1) // 1. Load configuration files and build Flow if err := file.ConfigImportYaml("/Users/tal/gopath/src/kis-flow/test/load_conf/"); err != nil { panic(err) } // 2. Get Flow flow1 := kis.Pool().GetFlow("flowName1") flow1Clone1 := flow1.Fork(ctx) // 3. Submit original data _ = flow1Clone1.CommitRow("This is Data1 from Test") _ = flow1Clone1.CommitRow("This is Data2 from Test") _ = flow1Clone1.CommitRow("This is Data3 from Test") // 4. Execute flow1 if err := flow1Clone1.Run(ctx); err != nil { panic(err) } }
First, we create an instance of the flow flowName1
and then get flowClone1
using Fork()
. After that, we execute the scheduling process of flowClone1
.
Navigate to the kis-flow/test/
directory and execute:
go test -test.v -test.paniconexit0 -test.run TestForkFlow
The result is as follows:
=== RUN TestForkFlow Add KisPool FuncName=funcName1 Add KisPool FuncName=funcName2 Add KisPool FuncName=funcName3 Add KisPool CaaSInit CName=ConnName1 Add KisPool CaaS CName=ConnName1, FName=funcName2, Mode=Save ===> Call Connector InitDemo1 &{conn ConnName1 0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990 redis redis-key map[args1:value1 args2:value2] [] [funcName2 funcName2]} Add FlowRouter FlowName=flowName1 Add FlowRouter FlowName=flowName2 Add FlowRouter FlowName=flowName3 Add FlowRouter FlowName=flowName4 ===> Call Connector InitDemo1 &{conn ConnName1 0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990 redis redis-key map[args1:value1 args2:value2] [] [funcName2 funcName2 funcName2]} Add FlowRouter FlowName=flowName5 ===> Call Connector InitDemo1 &{conn ConnName1 0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990 redis redis-key map[args1:value1 args2:value2] [] [funcName2 funcName2 funcName2]} context.Background =====>Flow Fork, oldFlow.funcParams = map[func-6b00f430fe494302a384c2ae09eb019c:map[default1:funcName3_param1 default2:funcName3_param2 myKey1:flowValue3-1 myKey2:flowValue3-2] func-bf9df5fc16684200b78f32985d073012:map[default1:funcName2_param1 default2:funcName2_param2 myKey1:flowValue2-1 myKey2:flowValue2-2] func-c0f1ae9850174f81b994a2e98fb34109:map[default1:funcName1_param1 default2:funcName1_param2 myKey1:flowValue1-1 myKey2:flowValue1-2]] =====>Flow Fork, newFlow.funcParams = map[func-6b00f430fe494302a384c2ae09eb019c:map[default1:funcName3_param1 default2:funcName3_param2 myKey1:flowValue3-1 myKey2:flowValue3-2] func-bf9df5fc16684200b78f32985d073012:map[default1:funcName2_param1 default2:funcName2_param2 myKey1:flowValue2-1 myKey2:flowValue2-2] func-c0f1ae9850174f81b994a2e98fb34109:map[default1:funcName1_param1 default2:funcName1_param2 myKey1:flowValue1-1 myKey2:flowValue1-2]] ===>FaaS Demo1, row.Data = This is Data1 from Test ===>FaaS Demo1, row.Data = This is Data2 from Test ===>FaaS Demo1, row.Data = This is Data3 from Test ===>FaaS Demo2, row.Data = This is Data1 from Test ===>FaaS Demo2, row.Data = This is Data2 from Test ===>FaaS Demo2, row.Data = This is Data3 from Test ===>FaaS Demo3, row.Data = This is Data1 from Test ===>FaaS Demo3, row.Data = This is Data2 from Test ===>FaaS Demo3, row.Data = This is Data3 from Test --- PASS: TestForkFlow (0.01s) PASS ok command-line-arguments (cached)
Github: https://github.com/aceld/kis-flow
Document: https://github.com/aceld/kis-flow/wiki
Part1-OverView
Part2.1-Project Construction / Basic Modules
Part2.2-Project Construction / Basic Modules
Part3-Data Stream
Part4-Function Scheduling
Part5-Connector
Part6-Configuration Import and Export
Part7-KisFlow Action
Part8-Cache/Params Data Caching and Data Parameters
To be continued.
8.1 Flow Cache - Data Stream Caching
KisFlow also provides shared caching in stream computing, using a simple local cache for developers to use as needed. For third-party local cache technology dependencies, refer to: https://github.com/patrickmn/go-cache.
8.1.1 go-cache
(1) Installation
go get github.com/patrickmn/go-cache
(2) Usage
import ( "fmt" "github.com/patrickmn/go-cache" "time" ) func main() { // Create a cache with a default expiration time of 5 minutes, and which // purges expired items every 10 minutes c := cache.New(5*time.Minute, 10*time.Minute) // Set the value of the key "foo" to "bar", with the default expiration time c.Set("foo", "bar", cache.DefaultExpiration) // Set the value of the key "baz" to 42, with no expiration time // (the item won't be removed until it is re-set, or removed using // c.Delete("baz") c.Set("baz", 42, cache.NoExpiration) // Get the string associated with the key "foo" from the cache foo, found := c.Get("foo") if found { fmt.Println(foo) } // Since Go is statically typed, and cache values can be anything, type // assertion is needed when values are being passed to functions that don't // take arbitrary types, (i.e. interface{}). The simplest way to do this for // values which will only be used once--e.g. for passing to another // function--is: foo, found := c.Get("foo") if found { MyFunction(foo.(string)) } // This gets tedious if the value is used several times in the same function. // You might do either of the following instead: if x, found := c.Get("foo"); found { foo := x.(string) // ... } // or var foo string if x, found := c.Get("foo"); found { foo = x.(string) } // ... // foo can then be passed around freely as a string // Want performance? Store pointers! c.Set("foo", &MyStruct, cache.DefaultExpiration) if x, found := c.Get("foo"); found { foo := x.(*MyStruct) // ... } }
For detailed reference: https://github.com/patrickmn/go-cache
8.1.2 KisFlow Integration with go-cache
(1) Flow Provides Abstract Interface
Flow provides interfaces for cache operations as follows:
kis-flow/kis/flow.go
type Flow interface { // Run schedules the Flow, sequentially scheduling and executing Functions within the Flow Run(ctx context.Context) error // Link connects Functions within the Flow according to the configuration file Link(fConf *config.KisFuncConfig, fParams config.FParam) error // CommitRow submits Flow data to the Function layer about to be executed CommitRow(row interface{}) error // Input gets the input source data for the currently executing Function in the Flow Input() common.KisRowArr // GetName gets the name of the Flow GetName() string // GetThisFunction gets the currently executing Function GetThisFunction() Function // GetThisFuncConf gets the configuration of the currently executing Function GetThisFuncConf() *config.KisFuncConfig // GetConnector gets the Connector of the currently executing Function GetConnector() (Connector, error) // GetConnConf gets the configuration of the Connector for the currently executing Function GetConnConf() (*config.KisConnConfig, error) // GetConfig gets the configuration of the current Flow GetConfig() *config.KisFlowConfig // GetFuncConfigByName gets the configuration of the Function by its name GetFuncConfigByName(funcName string) *config.KisFuncConfig // Next advances the currently executing Function to the next Function with specified Action Next(acts ...ActionFunc) error // ++++++++++++++++++++++++++++++++++++++++ // GetCacheData gets the cache data of the current Flow GetCacheData(key string) interface{} // SetCacheData sets the cache data of the current Flow SetCacheData(key string, value interface{}, Exp time.Duration) }
SetCacheData()
sets the local cache, with Exp as the expiration time. If Exp is 0, it is permanent.
GetCacheData()
reads the local cache.
(2) Providing Constants
Provide some constants related to cache expiration time.
kis-flow/common/const.go
// cache const ( // DeFaultFlowCacheCleanUp is the default cache cleanup interval for Flow objects in KisFlow, in minutes DeFaultFlowCacheCleanUp = 5 // in minutes // DefaultExpiration is the default GoCache time, permanently saved DefaultExpiration time.Duration = 0 )
(3) Adding and Initializing Members in KisFlow
kis-flow/flow/kis_flow.go
// KisFlow represents the context environment for stream computing type KisFlow struct { // ... ... // ... ... // Local cache for the flow cache *cache.Cache // Temporary cache context environment for Flow } // NewKisFlow creates a new KisFlow func NewKisFlow(conf *config.KisFlowConfig) kis.Flow { flow := new(KisFlow) // ... ... // ... ... // Initialize local cache flow.cache = cache.New(cache.NoExpiration, common.DeFaultFlowCacheCleanUp*time.Minute) return flow }
(4) Implementing the Interface
Finally, implement the two interfaces for cache read and write operations as follows:
kis-flow/flow/kis_flow_data.go
func (flow *KisFlow) GetCacheData(key string) interface{} { if data, found := flow.cache.Get(key); found { return data } return nil } func (flow *KisFlow) SetCacheData(key string, value interface{}, Exp time.Duration) { if Exp == common.DefaultExpiration { flow.cache.Set(key, value, cache.DefaultExpiration) } else { flow.cache.Set(key, value, Exp) } }
8.2 MetaData Temporary Cache Parameters
MetaData is defined as a map[string]interface{}
structure available at each level of Flow, Function, and Connector to store temporary data. The lifespan of this data is consistent with the lifespan of each instance.
8.2.1 Adding MetaData to Flow
First, add the metaData map[string]interface{}
member and corresponding read-write lock to KisFlow.
kis-flow/flow/kis_flow.go
// KisFlow represents the context environment throughout the entire stream computing type KisFlow struct { // ... ... // ... ... // +++++++++++++++++++++++++++++++++++++++++++ // metaData for the flow metaData map[string]interface{} // Custom temporary data for Flow mLock sync.RWMutex // Read-write lock to manage metaData }
Also, initialize the metaData
member in the KisFlow constructor as follows:
kis-flow/flow/kis_flow.go
// NewKisFlow creates a KisFlow func NewKisFlow(conf *config.KisFlowConfig) kis.Flow { flow := new(KisFlow) // ... ... // ... ... // ++++++++++++++++++++++++++++++++++++++ // Initialize temporary data flow.metaData = make(map[string]interface{}) return flow }
Next, add the read and write interfaces for MetaData to the Flow as follows:
kis-flow/kis/flow.go
type Flow interface { // Run schedules the Flow, sequentially scheduling and executing Functions within the Flow Run(ctx context.Context) error // Link connects Functions within the Flow according to the configuration file Link(fConf *config.KisFuncConfig, fParams config.FParam) error // CommitRow submits Flow data to the Function layer about to be executed CommitRow(row interface{}) error // Input gets the input source data for the currently executing Function in the Flow Input() common.KisRowArr // GetName gets the name of the Flow GetName() string // GetThisFunction gets the currently executing Function GetThisFunction() Function // GetThisFuncConf gets the configuration of the currently executing Function GetThisFuncConf() *config.KisFuncConfig // GetConnector gets the Connector of the currently executing Function GetConnector() (Connector, error) // GetConnConf gets the configuration of the Connector for the currently executing Function GetConnConf() (*config.KisConnConfig, error) // GetConfig gets the configuration of the current Flow GetConfig() *config.KisFlowConfig // GetFuncConfigByName gets the configuration of the Function by its name GetFuncConfigByName(funcName string) *config.KisFuncConfig // Next advances the currently executing Function to the next Function with specified Action Next(acts ...ActionFunc) error // GetCacheData gets the cache data of the current Flow GetCacheData(key string) interface{} // SetCacheData sets the cache data of the current Flow SetCacheData(key string, value interface{}, Exp time.Duration) // ++++++++++++++++++++++++++++ // GetMetaData gets the temporary data of the current Flow GetMetaData(key string) interface{} // SetMetaData sets the temporary data of the current Flow SetMetaData(key string, value interface{}) }
Define the GetMetaData()
and SetMetaData()
interfaces for reading and writing respectively. Finally, implement these interfaces as follows:
kis-flow/flow/kis_flow_data.go
// GetMetaData retrieves the temporary data of the current Flow object func (flow *KisFlow) GetMetaData(key string) interface{} { flow.mLock.RLock() defer flow.mLock.RUnlock() data, ok := flow.metaData[key] if !ok { return nil } return data } // SetMetaData sets the temporary data of the current Flow object func (flow *KisFlow) SetMetaData(key string, value interface{}) { flow.mLock.Lock() defer flow.mLock.Unlock() flow.metaData[key] = value }
8.2.2 Adding MetaData to Function
First, add the metaData
member to BaseFunction
as follows:
kis-flow/function/kis_base_function.go
type BaseFunction struct { // Id, KisFunction instance ID, used to distinguish different instance objects within KisFlow Id string Config *config.KisFuncConfig // flow flow kis.Flow // Context environment KisFlow // connector connector kis.Connector // ++++++++++++++++++++++++ // Custom temporary data for Function metaData map[string]interface{} // Read-write lock to manage metaData mLock sync.RWMutex // link N kis.Function // Next stream computing Function P kis.Function // Previous stream computing Function }
In the Function constructor, each specific Function needs a constructor to initialize the metaData
member. The changes are as follows:
kis-flow/function/kis_base_function.go
func NewKisFunction(flow kis.Flow, config *config.KisFuncConfig) kis.Function { var f kis.Function // Factory produces generalized objects // ++++++++++++++ switch common.KisMode(config.FMode) { case common.V: f = NewKisFunctionV() // +++ case common.S: f = NewKisFunctionS() // +++ case common.L: f = NewKisFunctionL() // +++ case common.C: f = NewKisFunctionC() // +++ case common.E: f = NewKisFunctionE() // +++ default: // LOG ERROR return nil } // Generate random unique instance ID f.CreateId() // Set basic information properties if err := f.SetConfig(config); err != nil { panic(err) } // Set Flow if err := f.SetFlow(flow); err != nil { panic(err) } return f }
Each constructor is as follows:
kis-flow/function/kis_function_c.go
func NewKisFunctionC() kis.Function { f := new(KisFunctionC) // Initialize metaData f.metaData = make(map[string]interface{}) return f }
kis-flow/function/kis_function_v.go
func NewKisFunctionV() kis.Function { f := new(KisFunctionV) // Initialize metaData f.metaData = make(map[string]interface{}) return f }
kis-flow/function/kis_function_e.go
func NewKisFunctionE() kis.Function { f := new(KisFunctionE) // Initialize metaData f.metaData = make(map[string]interface{}) return f }
kis-flow/function/kis_function_s.go
func NewKisFunctionS() kis.Function { f := new(KisFunctionS) // Initialize metaData f.metaData = make(map[string]interface{}) return f }
kis-flow/function/kis_function_l.go
func NewKisFunctionL() kis.Function { f := new(KisFunctionL) // Initialize metaData f.metaData = make(map[string]interface{}) return f }
Next, add interfaces to access the metaData member in the Function abstraction layer as follows:
type Function interface { // Call executes the stream computing logic Call(ctx context.Context, flow Flow) error // SetConfig configures the strategy for the current Function instance SetConfig(s *config.KisFuncConfig) error // GetConfig retrieves the configuration strategy of the current Function instance GetConfig() *config.KisFuncConfig // SetFlow sets the Flow instance that the current Function instance depends on SetFlow(f Flow) error // GetFlow retrieves the Flow instance that the current Function instance depends on GetFlow() Flow // AddConnector adds a Connector to the current Function instance AddConnector(conn Connector) error // GetConnector retrieves the Connector associated with the current Function instance GetConnector() Connector // CreateId generates a random instance KisID for the current Function instance CreateId() // GetId retrieves the FID of the current Function GetId() string // GetPrevId retrieves the FID of the previous Function node of the current Function GetPrevId() string // GetNextId retrieves the FID of the next Function node of the current Function GetNextId() string // Next returns the next layer computing stream Function, or nil if it is the last layer Next() Function // Prev returns the previous layer computing stream Function, or nil if it is the last layer Prev() Function // SetN sets the next Function instance SetN(f Function) // SetP sets the previous Function instance SetP(f Function) // ++++++++++++++++++++++++++++++++++ // GetMetaData retrieves the temporary data of the current Function GetMetaData(key string) interface{} // SetMetaData sets the temporary data of the current Function SetMetaData(key string, value interface{}) }
Implement the above two interfaces in the BaseFunction.
kis-flow/function/kis_base_function.go
// GetMetaData retrieves the temporary data of the current Function func (base *BaseFunction) GetMetaData(key string) interface{} { base.mLock.RLock() defer base.mLock.RUnlock() data, ok := base.metaData[key] if !ok { return nil } return data } // SetMetaData sets the temporary data of the current Function func (base *BaseFunction) SetMetaData(key string, value interface{}) { base.mLock.Lock() defer base.mLock.Unlock() base.metaData[key] = value }
8.2.3 Adding MetaData to Connector
First, add the metaData
member to KisConnector
as follows:
kis-flow/conn/kis_connector.go
type KisConnector struct { // Connector ID CId string // Connector Name CName string // Connector Config Conf *config.KisConnConfig // Connector Init onceInit sync.Once // ++++++++++++++ // Custom temporary data for KisConnector metaData map[string]interface{} // Read-write lock to manage metaData mLock sync.RWMutex } // NewKisConnector creates a KisConnector based on the configuration strategy func NewKisConnector(config *config.KisConnConfig) *KisConnector { conn := new(KisConnector) conn.CId = id.KisID(common.KisIdTypeConnector) conn.CName = config.CName conn.Conf = config // +++++++++++++++++++++++++++++++++++ conn.metaData = make(map[string]interface{}) return conn }
Initialize metaData
in the constructor.
Next, add interfaces to access and set MetaData in the Connector abstraction layer as follows:
kis-flow/kis/connector.go
type Connector interface { // Init initializes the links of the storage engine associated with the Connector Init() error // Call invokes the read and write operations of the external storage logic of the Connector Call(ctx context.Context, flow Flow, args interface{}) error // GetId retrieves the ID of the Connector GetId() string // GetName retrieves the name of the Connector GetName() string // GetConfig retrieves the configuration information of the Connector GetConfig() *config.KisConnConfig // GetMetaData retrieves the temporary data of the current Connector // +++++++++++++++++++++++++++++++ GetMetaData(key string) interface{} // SetMetaData sets the temporary data of the current Connector SetMetaData(key string, value interface{}) }
Finally, implement the above two interfaces in KisConnector
as follows:
kis-flow/conn/kis_connector.go
// GetMetaData retrieves the temporary data of the current Connector func (conn *KisConnector) GetMetaData(key string) interface{} { conn.mLock.RLock() defer conn.mLock.RUnlock() data, ok := conn.metaData[key] if !ok { return nil } return data } // SetMetaData sets the temporary data of the current Connector func (conn *KisConnector) SetMetaData(key string, value interface{}) { conn.mLock.Lock() defer conn.mLock.Unlock() conn.metaData[key] = value }
8.3 Configuration File Parameters
KisFlow allows developers to define default parameters (Params) for configuring Flow, Function, Connector, etc., in the configuration file. Here are some examples:
Function:
kistype: func fname: funcName1 fmode: Verify source: name: Official Account Douyin Mall Order Data must: - order_id - user_id option: default_params: default1: funcName1_param1 default2: funcName1_param2
Flow:
kistype: flow status: 1 flow_name: flowName1 flows: - fname: funcName1 params: myKey1: flowValue1-1 myKey2: flowValue1-2 - fname: funcName2 params: myKey1: flowValue2-1 myKey2: flowValue2-2 - fname: funcName3 params: myKey1: flowValue3-1 myKey2: flowValue3-2
Connector:
kistype: conn cname: ConnName1 addrs: '0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990' type: redis key: redis-key params: args1: value1 args2: value2 load: null save: - funcName2
Developers can provide Params for each defined module. Params provided in Flow will also be added to the Functions.
In the previous steps, we already read these parameters into each module's memory, but we did not expose an interface for developers.
8.3.1 Adding Param Retrieval Interface to Flow
First, we provide an interface for Flow to query Params:
kis-flow/kis/flow.go
type Flow interface { // ... ... // ... ... // GetFuncParam retrieves a key-value pair of the default parameters for the currently executing Function in the Flow GetFuncParam(key string) string // GetFuncParamAll retrieves all key-value pairs of the default parameters for the currently executing Function in the Flow GetFuncParamAll() config.FParam }
Implementation:
kis-flow/flow/kis_flow_data.go
// GetFuncParam retrieves a key-value pair of the default parameters for the currently executing Function in the Flow func (flow *KisFlow) GetFuncParam(key string) string { flow.fplock.RLock() defer flow.fplock.RUnlock() if param, ok := flow.funcParams[flow.ThisFunctionId]; ok { if value, vok := param[key]; vok { return value } } return "" } // GetFuncParamAll retrieves all key-value pairs of the default parameters for the currently executing Function in the Flow func (flow *KisFlow) GetFuncParamAll() config.FParam { flow.fplock.RLock() defer flow.fplock.RUnlock() param, ok := flow.funcParams[flow.ThisFunctionId] if !ok { return nil } return param }
GetFuncParam()
and GetFuncParamAll()
retrieve a single key or all parameters respectively, but both fetch the Params for the currently executing Function.
8.3.2 Unit Testing
We add some parameters to each Function in flowName1
.
kis-flow/test/load_conf/flow-FlowName1.yml
kistype: flow status: 1 flow_name: flowName1 flows: - fname: funcName1 params: myKey1: flowValue1-1 myKey2: flowValue1-2 - fname: funcName2 params: myKey1: flowValue2-1 myKey2: flowValue2-2 - fname: funcName3 params: myKey1: flowValue3-1 myKey2: flowValue3-2
Then configure some default custom parameters for each associated Function:
kis-flow/test/load_conf/func/func-FuncName1.yml
kistype: func fname: funcName1 fmode: Verify source: name: Official Account Douyin Mall Order Data must: - order_id - user_id option: default_params: default1: funcName1_param1 default2: funcName1_param2
kis-flow/test/load_conf/func/func-FuncName2.yml
kistype: func fname: funcName2 fmode: Save source: name: User Order Error Rate must: - order_id - user_id option: cname: ConnName1 default_params: default1: funcName2_param1 default2: funcName2_param2
kis-flow/test/load_conf/func/func-FuncName3.yml
kistype: func fname: funcName3 fmode: Calculate source: name: User Order Error Rate must: - order_id - user_id option: default_params: default1: funcName3_param1 default2: funcName3_param2
We also configure some Param parameters for the Connector associated with FuncName2
:
kis-flow/test/load_conf/conn/conn-ConnName1.yml
kistype: conn cname: ConnName1 addrs: '0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990' type: redis key: redis-key params: args1: value1 args2: value2 load: null save: - funcName2
To verify that our configuration parameters can be correctly retrieved during the execution of Functions, we modified each Function and Connector business function to print their Params:
kis-flow/test/faas/faas_demo1.go
func FuncDemo1Handler(ctx context.Context, flow kis.Flow) error { fmt.Println("---> Call funcName1Handler ----") // ++++++++++++++++ fmt.Printf("Params = %+v\n", flow.GetFuncParamAll()) for index, row := range flow.Input() { // Print data str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row) fmt.Println(str) // Calculate result data resultStr := fmt.Sprintf("data from funcName[%s], index = %d", flow.GetThisFuncConf().FName, index) // Commit result data _ = flow.CommitRow(resultStr) } return nil }
kis-flow/test/faas/faas_demo2.go
func FuncDemo2Handler(ctx context.Context, flow kis.Flow) error { fmt.Println("---> Call funcName2Handler ----") // ++++++++++++++++ fmt.Printf("Params = %+v\n", flow.GetFuncParamAll()) for index, row := range flow.Input() { str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row) fmt.Println(str) conn, err := flow.GetConnector() if err != nil { log.Logger().ErrorFX(ctx, "FuncDemo2Handler(): GetConnector err = %s\n", err.Error()) return err } if conn.Call(ctx, flow, row) != nil { log.Logger().ErrorFX(ctx, "FuncDemo2Handler(): Call err = %s\n", err.Error()) return err } // Calculate result data resultStr := fmt.Sprintf("data from funcName[%s], index = %d", flow.GetThisFuncConf().FName, index) // Commit result data _ = flow.CommitRow(resultStr) } return nil }
kis-flow/test/faas/faas_demo3.go
func FuncDemo3Handler(ctx context.Context, flow kis.Flow) error { fmt.Println("---> Call funcName3Handler ----") // ++++++++++++++++ fmt.Printf("Params = %+v\n", flow.GetFuncParamAll()) for _, row := range flow.Input() { str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row) fmt.Println(str) } return nil }
kis-flow/test/caas/caas_demo1.go
func CaasDemoHanler1(ctx context.Context, conn kis.Connector, fn kis.Function, flow kis.Flow, args interface{}) error { fmt.Printf("===> In CaasDemoHanler1: flowName: %s, cName:%s, fnName:%s, mode:%s\n", flow.GetName(), conn.GetName(), fn.GetConfig().FName, fn.GetConfig().FMode) // +++++++++++ fmt.Printf("Params = %+v\n", conn.GetConfig().Params) fmt.Printf("===> Call Connector CaasDemoHanler1, args from funciton: %s\n", args) return nil }
Finally, we write the unit test cases:
kis-flow/test/kis_params_test.go
package test import ( "context" "kis-flow/common" "kis-flow/file" "kis-flow/kis" "kis-flow/test/caas" "kis-flow/test/faas" "testing" ) func TestParams(t *testing.T) { ctx := context.Background() // 0. Register Function callback businesses kis.Pool().FaaS("funcName1", faas.FuncDemo1Handler) kis.Pool().FaaS("funcName2", faas.FuncDemo2Handler) kis.Pool().FaaS("funcName3", faas.FuncDemo3Handler) // 0. Register ConnectorInit and Connector callback businesses kis.Pool().CaaSInit("ConnName1", caas.InitConnDemo1) kis.Pool().CaaS("ConnName1", "funcName2", common.S, caas.CaasDemoHanler1) // 1. Load configuration files and build Flow if err := file.ConfigImportYaml("/Users/tal/gopath/src/kis-flow/test/load_conf/"); err != nil { panic(err) } // 2. Get Flow flow1 := kis.Pool().GetFlow("flowName1") // 3. Submit original data _ = flow1.CommitRow("This is Data1 from Test") _ = flow1.CommitRow("This is Data2 from Test") _ = flow1.CommitRow("This is Data3 from Test") // 4. Execute flow1 if err := flow1.Run(ctx); err != nil { panic(err) } }
Navigate to the kis-flow/test/
directory and execute:
go test -test.v -test.paniconexit0 -test.run TestParams
=== RUN TestParams .... .... ---> Call funcName1Handler ---- Params = map[default1:funcName1_param1 default2:funcName1_param2 myKey1:flowValue1-1 myKey2:flowValue1-2] ... ... ---> Call funcName2Handler ---- Params = map[default1:funcName2_param1 default2:funcName2_param2 myKey1:flowValue2-1 myKey2:flowValue2-2] ... ... ===> In CaasDemoHanler1: flowName: flowName1, cName:ConnName1, fnName:funcName2, mode:Save Params = map[args1:value1 args2:value2] ... ... ===> In CaasDemoHanler1: flowName: flowName1, cName:ConnName1, fnName:funcName2, mode:Save Params = map[args1:value1 args2:value2] ... ... ===> In CaasDemoHanler1: flowName: flowName1, cName:ConnName1, fnName:funcName2, mode:Save Params = map[args1:value1 args2:value2] ... ... ---> Call funcName3Handler ---- Params = map[default1:funcName3_param1 default2:funcName3_param2 myKey1:flowValue3-1 myKey2:flowValue3-2] ... ... --- PASS: TestParams (0.01s) PASS ok kis-flow/test 0.433s
As we can see, we can now correctly retrieve the Params configuration parameters at each level.
8.4 [V0.7] Source Code
https://github.com/aceld/kis-flow/releases/tag/v0.7
Author: Aceld
GitHub: https://github.com/aceld
KisFlow Open Source Project Address: https://github.com/aceld/kis-flow
Document: https://github.com/aceld/kis-flow/wiki
Part1-OverView
Part2.1-Project Construction / Basic Modules
Part2.2-Project Construction / Basic Modules
Part3-Data Stream
Part4-Function Scheduling
Part5-Connector
Part6-Configuration Import and Export
Part7-KisFlow Action
Part8-Cache/Params Data Caching and Data Parameters
Part9-Multiple Copies of Flow
Part10-Prometheus Metrics Statistics
Part11-Adaptive Registration of FaaS Parameter Types Based on Reflection
Case1-Quick Start
Case2-Flow Parallel Operation
Case3-Application of KisFlow in Multi-Goroutines
Case4-KisFlow in Message Queue (MQ) Applications
Top comments (0)