Skip to content

Commit e59c856

Browse files
cgillumhalspang
andauthored
Proper workflow retry support in Dapr SDK (#1090)
Signed-off-by: Chris Gillum <cgillum@microsoft.com> Co-authored-by: halspang <70976921+halspang@users.noreply.github.com>
1 parent a2d3c3a commit e59c856

File tree

8 files changed

+221
-37
lines changed

8 files changed

+221
-37
lines changed

examples/Workflow/WorkflowConsoleApp/Activities/ReserveInventoryActivity.cs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
1-
using System.Threading.Tasks;
2-
using Dapr.Client;
1+
using Dapr.Client;
32
using Dapr.Workflow;
43
using Microsoft.Extensions.Logging;
54
using WorkflowConsoleApp.Models;
@@ -27,7 +26,7 @@ public override async Task<InventoryResult> RunAsync(WorkflowActivityContext con
2726
req.ItemName);
2827

2928
// Ensure that the store has items
30-
InventoryItem item = await client.GetStateAsync<InventoryItem>(
29+
InventoryItem item = await this.client.GetStateAsync<InventoryItem>(
3130
storeName,
3231
req.ItemName.ToLowerInvariant());
3332

examples/Workflow/WorkflowConsoleApp/Activities/UpdateInventoryActivity.cs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
1-
using System.Threading.Tasks;
2-
using Dapr.Client;
1+
using Dapr.Client;
32
using Dapr.Workflow;
43
using WorkflowConsoleApp.Models;
54
using Microsoft.Extensions.Logging;

examples/Workflow/WorkflowConsoleApp/Workflows/OrderProcessingWorkflow.cs

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,18 @@ namespace WorkflowConsoleApp.Workflows
66
{
77
public class OrderProcessingWorkflow : Workflow<OrderPayload, OrderResult>
88
{
9+
readonly WorkflowTaskOptions defaultActivityRetryOptions = new WorkflowTaskOptions
10+
{
11+
// NOTE: Beware that changing the number of retries is a breaking change for existing workflows.
12+
RetryPolicy = new WorkflowRetryPolicy(
13+
maxNumberOfAttempts: 3,
14+
firstRetryInterval: TimeSpan.FromSeconds(5)),
15+
};
16+
917
public override async Task<OrderResult> RunAsync(WorkflowContext context, OrderPayload order)
1018
{
1119
string orderId = context.InstanceId;
1220

13-
1421
// Notify the user that an order has come through
1522
await context.CallActivityAsync(
1623
nameof(NotifyActivity),
@@ -19,7 +26,8 @@ await context.CallActivityAsync(
1926
// Determine if there is enough of the item available for purchase by checking the inventory
2027
InventoryResult result = await context.CallActivityAsync<InventoryResult>(
2128
nameof(ReserveInventoryActivity),
22-
new InventoryRequest(RequestId: orderId, order.Name, order.Quantity));
29+
new InventoryRequest(RequestId: orderId, order.Name, order.Quantity),
30+
this.defaultActivityRetryOptions);
2331

2432
// If there is insufficient inventory, fail and let the user know
2533
if (!result.Success)
@@ -34,14 +42,16 @@ await context.CallActivityAsync(
3442
// There is enough inventory available so the user can purchase the item(s). Process their payment
3543
await context.CallActivityAsync(
3644
nameof(ProcessPaymentActivity),
37-
new PaymentRequest(RequestId: orderId, order.Name, order.Quantity, order.TotalCost));
45+
new PaymentRequest(RequestId: orderId, order.Name, order.Quantity, order.TotalCost),
46+
this.defaultActivityRetryOptions);
3847

3948
try
4049
{
4150
// There is enough inventory available so the user can purchase the item(s). Process their payment
4251
await context.CallActivityAsync(
4352
nameof(UpdateInventoryActivity),
44-
new PaymentRequest(RequestId: orderId, order.Name, order.Quantity, order.TotalCost));
53+
new PaymentRequest(RequestId: orderId, order.Name, order.Quantity, order.TotalCost),
54+
this.defaultActivityRetryOptions);
4555
}
4656
catch (WorkflowTaskFailedException e)
4757
{

examples/Workflow/WorkflowUnitTest/OrderProcessingTests.cs

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
using System.Threading.Tasks;
22
using Dapr.Workflow;
3-
using Microsoft.DurableTask;
43
using Moq;
54
using WorkflowConsoleApp.Activities;
65
using WorkflowConsoleApp.Models;
@@ -24,7 +23,7 @@ public async Task TestSuccessfulOrder()
2423
// Mock the call to ReserveInventoryActivity
2524
Mock<WorkflowContext> mockContext = new();
2625
mockContext
27-
.Setup(ctx => ctx.CallActivityAsync<InventoryResult>(nameof(ReserveInventoryActivity), It.IsAny<InventoryRequest>(), It.IsAny<TaskOptions>()))
26+
.Setup(ctx => ctx.CallActivityAsync<InventoryResult>(nameof(ReserveInventoryActivity), It.IsAny<InventoryRequest>(), It.IsAny<WorkflowTaskOptions>()))
2827
.Returns(Task.FromResult(inventoryResult));
2928

3029
// Run the workflow directly
@@ -36,17 +35,17 @@ public async Task TestSuccessfulOrder()
3635

3736
// Verify that ReserveInventoryActivity was called with a specific input
3837
mockContext.Verify(
39-
ctx => ctx.CallActivityAsync<InventoryResult>(nameof(ReserveInventoryActivity), expectedInventoryRequest, It.IsAny<TaskOptions>()),
38+
ctx => ctx.CallActivityAsync<InventoryResult>(nameof(ReserveInventoryActivity), expectedInventoryRequest, It.IsAny<WorkflowTaskOptions>()),
4039
Times.Once());
4140

4241
// Verify that ProcessPaymentActivity was called with a specific input
4342
mockContext.Verify(
44-
ctx => ctx.CallActivityAsync(nameof(ProcessPaymentActivity), expectedPaymentRequest, It.IsAny<TaskOptions>()),
43+
ctx => ctx.CallActivityAsync(nameof(ProcessPaymentActivity), expectedPaymentRequest, It.IsAny<WorkflowTaskOptions>()),
4544
Times.Once());
4645

4746
// Verify that there were two calls to NotifyActivity
4847
mockContext.Verify(
49-
ctx => ctx.CallActivityAsync(nameof(NotifyActivity), It.IsAny<Notification>(), It.IsAny<TaskOptions>()),
48+
ctx => ctx.CallActivityAsync(nameof(NotifyActivity), It.IsAny<Notification>(), It.IsAny<WorkflowTaskOptions>()),
5049
Times.Exactly(2));
5150
}
5251

@@ -61,25 +60,25 @@ public async Task TestInsufficientInventory()
6160
// Mock the call to ReserveInventoryActivity
6261
Mock<WorkflowContext> mockContext = new();
6362
mockContext
64-
.Setup(ctx => ctx.CallActivityAsync<InventoryResult>(nameof(ReserveInventoryActivity), It.IsAny<InventoryRequest>(), It.IsAny<TaskOptions>()))
63+
.Setup(ctx => ctx.CallActivityAsync<InventoryResult>(nameof(ReserveInventoryActivity), It.IsAny<InventoryRequest>(), It.IsAny<WorkflowTaskOptions>()))
6564
.Returns(Task.FromResult(inventoryResult));
6665

6766
// Run the workflow directly
6867
OrderResult result = await new OrderProcessingWorkflow().RunAsync(mockContext.Object, order);
6968

7069
// Verify that ReserveInventoryActivity was called with a specific input
7170
mockContext.Verify(
72-
ctx => ctx.CallActivityAsync<InventoryResult>(nameof(ReserveInventoryActivity), expectedInventoryRequest, It.IsAny<TaskOptions>()),
71+
ctx => ctx.CallActivityAsync<InventoryResult>(nameof(ReserveInventoryActivity), expectedInventoryRequest, It.IsAny<WorkflowTaskOptions>()),
7372
Times.Once());
7473

7574
// Verify that ProcessPaymentActivity was never called
7675
mockContext.Verify(
77-
ctx => ctx.CallActivityAsync(nameof(ProcessPaymentActivity), It.IsAny<PaymentRequest>(), It.IsAny<TaskOptions>()),
76+
ctx => ctx.CallActivityAsync(nameof(ProcessPaymentActivity), It.IsAny<PaymentRequest>(), It.IsAny<WorkflowTaskOptions>()),
7877
Times.Never());
7978

8079
// Verify that there were two calls to NotifyActivity
8180
mockContext.Verify(
82-
ctx => ctx.CallActivityAsync(nameof(NotifyActivity), It.IsAny<Notification>(), It.IsAny<TaskOptions>()),
81+
ctx => ctx.CallActivityAsync(nameof(NotifyActivity), It.IsAny<Notification>(), It.IsAny<WorkflowTaskOptions>()),
8382
Times.Exactly(2));
8483
}
8584
}

src/Dapr.Workflow/DaprWorkflowContext.cs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -35,14 +35,14 @@ internal DaprWorkflowContext(TaskOrchestrationContext innerContext)
3535

3636
public override bool IsReplaying => this.innerContext.IsReplaying;
3737

38-
public override Task CallActivityAsync(string name, object? input = null, TaskOptions? options = null)
38+
public override Task CallActivityAsync(string name, object? input = null, WorkflowTaskOptions? options = null)
3939
{
40-
return WrapExceptions(this.innerContext.CallActivityAsync(name, input, options));
40+
return WrapExceptions(this.innerContext.CallActivityAsync(name, input, options?.ToDurableTaskOptions()));
4141
}
4242

43-
public override Task<T> CallActivityAsync<T>(string name, object? input = null, TaskOptions? options = null)
43+
public override Task<T> CallActivityAsync<T>(string name, object? input = null, WorkflowTaskOptions? options = null)
4444
{
45-
return WrapExceptions(this.innerContext.CallActivityAsync<T>(name, input, options));
45+
return WrapExceptions(this.innerContext.CallActivityAsync<T>(name, input, options?.ToDurableTaskOptions()));
4646
}
4747

4848
public override Task CreateTimer(TimeSpan delay, CancellationToken cancellationToken = default)
@@ -75,14 +75,14 @@ public override void SetCustomStatus(object? customStatus)
7575
this.innerContext.SetCustomStatus(customStatus);
7676
}
7777

78-
public override Task<TResult> CallChildWorkflowAsync<TResult>(string workflowName, object? input = null, TaskOptions? options = null)
78+
public override Task<TResult> CallChildWorkflowAsync<TResult>(string workflowName, object? input = null, ChildWorkflowTaskOptions? options = null)
7979
{
80-
return WrapExceptions(this.innerContext.CallSubOrchestratorAsync<TResult>(workflowName, input, options));
80+
return WrapExceptions(this.innerContext.CallSubOrchestratorAsync<TResult>(workflowName, input, options?.ToDurableTaskOptions()));
8181
}
8282

83-
public override Task CallChildWorkflowAsync(string workflowName, object? input = null, TaskOptions? options = null)
83+
public override Task CallChildWorkflowAsync(string workflowName, object? input = null, ChildWorkflowTaskOptions? options = null)
8484
{
85-
return WrapExceptions(this.innerContext.CallSubOrchestratorAsync(workflowName, input, options));
85+
return WrapExceptions(this.innerContext.CallSubOrchestratorAsync(workflowName, input, options?.ToDurableTaskOptions()));
8686
}
8787

8888
public override void ContinueAsNew(object? newInput = null, bool preserveUnprocessedEvents = true)

src/Dapr.Workflow/WorkflowContext.cs

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ namespace Dapr.Workflow
1616
using System;
1717
using System.Threading;
1818
using System.Threading.Tasks;
19-
using Microsoft.DurableTask;
2019

2120
/// <summary>
2221
/// Context object used by workflow implementations to perform actions such as scheduling activities, durable timers, waiting for
@@ -101,7 +100,7 @@ public abstract class WorkflowContext
101100
/// The activity failed with an unhandled exception. The details of the failure can be found in the
102101
/// <see cref="WorkflowTaskFailedException.FailureDetails"/> property.
103102
/// </exception>
104-
public virtual Task CallActivityAsync(string name, object? input = null, TaskOptions? options = null)
103+
public virtual Task CallActivityAsync(string name, object? input = null, WorkflowTaskOptions? options = null)
105104
{
106105
return this.CallActivityAsync<object>(name, input, options);
107106
}
@@ -110,7 +109,7 @@ public virtual Task CallActivityAsync(string name, object? input = null, TaskOpt
110109
/// A task that completes when the activity completes or fails. The result of the task is the activity's return value.
111110
/// </returns>
112111
/// <inheritdoc cref="CallActivityAsync"/>
113-
public abstract Task<T> CallActivityAsync<T>(string name, object? input = null, TaskOptions? options = null);
112+
public abstract Task<T> CallActivityAsync<T>(string name, object? input = null, WorkflowTaskOptions? options = null);
114113

115114
/// <summary>
116115
/// Creates a durable timer that expires after the specified delay.
@@ -212,8 +211,11 @@ public virtual Task CreateTimer(TimeSpan delay, CancellationToken cancellationTo
212211
/// <typeparam name="TResult">
213212
/// The type into which to deserialize the child workflow's output.
214213
/// </typeparam>
215-
/// <inheritdoc cref="CallChildWorkflowAsync(string, object?, TaskOptions?)"/>
216-
public abstract Task<TResult> CallChildWorkflowAsync<TResult>(string workflowName, object? input = null, TaskOptions? options = null);
214+
/// <inheritdoc cref="CallChildWorkflowAsync(string, object?, ChildWorkflowTaskOptions?)"/>
215+
public abstract Task<TResult> CallChildWorkflowAsync<TResult>(
216+
string workflowName,
217+
object? input = null,
218+
ChildWorkflowTaskOptions? options = null);
217219

218220
/// <summary>
219221
/// Executes the specified workflow as a child workflow.
@@ -222,7 +224,8 @@ public virtual Task CreateTimer(TimeSpan delay, CancellationToken cancellationTo
222224
/// <para>
223225
/// In addition to activities, workflows can schedule other workflows as <i>child workflows</i>.
224226
/// A child workflow has its own instance ID, history, and status that is independent of the parent workflow
225-
/// that started it.
227+
/// that started it. You can use <see cref="ChildWorkflowTaskOptions.InstanceId" /> to specify an instance ID
228+
/// for the child workflow. Otherwise, the instance ID will be randomly generated.
226229
/// </para><para>
227230
/// Child workflows have many benefits:
228231
/// <list type="bullet">
@@ -237,15 +240,14 @@ public virtual Task CreateTimer(TimeSpan delay, CancellationToken cancellationTo
237240
/// exception. Child workflows also support automatic retry policies.
238241
/// </para><para>
239242
/// Because child workflows are independent of their parents, terminating a parent workflow does not affect
240-
/// any child workflows. You must terminate each child workflow independently using its instance ID, which is
241-
/// specified by supplying <see cref="SubOrchestrationOptions" /> in place of <see cref="TaskOptions" />.
243+
/// any child workflows. You must terminate each child workflow independently using its instance ID, which
244+
/// is specified by <see cref="ChildWorkflowTaskOptions.InstanceId" />.
242245
/// </para>
243246
/// </remarks>
244247
/// <param name="workflowName">The name of the workflow to call.</param>
245248
/// <param name="input">The serializable input to pass to the child workflow.</param>
246249
/// <param name="options">
247-
/// Additional options that control the execution and processing of the child workflow. Callers can choose to
248-
/// supply the derived type <see cref="SubOrchestrationOptions" />.
250+
/// Additional options that control the execution and processing of the child workflow.
249251
/// </param>
250252
/// <returns>A task that completes when the child workflow completes or fails.</returns>
251253
/// <exception cref="ArgumentException">The specified workflow does not exist.</exception>
@@ -256,7 +258,10 @@ public virtual Task CreateTimer(TimeSpan delay, CancellationToken cancellationTo
256258
/// The child workflow failed with an unhandled exception. The details of the failure can be found in the
257259
/// <see cref="WorkflowTaskFailedException.FailureDetails"/> property.
258260
/// </exception>
259-
public virtual Task CallChildWorkflowAsync(string workflowName, object? input = null, TaskOptions? options = null)
261+
public virtual Task CallChildWorkflowAsync(
262+
string workflowName,
263+
object? input = null,
264+
ChildWorkflowTaskOptions? options = null)
260265
{
261266
return this.CallChildWorkflowAsync<object>(workflowName, input, options);
262267
}

0 commit comments

Comments
 (0)