Skip to content

Commit 3407074

Browse files
zhouyifan279BewareMyPowerlhotari
committed
[fix][broker] First entry will be skipped if opening NonDurableCursor while trimmed ledger is adding first entry. (#24738)
Co-authored-by: Yunze Xu <xyzinfernity@163.com> Co-authored-by: Lari Hotari <lhotari@users.noreply.github.com> (cherry picked from commit 7a120f3)
1 parent 65ddd44 commit 3407074

File tree

2 files changed

+62
-6
lines changed

2 files changed

+62
-6
lines changed

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,11 +43,12 @@ public class NonDurableCursorImpl extends ManagedCursorImpl {
4343
// Compare with "latest" position marker by using only the ledger id. Since the C++ client is using 48bits to
4444
// store the entryId, it's not able to pass a Long.max() as entryId. In this case there's no point to require
4545
// both ledgerId and entryId to be Long.max()
46-
if (startCursorPosition == null || startCursorPosition.compareTo(ledger.lastConfirmedEntry) > 0) {
46+
Pair<PositionImpl, Long> lastPositionCounter = ledger.getLastPositionAndCounter();
47+
if (startCursorPosition == null || startCursorPosition.compareTo(lastPositionCounter.getLeft()) > 0) {
4748
// Start from last entry
4849
switch (initialPosition) {
4950
case Latest:
50-
initializeCursorPosition(ledger.getLastPositionAndCounter());
51+
initializeCursorPosition(lastPositionCounter);
5152
break;
5253
case Earliest:
5354
initializeCursorPosition(ledger.getFirstPositionAndCounter());

managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java

Lines changed: 59 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,15 @@
1919
package org.apache.bookkeeper.mledger.impl;
2020

2121
import static java.nio.charset.StandardCharsets.UTF_8;
22+
import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.ENTRIES_ADDED_COUNTER_UPDATER;
2223
import static org.testng.Assert.assertEquals;
2324
import static org.testng.Assert.assertFalse;
2425
import static org.testng.Assert.assertNotEquals;
2526
import static org.testng.Assert.assertNotNull;
2627
import static org.testng.Assert.assertTrue;
2728
import static org.testng.Assert.fail;
28-
2929
import com.google.common.collect.Iterables;
30-
30+
import io.netty.buffer.ByteBuf;
3131
import java.nio.charset.Charset;
3232
import java.nio.charset.StandardCharsets;
3333
import java.util.ArrayList;
@@ -37,8 +37,6 @@
3737
import java.util.concurrent.TimeUnit;
3838
import java.util.concurrent.atomic.AtomicBoolean;
3939
import java.util.concurrent.atomic.AtomicReference;
40-
41-
import io.netty.buffer.ByteBuf;
4240
import lombok.Cleanup;
4341
import org.apache.bookkeeper.mledger.AsyncCallbacks;
4442
import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
@@ -52,6 +50,11 @@
5250
import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
5351
import org.apache.bookkeeper.mledger.Position;
5452
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
53+
import org.apache.commons.lang3.tuple.Pair;
54+
import org.apache.pulsar.common.api.proto.CommandSubscribe;
55+
import org.awaitility.Awaitility;
56+
import org.mockito.Mockito;
57+
import org.mockito.stubbing.Answer;
5558
import org.slf4j.Logger;
5659
import org.slf4j.LoggerFactory;
5760
import org.testng.Assert;
@@ -105,6 +108,58 @@ void testOpenNonDurableCursorAtNonExistentMessageId() throws Exception {
105108
ledger.close();
106109
}
107110

111+
@Test(timeOut = 20000)
112+
void testOpenNonDurableCursorWhileLedgerIsAddingFirstEntryAfterTrimmed() throws Exception {
113+
ManagedLedgerConfig config = new ManagedLedgerConfig().setMaxEntriesPerLedger(1)
114+
.setRetentionTime(0, TimeUnit.MILLISECONDS);
115+
config.setMinimumRolloverTime(0, TimeUnit.MILLISECONDS);
116+
@Cleanup
117+
ManagedLedgerImpl ledgerSpy =
118+
Mockito.spy((ManagedLedgerImpl) factory.open("non_durable_cursor_while_ledger_trimmed", config));
119+
120+
ledgerSpy.addEntry("message1".getBytes());
121+
122+
ledgerSpy.rollCurrentLedgerIfFull();
123+
Awaitility.waitAtMost(5, TimeUnit.SECONDS).until(() ->
124+
ledgerSpy.getLedgersInfoAsList().size() > 1
125+
);
126+
CompletableFuture<Void> trimFuture = new CompletableFuture<>();
127+
ledgerSpy.trimConsumedLedgersInBackground(trimFuture);
128+
trimFuture.join();
129+
130+
// Use (currentLedgerId, -1) as startCursorPosition after ledger was trimmed
131+
PositionImpl startCursorPosition = PositionImpl.get(ledgerSpy.getCurrentLedger().getId(), -1);
132+
assertTrue(startCursorPosition.compareTo(ledgerSpy.lastConfirmedEntry) > 0);
133+
134+
CountDownLatch getLastPositionLatch = new CountDownLatch(1);
135+
CountDownLatch newNonDurableCursorLatch = new CountDownLatch(1);
136+
Mockito.when(ledgerSpy.getLastPositionAndCounter()).then((Answer<Pair<Position, Long>>) invocation -> {
137+
newNonDurableCursorLatch.countDown();
138+
getLastPositionLatch.await();
139+
return Pair.of(ledgerSpy.lastConfirmedEntry, ENTRIES_ADDED_COUNTER_UPDATER.get(ledgerSpy));
140+
});
141+
142+
CompletableFuture<ManagedCursor> cursorFuture = new CompletableFuture<ManagedCursor>()
143+
.completeAsync(() ->
144+
new NonDurableCursorImpl(bkc, ledgerSpy, "my_test_cursor",
145+
startCursorPosition, CommandSubscribe.InitialPosition.Latest, false)
146+
);
147+
PositionImpl oldLastConfirmedEntry = ledgerSpy.lastConfirmedEntry;
148+
149+
// Wait until NonDurableCursorImpl constructor invokes ManagedLedgerImpl.getLastPositionAndCounter
150+
newNonDurableCursorLatch.await();
151+
// Add first entry after ledger was trimmed
152+
ledgerSpy.addEntry("message2".getBytes());
153+
assertTrue(oldLastConfirmedEntry.compareTo(ledgerSpy.lastConfirmedEntry) < 0);
154+
155+
// Unblock NonDurableCursorImpl constructor
156+
getLastPositionLatch.countDown();
157+
158+
// cursor should read from lastConfirmedEntry
159+
ManagedCursor cursor = cursorFuture.join();
160+
assertEquals(cursor.getReadPosition(), ledgerSpy.lastConfirmedEntry);
161+
}
162+
108163
@Test(timeOut = 20000)
109164
void testZNodeBypassed() throws Exception {
110165
ManagedLedger ledger = factory.open("my_test_ledger");

0 commit comments

Comments
 (0)