TLDR; Use a standard .NET 8+ host for your microservices running for example in AKS without any Azure WebJobs SDK syntactic sugar or similar fluff/magic1.
Introduction
We started originally now almost 5 years ago with .NET/F# Azure Functions apps, and we had pretty bad experiences with that setup, the biggest issues being low node app density, high costs and too much bloat from MS frameworks.
Soon after that we migrated to AKS (managed Kubernetes on Azure similar to EKS and GKE), but to keep migration efforts reasonable we re-used the WebJobs SDK, which is a building stone for the Azure Functions SDK. More than 2 years ago I wrote an article about the app stub we were using.
Everything was running relatively fine until recently when we had to improve our local development experience, and we needed a way to change the low-level workings of some of our code related to Azure Event Hubs and similar. That was the trigger for re-visiting the usage of WebJobs SDK and actually migrating away from, which is the topic of this article.
When it comes to what Azure WebJobs SDK triggers we were using, I believe we are nothing special and like many others we have the following in place:
- Pub-Sub message bus, in our case Azure Event Hubs by means of
EventHubTrigger
- Queues, in our case Azure Storage Queues by means of
QueueTrigger
- Time-triggered jobs by means of
TimerTrigger
- Websockets, in our case Azure SignalR by means of
SignalRTrigger
Additionally, we use various other IHostedService/BackgroundService processes, which get started upon app start and are running in the background, doing some other work - e.g. listening to notifications from the database (change streams), aggregating something every x seconds, etc.
All of the above is using configuration based on simple environment variables, some of them pointing to a secrets store (in our case Azure Key Vault). There is of course some logging and telemetry sent to a cloud service (in our case Azure Application Insights), as well as some web/REST/HTTP API (in our case using barebone ASP.NET Core SDK).
All XyzTriggers
above have been migrated away from the Azure WebJobs SDK to pretty small, sweet and (almost) fully in our control implementations using directly the Azure SDK, which will be explain in the sections below. But before we do that let's look at how the app stub looks like now.
App Stub v2
// Program.fs module TestService.XyzHandling.Program open System.Threading open Microsoft.Extensions.Hosting open Framework.Hosting open Framework.AzureKeyVault.Environment open TestService.XyzHandling.Api.Wiring Environment.overwriteEnvironmentVariablesFromKVRef () |> Async.RunSynchronously [<EntryPoint>] let main argv = let builder = HostBuilder.createDefaultBuilder argv BackgroundServiceExceptionBehavior.StopHost |> HostBuilder.configureLogging |> HostBuilder.configureAppInsights |> HostBuilder.configureEventHubProcessors EnvVars.appName [ EventHubProcessors.eventHubProcessor1; EventHubProcessors.eventHubProcessor2 ] |> HostBuilder.configureQueueProcessors EnvVars.appName ([ Some QueueProcessors.queueProcessor1 ] @ [ QueueProcessors.queueProcessor2 ] |> List.choose id) |> HostBuilder.configureBackgroundServices EnvVars.appName [ BackgroundServices.eventHubPublisherTest BackgroundServices.queuePublisherTest ] |> HostBuilder.configureTimers EnvVars.appName [ Timers.timerProcessor1; Timers.timerProcessor2 ] |> HostBuilder.configureStartup Startup.startupFunctions |> HostBuilder.configureWebHost EnvVars.appName [ WebApi.checkHealth; WebApi.checkReadiness; WebApi.Entity.getById ] use tokenSource = new CancellationTokenSource() use host = builder.Build() host.RunAsync(tokenSource.Token) |> Async.AwaitTask |> Async.RunSynchronously 0 // return an integer exit code
Notes:
- The above code spawns in total 9 IHosted/Background services (incl. the standard web host)
- Event Hub Processors listen to & process Azure Event Hub events
- Queue Processors listen to & process Azure Storage queue messages
- Timer Processors run stuff every x seconds, minutes, hours etc.
- There are even some generic Background Services started for writing to an event hub and a queue
- One can see at a glance all the running processes and even web api endpoints from the
Program.fs
file
Trigger Implementations
The trigger implementations below are all based on BackgroundService/IHostedService
, which means a background task is spawned and is running the whole time the host itself is running.
Compared to the original implementations in Azure Webjobs SDK the below ones may have less functionality (no auto-scaling for Azure Functions or similar) but at the same time are very lean and much easier to understand/maintain.
The source code of a working sample application will be eventually made available here.
Event Hub Processor
Old code:
// Api.Wiring.fs, handle the event type WebJobs(...) = [<FunctionName("HandleXyzEvent")>] member _.HandleXyzEvent ( [<EventHubTrigger("", Connection = EnvVars.EventHubs.Xyz.connectionStringKey, ConsumerGroup = EnvVars.EventHubs.consumerGroup)>] msg: EventData, enqueuedTimeUtc: DateTime, sequenceNumber: Int64, offset: string, logger: ILogger ) = // handle the event // Program.fs, configure the host with web jobs let configureWebJobs (builder:IHostBuilder) = builder.ConfigureWebJobs(fun b -> b.AddAzureStorageCoreServices() |> ignore b.AddEventHubs() |> ignore)
Notes:
- There is some magic going on, because in Program.fs you do not say what you actually want to listen to, you just say "I want to enable Event Hubs Handling", and then you decorate some method of a class with attributes, which indicate that you want to listen to an event hub
- The EventHubTrigger insists on getting a key to a connection string environment variable, and fetch the value when it decides ...
- You seem to need to specify also a FunctionName in addition to the method name
- Any configuration settings for the EventHubTrigger are hidden away, you need to know what environment variables to configure, which are picked "automatigally" by the framework ;)
New code:
// Api.Functions.fs, handle the event let processEvent log partitionId (cancellationToken: CancellationToken) event = // handle the event () |> Async.retn // Api.Wiring.fs, instantiate the processor let eventHubProcessor1 = EventHubProcessorDef.create "EventHubProcessor1" EnvVars.appName EnvVars.EventHubs.checkpointStorageConnectionString EnvVars.EventHubs.eventHubConnectionString EnvVars.EventHubs.consumerGroup EnvVars.EventHubs.eventBatchMaximumCount EnvVars.EventHubs.assignedPartitionIds EnvVars.EventHubs.defaultStartingPosition EventHubHandlers.processEvent // Program.fs, configure the host with the processors |> HostBuilder.configureEventHubProcessors EnvVars.appName [ EventHubProcessors.eventHubProcessor1; EventHubProcessors.eventHubProcessor2 ]
Event Hub Processor Implementation:
module Framework.AzureEventHubs.EventProcessing open System open System.Threading.Tasks open System.Collections.Generic open System.Threading open Azure.Messaging.EventHubs.Consumer open Azure.Storage.Blobs open Azure.Messaging.EventHubs open Azure.Messaging.EventHubs.Primitives open Framework open Framework.AzureEventHubs.LogEvents open Framework.Logging.StructuredLog // https://github.com/Azure/azure-sdk-for-net/blob/main/sdk/eventhub/Azure.Messaging.EventHubs/samples/Sample08_CustomEventProcessor.md // Event Processor which considers assigned partitions type AssignablePartitionProcessor ( log: Log, name: string, storageClient: BlobContainerClient, assignedPartitions: string[] option, eventBatchMaximumCount: int, consumerGroup: string, connectionString: string, clientOptions: EventProcessorOptions, processEvent: Log -> string -> CancellationToken -> EventData -> Async<unit>, processError: Log -> string option -> string -> CancellationToken -> Exception -> Async<unit> ) = inherit PluggableCheckpointStoreEventProcessor<EventProcessorPartition>( BlobCheckpointStore(storageClient), eventBatchMaximumCount, consumerGroup, connectionString, clientOptions ) // Workaround, see https://github.com/dotnet/fsharp/issues/12448 ... member this.BaseListPartitionIdsAsync ( connection: EventHubConnection, cancellationToken: CancellationToken ) : Task<string[]> = base.ListPartitionIdsAsync(connection, cancellationToken) override this.ListPartitionIdsAsync ( connection: EventHubConnection, cancellationToken: CancellationToken ) : Task<string[]> = match assignedPartitions with | Some assignedPartitions -> assignedPartitions |> Task.FromResult | None -> this.BaseListPartitionIdsAsync(connection, cancellationToken) // Workaround, see https://github.com/dotnet/fsharp/issues/12448 ... member this.BaseListOwnershipAsync (cancellationToken: CancellationToken) : Task<IEnumerable<EventProcessorPartitionOwnership>> = base.ListOwnershipAsync(cancellationToken) override this.ListOwnershipAsync (cancellationToken: CancellationToken) : Task<IEnumerable<EventProcessorPartitionOwnership>> = match assignedPartitions with | Some assignedPartitions -> assignedPartitions |> Seq.map (fun partition -> EventProcessorPartitionOwnership( FullyQualifiedNamespace = this.FullyQualifiedNamespace, EventHubName = this.EventHubName, ConsumerGroup = this.ConsumerGroup, PartitionId = partition, OwnerIdentifier = this.Identifier, LastModifiedTime = DateTimeOffset.UtcNow )) |> Task.FromResult | None -> this.BaseListOwnershipAsync(cancellationToken) // Workaround, see https://github.com/dotnet/fsharp/issues/12448 ... member this.BaseClaimOwnershipAsync ( desiredOwnership: IEnumerable<EventProcessorPartitionOwnership>, cancellationToken: CancellationToken ) : Task<IEnumerable<EventProcessorPartitionOwnership>> = base.ClaimOwnershipAsync(desiredOwnership, cancellationToken) override this.ClaimOwnershipAsync ( desiredOwnership: IEnumerable<EventProcessorPartitionOwnership>, cancellationToken: CancellationToken ) : Task<IEnumerable<EventProcessorPartitionOwnership>> = // Warning: if the match is removed, and only the code in the Some part is left => High CPU utilization if no assignedPartitions defined! // for more info see https://github.com/Azure/azure-sdk-for-net/issues/39603 match assignedPartitions with | Some _ -> desiredOwnership |> Seq.iter (fun ownership -> ownership.LastModifiedTime <- DateTimeOffset.UtcNow) desiredOwnership |> Task.FromResult | None -> this.BaseClaimOwnershipAsync(desiredOwnership, cancellationToken) // Workaround, see https://github.com/dotnet/fsharp/issues/12448 ... member this.BaseUpdateCheckpointAsync ( partitionId: string, offset: int64, sequenceNumber: Nullable<int64>, cancellationToken: CancellationToken ) = base.UpdateCheckpointAsync(partitionId, offset, sequenceNumber, cancellationToken) // used in the OnProcessingEventBatchAsync member, calculate once member private this.EventHubFullName = Subscribing.createEventHubFullPath this.FullyQualifiedNamespace this.EventHubName this.ConsumerGroup // https://learn.microsoft.com/en-us/dotnet/api/azure.messaging.eventhubs.eventprocessorclient.onprocessingeventbatchasync?view=azure-dotnet#remarks override this.OnProcessingEventBatchAsync ( events: IEnumerable<EventData>, partition: EventProcessorPartition, cancellationToken: CancellationToken ) : Task = task { try if not (isNull events || events |> Seq.isEmpty) then do! events |> Seq.map (fun event -> async { Subscribing.checkEventEnDequeueTime log this.EventHubFullName event do! processEvent log partition.PartitionId cancellationToken event }) |> Async.Sequential |> Async.Ignore |> Async.StartAsTask :> Task let lastEvent = events |> Seq.last do! this.BaseUpdateCheckpointAsync( partition.PartitionId, lastEvent.Offset, lastEvent.SequenceNumber, cancellationToken ) with ex -> // It is very important that you always guard against exceptions in // your handler code; the processor does not have enough // understanding of your code to determine the correct action to take. // Any exceptions from your handlers go uncaught by the processor and // will NOT be redirected to the error handler. // // In this case, the partition processing task will fault and be restarted // from the last recorded checkpoint. log.Exception (int EventId.EventProcessorError, string EventId.EventProcessorError) "OnProcessingEventBatchAsync: Exception while processing events: {ex}" ex [| ex |] // bubble up, which will kill the background service and result in Health Check alert // alternative is to log and "swallow" the exception here, but then the processor will go in infinite loop ... // NOT a good idea, raising exception here invokes OnProcessingErrorAsync, which causes the host to get restarted automatically, and the health check does not detect this .. // ex.Reraise() } // https://learn.microsoft.com/en-us/dotnet/api/azure.messaging.eventhubs.eventprocessorclient.onprocessingerrorasync?view=azure-dotnet#remarks override this.OnProcessingErrorAsync ( ex: Exception, partition: EventProcessorPartition, operationDescription: string, cancellationToken: CancellationToken ) : Task = task { try let partitionId = partition |> Option.ofObj |> Option.map _.PartitionId do! processError log partitionId operationDescription cancellationToken ex |> Async.StartAsTask with wex -> // It is very important that you always guard against exceptions // in your handler code; the processor does not have enough // understanding of your code to determine the correct action to // take. Any exceptions from your handlers go uncaught by the // processor and will NOT be handled in any way. // // In this case, unhandled exceptions will not impact the processor // operation but will go unobserved, hiding potential application problems. log.Exception (int EventId.EventProcessorError, string EventId.EventProcessorError) "OnProcessingErrorAsync: Exception occurred while processing events: {wex}. Original exception: {ex}." wex [| wex; ex |] // do! this.StopProcessingAsync(cancellationToken) // bubble up, which will kill the background service and result in Health Check alert // alternative is to log and "swallow" the exception here, but then the processor will go in infinite loop ... // NOT a good idea, the host gets restarted automatically, and the health check does not detect this .. // ex.Reraise() } /// Starts the Event Processor let startConsumeEvents (name: string) (checkpointStorageConnectionString: string) (checkpointBlobContainerName: string) (eventHubConnectionString: string) (consumerGroup: string) (eventBatchMaximumCount: int) (assignedPartitionIds: string[] option) (defaultStartingPosition: EventPosition) (processEvent: Log -> string -> CancellationToken -> EventData -> Async<unit>) (processError: Log -> string option -> string -> CancellationToken -> Exception -> Async<unit>) (started: ManualResetEvent) (log: Log) : (IDictionary<string, obj> -> CancellationToken -> Async<unit>) = fun state cancellationToken -> async { let blobContainerClient = BlobContainerClient(checkpointStorageConnectionString, checkpointBlobContainerName) // automatically create container if it does not exist do! blobContainerClient.CreateIfNotExistsAsync() |> Async.AwaitTask |> Async.Ignore let options = EventProcessorOptions() // TODO: Customize some of them? options.DefaultStartingPosition <- defaultStartingPosition let processor = AssignablePartitionProcessor( log, name, blobContainerClient, assignedPartitionIds, eventBatchMaximumCount, consumerGroup, eventHubConnectionString, options, processEvent, processError ) state.Add("processor", processor) log.Info (int EventId.EventProcessorStarted, string EventId.EventProcessorStarted) "Starting with config:\n\ \tEventHubNamespace/Name/ConsumerGroup = {eventHubNamespace}/{eventHubName}/{consumerGroup}\n\ \tBlobContainerClient.Uri = {blobContainerClientUri}\n\ \tAssignedPartitionIds = {assignedPartitionIds}\n\ \tEventBatchMaximumCount = {eventBatchMaximumCount}\n\ \tOptions.PrefetchCount = {prefetchCount}\n\ \tOptions.PrefetchSizeInBytes = {prefetchSizeInBytes}\n\ \tOptions.MaximumWaitTime = {maximumWaitTime}\n\ \tOptions.TrackLastEnqueuedEventProperties = {trackLastEnqueuedEventProperties}\n\ \tOptions.DefaultStartingPosition = {defaultStartingPosition}\n\ \tOptions.LoadBalancingStrategy = {loadBalancingStrategy}\n\ \tOptions.LoadBalancingUpdateInterval = {loadBalancingUpdateInterval}\n\ \tOptions.PartitionOwnershipExpirationInterval = {partitionOwnershipExpirationInterval}\n\ \tOptions.RetryOptions.Mode = {retryOptionsMode}\n\ \tOptions.RetryOptions.Delay = {retryOptionsDelay}\n\ \tOptions.RetryOptions.MaximumDelay = {retryOptionsMaximumDelay}\n\ \tOptions.RetryOptions.MaximumRetries = {retryOptionsMaximumRetries}\n\ \tOptions.RetryOptions.TryTimeout = {retryOptionsTryTimeout}\n\ \tOptions.RetryOptions.CustomRetryPolicy = {retryOptionsCustomRetryPolicy}\n\ " [| (processor.FullyQualifiedNamespace |> String.replace ".servicebus.windows.net" "") processor.EventHubName processor.ConsumerGroup blobContainerClient.Uri $"%A{assignedPartitionIds}" eventBatchMaximumCount options.PrefetchCount options.PrefetchSizeInBytes options.MaximumWaitTime options.TrackLastEnqueuedEventProperties options.DefaultStartingPosition options.LoadBalancingStrategy options.LoadBalancingUpdateInterval options.PartitionOwnershipExpirationInterval options.RetryOptions.Mode options.RetryOptions.Delay options.RetryOptions.MaximumDelay options.RetryOptions.MaximumRetries options.RetryOptions.TryTimeout options.RetryOptions.CustomRetryPolicy |] do! processor.StartProcessingAsync(cancellationToken) |> Async.AwaitTask started.Set() |> ignore } /// Stops the Event Processor let stopConsumeEvents name (started: ManualResetEvent) (log: Log) (state: IDictionary<string, obj>) : (CancellationToken -> Async<unit>) = fun cancellationToken -> async { let processor = state["processor"] :?> AssignablePartitionProcessor do! processor.StopProcessingAsync(cancellationToken) |> Async.AwaitTask log.Info (int EventId.EventProcessorStopped, string EventId.EventProcessorStopped) "Event Hub Processor was stopped" [||] started.Reset() |> ignore }
Notes:
- You define a function, then instantiate processor(s) with the function and a bunch of configuration values, which you can fetch by yourself, and then you tell the HostBuilder to configure your processor(s) - pretty straightforward, no reflection, no magic
- The implementation of the EventHubProcessor is using EventProcessorClient in the background, which does everything required, including the same checkpointing in blob storage as done by WebJobs SDK. The whole implementation is less than 350 LOCs ..
Queue Processor
Old code:
type WebJobs(...) = [<FunctionName("RetryHandleXyzEvent")>] member _.RetryHandleXyzEvent ([<QueueTrigger("xyz-events-retry-queue", Connection = "StorageQueueConnectionStringKey")>] msg: string) (logger: ILogger) = // handle the message // Program.fs, configure the host with web jobs let configureWebJobs (builder:IHostBuilder) = builder.ConfigureWebJobs(fun b -> b.AddAzureStorageCoreServices() |> ignore b.AddAzureStorageQueues() |> ignore)
New code:
// Api.Functions.fs, handle the event let processMessage log (cancellationToken: CancellationToken) (msg: QueueMessage) : Async<unit> = // handle the queue message () |> Async.retn // Api.Wiring.fs, instantiate the processor let queueProcessor1 = QueueProcessorDef.create "QueueProcessor1" EnvVars.appName EnvVars.Queues.queueStorageConnectionString EnvVars.Queues.queueName EnvVars.Queues.messageBatchMaximumCount EnvVars.Queues.visibilityTimeout EnvVars.Queues.maxPollingInterval EnvVars.Queues.maxDequeueCount EnvVars.Queues.defaultBackOffIntervalMs QueueHandlers.processMessage // Program.fs, configure the host with the processors |> HostBuilder.configureQueueProcessors EnvVars.appName [ QueueProcessors.queueProcessor1 ]
Queue Processor Implementation:
module Framework.AzureStorageQueues.QueueProcessing open System open System.Threading open Azure.Storage.Queues open Azure.Storage.Queues.Models open Framework.AzureStorageQueues.BasicOperations open Framework.AzureStorageQueues.LogEvents open Framework.Logging.StructuredLog open Framework.ExceptionHandling let private doProcessMessage (log: Log) (name: string) (cancellationToken: CancellationToken) (maxDequeueCount: int) (queueClient: QueueClient) (poisonQueueClient: QueueClient) (processMessage: Log -> CancellationToken -> QueueMessage -> Async<unit>) (msg: QueueMessage) = async { if msg.DequeueCount > maxDequeueCount then // message has been retried too many times => move it the the poison queue do! poisonQueueClient.SendMessageAsync(msg.Body, cancellationToken = cancellationToken) |> Async.AwaitTask |> Async.Ignore log.Debug (int EventId.QueueMessageProcessingError, string EventId.QueueMessageProcessingError) "Message dequeue count = {messageDequeueCount} > maxDequeueCount = {maxDequeueCount} => message moved to poison queue {poisonQueueUri}" [| msg.DequeueCount; maxDequeueCount; poisonQueueClient.Uri |] do! queueClient.DeleteMessageAsync(msg.MessageId, msg.PopReceipt, cancellationToken) |> Async.AwaitTask |> Async.Ignore log.Debug (int EventId.QueueMessageProcessingError, string EventId.QueueMessageProcessingError) "Message deleted from queue {queueUri}" [| queueClient.Uri |] else // normal processing // NOTE: try-with is used instead of Async.Catch to catch also cases when processMessage throws exception outside of an Async block ... try do! processMessage log cancellationToken msg do! queueClient.DeleteMessageAsync(msg.MessageId, msg.PopReceipt, cancellationToken) |> Async.AwaitTask |> Async.Ignore log.Debug (int EventId.QueueMessageBeingProcessed, string EventId.QueueMessageBeingProcessed) "Deleted message with id = {messageId} and dequeue count = {messageDequeueCount} in queue {queueUri} after successful processing" [| msg.MessageId; msg.DequeueCount; queueClient.Uri |] with ex -> // message remains in the queue and will become again visible after visibilityTimeout log.Exception (int EventId.QueueMessageProcessingError, string EventId.QueueMessageProcessingError) "Queue message processing failed. Retrying {remainingDequeueCount} more times, then the message will be moved to poison queue. Queue: {queueUri}; Message Body: {messageBody}" ex [| (int64 maxDequeueCount - msg.DequeueCount) queueClient.Uri msg |> QueueMessage.toString |] } |> Async.Catch |> Async.map (function | Choice1Of2 _ -> () | Choice2Of2 ex -> // log the message body log.Exception (int EventId.QueueMessageProcessingError, string EventId.QueueMessageProcessingError) "Error occurred while performing auxiliary queue message processing (e.g. DeleteMessageAsync). Message Body: {messageBody}" ex [| msg |> QueueMessage.toString |] // and propagate up the exception ex.Reraise()) /// Starts an infinite loop for receiving queue messages let consumeMessages (name: string) (storageConnectionString: string) (queueName: string) (poisonQueueName: string) (messageBatchMaximumCount: int) (visibilityTimeout: TimeSpan) (maxPollingInterval: TimeSpan) (maxDequeueCount: int) (defaultBackOffIntervalMs: int) (processMessage: Log -> CancellationToken -> QueueMessage -> Async<unit>) (started: ManualResetEvent) (log: Log) : (CancellationToken -> Async<unit>) = fun cancellationToken -> async { let options = QueueClientOptions() // TODO: Configure options? let queueClient = QueueClient(storageConnectionString, queueName, options) do! queueClient.CreateIfNotExistsAsync() |> Async.AwaitTask |> Async.Ignore let poisonQueueClient = QueueClient(storageConnectionString, poisonQueueName, options) do! poisonQueueClient.CreateIfNotExistsAsync() |> Async.AwaitTask |> Async.Ignore let mutable backoffTimeMs = defaultBackOffIntervalMs started.Set() |> ignore log.Info (int EventId.QueueProcessorStarting, string EventId.QueueProcessorStarting) "Starting with config:\n\ \tUri = {queueClientUri}\n\ \tMessageBatchMaximumCount = {messageBatchMaximumCount}\n\ \tVisibilityTimeout = {visibilityTimeout}\n\ \tMaxPollingInterval = {maxPollingInterval}\n\ \tMaxDequeueCount = {maxDequeueCount}\n\ \tOptions.MessageEncoding = {messageEncoding}\n\ \tOptions.Retry.Mode = {retryMode}\n\ \tOptions.Retry.Delay = {retryDelay}\n\ \tOptions.Retry.MaxDelay = {retryMaxDelay}\n\ \tOptions.Retry.MaxRetries = {retryMaxRetries}\n\ \tOptions.Retry.NetworkTimeout = {retryNetworkTimeout}\n\ \tPoisonQueueName = {poisonQueueName}\n\ " [| queueClient.Uri messageBatchMaximumCount visibilityTimeout maxPollingInterval maxDequeueCount options.MessageEncoding options.Retry.Mode options.Retry.Delay options.Retry.MaxDelay options.Retry.MaxRetries options.Retry.NetworkTimeout poisonQueueName |] while not cancellationToken.IsCancellationRequested do try // any exception in this block will be caught and logged // because if propagated up they will stop the queue processor/background service => health check alert, // but no automatic restart is currently possible ... let! response = queueClient.ReceiveMessagesAsync( maxMessages = messageBatchMaximumCount, visibilityTimeout = visibilityTimeout, cancellationToken = cancellationToken ) |> Async.AwaitTask if response.HasValue && response.Value.Length > 0 then do! response.Value |> Seq.map (fun msg -> async { // central check for endequeTime // TODO: Enable this once a proper config/solution is found for the invisibility period, which is *not* exposed as a QueueMessage property (only InsertedOn and NextVisibleOn, but VisibleOn is needed ). See fore more info https://github.com/Azure/azure-sdk-for-net/issues/40147 // do checkMessageEnDequeueTime log queueClient.Name backoffTimeMs msg return! doProcessMessage log name cancellationToken maxDequeueCount queueClient poisonQueueClient processMessage msg }) |> Async.Sequential |> Async.Ignore backoffTimeMs <- defaultBackOffIntervalMs log.Debug (int EventId.WaitingForQueueMessages, string EventId.WaitingForQueueMessages) "{messageCount} messages successfully processed from queue {queueUri}. Waiting for {backoffTimeMs} milliseconds before checking again ..." [| response.Value.Length; queueClient.Uri; backoffTimeMs |] else backoffTimeMs <- Math.Min(backoffTimeMs * 2, int maxPollingInterval.TotalMilliseconds) log.Debug (int EventId.WaitingForQueueMessages, string EventId.WaitingForQueueMessages) "No messages found in queue {queueUri}. Waiting for {backoffTimeMs} milliseconds before checking again ..." [| queueClient.Uri; backoffTimeMs |] do! Async.Sleep(backoffTimeMs) with ex -> log.Exception (int EventId.QueueMessageProcessingError, string EventId.QueueMessageProcessingError) "Exception while processing messages: {ex}" ex [| ex |] }
Notes:
- The implementation is based on the default QueueClient.ReceiveMessagesAsync approach of handling queue messages with the Azure SDK, invoked in an infinite loop with some Thread.Sleep sprinkled in it ... The whole implementation is about 200 LOCs.
Timer Processor
Old code:
type WebJobs(...) = [<FunctionName("DoSomethingRegularly")>] member this.ExpireCustomerDocuments ( [<TimerTrigger("%DoSomethingRegularlyCrontab%")>] timer: TimerInfo, logger: ILogger ) = // do something // Program.fs, configure the host with web jobs let configureWebJobs (builder:IHostBuilder) = builder.ConfigureWebJobs(fun b -> b.AddAzureStorageCoreServices() |> ignore b.AddTimers() |> ignore)
Notes:
- Some exotic placeholder format of the crontab placeholder ..
New code:
// Api.Functions.fs, handle the event let processTimer1 log (cancellationToken: CancellationToken) (toProcessOn: DateTime) : Async<unit> = // do something () |> Async.retn // Api.Wiring.fs, instantiate the processor let timerProcessor1 = TimerProcessorDef.create "TimerProcessor1" EnvVars.appName EnvVars.Timers.timerProcessorQueueStorageConnectionString (TimeSpan.FromSeconds(10) |> Some) "* * * * *" TimerHandlers.processTimer1 // Program.fs, configure the host with the processors |> HostBuilder.configureTimers EnvVars.appName [ Timers.timerProcessor1; Timers.timerProcessor2 ]
Timer Processor Implementation:
module Framework.AzureStorageQueues.TimerProcessing open System open System.Text open System.Threading open Azure.Storage.Queues open NCrontab open Framework open Framework.AzureStorageQueues.LogEvents open Framework.Logging.StructuredLog type TimerMessage = { ToProcessOn: DateTime } let private maxTimeoutPeriod = TimeSpan.FromDays(3) // could be up to 7 (messages are deleted after 7 days from the queue), but calculating possible downtimes/system recovery let private calculateNextCheckOn (toProcessOn: DateTime) (now: DateTime) = if toProcessOn - now < maxTimeoutPeriod then toProcessOn else now + maxTimeoutPeriod let private createAndSendNextTimerMessage (queueClient: QueueClient) (crontab: CrontabSchedule) = async { let toProcessOn = crontab.GetNextOccurrence(DateTime.UtcNow) let msg = { ToProcessOn = toProcessOn } let encodedMessage = msg |> Json.serialize |> Encoding.base64Encode Encoding.UTF8 let nextCheckOn = calculateNextCheckOn toProcessOn DateTime.UtcNow let visibilityTimeout = nextCheckOn - DateTime.UtcNow // message should become visible always slightly after ToProcessOn because sending the msg to the queues takes some ms do! queueClient.SendMessageAsync(encodedMessage, visibilityTimeout, TimeSpan.FromSeconds(-1)) // -1 second indicates "infinite" message TTL (i.e. 7 days) |> Async.AwaitTask |> Async.Ignore return msg } /// Starts an infinite loop for receiving queue timer messages let private consumeMessages (log: Log) (name: string) (queueClient: QueueClient) (cancellationToken: CancellationToken) (crontab: CrontabSchedule) processMessage (messageBatchMaximumCount: int) (visibilityTimeout: TimeSpan) = async { let mutable firstRun = true while not cancellationToken.IsCancellationRequested do try // any exception in this block will be caught and logged // because if propagated up they will stop the timer processor/background service => health check alert, // but no automatic restart is currently possible ... let! response = queueClient.ReceiveMessagesAsync( maxMessages = messageBatchMaximumCount, visibilityTimeout = visibilityTimeout, cancellationToken = cancellationToken ) |> Async.AwaitTask let! nextCheckOn = if response.HasValue && (response.Value |> Seq.tryHead |> Option.isSome) then // the message became visible, so it needs to be processed or rescheduled async { let msg = response.Value |> Seq.head // we expect/process only 1 timer message per timer queue! let decodedMessage = msg.Body.ToString() |> Encoding.base64Decode Encoding.UTF8 |> Json.deserialize log.Debug (int EventId.TimerMessageBeingProcessed, string EventId.TimerMessageBeingProcessed) "Timer message received in queue {queueUri} with ToProcessOn = {toProcessOn}." [| queueClient.Uri; decodedMessage.ToProcessOn |> DateTime.toStringIso |] let! toProcessOn = // if ToProcessOn in the past => ready to process! if decodedMessage.ToProcessOn <= DateTime.UtcNow then async { do! processMessage log cancellationToken decodedMessage.ToProcessOn log.Debug (int EventId.TimerMessageBeingProcessed, string EventId.TimerMessageBeingProcessed) "Timer message in queue {queueUri} successfully processed." [| queueClient.Uri |] // delete this (all) message(s) do! queueClient.ClearMessagesAsync() |> Async.AwaitTask |> Async.Ignore log.Debug (int EventId.TimerMessageBeingProcessed, string EventId.TimerMessageBeingProcessed) "Deleted timer message in queue {queueUri}." [| queueClient.Uri |] // schedule next execution in a new message let! newTimerMessage = createAndSendNextTimerMessage queueClient crontab log.Debug (int EventId.TimerMessageBeingProcessed, string EventId.TimerMessageBeingProcessed) "New timer message with ToProcessOn = {toProcessOn} sent to queue {queueUri}." [| newTimerMessage.ToProcessOn; queueClient.Uri |] return newTimerMessage.ToProcessOn } else // message visible before ToProcessOn .. make invisible again async { let nextCheckOn = calculateNextCheckOn decodedMessage.ToProcessOn DateTime.UtcNow // message should become visible always slightly after ToProcessOn because sending the msg to the queues takes some ms let visibilityTimeout = nextCheckOn - DateTime.UtcNow if visibilityTimeout > TimeSpan.Zero then do! queueClient.UpdateMessageAsync( msg.MessageId, msg.PopReceipt, visibilityTimeout = visibilityTimeout ) |> Async.AwaitTask |> Async.Ignore log.Debug (int EventId.TimerMessageBeingProcessed, string EventId.TimerMessageBeingProcessed) "Timer message in queue {queueUri} with ToProcessOn = {toProcessOn} became visible before ToProcessOn. The message's invisibility was extended." [| queueClient.Uri; decodedMessage.ToProcessOn |> DateTime.toStringIso |] return decodedMessage.ToProcessOn } return calculateNextCheckOn toProcessOn DateTime.UtcNow } elif firstRun then log.Debug (int EventId.WaitingForTimerMessage, string EventId.WaitingForTimerMessage) "No visible message in queue {queueUri}, but first run, so await schedule ..." [| queueClient.Uri |] let toProcessOn = crontab.GetNextOccurrence(DateTime.UtcNow) calculateNextCheckOn toProcessOn DateTime.UtcNow |> Async.retn else log.Debug (int EventId.WaitingForTimerMessage, string EventId.WaitingForTimerMessage) "No visible message in queue {queueUri} after sleep but there should have been one, maybe delayed, so check frequently ..." [| queueClient.Uri |] DateTime.UtcNow.AddMinutes(1) |> Async.retn let sleepTimeSpan = (nextCheckOn - DateTime.UtcNow) + TimeSpan.FromSeconds(1) // add 1 second to make 100% sure sleep is over only after the message has already become visible log.Debug (int EventId.WaitingForTimerMessage, string EventId.WaitingForTimerMessage) "Sleeping {sleepTimeSpan} before checking again in queue {queueUri}" [| sleepTimeSpan; queueClient.Uri |] if sleepTimeSpan > TimeSpan.Zero then do! Async.Sleep(sleepTimeSpan) with ex -> log.Exception (int EventId.TimerMessageProcessingError, string EventId.TimerMessageProcessingError) "Exception while processing messages: {ex}" ex [| ex |] firstRun <- false } /// Creates timer queue messages if missing and starts an infinite loop for receiving queue timer messages let createAndConsumeMessages (name: string) (crontab: CrontabSchedule) (storageConnectionString: string) (queueName: string) (visibilityTimeout: TimeSpan) (processMessage: Log -> CancellationToken -> DateTime -> Async<unit>) (started: ManualResetEvent) (log: Log) : (CancellationToken -> Async<unit>) = fun cancellationToken -> async { let options = QueueClientOptions() // TODO: Configure options? let queueClient = QueueClient(storageConnectionString, queueName, options) let! response = queueClient.CreateIfNotExistsAsync() |> Async.AwaitTask if not (isNull response) then // queue was just created let! timerMessage = createAndSendNextTimerMessage queueClient crontab log.Info (int EventId.TimerProcessorStarting, string EventId.TimerProcessorStarting) "New queue {queueUri} was created and a new timer message with ToProcessOn = {toProcessOn} was sent to it." [| queueClient.Uri; timerMessage.ToProcessOn |> DateTime.toStringIso |] else // queue was already existing let! response = queueClient.GetPropertiesAsync() |> Async.AwaitTask if response.HasValue && response.Value.ApproximateMessagesCount = 0 then // ApproximateMessagesCount, even though not exact, is guaranteed to have a value > 0 if there are messages. Additionally "Approximate messages count will give you an approximate count of total messages in a queue and will include both visible and invisible messages." let! timerMessage = createAndSendNextTimerMessage queueClient crontab log.Info (int EventId.TimerProcessorStarting, string EventId.TimerProcessorStarting) "Queue {queueUri} exists, but no timer message found in it. Created timer message with ToProcessOn = {toProcessOn}." [| queueClient.Uri; timerMessage.ToProcessOn |> DateTime.toStringIso; queueClient.Uri |] else log.Info (int EventId.TimerProcessorStarting, string EventId.TimerProcessorStarting) "Existing queue {queueUri} with timer message found." [| queueClient.Uri |] let messageBatchMaximumCount = 1 // 1 queue per timer processor, with 1 message per queue only started.Set() |> ignore log.Info (int EventId.TimerProcessorStarting, string EventId.TimerProcessorStarting) "Starting with config:\n\ \tUri = {queueClientUri}\n\ \tMessageBatchMaximumCount = {messageBatchMaximumCount}\n\ \tVisibilityTimeout = {visibilityTimeout}\n\ \tOptions.MessageEncoding = {messageEncoding}\n\ \tOptions.Retry.Mode = {retryMode}\n\ \tOptions.Retry.Delay = {retryDelay}\n\ \tOptions.Retry.MaxDelay = {retryMaxDelay}\n\ \tOptions.Retry.MaxRetries = {retryMaxRetries}\n\ \tOptions.Retry.NetworkTimeout = {retryNetworkTimeout}\n\ " [| queueClient.Uri messageBatchMaximumCount visibilityTimeout options.MessageEncoding options.Retry.Mode options.Retry.Delay options.Retry.MaxDelay options.Retry.MaxRetries options.Retry.NetworkTimeout |] do! consumeMessages log name queueClient cancellationToken crontab processMessage messageBatchMaximumCount visibilityTimeout }
Notes:
- The implementation is based on an Azure Storage Queue with a single message inside, which is made "invisible" for a certain period of time, which allows for surviving a process crash, having multiple instances running etc. The whole implementation is about 250 LOCs.
SignalR
Old code:
type WebJobs(...) = [<FunctionName("HandleAndPushToClient")>] member _.HandleAndPushToClient ( [<EventHubTrigger("", Connection = EnvVars.EventHubs.Xyz.connectionStringKey, ConsumerGroup = DependencyInjection.EventHubs.Xyz.consumerGroupForWebSocketNotification)>] msg: EventData, enqueuedTimeUtc: DateTime, sequenceNumber: Int64, offset: string, [<SignalR(HubName = DependencyInjection.SignalR.hubName, ConnectionStringSetting = "AzureSignalRConnectionString")>] signalRMessages: IAsyncCollector<SignalRMessage>, logger: ILogger ) = // transform some internal event to external // publish to SignalR using signalRMessages.AddAsync
Notes:
- The output triggers are generally a killer feature, which is really killing you - instead of invoking a very simple Azure SDK client method, you deal with IAsyncCollector and SignalRTrigger magic, which is completely unnecessary. Go figure out how to send message to a specific user or to all ...
New code (same as for EventHubProcessor above:
let serviceManager = SignalRClient.getServiceManager EnvVars.SignalR.connectionString let hub = SignalRClient.getHubContext serviceManager EnvVars.SignalR.azureSignalRHubName |> Async.RunSynchronously // TODO: Find a way to get rid of this let sendToUser = SignalRClient.sendToUser hub // Api.Functions.fs, handle the event let processEvent log sendToUser partitionId (cancellationToken: CancellationToken) event = async { // handle the event do! sendToUser "SomeTarget" "Some Message" "SomeUserId" }
Notes:
- The Azure SDK for sending SignalR Messages is very very straightforward, when you use it directly ...
Conclusion
Removing a layer of indirection has always generated great satisfaction in me. Not only does it make the whole application easier to understand, but you gain also a lot more control, and get to know the inner workings of the technology, without someone deciding something for you, or translating stuff like configuration for you.
MS seems to always try to make things easier for the developer (patronizing him?) by providing a magical and abstract framework which achieves exactly the opposite. My recommendation to MS would be to try make everything look like a stupid console application instead, with full control of the client developer, who is just using a bunch of simple "helper" functions (or class methods in OOP) from MS, and nothing more.
Hopefully someone can save some time doing something similar based on the ideas and code in this article!
-
Funny enough in the meantime MS seems to be trying to do something similar, by integrating WebJobs SDK into .NET 9's HostBuilder. ↩
Top comments (0)