- Notifications
You must be signed in to change notification settings - Fork 25.6k
Optimize KeyedLock and related concurrency primitives #96372
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
ff36302 718805c 87bfd57 002e54b File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| | @@ -122,21 +122,7 @@ public String toString() { | |
| * Wraps a {@link Releasable} such that its {@link Releasable#close()} method can be called multiple times without double-releasing. | ||
| */ | ||
| public static Releasable releaseOnce(final Releasable releasable) { | ||
| final var ref = new AtomicReference<>(releasable); | ||
| return new Releasable() { | ||
| @Override | ||
| public void close() { | ||
| final var acquired = ref.getAndSet(null); | ||
| if (acquired != null) { | ||
| acquired.close(); | ||
| } | ||
| } | ||
| | ||
| @Override | ||
| public String toString() { | ||
| return "releaseOnce[" + ref.get() + "]"; | ||
| } | ||
| }; | ||
| return new ReleaseOnce(releasable); | ||
| } | ||
| | ||
| public static Releasable assertOnce(final Releasable delegate) { | ||
| | @@ -165,4 +151,23 @@ public String toString() { | |
| return delegate; | ||
| } | ||
| } | ||
| | ||
| private static class ReleaseOnce extends AtomicReference<Releasable> implements Releasable { | ||
| Contributor There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This seems ok because the caller can't see the Contributor Author There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah for those I can't do without making things visible to the caller I think. At least not unless I move more things around. Maybe do that later if we see it matter anywhere? | ||
| ReleaseOnce(Releasable releasable) { | ||
| super(releasable); | ||
| } | ||
| | ||
| @Override | ||
| public void close() { | ||
| final var acquired = getAndSet(null); | ||
| if (acquired != null) { | ||
| acquired.close(); | ||
| } | ||
| } | ||
| | ||
| @Override | ||
| public String toString() { | ||
| return "releaseOnce[" + get() + "]"; | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| | @@ -8,12 +8,14 @@ | |
| | ||
| package org.elasticsearch.common.util.concurrent; | ||
| | ||
| import org.elasticsearch.core.AbstractRefCounted; | ||
| import org.elasticsearch.core.Releasable; | ||
| import org.elasticsearch.core.Releasables; | ||
| | ||
| import java.util.concurrent.ConcurrentMap; | ||
| import java.util.concurrent.atomic.AtomicBoolean; | ||
| import java.util.concurrent.atomic.AtomicInteger; | ||
| import java.util.concurrent.locks.ReentrantLock; | ||
| import java.util.function.BiFunction; | ||
| | ||
| /** | ||
| * This class manages locks. Locks can be accessed with an identifier and are | ||
| | @@ -48,55 +50,36 @@ public KeyedLock() { | |
| * by the same thread multiple times. The lock is released by closing the returned {@link Releasable}. | ||
| */ | ||
| public Releasable acquire(T key) { | ||
| while (true) { | ||
| KeyLock perNodeLock = map.get(key); | ||
| if (perNodeLock == null) { | ||
| ReleasableLock newLock = tryCreateNewLock(key); | ||
| if (newLock != null) { | ||
| return newLock; | ||
| } | ||
| } else { | ||
| assert perNodeLock != null; | ||
| int i = perNodeLock.count.get(); | ||
| if (i > 0 && perNodeLock.count.compareAndSet(i, i + 1)) { | ||
| perNodeLock.lock(); | ||
| return new ReleasableLock(key, perNodeLock); | ||
| } | ||
| } | ||
| } | ||
| KeyLock perNodeLock = map.compute(key, computeLock(fair)); | ||
| ||
| perNodeLock.lock(); | ||
| return releasableLock(key, perNodeLock); | ||
| } | ||
| | ||
| /** | ||
| * Tries to acquire the lock for the given key and returns it. If the lock can't be acquired null is returned. | ||
| */ | ||
| public Releasable tryAcquire(T key) { | ||
| final KeyLock perNodeLock = map.get(key); | ||
| if (perNodeLock == null) { | ||
| return tryCreateNewLock(key); | ||
| KeyLock perNodeLock = map.compute(key, computeLock(fair)); | ||
| if (perNodeLock.tryLock()) { | ||
| return releasableLock(key, perNodeLock); | ||
| } | ||
| if (perNodeLock.tryLock()) { // ok we got it - make sure we increment it accordingly otherwise release it again | ||
| int i; | ||
| while ((i = perNodeLock.count.get()) > 0) { | ||
| // we have to do this in a loop here since even if the count is > 0 | ||
| // there could be a concurrent blocking acquire that changes the count and then this CAS fails. Since we already got | ||
| // the lock we should retry and see if we can still get it or if the count is 0. If that is the case and we give up. | ||
| if (perNodeLock.count.compareAndSet(i, i + 1)) { | ||
| return new ReleasableLock(key, perNodeLock); | ||
| } | ||
| } | ||
| perNodeLock.unlock(); // make sure we unlock and don't leave the lock in a locked state | ||
| // failed to get the lock, but we incremented its count when acquiring it above, so we decrement the count and potentially remove | ||
| // it from the map | ||
| if (perNodeLock.count.decrementAndGet() == 0) { | ||
| map.remove(key, perNodeLock); | ||
| } | ||
| return null; | ||
| } | ||
| | ||
| private ReleasableLock tryCreateNewLock(T key) { | ||
| KeyLock newLock = new KeyLock(fair); | ||
| newLock.lock(); | ||
| KeyLock keyLock = map.putIfAbsent(key, newLock); | ||
| if (keyLock == null) { | ||
| return new ReleasableLock(key, newLock); | ||
| } | ||
| return null; | ||
| private static <S> BiFunction<S, KeyLock, KeyLock> computeLock(boolean fair) { | ||
| // duplicate lambdas a little to save capturing lambda instantiation from capturing the 'fair' flag | ||
| return fair | ||
| ? (k, existing) -> tryIncrementCount(existing) ? existing : new KeyLock(true) | ||
| : (k, existing) -> tryIncrementCount(existing) ? existing : new KeyLock(false); | ||
| } | ||
| | ||
| private static boolean tryIncrementCount(KeyLock existing) { | ||
| return existing != null && AbstractRefCounted.incrementIfPositive(existing.count); | ||
| } | ||
| | ||
| /** | ||
| | @@ -120,22 +103,8 @@ private void release(T key, KeyLock lock) { | |
| assert decrementAndGet >= 0 : decrementAndGet + " must be >= 0 but wasn't"; | ||
| } | ||
| | ||
| private final class ReleasableLock implements Releasable { | ||
| final T key; | ||
| final KeyLock lock; | ||
| final AtomicBoolean closed = new AtomicBoolean(); | ||
| | ||
| private ReleasableLock(T key, KeyLock lock) { | ||
| this.key = key; | ||
| this.lock = lock; | ||
| } | ||
| | ||
| @Override | ||
| public void close() { | ||
| if (closed.compareAndSet(false, true)) { | ||
| release(key, lock); | ||
| } | ||
| } | ||
| private Releasable releasableLock(T key, KeyLock lock) { | ||
| return Releasables.releaseOnce(() -> release(key, lock)); | ||
| } | ||
| | ||
| @SuppressWarnings("serial") | ||
| | ||
Uh oh!
There was an error while loading. Please reload this page.