Skip to content

Conversation

@GustavoCaso
Copy link

@GustavoCaso GustavoCaso commented Oct 5, 2020

Summary

At Shopify, we have been using different flavours of concurrency within ActiveJob for several years.
This year we have consolidated all of this functionality into one API method called concurrency.
After validating that this works for the scale of Shopify we decided to port the API upstream.

There have been others attempt to add locking to ActiveJob #33728

While that functionality has nothing wrong, we have identified that locking is not enough as a concurrency primitive.

At Shopify, we allow developers to specify multiple concurrency options for there jobs:

In all examples below, keys is the array of keys that will help build a unique concurrency key from the job arguments.

  • EndToEnd where a job holds a concurrency ticket from the moment gets enqueue until the job is performed. When any jobs that try to enter the queue, we check if the concurrent limit is reached, and if it is, we drop the job.
    concurrency(limit: 1, keys: ['checkout_id'])
class PaymentProcessingJob < ActiveJob # We only allow one job to be in the queue and processing at all times. # If a job tries to enter the queue while another job with the same checkout_id # is in the queue or performing, we drop the new job. # Do not want to charge the same checkout twice :) concurrency(limit: 1, keys: ['checkout_id']) def perform(params) # Logic end end
  • EnqueueLimit where a job holds a concurrency ticket from the moment gets enqueue until the jobs get dequeued. When any jobs that try to enter the queue, we check if the concurrent limit is reached and if it is, we drop the job.
    concurrency(limit: { enqueue: 1 }, keys: ['checkout_id'])

  • PerformLimit where a job holds a concurrency ticket from the moment gets dequeued until the jobs get performed. At the time of the dequeue, we check if the concurrency limit is reached and if so we put that job back into the Delayed queue
    concurrency(limit: { performed: 1 }, keys: ['checkout_id'])

class UpdateThemeSettingsReferencesJob < ActiveJob # We only allow one job to perform at any moment.  # This is often used as a form of throttling for expensive operations. # When a job from the head of the queue tries to start performing while another job  # with the same key is already performing, we re-enqueue the new job with a delay. concurrency( limit: { perform: 1 }, keys: ['shop_id', 'model', 'model_id'] ) def perform(params) # Logic  end end
  • BoundedStages is the combination of EnqueueLimit and PerformLimit
    concurrency(limit: { enqueue: 1, perform:1 }, keys: ['checkout_id'])
class ExpensiveIdempotentSyncJob < ActiveJob # This job syncs the data of this shop with an external service. # Since it is expensive, we want only one sync running at a time for each shop. # We need to enqueue this job whenever something changes that requires syncing. # However, if there is already an ExpensiveSyncJob in the queue, that job will sync all the changes made # before it starts running, so there is no need to enqueue duplicates. concurrency(limit: { enqueue: 1, perform: 1 }, keys: ['shop_id']) def perform(params) # Logic  end end

ActiveJob only care about the concurrency at the enqueue (EndToEnd and EnqueueLimit) steps for the perform cases (PerformLimit and BoundedStages) is up to the job adapters to implement it internally. So the adapters can decide to add the functionality or not 😄

Other Information

The timeout option allows specifying for how long a job can hold a concurrency ticket.

At Shopify, we allow other API options like perform_delay which configures the delay in which the job is placed back in the delayed queue when the PerformLimit is reached.

Here is an example of the API in use in one of our jobs:

concurrency(limit: { perform: 8, perform_delay: 300 }, keys: ['task_name']) 
@rails-bot rails-bot bot added the activejob label Oct 5, 2020
@GustavoCaso GustavoCaso force-pushed the add-concurrency-to-active-job branch 4 times, most recently from c43ba58 to 072935c Compare October 7, 2020 12:14
Comment on lines 81 to 75
Copy link
Author

@GustavoCaso GustavoCaso Oct 7, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this might cause some issues down the line, because not all enqueue jobs have a hash for arguments.

I believe we can overcome this by changing this API to use a block rather than a set of keys.
Currently, at Shopify we use the set of keys. Why? So developers do not get extra fancy with the block logic and it also allows us to raise an Exception if the specify a key that is no longer in the job params.

This is how the API would look like using a block:

class MyJob < ActiveJob::Base concurrency(limit: 1, key: -> { |job| job.arguments[0] }) end

I still believe we have to add the job_class as part of the queue to guarantee the uniqueness between jobs. Two jobs could enqueue with the same params.

