Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@
public abstract class AbstractRefCounted implements RefCounted {
public static final String ALREADY_CLOSED_MESSAGE = "already closed, can't increment ref count";

public static boolean incrementIfPositive(AtomicInteger counter) {
return counter.updateAndGet(i -> i == 0 ? 0 : i + 1) > 0;
}

private final AtomicInteger refCount = new AtomicInteger(1);

protected AbstractRefCounted() {}
Expand All @@ -31,17 +35,11 @@ public final void incRef() {

@Override
public final boolean tryIncRef() {
do {
int i = refCount.get();
if (i > 0) {
if (refCount.compareAndSet(i, i + 1)) {
touch();
return true;
}
} else {
return false;
}
} while (true);
if (incrementIfPositive(refCount)) {
touch();
return true;
}
return false;
}

@Override
Expand Down Expand Up @@ -82,7 +80,7 @@ protected void alreadyClosed() {
* Returns the current reference count.
*/
public final int refCount() {
return this.refCount.get();
return refCount.get();
}

/**
Expand Down
35 changes: 20 additions & 15 deletions libs/core/src/main/java/org/elasticsearch/core/Releasables.java
Original file line number Diff line number Diff line change
Expand Up @@ -122,21 +122,7 @@ public String toString() {
* Wraps a {@link Releasable} such that its {@link Releasable#close()} method can be called multiple times without double-releasing.
*/
public static Releasable releaseOnce(final Releasable releasable) {
final var ref = new AtomicReference<>(releasable);
return new Releasable() {
@Override
public void close() {
final var acquired = ref.getAndSet(null);
if (acquired != null) {
acquired.close();
}
}

@Override
public String toString() {
return "releaseOnce[" + ref.get() + "]";
}
};
return new ReleaseOnce(releasable);
}

public static Releasable assertOnce(final Releasable delegate) {
Expand Down Expand Up @@ -165,4 +151,23 @@ public String toString() {
return delegate;
}
}

private static class ReleaseOnce extends AtomicReference<Releasable> implements Releasable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems ok because the caller can't see the AtomicReference bit. Maybe do the same thing to ActionListener#notifyOnce and RunOnce too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah for those I can't do without making things visible to the caller I think. At least not unless I move more things around. Maybe do that later if we see it matter anywhere?

ReleaseOnce(Releasable releasable) {
super(releasable);
}

@Override
public void close() {
final var acquired = getAndSet(null);
if (acquired != null) {
acquired.close();
}
}

@Override
public String toString() {
return "releaseOnce[" + get() + "]";
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@

package org.elasticsearch.common.util.concurrent;

import org.elasticsearch.core.AbstractRefCounted;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;

import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiFunction;

/**
* This class manages locks. Locks can be accessed with an identifier and are
Expand Down Expand Up @@ -48,55 +50,36 @@ public KeyedLock() {
* by the same thread multiple times. The lock is released by closing the returned {@link Releasable}.
*/
public Releasable acquire(T key) {
while (true) {
KeyLock perNodeLock = map.get(key);
if (perNodeLock == null) {
ReleasableLock newLock = tryCreateNewLock(key);
if (newLock != null) {
return newLock;
}
} else {
assert perNodeLock != null;
int i = perNodeLock.count.get();
if (i > 0 && perNodeLock.count.compareAndSet(i, i + 1)) {
perNodeLock.lock();
return new ReleasableLock(key, perNodeLock);
}
}
}
KeyLock perNodeLock = map.compute(key, computeLock(fair));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not convinced that this will be faster. Compute has a stronger guarantee around the remapping function only being called once, which leads to a bit of locking and storing twice (in the non-existing case). And even if the remapping results in the same value, I think it stores the value back. Last part might not be too important since we probably expect to be uncontended in most cases and contended case would likely be dominated by the lock.

Have you done benchmarks to validate this part of the change here?

I wonder if optimistically doing tryCreateNewLock before any map lookup would not be faster, since in the uncontended case that would go through and in the contended case, it does not matter?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if optimistically doing tryCreateNewLock before any map lookup would not be faster, since in the uncontended case that would go through and in the contended case, it does not matter?

I guess that may be true. I'm just a little worried about how bad the contended case will behave then. It seems like in the real world, most of the time gained by my change comes from looking up the right bucket in the map only once. In the end it's really hard to measure the effect of this change in the contended scenario well. For the uncontended case, this solution wins hands down in some quick benchmarks just by only doing the map lookup once and inlining much better. For contended case I wasn't able to find a good benchmark approximation because it depends massively on the specific keys used etc. I would hope though that on average, that spinning on a synchronized block (I think no matter what the lock will rarely be contended to the point where it's not a spin-lock) is still faster than spinning by doing object creation and repeated map lookups.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you share (perhaps privately) the benchmarks done with and without the change and the results?

If we have done benchmarks, showing this is faster in the uncontended case, then I am good with this.

perNodeLock.lock();
return releasableLock(key, perNodeLock);
}

/**
* Tries to acquire the lock for the given key and returns it. If the lock can't be acquired null is returned.
*/
public Releasable tryAcquire(T key) {
final KeyLock perNodeLock = map.get(key);
if (perNodeLock == null) {
return tryCreateNewLock(key);
KeyLock perNodeLock = map.compute(key, computeLock(fair));
if (perNodeLock.tryLock()) {
return releasableLock(key, perNodeLock);
}
if (perNodeLock.tryLock()) { // ok we got it - make sure we increment it accordingly otherwise release it again
int i;
while ((i = perNodeLock.count.get()) > 0) {
// we have to do this in a loop here since even if the count is > 0
// there could be a concurrent blocking acquire that changes the count and then this CAS fails. Since we already got
// the lock we should retry and see if we can still get it or if the count is 0. If that is the case and we give up.
if (perNodeLock.count.compareAndSet(i, i + 1)) {
return new ReleasableLock(key, perNodeLock);
}
}
perNodeLock.unlock(); // make sure we unlock and don't leave the lock in a locked state
// failed to get the lock, but we incremented its count when acquiring it above, so we decrement the count and potentially remove
// it from the map
if (perNodeLock.count.decrementAndGet() == 0) {
map.remove(key, perNodeLock);
}
return null;
}

private ReleasableLock tryCreateNewLock(T key) {
KeyLock newLock = new KeyLock(fair);
newLock.lock();
KeyLock keyLock = map.putIfAbsent(key, newLock);
if (keyLock == null) {
return new ReleasableLock(key, newLock);
}
return null;
private static <S> BiFunction<S, KeyLock, KeyLock> computeLock(boolean fair) {
// duplicate lambdas a little to save capturing lambda instantiation from capturing the 'fair' flag
return fair
? (k, existing) -> tryIncrementCount(existing) ? existing : new KeyLock(true)
: (k, existing) -> tryIncrementCount(existing) ? existing : new KeyLock(false);
}

private static boolean tryIncrementCount(KeyLock existing) {
return existing != null && AbstractRefCounted.incrementIfPositive(existing.count);
}

/**
Expand All @@ -120,22 +103,8 @@ private void release(T key, KeyLock lock) {
assert decrementAndGet >= 0 : decrementAndGet + " must be >= 0 but wasn't";
}

private final class ReleasableLock implements Releasable {
final T key;
final KeyLock lock;
final AtomicBoolean closed = new AtomicBoolean();

private ReleasableLock(T key, KeyLock lock) {
this.key = key;
this.lock = lock;
}

@Override
public void close() {
if (closed.compareAndSet(false, true)) {
release(key, lock);
}
}
private Releasable releasableLock(T key, KeyLock lock) {
return Releasables.releaseOnce(() -> release(key, lock));
}

@SuppressWarnings("serial")
Expand Down