-
Couldn't load subscription status.
- Fork 25.6k
Refactor enrich maintenance coordination logic #90931
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactor enrich maintenance coordination logic #90931
Conversation
| Hi @jbaiera, I've created a changelog YAML for you. |
| Pinging @elastic/es-data-management (Team:Data Management) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left some comments, but nothing major, thanks for taking a look at this Jimmy!
| } | ||
| | ||
| private void releasePolicy(String policyName) { | ||
| private void releasePolicy(EnrichPolicyLock policyLock) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the difference between releasePolicy(policyLock) and policyLock.release()? Is there a way we can differentiate the two a bit better? It looks like the main difference is the just the execution permits?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I could update the name to make it a bit more clear - but yes, the cleanup logic here needs us to release a permit as well as unlock the policy.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed the policy lock object to implement Releasable, and now I simply have the method wrap the Releasable so that it also releases the execution permits. Everything just calls close() now instead of needing to remember to run releasePolicy(policyLock)
...ugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyMaintenanceService.java Outdated Show resolved Hide resolved
x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java Outdated Show resolved Hide resolved
| * cleaned up by the maintenance task. | ||
| */ | ||
| private final AtomicLong policyRunCounter = new AtomicLong(0L); | ||
| private final ConcurrentHashMap<String, Semaphore> workingIndices = new ConcurrentHashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we ever actually call workingIndices.get(...) anywhere? I can't find anywhere, which makes me think maybe this could just be a Set instead of a Map?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm using a map here mostly to ensure we do not remove the working index unless we are the holder of the semaphore it is mapped to. When calling map.remove() in the unlock method we pass the actively held semaphore in. This makes it so that the entry is only removed if it is mapped to the same semaphore.
...test/java/org/elasticsearch/xpack/enrich/action/InternalExecutePolicyActionRequestTests.java Show resolved Hide resolved
Wrap the Releasable in the executor to release any held permits.
| @elasticmachine update branch |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me, and looks much better than what it replaces. For what it's worth (maybe someone coming to this in the future), here are my notes:
Yes -- Can I run any policy right now? Yes put index name in working indices create index run policy remove from working indices No -- exception No -- exception Can I delete this index (out of all enrich indices in the cluster state)? Is it in working indices? Yes -- then no No -- other checks unrelated to this PR 💔 Backport failed
You can use sqren/backport to manually backport by running |
This PR refactors the locking logic for enrich policies so that enrich index names are resolved early so that they may be explicitly protected from maintenance tasks on the master node. The maintenance service has been optimized to allow for concurrent removal of old enrich indices while policies are executing. Further concurrency changes were made to improve the thread safety of the system (such as removing the double check locking in maintenance and the ability to unlock policies from code that does not hold the lock).
This PR refactors the locking logic for enrich policies so that enrich index names are resolved early so that they may be explicitly protected from maintenance tasks on the master node. The maintenance service has been optimized to allow for concurrent removal of old enrich indices while policies are executing. Further concurrency changes were made to improve the thread safety of the system (such as removing the double check locking in maintenance and the ability to unlock policies from code that does not hold the lock).
💚 All backports created successfully
Questions ?Please refer to the Backport tool documentation |
This PR refactors the locking logic for enrich policies so that enrich index names are resolved early so that they may be explicitly protected from maintenance tasks on the master node. The maintenance service has been optimized to allow for concurrent removal of old enrich indices while policies are executing. Further concurrency changes were made to improve the thread safety of the system (such as removing the double check locking in maintenance and the ability to unlock policies from code that does not hold the lock). (cherry picked from commit 998520e)
This PR refactors the locking logic for enrich policies so that enrich index names are resolved early so that they may be explicitly protected from maintenance tasks on the master node. The maintenance service has been optimized to allow for concurrent removal of old enrich indices while policies are executing. Further concurrency changes were made to improve the thread safety of the system (such as removing the double check locking in maintenance and the ability to unlock policies from code that does not hold the lock). (cherry picked from commit 998520e)
💚 All backports created successfully
Questions ?Please refer to the Backport tool documentation |
This PR refactors the locking logic for enrich policies so that enrich index names are resolved early so that they may be explicitly protected from maintenance tasks on the master node. The maintenance service has been optimized to allow for concurrent removal of old enrich indices while policies are executing. Further concurrency changes were made to improve the thread safety of the system (such as removing the double check locking in maintenance and the ability to unlock policies from code that does not hold the lock). (cherry picked from commit 998520e) # Conflicts: # x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyMaintenanceService.java # x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/InternalExecutePolicyAction.java # x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunnerTests.java
This PR refactors the locking logic for enrich policies so that enrich index names are resolved early so that they may be explicitly protected from maintenance tasks on the master node. The maintenance service has been optimized to allow for concurrent removal of old enrich indices while policies are executing. Further concurrency changes were made to improve the thread safety of the system (such as removing the double check locking in maintenance and the ability to unlock policies from code that does not hold the lock). (cherry picked from commit 998520e)
Enrich maintenance runs on a schedule on the master. It cleans up unused enrich indices while there are no policy executions in progress. This can cause issues in environments that frequently execute enrich policies on a tight schedule as the maintenance service will refuse to clean indices. Additionally, the maintenance task relies on some dubious concurrency code to attempt detection of concurrent policy executions. If this detection logic reads incorrect state, there is a possibility that the enrich maintenance service can remove enrich indices for policies currently being executed.
This refactoring changes a few core aspects of the enrich policy locking code:
First, the policy maintenance task collects indices to delete from the cluster state instead of using the async get indices action. The cluster state is available to the maintenance service and no features from the get action are required for this step.
Second, the internal policy execution logic has been refactored to generate the new enrich index's name earlier in the process so that it can keep track of enrich indices that are actively being constructed. The maintenance task uses this set of indices to restrict which indices must be cleaned up.
Third, since the enrich indices to keep are now explicitly marked for the maintenance service, the global locking and clumsy state checking can be removed from the policy locks object. This should now be relying on concurrency primitives in the way they should actually be used.
Finally, A number of improvements have been added to improve the concurrency hygiene of the policy coordination. A process can only unlock a policy if it holds the policy's active semaphore. Previously any thread could release a policy any number of times, even if it did not have the current right to do so. Some critical sections have been updated to make sure that policy locks are always released in case of an uncaught exception.