@GustavoCaso GustavoCaso force-pushed the add-concurrency-to-active-job branch from 072935c to b10bcba Compare October 9, 2020 16:37
@GustavoCaso GustavoCaso force-pushed the add-concurrency-to-active-job branch from b10bcba to dd148d0 Compare November 25, 2020 20:08
Copy link
Member

@tenderlove tenderlove left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems fine to me. Would it be possible to give examples of what kinds of jobs might use the different concurrency strategies? I think that would help other folks on the core team understand the use cases at Shopify (and why we need this). It would also be helpful for other folks who may need this feature but don't know it yet.

@GustavoCaso GustavoCaso force-pushed the add-concurrency-to-active-job branch from dd148d0 to 9363dd0 Compare November 25, 2020 20:13
@GustavoCaso GustavoCaso marked this pull request as ready for review November 25, 2020 21:10
@GustavoCaso GustavoCaso changed the title POC for adding concurrency to ActiveJob Add concurrency to ActiveJob Nov 25, 2020
@dhh
Copy link
Member

dhh commented Nov 26, 2020

This is great! Love to have this sort of exclusive locking built in. Questions: Have you ever used a different limit than 1? When would you want to use a limit of 2?

Reason I ask is primarily because I think the API could be simplified like so:

class UpdateThemeSettingsReferencesJob < ActiveJob perform_exclusively_with :shop_id, :model, :model_id end class PaymentProcessingJob < ActiveJob enqueue_exclusively_with :checkout_id end class ExpensiveIdempotentSyncJob < ActiveJob enqueue_and_perform_exclusively_with :shop_id end

Even if a limit greater than 1 had a demonstrable use case, I'd still use this API, but tag on kwargs:

class UpdateThemeSettingsReferencesJob < ActiveJob perform_exclusively_with :shop_id, :model, :model_id, limit: 3 end class PaymentProcessingJob < ActiveJob enqueue_exclusively_with :checkout_id, limit: 3 end class ExpensiveIdempotentSyncJob < ActiveJob enqueue_and_perform_exclusively_with :shop_id, limit: 3 end

But I'd prefer not even to expose the limit unless we have a proven case for it.

@GustavoCaso
Copy link
Author

GustavoCaso commented Nov 30, 2020

@dhh Thanks for the quick response.

Have you ever used a different limit than 1?

Yes, we have a bunch of jobs that require values higher than 1

class UpdateThemeSettingsReferencesJob < ApplicationJob concurrency(limit: { perform: 3 }, keys: ['shop_id']) end  class PublishJob < ApplicationJob concurrency(limit: { enqueue: 50, perform: 10 }, keys: ['shop_id']) end

Usually, developers that only care about total parallel execution uses the perform option. Think of limitations with the job perform method: The job needs a lot of data, we reach to the database, under normal circumstances this is not a problem, but under high load, this could cause the overall system to respond slower. Limiting the total number of these jobs that run in parallel helps us to protect the system.

The enqueue case serves as a way to avoid the queues to grow infinitely, allowing developers to put on a cap on the max jobs in the queue; once we reach that limit, we start to drop those extra jobs. We are protecting the memory usage of the backend that stores job information.

The use of enqueue and perform is the combination of the case above outlines but for a single job.

The developer would have to make the decision on which is the best strategy for the job.

Suppose we validate that these use cases are legit and valid for the entire Rails community. In that case, we might have to update the API to allow setting a different limit base on the developer/application needs.

class UpdateThemeSettingsReferencesJob < ActiveJob perform_exclusively_with :shop_id, :model, :model_id, limit: 3 end  class PublishJob < ActiveJob enqueue_exclusively_with :checkout_id, limit: 50 end  class ExpensiveIdempotentSyncJob < ActiveJob enqueue_and_perform_exclusively_with :shop_id, enqueue_limit: 3, perform_limit: 2 end

I would assume that calling any of the API methods perform_exclusively_with, enqueue_exclusively_with, perform_exclusively_with, and enqueue_and_perform_exclusively_with without limit option will default to 1, simplifying the majority of uses cases.

Look forward to hearing your thoughts.

@kaspth
Copy link
Contributor

kaspth commented Nov 30, 2020

I like these stated use cases, but I wish they were a little clearer in the code. Also think there's something to extracting the uniqueness aspect of the job to a separate shared statement, so perhaps something like:

