Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions lib/ConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1570,6 +1570,10 @@ void ConsumerImpl::seekAsync(uint64_t timestamp, const ResultCallback& callback)
bool ConsumerImpl::isReadCompacted() { return readCompacted_; }

void ConsumerImpl::hasMessageAvailableAsync(const HasMessageAvailableCallback& callback) {
if (!incomingMessages_.empty()) {
callback(ResultOk, true);
return;
}
bool compareMarkDeletePosition;
{
std::lock_guard<std::mutex> lock{mutexForMessageId_};
Expand Down Expand Up @@ -1735,6 +1739,7 @@ void ConsumerImpl::seekAsyncInternal(long requestId, const SharedBuffer& seek, c
hasSoughtByTimestamp_.store(true, std::memory_order_release);
} else {
seekMessageId_ = *boost::get<MessageId>(&seekArg);
hasSoughtByTimestamp_.store(false, std::memory_order_release);
}
seekStatus_ = SeekStatus::IN_PROGRESS;
seekCallback_ = callback;
Expand Down
30 changes: 23 additions & 7 deletions tests/ReaderTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
#include <pulsar/Reader.h>
#include <time.h>

#include <future>
#include <string>
#include <thread>

#include "HttpHelper.h"
#include "PulsarFriend.h"
Expand Down Expand Up @@ -850,7 +852,7 @@ TEST_P(ReaderSeekTest, testHasMessageAvailableAfterSeekToEnd) {
ASSERT_FALSE(hasMessageAvailable);
}

TEST_P(ReaderSeekTest, testHasMessageAvailableAfterSeekTimestamp) {
TEST_F(ReaderSeekTest, testHasMessageAvailableAfterSeekTimestamp) {
using namespace std::chrono;
const auto topic = "test-has-message-available-after-seek-timestamp-" + std::to_string(time(nullptr));
Producer producer;
Expand All @@ -862,12 +864,10 @@ TEST_P(ReaderSeekTest, testHasMessageAvailableAfterSeekTimestamp) {

auto createReader = [this, &topic](Reader& reader, const MessageId& msgId) {
ASSERT_EQ(ResultOk, client.createReader(topic, msgId, {}, reader));
if (GetParam()) {
if (msgId == MessageId::earliest()) {
EXPECT_HAS_MESSAGE_AVAILABLE(reader, true);
} else {
EXPECT_HAS_MESSAGE_AVAILABLE(reader, false);
}
if (msgId == MessageId::earliest()) {
EXPECT_HAS_MESSAGE_AVAILABLE(reader, true);
} else {
EXPECT_HAS_MESSAGE_AVAILABLE(reader, false);
}
};

Expand All @@ -886,6 +886,22 @@ TEST_P(ReaderSeekTest, testHasMessageAvailableAfterSeekTimestamp) {
ASSERT_EQ(ResultOk, reader.seek(timestampBeforeSend));
EXPECT_HAS_MESSAGE_AVAILABLE(reader, true);
}

// Test `hasMessageAvailableAsync` will complete immediately if the incoming message queue is non-empty
Reader reader;
ASSERT_EQ(ResultOk, client.createReader(topic, MessageId::latest(), {}, reader));
reader.seek(timestampBeforeSend);
std::promise<std::thread::id> threadIdPromise;

waitUntil(seconds(3),
[&reader] { return PulsarFriend::getConsumer(reader)->getNumOfPrefetchedMessages() > 0; });
reader.hasMessageAvailableAsync([&threadIdPromise](Result result, bool hasMessageAvailable) {
ASSERT_EQ(ResultOk, result);
ASSERT_TRUE(hasMessageAvailable);
threadIdPromise.set_value(std::this_thread::get_id());
});
auto threadId = threadIdPromise.get_future().get();
ASSERT_EQ(threadId, std::this_thread::get_id());
}

TEST_F(ReaderSeekTest, testSeekInclusiveChunkMessage) {
Expand Down