|  | 
| 7 | 7 | package org.elasticsearch.xpack.enrich; | 
| 8 | 8 | 
 | 
| 9 | 9 | import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; | 
|  | 10 | +import org.elasticsearch.core.Releasable; | 
| 10 | 11 | 
 | 
|  | 12 | +import java.util.HashSet; | 
|  | 13 | +import java.util.Set; | 
| 11 | 14 | import java.util.concurrent.ConcurrentHashMap; | 
| 12 | 15 | import java.util.concurrent.Semaphore; | 
| 13 |  | -import java.util.concurrent.atomic.AtomicLong; | 
| 14 |  | -import java.util.concurrent.locks.ReadWriteLock; | 
| 15 |  | -import java.util.concurrent.locks.ReentrantReadWriteLock; | 
| 16 | 16 | 
 | 
| 17 | 17 | /** | 
| 18 | 18 |  * A coordination object that allows multiple distinct polices to be executed concurrently, but also makes sure that a single | 
|  | 
| 23 | 23 | public class EnrichPolicyLocks { | 
| 24 | 24 | 
 | 
| 25 | 25 |  /** | 
| 26 |  | - * A snapshot in time detailing if any policy executions are in flight and total number of local executions that | 
| 27 |  | - * have been kicked off since the node has started | 
|  | 26 | + * An instance of a specific lock on a single policy object. Ensures that when unlocking a policy, the policy is only unlocked if this | 
|  | 27 | + * object is the owner of the held lock. Additionally, this manages the lock lifecycle for any other resources tracked by the policy | 
|  | 28 | + * coordination logic, such as a policy execution's target index. | 
| 28 | 29 |  */ | 
| 29 |  | - public static class EnrichPolicyExecutionState { | 
| 30 |  | - final boolean anyPolicyInFlight; | 
| 31 |  | - final long executions; | 
|  | 30 | + public class EnrichPolicyLock implements Releasable { | 
|  | 31 | + private final String policyName; | 
|  | 32 | + private final String enrichIndexName; | 
|  | 33 | + private final Semaphore executionLease; | 
| 32 | 34 | 
 | 
| 33 |  | - EnrichPolicyExecutionState(boolean anyPolicyInFlight, long executions) { | 
| 34 |  | - this.anyPolicyInFlight = anyPolicyInFlight; | 
| 35 |  | - this.executions = executions; | 
|  | 35 | + private EnrichPolicyLock(String policyName, String enrichIndexName, Semaphore executionLease) { | 
|  | 36 | + this.policyName = policyName; | 
|  | 37 | + this.enrichIndexName = enrichIndexName; | 
|  | 38 | + this.executionLease = executionLease; | 
| 36 | 39 |  } | 
| 37 | 40 | 
 | 
| 38 |  | - public boolean isAnyPolicyInFlight() { | 
| 39 |  | - return anyPolicyInFlight; | 
|  | 41 | + /** | 
|  | 42 | + * Unlocks this policy for execution and maintenance IFF this lock represents the currently held semaphore for a policy name. If | 
|  | 43 | + * this lock was created for an execution, the target index for the policy execution is also cleared from the locked state. | 
|  | 44 | + */ | 
|  | 45 | + @Override | 
|  | 46 | + public void close() { | 
|  | 47 | + if (enrichIndexName != null) { | 
|  | 48 | + boolean wasRemoved = workingIndices.remove(enrichIndexName, executionLease); | 
|  | 49 | + assert wasRemoved | 
|  | 50 | + : "Target index [" + enrichIndexName + "] for policy [" + policyName + "] was removed prior to policy unlock"; | 
|  | 51 | + } | 
|  | 52 | + boolean wasRemoved = policyLocks.remove(policyName, executionLease); | 
|  | 53 | + assert wasRemoved : "Second attempt was made to unlock policy [" + policyName + "]"; | 
| 40 | 54 |  } | 
| 41 | 55 |  } | 
| 42 | 56 | 
 | 
| 43 |  | - /** | 
| 44 |  | - * A read-write lock that allows for policies to be executed concurrently with minimal overhead, but allows for blocking | 
| 45 |  | - * policy locking operations while capturing the state of policy executions. | 
| 46 |  | - */ | 
| 47 |  | - private final ReadWriteLock currentStateLock = new ReentrantReadWriteLock(true); | 
| 48 |  | - | 
| 49 | 57 |  /** | 
| 50 | 58 |  * A mapping of policy name to a semaphore used for ensuring that a single policy can only have one execution in flight | 
| 51 | 59 |  * at a time. | 
| 52 | 60 |  */ | 
| 53 | 61 |  private final ConcurrentHashMap<String, Semaphore> policyLocks = new ConcurrentHashMap<>(); | 
| 54 | 62 | 
 | 
| 55 | 63 |  /** | 
| 56 |  | - * A counter that is used as a sort of policy execution sequence id / dirty bit. This is incremented every time a policy | 
| 57 |  | - * successfully acquires an execution lock. | 
|  | 64 | + * When a policy is locked for execution the new index that is created is added to this set to keep it from being accidentally | 
|  | 65 | + * cleaned up by the maintenance task. | 
| 58 | 66 |  */ | 
| 59 |  | - private final AtomicLong policyRunCounter = new AtomicLong(0L); | 
|  | 67 | + private final ConcurrentHashMap<String, Semaphore> workingIndices = new ConcurrentHashMap<>(); | 
| 60 | 68 | 
 | 
| 61 | 69 |  /** | 
| 62 | 70 |  * Locks a policy to prevent concurrent execution. If the policy is currently executing, this method will immediately | 
| 63 | 71 |  * throw without waiting. This method only blocks if another thread is currently capturing the current policy execution state. | 
|  | 72 | + * <br/><br/> | 
|  | 73 | + * If a policy is being executed, use {@link EnrichPolicyLocks#lockPolicy(String, String)} instead in order to properly track the | 
|  | 74 | + * new enrich index that will be created. | 
| 64 | 75 |  * @param policyName The policy name to lock for execution | 
| 65 | 76 |  * @throws EsRejectedExecutionException if the policy is locked already or if the maximum number of concurrent policy executions | 
| 66 | 77 |  * has been reached | 
| 67 | 78 |  */ | 
| 68 |  | - public void lockPolicy(String policyName) { | 
| 69 |  | - currentStateLock.readLock().lock(); | 
| 70 |  | - try { | 
| 71 |  | - Semaphore runLock = policyLocks.computeIfAbsent(policyName, (name) -> new Semaphore(1)); | 
| 72 |  | - boolean acquired = runLock.tryAcquire(); | 
| 73 |  | - if (acquired == false) { | 
| 74 |  | - throw new EsRejectedExecutionException( | 
| 75 |  | - "Could not obtain lock because policy execution for [" + policyName + "] is already in progress." | 
| 76 |  | - ); | 
| 77 |  | - } | 
| 78 |  | - policyRunCounter.incrementAndGet(); | 
| 79 |  | - } finally { | 
| 80 |  | - currentStateLock.readLock().unlock(); | 
| 81 |  | - } | 
|  | 79 | + public EnrichPolicyLock lockPolicy(String policyName) { | 
|  | 80 | + return lockPolicy(policyName, null); | 
| 82 | 81 |  } | 
| 83 | 82 | 
 | 
| 84 | 83 |  /** | 
| 85 |  | - * Captures a snapshot of the current policy execution state. This method never blocks, instead assuming that a policy is | 
| 86 |  | - * currently starting its execution and returns an appropriate state. | 
| 87 |  | - * @return The current state of in-flight policy executions | 
|  | 84 | + * Locks a policy to prevent concurrent execution. If the policy is currently executing, this method will immediately | 
|  | 85 | + * throw without waiting. This method only blocks if another thread is currently capturing the current policy execution state. | 
|  | 86 | + * <br/><br/> | 
|  | 87 | + * If a policy needs to be locked just to ensure it is not executing, use {@link EnrichPolicyLocks#lockPolicy(String)} instead since | 
|  | 88 | + * no new enrich indices need to be maintained. | 
|  | 89 | + * @param policyName The policy name to lock for execution | 
|  | 90 | + * @param enrichIndexName If the policy is being executed, this parameter denotes the index that should be protected from maintenance | 
|  | 91 | + * operations. | 
|  | 92 | + * @throws EsRejectedExecutionException if the policy is locked already or if the maximum number of concurrent policy executions | 
|  | 93 | + * has been reached | 
| 88 | 94 |  */ | 
| 89 |  | - public EnrichPolicyExecutionState captureExecutionState() { | 
| 90 |  | - if (currentStateLock.writeLock().tryLock()) { | 
| 91 |  | - try { | 
| 92 |  | - long revision = policyRunCounter.get(); | 
| 93 |  | - long currentPolicyExecutions = policyLocks.mappingCount(); | 
| 94 |  | - return new EnrichPolicyExecutionState(currentPolicyExecutions > 0L, revision); | 
| 95 |  | - } finally { | 
| 96 |  | - currentStateLock.writeLock().unlock(); | 
| 97 |  | - } | 
|  | 95 | + public EnrichPolicyLock lockPolicy(String policyName, String enrichIndexName) { | 
|  | 96 | + Semaphore runLock = policyLocks.computeIfAbsent(policyName, (name) -> new Semaphore(1)); | 
|  | 97 | + boolean acquired = runLock.tryAcquire(); | 
|  | 98 | + if (acquired == false) { | 
|  | 99 | + throw new EsRejectedExecutionException( | 
|  | 100 | + "Could not obtain lock because policy execution for [" + policyName + "] is already in progress." | 
|  | 101 | + ); | 
|  | 102 | + } | 
|  | 103 | + if (enrichIndexName != null) { | 
|  | 104 | + Semaphore previous = workingIndices.putIfAbsent(enrichIndexName, runLock); | 
|  | 105 | + assert previous == null : "Target index [" + enrichIndexName + "] is already claimed by an execution, or was not cleaned up."; | 
| 98 | 106 |  } | 
| 99 |  | - return new EnrichPolicyExecutionState(true, policyRunCounter.get()); | 
|  | 107 | + return new EnrichPolicyLock(policyName, enrichIndexName, runLock); | 
| 100 | 108 |  } | 
| 101 | 109 | 
 | 
| 102 |  | - /** | 
| 103 |  | - * Checks if the current execution state matches that of the given execution state. Used to ensure that over a period of time | 
| 104 |  | - * no changes to the policy execution state have occurred. | 
| 105 |  | - * @param previousState The previous state to check the current state against | 
| 106 |  | - * @return true if the current state matches the given previous state, false if policy executions have changed over time. | 
| 107 |  | - */ | 
| 108 |  | - boolean isSameState(EnrichPolicyExecutionState previousState) { | 
| 109 |  | - EnrichPolicyExecutionState currentState = captureExecutionState(); | 
| 110 |  | - return currentState.anyPolicyInFlight == previousState.anyPolicyInFlight && currentState.executions == previousState.executions; | 
|  | 110 | + public Set<String> lockedPolices() { | 
|  | 111 | + return new HashSet<>(policyLocks.keySet()); | 
| 111 | 112 |  } | 
| 112 | 113 | 
 | 
| 113 |  | - /** | 
| 114 |  | - * Releases the lock for a given policy name, allowing it to be executed. | 
| 115 |  | - * @param policyName The policy to release. | 
| 116 |  | - */ | 
| 117 |  | - public void releasePolicy(String policyName) { | 
| 118 |  | - currentStateLock.readLock().lock(); | 
| 119 |  | - try { | 
| 120 |  | - policyLocks.remove(policyName); | 
| 121 |  | - } finally { | 
| 122 |  | - currentStateLock.readLock().unlock(); | 
| 123 |  | - } | 
|  | 114 | + public Set<String> inflightPolicyIndices() { | 
|  | 115 | + return new HashSet<>(workingIndices.keySet()); | 
| 124 | 116 |  } | 
|  | 117 | + | 
| 125 | 118 | } | 
0 commit comments