- Notifications
You must be signed in to change notification settings - Fork 22k
Add concurrency to ActiveJob #40337
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Add concurrency to ActiveJob #40337
Conversation
c43ba58 to 072935c Compare There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe this might cause some issues down the line, because not all enqueue jobs have a hash for arguments.
I believe we can overcome this by changing this API to use a block rather than a set of keys.
Currently, at Shopify we use the set of keys. Why? So developers do not get extra fancy with the block logic and it also allows us to raise an Exception if the specify a key that is no longer in the job params.
This is how the API would look like using a block:
class MyJob < ActiveJob::Base concurrency(limit: 1, key: -> { |job| job.arguments[0] }) endI still believe we have to add the job_class as part of the queue to guarantee the uniqueness between jobs. Two jobs could enqueue with the same params.
072935c to b10bcba Compare b10bcba to dd148d0 Compare There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems fine to me. Would it be possible to give examples of what kinds of jobs might use the different concurrency strategies? I think that would help other folks on the core team understand the use cases at Shopify (and why we need this). It would also be helpful for other folks who may need this feature but don't know it yet.
dd148d0 to 9363dd0 Compare | This is great! Love to have this sort of exclusive locking built in. Questions: Have you ever used a different limit than 1? When would you want to use a limit of 2? Reason I ask is primarily because I think the API could be simplified like so: class UpdateThemeSettingsReferencesJob < ActiveJob perform_exclusively_with :shop_id, :model, :model_id end class PaymentProcessingJob < ActiveJob enqueue_exclusively_with :checkout_id end class ExpensiveIdempotentSyncJob < ActiveJob enqueue_and_perform_exclusively_with :shop_id endEven if a limit greater than 1 had a demonstrable use case, I'd still use this API, but tag on kwargs: class UpdateThemeSettingsReferencesJob < ActiveJob perform_exclusively_with :shop_id, :model, :model_id, limit: 3 end class PaymentProcessingJob < ActiveJob enqueue_exclusively_with :checkout_id, limit: 3 end class ExpensiveIdempotentSyncJob < ActiveJob enqueue_and_perform_exclusively_with :shop_id, limit: 3 endBut I'd prefer not even to expose the limit unless we have a proven case for it. |
| @dhh Thanks for the quick response.
Yes, we have a bunch of jobs that require values higher than 1 class UpdateThemeSettingsReferencesJob < ApplicationJob concurrency(limit: { perform: 3 }, keys: ['shop_id']) end class PublishJob < ApplicationJob concurrency(limit: { enqueue: 50, perform: 10 }, keys: ['shop_id']) endUsually, developers that only care about total parallel execution uses the The The developer would have to make the decision on which is the best strategy for the job. class UpdateThemeSettingsReferencesJob < ActiveJob perform_exclusively_with :shop_id, :model, :model_id, limit: 3 end class PublishJob < ActiveJob enqueue_exclusively_with :checkout_id, limit: 50 end class ExpensiveIdempotentSyncJob < ActiveJob enqueue_and_perform_exclusively_with :shop_id, enqueue_limit: 3, perform_limit: 2 endI would assume that calling any of the API methods Look forward to hearing your thoughts. |
| I like these stated use cases, but I wish they were a little clearer in the code. Also think there's something to extracting the uniqueness aspect of the job to a separate shared statement, so perhaps something like: class UpdateThemeSettingsReferencesJob < ApplicationJob unique_by :all # Default. top_concurrent_perform_at limit: 3 end class PublishJob < ApplicationJob unique_by :first # Only consider this unique through job.arguments.first, also supports :shop_id, etc. reject_enqueues_at limit: 50 end class ExpensiveIdempotentSyncJob < ApplicationJob reject_enqueues_at limit: 3 top_concurrent_perform_at limit: 2 endNot super liking my |
| @dhh @kaspth @rafaelfranca @tenderlove With the last commit 6f90615 I splitted the APi from I added all the new information to the payload of the job: def serialize super.merge( "concurrency_enqueue_limit" => concurrency_enqueue_limit, "concurrency_perform_limit" => concurrency_perform_limit, "concurrency_key" => concurrency_key, "concurrency_timeout" => concurrency_timeout, "concurrency_strategy" => concurrency_strategy ) endI like having all the information in the payload so the different background job libraries can implement concurrency its own way by checking all concurrency information from the job payload. I thought we could actually encapsulate all of this into separate I haven't added that last part, because I first want to know if you guys like this approach and this direction we are going. The actual key once serialize would look something like this: { # Rest of serialize job "concurrency_strategy" => { "_aj_serialized" => "ActiveJob::Concurrency::Strategy::Enqueue", "enqueue_limit" => 1, "dequeue_limit" => nil, "keys" => ["model", "model_id"] }, }If you guys like the way this is going and the idea of having each strategy it is own class and just one key as part of the concurrency payload for a job I will make the necessary changes. |
c72e343 to 6f90615 Compare
There is overhead of adding new object serializers since we iterate over all of them to check. I also don't think the number of keys added to the payload are worth this and if a serializers would save that. If we are so worried with the storage space we could avoid adding def serialize super.merge( "concurrency" => { "limit" => concurrency_enqueue_limit, "perform_limit" => concurrency_perform_limit, "key" => concurrency_key, "timeout" => concurrency_timeout, "strategy" => concurrency_strategy } ) end |
| Will it be possible to combine different concurrency logic? For instance, say I have class ResourceProcessorJob < ApplicationJob def perform(resource_id) # process the resource using some API end endand I want to:
Would it be possible to express that logic using the API described above, and if so, how? It seems to me like they would conflict with each other, both because of the multiple limitations on performing concurrency (limit 1 and limit 10), and the differing methods of determining uniqueness (by The combination question could also extend to subclasses. Should it be possible to inherit concurrency limits? Would subclasses add to their inherited limits or override them? |
| Any update? This would be a huge feature if it allows cluster-wide uniqueness. Can I help advancing it? |
| @rewritten I'm planning for this week on fixing the tests that are falling and add the documentation that is needed and hopefully we would be able to merge it |
753fb63 to a8ea49b Compare | @sambostock @rafaelfranca I modified the code to allow to specify multiple concurrency strategies within the same job. For the example outlined above #40337 (comment)
With the last commit fe6435c we are able to specify the multiple concurrency strategies. class ResourceProcessorJob < ApplicationJob exclusively_with(limit: 10) enqueue_exclusively_with(keys: ['resource_id']) perform_exclusively_with(keys: ['resource_id']) def perform(resource_id) # process the resource using some API end end |
fe6435c to cdc8ac3 Compare cdc8ac3 to 69b59be Compare 69b59be to cb57a03 Compare
@bensheldon that is a great point. What about something like: def execute(job_data) #:nodoc: ActiveJob::Callbacks.run_callbacks(:execute) do job = deserialize(job_data) perform = true if job.concurrency.any? job.concurrency.each do |concurrency| next if concurrency.enqueue_limit? if job.class.queue_adapter.concurrency_perform_reached?(concurrency, job) # The adapter checks and decide what to do with the job if the concurreny is reached perform = false end break unless perform end end job.perform_now if perform end endThat would mean that each adapter now will need to define @rafaelfranca @dhh any thoughts? |
| @GustavoCaso I'm glad that my comment was helpful! One note on this implementation: with every Adapter afaik, this line will cause the job to be discarded without having been performed.
Raising an exception to the adapter from |
| @bensheldon Thank you for the quick answer.
If ActiveJob triggers At Shopify when the Am I missing something? |
I am imagining that Conceptually, my mental model of ActiveJob is that any job-lifecycle actions that happen within I'll admit that my concerns, as GoodJob's maintainer, are somewhat unique because GoodJob has a hard dependency on ActiveJob and delegates lifecycle behavior to ActiveJob: error handling, retries, timeouts are all deferred to ActiveJob-based implementations with callback methods (e.g. This concern is why I push that any job-lifecycle modifying that happens within Another conceptual test I have in mind is whether adding this feature in ActiveJob would require any implementing adapter to add additional configuration of its own. For example, if every adapter would have to implement its own user-configurable values for concurrency fallback, (e.g. retry delay) that could imply that the ActiveJob interface is incomplete. I hope this didn't get too debatable. I think some of these concerns would surface themselves if the proposed interface is implemented in |
| This pull request has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. |
| Thanks @zzak I'm planning to get back to the work needed in a couple of days. Work is getting slower in the summer so I'm going to be able to spend some time working on it |
| @GustavoCaso Awesome! Please ping me if you have any questions or run into troubles 🙇 |
| This pull request has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. |
| fyi, I released a concurrency extension for ActiveJob within GoodJob. I went with a simplified interface: class MyJob < ApplicationJob include GoodJob::ActiveJobExtensions::Concurrency good_job_control_concurrency_with( # Maximum number of jobs with the concurrency key to be concurrently enqueued enqueue_limit: 2, # Maximum number of jobs with the concurrency key to be concurrently performed perform_limit: 1, # A unique key to be globally locked against. # Can be String or Lambda/Proc that is invoked in the context of the job. # Note: Arguments passed to #perform_later must be accessed through `arguments` method. key: -> { "Unique-#{arguments.first}" } # MyJob.perform_later("Alice") => "Unique-Alice" ) def perform(first_name) # do work end endThe implementation is fairly simple. The only responsibility of the adapter is to return a count of enqueued/performing jobs for a given key. I wanted to get it released for feedback from users. So far I have learned:
|
| Any update on this? |
| Given the age, I think this probably needs a think and consider what @bensheldon was working on in the previous post: @GustavoCaso If you're still interested in working on this? Otherwise we can maybe open it up to new contributors with a fresh take on it. 🙏 |
| Hey @zzak Thanks for the ping 😄 I would love to give it another try. As you mentioned, it is probably best to start with a new one due to the PR age and consider all the feedback from @bensheldon. I'll work on another attempt before closing this one, and we can compare both. |
| @GustavoCaso any updates on this? |
Summary
At Shopify, we have been using different flavours of concurrency within
ActiveJobfor several years.This year we have consolidated all of this functionality into one API method called
concurrency.After validating that this works for the scale of Shopify we decided to port the API upstream.
There have been others attempt to add locking to
ActiveJob#33728While that functionality has nothing wrong, we have identified that locking is not enough as a concurrency primitive.
At Shopify, we allow developers to specify multiple concurrency options for there jobs:
In all examples below,
keysis the array of keys that will help build a unique concurrency key from the job arguments.EndToEndwhere a job holds a concurrency ticket from the moment gets enqueue until the job is performed. When any jobs that try to enter the queue, we check if the concurrent limit is reached, and if it is, we drop the job.concurrency(limit: 1, keys: ['checkout_id'])EnqueueLimitwhere a job holds a concurrency ticket from the moment gets enqueue until the jobs get dequeued. When any jobs that try to enter the queue, we check if the concurrent limit is reached and if it is, we drop the job.concurrency(limit: { enqueue: 1 }, keys: ['checkout_id'])PerformLimitwhere a job holds a concurrency ticket from the moment gets dequeued until the jobs get performed. At the time of the dequeue, we check if the concurrency limit is reached and if so we put that job back into the Delayed queueconcurrency(limit: { performed: 1 }, keys: ['checkout_id'])BoundedStagesis the combination ofEnqueueLimitandPerformLimitconcurrency(limit: { enqueue: 1, perform:1 }, keys: ['checkout_id'])ActiveJobonly care about the concurrency at theenqueue(EndToEnd and EnqueueLimit) steps for theperformcases (PerformLimit and BoundedStages) is up to the job adapters to implement it internally. So the adapters can decide to add the functionality or not 😄Other Information
The
timeoutoption allows specifying for how long a job can hold a concurrency ticket.At Shopify, we allow other API options like
perform_delaywhich configures the delay in which the job is placed back in the delayed queue when thePerformLimitis reached.Here is an example of the API in use in one of our jobs: