Skip to content

Commit 79bb459

Browse files
authored
feat(active-active): Index workflows by cluster attributes (#7421)
**Detailed Description** Index workflows by their cluster attributes **Impact Analysis** - **Backward Compatibility**: Yes - **Forward Compatibility**: Yes. **Testing Plan** - **Unit Tests**: Yes - **Persistence Tests**: Yes - **Integration Tests**: Yes - **Compatibility Tests**: N/A **Rollout Plan** - What is the rollout plan? - Does the order of deployment matter? No. - Is it safe to rollback? Does the order of rollback matter? It's safe to rollback. - Is there a kill switch to mitigate the impact immediately? No. To get all workflows with some cluster attribute, one can run the following query: ``` 'ClusterAttributeScope="region" and ClusterAttributeName="us-east"' ```
1 parent eafcce5 commit 79bb459

35 files changed

+731
-478
lines changed

common/definition/indexedKeys.go

Lines changed: 35 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -24,24 +24,26 @@ import "github.com/uber/cadence/common/types"
2424

2525
// valid indexed fields on ES
2626
const (
27-
DomainID = "DomainID"
28-
WorkflowID = "WorkflowID"
29-
RunID = "RunID"
30-
WorkflowType = "WorkflowType"
31-
StartTime = "StartTime"
32-
ExecutionTime = "ExecutionTime"
33-
CloseTime = "CloseTime"
34-
CloseStatus = "CloseStatus"
35-
HistoryLength = "HistoryLength"
36-
Encoding = "Encoding"
37-
KafkaKey = "KafkaKey"
38-
BinaryChecksums = "BinaryChecksums"
39-
TaskList = "TaskList"
40-
IsCron = "IsCron"
41-
NumClusters = "NumClusters"
42-
UpdateTime = "UpdateTime"
43-
CustomDomain = "CustomDomain" // to support batch workflow
44-
Operator = "Operator" // to support batch workflow
27+
DomainID = "DomainID"
28+
WorkflowID = "WorkflowID"
29+
RunID = "RunID"
30+
WorkflowType = "WorkflowType"
31+
StartTime = "StartTime"
32+
ExecutionTime = "ExecutionTime"
33+
CloseTime = "CloseTime"
34+
CloseStatus = "CloseStatus"
35+
HistoryLength = "HistoryLength"
36+
Encoding = "Encoding"
37+
KafkaKey = "KafkaKey"
38+
BinaryChecksums = "BinaryChecksums"
39+
TaskList = "TaskList"
40+
ClusterAttributeScope = "ClusterAttributeScope"
41+
ClusterAttributeName = "ClusterAttributeName"
42+
IsCron = "IsCron"
43+
NumClusters = "NumClusters"
44+
UpdateTime = "UpdateTime"
45+
CustomDomain = "CustomDomain" // to support batch workflow
46+
Operator = "Operator" // to support batch workflow
4547

4648
CustomStringField = "CustomStringField"
4749
CustomKeywordField = "CustomKeywordField"
@@ -90,19 +92,21 @@ func GetDefaultIndexedKeys() map[string]interface{} {
9092

9193
// systemIndexedKeys is Cadence created visibility keys
9294
var systemIndexedKeys = map[string]interface{}{
93-
DomainID: types.IndexedValueTypeKeyword,
94-
WorkflowID: types.IndexedValueTypeKeyword,
95-
RunID: types.IndexedValueTypeKeyword,
96-
WorkflowType: types.IndexedValueTypeKeyword,
97-
StartTime: types.IndexedValueTypeInt,
98-
ExecutionTime: types.IndexedValueTypeInt,
99-
CloseTime: types.IndexedValueTypeInt,
100-
CloseStatus: types.IndexedValueTypeInt,
101-
HistoryLength: types.IndexedValueTypeInt,
102-
TaskList: types.IndexedValueTypeKeyword,
103-
IsCron: types.IndexedValueTypeBool,
104-
NumClusters: types.IndexedValueTypeInt,
105-
UpdateTime: types.IndexedValueTypeInt,
95+
DomainID: types.IndexedValueTypeKeyword,
96+
WorkflowID: types.IndexedValueTypeKeyword,
97+
RunID: types.IndexedValueTypeKeyword,
98+
WorkflowType: types.IndexedValueTypeKeyword,
99+
StartTime: types.IndexedValueTypeInt,
100+
ExecutionTime: types.IndexedValueTypeInt,
101+
CloseTime: types.IndexedValueTypeInt,
102+
CloseStatus: types.IndexedValueTypeInt,
103+
HistoryLength: types.IndexedValueTypeInt,
104+
TaskList: types.IndexedValueTypeKeyword,
105+
IsCron: types.IndexedValueTypeBool,
106+
NumClusters: types.IndexedValueTypeInt,
107+
UpdateTime: types.IndexedValueTypeInt,
108+
ClusterAttributeScope: types.IndexedValueTypeKeyword,
109+
ClusterAttributeName: types.IndexedValueTypeKeyword,
106110
}
107111

108112
// IsSystemIndexedKey return true is key is system added

common/elasticsearch/client.go

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -283,18 +283,20 @@ func (c *ESClient) convertSearchResultToVisibilityRecord(hit *client.SearchHit)
283283
}
284284

285285
record := &p.InternalVisibilityWorkflowExecutionInfo{
286-
DomainID: source.DomainID,
287-
WorkflowType: source.WorkflowType,
288-
WorkflowID: source.WorkflowID,
289-
RunID: source.RunID,
290-
TypeName: source.WorkflowType,
291-
StartTime: time.Unix(0, source.StartTime),
292-
ExecutionTime: time.Unix(0, source.ExecutionTime),
293-
Memo: p.NewDataBlob(source.Memo, constants.EncodingType(source.Encoding)),
294-
TaskList: source.TaskList,
295-
IsCron: source.IsCron,
296-
NumClusters: source.NumClusters,
297-
SearchAttributes: source.Attr,
286+
DomainID: source.DomainID,
287+
WorkflowType: source.WorkflowType,
288+
WorkflowID: source.WorkflowID,
289+
RunID: source.RunID,
290+
TypeName: source.WorkflowType,
291+
StartTime: time.Unix(0, source.StartTime),
292+
ExecutionTime: time.Unix(0, source.ExecutionTime),
293+
Memo: p.NewDataBlob(source.Memo, constants.EncodingType(source.Encoding)),
294+
TaskList: source.TaskList,
295+
IsCron: source.IsCron,
296+
NumClusters: source.NumClusters,
297+
ClusterAttributeScope: source.ClusterAttributeScope,
298+
ClusterAttributeName: source.ClusterAttributeName,
299+
SearchAttributes: source.Attr,
298300
}
299301
if source.UpdateTime != 0 {
300302
record.UpdateTime = time.Unix(0, source.UpdateTime)

common/elasticsearch/defs.go

Lines changed: 19 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -24,23 +24,25 @@ import "github.com/uber/cadence/.gen/go/indexer"
2424

2525
// All legal fields allowed in elastic search index
2626
const (
27-
DomainID = "DomainID"
28-
WorkflowID = "WorkflowID"
29-
RunID = "RunID"
30-
WorkflowType = "WorkflowType"
31-
StartTime = "StartTime"
32-
ExecutionTime = "ExecutionTime"
33-
CloseTime = "CloseTime"
34-
CloseStatus = "CloseStatus"
35-
HistoryLength = "HistoryLength"
36-
Memo = "Memo"
37-
Encoding = "Encoding"
38-
TaskList = "TaskList"
39-
IsCron = "IsCron"
40-
NumClusters = "NumClusters"
41-
VisibilityOperation = "VisibilityOperation"
42-
UpdateTime = "UpdateTime"
43-
ShardID = "ShardID"
27+
DomainID = "DomainID"
28+
WorkflowID = "WorkflowID"
29+
RunID = "RunID"
30+
WorkflowType = "WorkflowType"
31+
StartTime = "StartTime"
32+
ExecutionTime = "ExecutionTime"
33+
CloseTime = "CloseTime"
34+
CloseStatus = "CloseStatus"
35+
HistoryLength = "HistoryLength"
36+
Memo = "Memo"
37+
Encoding = "Encoding"
38+
TaskList = "TaskList"
39+
IsCron = "IsCron"
40+
NumClusters = "NumClusters"
41+
ClusterAttributeScope = "ClusterAttributeScope"
42+
ClusterAttributeName = "ClusterAttributeName"
43+
VisibilityOperation = "VisibilityOperation"
44+
UpdateTime = "UpdateTime"
45+
ShardID = "ShardID"
4446
)
4547

4648
// Supported field types

common/elasticsearch/interfaces.go

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -165,22 +165,24 @@ type (
165165

166166
// VisibilityRecord is a struct of doc for deserialization
167167
VisibilityRecord struct {
168-
WorkflowID string
169-
RunID string
170-
WorkflowType string
171-
DomainID string
172-
StartTime int64
173-
ExecutionTime int64
174-
CloseTime int64
175-
CloseStatus workflow.WorkflowExecutionCloseStatus
176-
HistoryLength int64
177-
Memo []byte
178-
Encoding string
179-
TaskList string
180-
IsCron bool
181-
NumClusters int16
182-
UpdateTime int64
183-
Attr map[string]interface{}
168+
WorkflowID string
169+
RunID string
170+
WorkflowType string
171+
DomainID string
172+
StartTime int64
173+
ExecutionTime int64
174+
CloseTime int64
175+
CloseStatus workflow.WorkflowExecutionCloseStatus
176+
HistoryLength int64
177+
Memo []byte
178+
Encoding string
179+
TaskList string
180+
IsCron bool
181+
NumClusters int16
182+
ClusterAttributeScope string
183+
ClusterAttributeName string
184+
UpdateTime int64
185+
Attr map[string]interface{}
184186
}
185187

186188
SearchHits struct {

common/persistence/data_store_interfaces.go

Lines changed: 73 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -647,23 +647,25 @@ type (
647647

648648
// InternalVisibilityWorkflowExecutionInfo is visibility info for internal response
649649
InternalVisibilityWorkflowExecutionInfo struct {
650-
DomainID string
651-
WorkflowType string
652-
WorkflowID string
653-
RunID string
654-
TypeName string
655-
StartTime time.Time
656-
ExecutionTime time.Time
657-
CloseTime time.Time
658-
Status *types.WorkflowExecutionCloseStatus
659-
HistoryLength int64
660-
Memo *DataBlob
661-
TaskList string
662-
IsCron bool
663-
NumClusters int16
664-
UpdateTime time.Time
665-
SearchAttributes map[string]interface{}
666-
ShardID int16
650+
DomainID string
651+
WorkflowType string
652+
WorkflowID string
653+
RunID string
654+
TypeName string
655+
StartTime time.Time
656+
ExecutionTime time.Time
657+
CloseTime time.Time
658+
Status *types.WorkflowExecutionCloseStatus
659+
HistoryLength int64
660+
Memo *DataBlob
661+
TaskList string
662+
IsCron bool
663+
NumClusters int16
664+
ClusterAttributeScope string
665+
ClusterAttributeName string
666+
UpdateTime time.Time
667+
SearchAttributes map[string]interface{}
668+
ShardID int16
667669
}
668670

669671
// InternalListWorkflowExecutionsResponse is response from ListWorkflowExecutions
@@ -706,43 +708,47 @@ type (
706708

707709
// InternalRecordWorkflowExecutionStartedRequest request to RecordWorkflowExecutionStarted
708710
InternalRecordWorkflowExecutionStartedRequest struct {
709-
DomainUUID string
710-
WorkflowID string
711-
RunID string
712-
WorkflowTypeName string
713-
StartTimestamp time.Time
714-
ExecutionTimestamp time.Time
715-
WorkflowTimeout time.Duration
716-
TaskID int64
717-
Memo *DataBlob
718-
TaskList string
719-
IsCron bool
720-
NumClusters int16
721-
UpdateTimestamp time.Time
722-
SearchAttributes map[string][]byte
723-
ShardID int16
711+
DomainUUID string
712+
WorkflowID string
713+
RunID string
714+
WorkflowTypeName string
715+
StartTimestamp time.Time
716+
ExecutionTimestamp time.Time
717+
WorkflowTimeout time.Duration
718+
TaskID int64
719+
Memo *DataBlob
720+
TaskList string
721+
IsCron bool
722+
NumClusters int16
723+
ClusterAttributeScope string
724+
ClusterAttributeName string
725+
UpdateTimestamp time.Time
726+
SearchAttributes map[string][]byte
727+
ShardID int16
724728
}
725729

726730
// InternalRecordWorkflowExecutionClosedRequest is request to RecordWorkflowExecutionClosed
727731
InternalRecordWorkflowExecutionClosedRequest struct {
728-
DomainUUID string
729-
WorkflowID string
730-
RunID string
731-
WorkflowTypeName string
732-
StartTimestamp time.Time
733-
ExecutionTimestamp time.Time
734-
TaskID int64
735-
Memo *DataBlob
736-
TaskList string
737-
SearchAttributes map[string][]byte
738-
CloseTimestamp time.Time
739-
Status types.WorkflowExecutionCloseStatus
740-
HistoryLength int64
741-
RetentionPeriod time.Duration
742-
IsCron bool
743-
NumClusters int16
744-
UpdateTimestamp time.Time
745-
ShardID int16
732+
DomainUUID string
733+
WorkflowID string
734+
RunID string
735+
WorkflowTypeName string
736+
StartTimestamp time.Time
737+
ExecutionTimestamp time.Time
738+
TaskID int64
739+
Memo *DataBlob
740+
TaskList string
741+
SearchAttributes map[string][]byte
742+
CloseTimestamp time.Time
743+
Status types.WorkflowExecutionCloseStatus
744+
HistoryLength int64
745+
RetentionPeriod time.Duration
746+
IsCron bool
747+
NumClusters int16
748+
ClusterAttributeScope string
749+
ClusterAttributeName string
750+
UpdateTimestamp time.Time
751+
ShardID int16
746752
}
747753

748754
// InternalRecordWorkflowExecutionUninitializedRequest is used to add a record of a newly uninitialized execution
@@ -757,21 +763,23 @@ type (
757763

758764
// InternalUpsertWorkflowExecutionRequest is request to UpsertWorkflowExecution
759765
InternalUpsertWorkflowExecutionRequest struct {
760-
DomainUUID string
761-
WorkflowID string
762-
RunID string
763-
WorkflowTypeName string
764-
StartTimestamp time.Time
765-
ExecutionTimestamp time.Time
766-
WorkflowTimeout time.Duration
767-
TaskID int64
768-
Memo *DataBlob
769-
TaskList string
770-
IsCron bool
771-
NumClusters int16
772-
UpdateTimestamp time.Time
773-
SearchAttributes map[string][]byte
774-
ShardID int64
766+
DomainUUID string
767+
WorkflowID string
768+
RunID string
769+
WorkflowTypeName string
770+
StartTimestamp time.Time
771+
ExecutionTimestamp time.Time
772+
WorkflowTimeout time.Duration
773+
TaskID int64
774+
Memo *DataBlob
775+
TaskList string
776+
IsCron bool
777+
NumClusters int16
778+
ClusterAttributeScope string
779+
ClusterAttributeName string
780+
UpdateTimestamp time.Time
781+
SearchAttributes map[string][]byte
782+
ShardID int64
775783
}
776784

777785
// InternalListWorkflowExecutionsRequest is used to list executions in a domain

0 commit comments

Comments
 (0)