Skip to content
66 changes: 35 additions & 31 deletions common/definition/indexedKeys.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,24 +24,26 @@ import "github.com/uber/cadence/common/types"

// valid indexed fields on ES
const (
DomainID = "DomainID"
WorkflowID = "WorkflowID"
RunID = "RunID"
WorkflowType = "WorkflowType"
StartTime = "StartTime"
ExecutionTime = "ExecutionTime"
CloseTime = "CloseTime"
CloseStatus = "CloseStatus"
HistoryLength = "HistoryLength"
Encoding = "Encoding"
KafkaKey = "KafkaKey"
BinaryChecksums = "BinaryChecksums"
TaskList = "TaskList"
IsCron = "IsCron"
NumClusters = "NumClusters"
UpdateTime = "UpdateTime"
CustomDomain = "CustomDomain" // to support batch workflow
Operator = "Operator" // to support batch workflow
DomainID = "DomainID"
WorkflowID = "WorkflowID"
RunID = "RunID"
WorkflowType = "WorkflowType"
StartTime = "StartTime"
ExecutionTime = "ExecutionTime"
CloseTime = "CloseTime"
CloseStatus = "CloseStatus"
HistoryLength = "HistoryLength"
Encoding = "Encoding"
KafkaKey = "KafkaKey"
BinaryChecksums = "BinaryChecksums"
TaskList = "TaskList"
ClusterAttributeScope = "ClusterAttributeScope"
ClusterAttributeName = "ClusterAttributeName"
IsCron = "IsCron"
NumClusters = "NumClusters"
UpdateTime = "UpdateTime"
CustomDomain = "CustomDomain" // to support batch workflow
Operator = "Operator" // to support batch workflow

CustomStringField = "CustomStringField"
CustomKeywordField = "CustomKeywordField"
Expand Down Expand Up @@ -90,19 +92,21 @@ func GetDefaultIndexedKeys() map[string]interface{} {

// systemIndexedKeys is Cadence created visibility keys
var systemIndexedKeys = map[string]interface{}{
DomainID: types.IndexedValueTypeKeyword,
WorkflowID: types.IndexedValueTypeKeyword,
RunID: types.IndexedValueTypeKeyword,
WorkflowType: types.IndexedValueTypeKeyword,
StartTime: types.IndexedValueTypeInt,
ExecutionTime: types.IndexedValueTypeInt,
CloseTime: types.IndexedValueTypeInt,
CloseStatus: types.IndexedValueTypeInt,
HistoryLength: types.IndexedValueTypeInt,
TaskList: types.IndexedValueTypeKeyword,
IsCron: types.IndexedValueTypeBool,
NumClusters: types.IndexedValueTypeInt,
UpdateTime: types.IndexedValueTypeInt,
DomainID: types.IndexedValueTypeKeyword,
WorkflowID: types.IndexedValueTypeKeyword,
RunID: types.IndexedValueTypeKeyword,
WorkflowType: types.IndexedValueTypeKeyword,
StartTime: types.IndexedValueTypeInt,
ExecutionTime: types.IndexedValueTypeInt,
CloseTime: types.IndexedValueTypeInt,
CloseStatus: types.IndexedValueTypeInt,
HistoryLength: types.IndexedValueTypeInt,
TaskList: types.IndexedValueTypeKeyword,
IsCron: types.IndexedValueTypeBool,
NumClusters: types.IndexedValueTypeInt,
UpdateTime: types.IndexedValueTypeInt,
ClusterAttributeScope: types.IndexedValueTypeKeyword,
ClusterAttributeName: types.IndexedValueTypeKeyword,
}

// IsSystemIndexedKey return true is key is system added
Expand Down
26 changes: 14 additions & 12 deletions common/elasticsearch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,18 +283,20 @@ func (c *ESClient) convertSearchResultToVisibilityRecord(hit *client.SearchHit)
}

record := &p.InternalVisibilityWorkflowExecutionInfo{
DomainID: source.DomainID,
WorkflowType: source.WorkflowType,
WorkflowID: source.WorkflowID,
RunID: source.RunID,
TypeName: source.WorkflowType,
StartTime: time.Unix(0, source.StartTime),
ExecutionTime: time.Unix(0, source.ExecutionTime),
Memo: p.NewDataBlob(source.Memo, constants.EncodingType(source.Encoding)),
TaskList: source.TaskList,
IsCron: source.IsCron,
NumClusters: source.NumClusters,
SearchAttributes: source.Attr,
DomainID: source.DomainID,
WorkflowType: source.WorkflowType,
WorkflowID: source.WorkflowID,
RunID: source.RunID,
TypeName: source.WorkflowType,
StartTime: time.Unix(0, source.StartTime),
ExecutionTime: time.Unix(0, source.ExecutionTime),
Memo: p.NewDataBlob(source.Memo, constants.EncodingType(source.Encoding)),
TaskList: source.TaskList,
IsCron: source.IsCron,
NumClusters: source.NumClusters,
ClusterAttributeScope: source.ClusterAttributeScope,
ClusterAttributeName: source.ClusterAttributeName,
SearchAttributes: source.Attr,
}
if source.UpdateTime != 0 {
record.UpdateTime = time.Unix(0, source.UpdateTime)
Expand Down
36 changes: 19 additions & 17 deletions common/elasticsearch/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,23 +24,25 @@ import "github.com/uber/cadence/.gen/go/indexer"

// All legal fields allowed in elastic search index
const (
DomainID = "DomainID"
WorkflowID = "WorkflowID"
RunID = "RunID"
WorkflowType = "WorkflowType"
StartTime = "StartTime"
ExecutionTime = "ExecutionTime"
CloseTime = "CloseTime"
CloseStatus = "CloseStatus"
HistoryLength = "HistoryLength"
Memo = "Memo"
Encoding = "Encoding"
TaskList = "TaskList"
IsCron = "IsCron"
NumClusters = "NumClusters"
VisibilityOperation = "VisibilityOperation"
UpdateTime = "UpdateTime"
ShardID = "ShardID"
DomainID = "DomainID"
WorkflowID = "WorkflowID"
RunID = "RunID"
WorkflowType = "WorkflowType"
StartTime = "StartTime"
ExecutionTime = "ExecutionTime"
CloseTime = "CloseTime"
CloseStatus = "CloseStatus"
HistoryLength = "HistoryLength"
Memo = "Memo"
Encoding = "Encoding"
TaskList = "TaskList"
IsCron = "IsCron"
NumClusters = "NumClusters"
ClusterAttributeScope = "ClusterAttributeScope"
ClusterAttributeName = "ClusterAttributeName"
VisibilityOperation = "VisibilityOperation"
UpdateTime = "UpdateTime"
ShardID = "ShardID"
)

// Supported field types
Expand Down
34 changes: 18 additions & 16 deletions common/elasticsearch/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,22 +165,24 @@ type (

// VisibilityRecord is a struct of doc for deserialization
VisibilityRecord struct {
WorkflowID string
RunID string
WorkflowType string
DomainID string
StartTime int64
ExecutionTime int64
CloseTime int64
CloseStatus workflow.WorkflowExecutionCloseStatus
HistoryLength int64
Memo []byte
Encoding string
TaskList string
IsCron bool
NumClusters int16
UpdateTime int64
Attr map[string]interface{}
WorkflowID string
RunID string
WorkflowType string
DomainID string
StartTime int64
ExecutionTime int64
CloseTime int64
CloseStatus workflow.WorkflowExecutionCloseStatus
HistoryLength int64
Memo []byte
Encoding string
TaskList string
IsCron bool
NumClusters int16
ClusterAttributeScope string
ClusterAttributeName string
UpdateTime int64
Attr map[string]interface{}
}

SearchHits struct {
Expand Down
138 changes: 73 additions & 65 deletions common/persistence/data_store_interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -647,23 +647,25 @@ type (

// InternalVisibilityWorkflowExecutionInfo is visibility info for internal response
InternalVisibilityWorkflowExecutionInfo struct {
DomainID string
WorkflowType string
WorkflowID string
RunID string
TypeName string
StartTime time.Time
ExecutionTime time.Time
CloseTime time.Time
Status *types.WorkflowExecutionCloseStatus
HistoryLength int64
Memo *DataBlob
TaskList string
IsCron bool
NumClusters int16
UpdateTime time.Time
SearchAttributes map[string]interface{}
ShardID int16
DomainID string
WorkflowType string
WorkflowID string
RunID string
TypeName string
StartTime time.Time
ExecutionTime time.Time
CloseTime time.Time
Status *types.WorkflowExecutionCloseStatus
HistoryLength int64
Memo *DataBlob
TaskList string
IsCron bool
NumClusters int16
ClusterAttributeScope string
ClusterAttributeName string
UpdateTime time.Time
SearchAttributes map[string]interface{}
ShardID int16
}

// InternalListWorkflowExecutionsResponse is response from ListWorkflowExecutions
Expand Down Expand Up @@ -706,43 +708,47 @@ type (

// InternalRecordWorkflowExecutionStartedRequest request to RecordWorkflowExecutionStarted
InternalRecordWorkflowExecutionStartedRequest struct {
DomainUUID string
WorkflowID string
RunID string
WorkflowTypeName string
StartTimestamp time.Time
ExecutionTimestamp time.Time
WorkflowTimeout time.Duration
TaskID int64
Memo *DataBlob
TaskList string
IsCron bool
NumClusters int16
UpdateTimestamp time.Time
SearchAttributes map[string][]byte
ShardID int16
DomainUUID string
WorkflowID string
RunID string
WorkflowTypeName string
StartTimestamp time.Time
ExecutionTimestamp time.Time
WorkflowTimeout time.Duration
TaskID int64
Memo *DataBlob
TaskList string
IsCron bool
NumClusters int16
ClusterAttributeScope string
ClusterAttributeName string
UpdateTimestamp time.Time
SearchAttributes map[string][]byte
ShardID int16
}

// InternalRecordWorkflowExecutionClosedRequest is request to RecordWorkflowExecutionClosed
InternalRecordWorkflowExecutionClosedRequest struct {
DomainUUID string
WorkflowID string
RunID string
WorkflowTypeName string
StartTimestamp time.Time
ExecutionTimestamp time.Time
TaskID int64
Memo *DataBlob
TaskList string
SearchAttributes map[string][]byte
CloseTimestamp time.Time
Status types.WorkflowExecutionCloseStatus
HistoryLength int64
RetentionPeriod time.Duration
IsCron bool
NumClusters int16
UpdateTimestamp time.Time
ShardID int16
DomainUUID string
WorkflowID string
RunID string
WorkflowTypeName string
StartTimestamp time.Time
ExecutionTimestamp time.Time
TaskID int64
Memo *DataBlob
TaskList string
SearchAttributes map[string][]byte
CloseTimestamp time.Time
Status types.WorkflowExecutionCloseStatus
HistoryLength int64
RetentionPeriod time.Duration
IsCron bool
NumClusters int16
ClusterAttributeScope string
ClusterAttributeName string
UpdateTimestamp time.Time
ShardID int16
}

// InternalRecordWorkflowExecutionUninitializedRequest is used to add a record of a newly uninitialized execution
Expand All @@ -757,21 +763,23 @@ type (

// InternalUpsertWorkflowExecutionRequest is request to UpsertWorkflowExecution
InternalUpsertWorkflowExecutionRequest struct {
DomainUUID string
WorkflowID string
RunID string
WorkflowTypeName string
StartTimestamp time.Time
ExecutionTimestamp time.Time
WorkflowTimeout time.Duration
TaskID int64
Memo *DataBlob
TaskList string
IsCron bool
NumClusters int16
UpdateTimestamp time.Time
SearchAttributes map[string][]byte
ShardID int64
DomainUUID string
WorkflowID string
RunID string
WorkflowTypeName string
StartTimestamp time.Time
ExecutionTimestamp time.Time
WorkflowTimeout time.Duration
TaskID int64
Memo *DataBlob
TaskList string
IsCron bool
NumClusters int16
ClusterAttributeScope string
ClusterAttributeName string
UpdateTimestamp time.Time
SearchAttributes map[string][]byte
ShardID int64
}

// InternalListWorkflowExecutionsRequest is used to list executions in a domain
Expand Down
Loading
Loading