class UpdateThemeSettingsReferencesJob < ApplicationJob unique_by :all # Default. top_concurrent_perform_at limit: 3 end  class PublishJob < ApplicationJob unique_by :first # Only consider this unique through job.arguments.first, also supports :shop_id, etc. reject_enqueues_at limit: 50 end  class ExpensiveIdempotentSyncJob < ApplicationJob reject_enqueues_at limit: 3 top_concurrent_perform_at limit: 2 end

Not super liking my reject_enqueues_at and top_concurrent_perform_at. Anyway, wanted to suggest the unique_by aspect and a potential default. Passing the buck to @dhh ⛳️🏌️‍♂️

@GustavoCaso
Copy link
Author

GustavoCaso commented Dec 3, 2020

@dhh @kaspth @rafaelfranca @tenderlove

With the last commit 6f90615

I splitted the APi from concurrency(limit: , keys: , timeout: ) to enqueue_exclusively_with, perform_exclusively_with, enqueue_and_perform_exclusively_with, and exclusively_with

I added all the new information to the payload of the job:

def serialize super.merge( "concurrency_enqueue_limit" => concurrency_enqueue_limit, "concurrency_perform_limit" => concurrency_perform_limit, "concurrency_key" => concurrency_key, "concurrency_timeout" => concurrency_timeout, "concurrency_strategy" => concurrency_strategy ) end

I like having all the information in the payload so the different background job libraries can implement concurrency its own way by checking all concurrency information from the job payload.

I thought we could actually encapsulate all of this into separate Strategy classes: ActiveJob::Concurrency::Strategy::Enqueue, ActiveJob::Concurrency::Strategy::Perform, ActiveJob::Concurrency::Strategy::EnqueueAndPerform, ActiveJob::Concurrency::Strategy::EndToEnd and then using the ActiveJob::Serializers::ObjectSerializer we could actually dry the numbers of key added by concurrent to the job payload.

I haven't added that last part, because I first want to know if you guys like this approach and this direction we are going.
If that is the case I'm happy to dry up the job payloads key and only have one key concurrency_startegy which will hold all the information about that job concurrency options.

The actual key once serialize would look something like this:

{ # Rest of serialize job "concurrency_strategy" => { "_aj_serialized" => "ActiveJob::Concurrency::Strategy::Enqueue", "enqueue_limit" => 1, "dequeue_limit" => nil, "keys" => ["model", "model_id"] }, }

If you guys like the way this is going and the idea of having each strategy it is own class and just one key as part of the concurrency payload for a job I will make the necessary changes.

@GustavoCaso GustavoCaso force-pushed the add-concurrency-to-active-job branch from c72e343 to 6f90615 Compare December 3, 2020 15:00
@rafaelfranca
Copy link
Member

I thought we could actually encapsulate all of this into separate Strategy classes: ActiveJob::Concurrency::Strategy::Enqueue, ActiveJob::Concurrency::Strategy::Perform, ActiveJob::Concurrency::Strategy::EnqueueAndPerform, ActiveJob::Concurrency::Strategy::EndToEnd and then using the ActiveJob::Serializers::ObjectSerializer we could actually dry the numbers of key added by concurrent to the job payload.

There is overhead of adding new object serializers since we iterate over all of them to check. I also don't think the number of keys added to the payload are worth this and if a serializers would save that. If we are so worried with the storage space we could avoid adding concurrency_ prefix in the key and use a hash:

def serialize super.merge( "concurrency" => { "limit" => concurrency_enqueue_limit, "perform_limit" => concurrency_perform_limit, "key" => concurrency_key, "timeout" => concurrency_timeout, "strategy" => concurrency_strategy } ) end
@sambostock
Copy link
Contributor

sambostock commented Dec 11, 2020

Will it be possible to combine different concurrency logic? For instance, say I have

class ResourceProcessorJob < ApplicationJob def perform(resource_id) # process the resource using some API end end

and I want to:

  • enforce a maximum of 1 job enqueued with the same resource_id
  • enforce a maximum of 1 job performing with the same resource_id
  • enforce a maximum of 10 ResourceProcessorJobs performing, regardless of arguments (to not occupy all workers, and limit API load)

Would it be possible to express that logic using the API described above, and if so, how? It seems to me like they would conflict with each other, both because of the multiple limitations on performing concurrency (limit 1 and limit 10), and the differing methods of determining uniqueness (by resource_id, and ignoring arguments).

The combination question could also extend to subclasses. Should it be possible to inherit concurrency limits? Would subclasses add to their inherited limits or override them?

@rewritten
Copy link

Any update? This would be a huge feature if it allows cluster-wide uniqueness.

Can I help advancing it?

Base automatically changed from master to main January 14, 2021 17:02
@GustavoCaso
Copy link
Author

@rewritten I'm planning for this week on fixing the tests that are falling and add the documentation that is needed and hopefully we would be able to merge it

@GustavoCaso GustavoCaso force-pushed the add-concurrency-to-active-job branch 2 times, most recently from 753fb63 to a8ea49b Compare January 18, 2021 18:12
@GustavoCaso
Copy link
Author

GustavoCaso commented Jan 18, 2021

@sambostock @rafaelfranca I modified the code to allow to specify multiple concurrency strategies within the same job.

For the example outlined above #40337 (comment)

I want to:
- enforce a maximum of 1 job enqueued with the same resource_id
- enforce a maximum of 1 job performing with the same resource_id
- enforce a maximum of 10 ResourceProcessorJobs performing, regardless of arguments (to not occupy all workers, and limit API load)

With the last commit fe6435c we are able to specify the multiple concurrency strategies.

class ResourceProcessorJob < ApplicationJob exclusively_with(limit: 10) enqueue_exclusively_with(keys: ['resource_id']) perform_exclusively_with(keys: ['resource_id']) def perform(resource_id) # process the resource using some API end end
@GustavoCaso GustavoCaso force-pushed the add-concurrency-to-active-job branch from cdc8ac3 to 69b59be Compare February 9, 2021 12:55
@GustavoCaso GustavoCaso force-pushed the add-concurrency-to-active-job branch from 69b59be to cb57a03 Compare February 9, 2021 12:58
@GustavoCaso
Copy link
Author

Another suggestion is that ActiveJob, rather than the adapter, could trigger the execution concurrency check and manage the potential failure of that check. The reason I suggest this is that the current order of operations of an adapter when executing is:

@bensheldon that is a great point.

What about something like:

def execute(job_data) #:nodoc: ActiveJob::Callbacks.run_callbacks(:execute) do job = deserialize(job_data) perform = true if job.concurrency.any? job.concurrency.each do |concurrency| next if concurrency.enqueue_limit? if job.class.queue_adapter.concurrency_perform_reached?(concurrency, job) # The adapter checks and decide what to do with the job if the concurreny is reached perform = false end break unless perform end end job.perform_now if perform end end

That would mean that each adapter now will need to define [:enqueue, :enqueue_at, :concurrency_enqueue_reached?, :clear_concurrency, :concurrency_perform_reached] to be consider a valid adapter.

@rafaelfranca @dhh any thoughts?

@bensheldon
Copy link
Contributor

@GustavoCaso I'm glad that my comment was helpful!

One note on this implementation: with every Adapter afaik, this line will cause the job to be discarded without having been performed.

job.perform_now if perform

Raising an exception to the adapter from #execute is the only way afaik to ensure that the adapter will retain the job and re-execute it. Each adapter might handle it differently. My preference, for GoodJob, is that ActiveJob itself call retry_job, but I could also imagine some adapters might want to handle it differently so that they have more control over when/how the job is re-run. If it raised a named error, e.g. ActiveJob::ConcurrencyPerformReachedError, I think that would be sufficient for an adapter to handle.

@GustavoCaso
Copy link
Author

@bensheldon Thank you for the quick answer.

with every Adapter afaik, this line will cause the job to be discarded without having been performed.

If ActiveJob triggers job.class.queue_adapter.concurrency_perform_reached?(concurrency, job) and pass the job instance, wouldn't every adapter have the ability to decide what to do if the concurrency limit is reached? Some adapters might decide to discard, others would decide to retain it, etc...

At Shopify when the Perform concurrency is reached we enqueue back to the delayed queue with some extra delay, so hopefully can be performed later in time. I would assume GoodJob would do whatever see fit.

Am I missing something?

@bensheldon
Copy link
Contributor

bensheldon commented Feb 10, 2021

If ActiveJob triggers job.class.queue_adapter.concurrency_perform_reached?(concurrency, job) and pass the job instance, wouldn't every adapter have the ability to decide what to do if the concurrency limit is reached?

I am imagining that #concurrency_perform_reached?, as a predicate? method, would be free of side-effects e.g. it would not alter, re-enqueue or discard a job. I guess renaming it to be clearer, e.g. #check_and_handle_perform_currency(concurrency, job) would address that concern. But...

Conceptually, my mental model of ActiveJob is that any job-lifecycle actions that happen within ActiveJob::Base.execute(serialized_job_data) is the responsibility of ActiveJob, and anything that happens outside of ActiveJob::Base.execute is the responsibility of the Adapter.

I'll admit that my concerns, as GoodJob's maintainer, are somewhat unique because GoodJob has a hard dependency on ActiveJob and delegates lifecycle behavior to ActiveJob: error handling, retries, timeouts are all deferred to ActiveJob-based implementations with callback methods (e.g. rescue_from, retry_on, around_perform, etc.). That's unique because (all?) other Adapters pre-date ActiveJob and have their own free-standing implementations of error handling, retries, timeouts, etc. that can be triggered separately from ActiveJob's methods.

This concern is why I push that any job-lifecycle modifying that happens within ActiveJob::Base.execute be the responsibility of ActiveJob, and any callbacks to the Adapter within execute be side-effect free. The benefit of that is that replacing one ActiveJob adapter with another should not result in a significant change of behavior.

Another conceptual test I have in mind is whether adding this feature in ActiveJob would require any implementing adapter to add additional configuration of its own. For example, if every adapter would have to implement its own user-configurable values for concurrency fallback, (e.g. retry delay) that could imply that the ActiveJob interface is incomplete.

I hope this didn't get too debatable. I think some of these concerns would surface themselves if the proposed interface is implemented in AsyncAdapter.

@rails-bot
Copy link

rails-bot bot commented May 11, 2021

This pull request has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs.
Thank you for your contributions.

@rails-bot rails-bot bot added the stale label May 11, 2021
@rails-bot rails-bot bot closed this May 18, 2021
@zzak zzak reopened this May 18, 2021
@rails-bot rails-bot bot removed the stale label May 18, 2021
@GustavoCaso
Copy link
Author

Thanks @zzak I'm planning to get back to the work needed in a couple of days. Work is getting slower in the summer so I'm going to be able to spend some time working on it

@zzak
Copy link
Member

zzak commented Jun 3, 2021

@GustavoCaso Awesome! Please ping me if you have any questions or run into troubles 🙇

@rails-bot
Copy link

rails-bot bot commented Sep 1, 2021

This pull request has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs.
Thank you for your contributions.

@rails-bot rails-bot bot added the stale label Sep 1, 2021
@rails-bot rails-bot bot closed this Sep 8, 2021
@rafaelfranca rafaelfranca reopened this Sep 8, 2021
@rails-bot rails-bot bot removed the stale label Sep 8, 2021
@rafaelfranca rafaelfranca self-assigned this Sep 8, 2021
@bensheldon
Copy link
Contributor

fyi, I released a concurrency extension for ActiveJob within GoodJob. I went with a simplified interface:

class MyJob < ApplicationJob include GoodJob::ActiveJobExtensions::Concurrency good_job_control_concurrency_with( # Maximum number of jobs with the concurrency key to be concurrently enqueued enqueue_limit: 2, # Maximum number of jobs with the concurrency key to be concurrently performed perform_limit: 1, # A unique key to be globally locked against. # Can be String or Lambda/Proc that is invoked in the context of the job. # Note: Arguments passed to #perform_later must be accessed through `arguments` method. key: -> { "Unique-#{arguments.first}" } # MyJob.perform_later("Alice") => "Unique-Alice" ) def perform(first_name) # do work end end

The implementation is fairly simple. The only responsibility of the adapter is to return a count of enqueued/performing jobs for a given key.

I wanted to get it released for feedback from users. So far I have learned:

  • It's racy. I had to wrap the count-then-enqueue and count-then-perform with Advisory Locks.
  • It is a design decision whether enqueue_limit is inclusive of performing jobs or not, with different results. Currently enqueue_limit excludes performing jobs, and I'm about to add another option (limit:) that includes performing jobs.
  • There has been a request for a throttle capability.
@northeastprince
Copy link
Contributor

Any update on this?

@zzak
Copy link
Member

zzak commented May 22, 2023

Given the age, I think this probably needs a think and consider what @bensheldon was working on in the previous post:
#40337 (comment)

@GustavoCaso If you're still interested in working on this? Otherwise we can maybe open it up to new contributors with a fresh take on it. 🙏

@GustavoCaso
Copy link
Author

Hey @zzak

Thanks for the ping 😄

I would love to give it another try. As you mentioned, it is probably best to start with a new one due to the PR age and consider all the feedback from @bensheldon.

I'll work on another attempt before closing this one, and we can compare both.

@northeastprince
Copy link
Contributor

@GustavoCaso any updates on this?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment