Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@
* <ul>
* <li>NOT_FOUND (-1): ID doesn't exist in stream</li>
* <li>DELETED (1): Entry was deleted/acknowledged and deleted</li>
* <li>ACKNOWLEDGED_NOT_DELETED (2): Entry was acknowledged but not deleted (still has dangling
* references)</li>
* <li>NOT_DELETED_UNACKNOWLEDGED_OR_STILL_REFERENCED (2): Entry wasn't deleted.</li>
* </ul>
*/
public enum StreamEntryDeletionResult {
Expand All @@ -28,10 +27,13 @@ public enum StreamEntryDeletionResult {
DELETED(1),

/**
* The entry was acknowledged but not deleted because it still has dangling references in other
* consumer groups' pending entry lists.
* The entry was not deleted due to one of the following reasons:
* <ul>
* <li>For XDELEX: The entry was not acknowledged by any consumer group</li>
* <li>For XACKDEL: The entry still has pending references in other consumer groups</li>
* </ul>
*/
ACKNOWLEDGED_NOT_DELETED(2);
NOT_DELETED_UNACKNOWLEDGED_OR_STILL_REFERENCED(2);

private final int code;

Expand Down Expand Up @@ -60,7 +62,7 @@ public static StreamEntryDeletionResult fromCode(int code) {
case 1:
return DELETED;
case 2:
return ACKNOWLEDGED_NOT_DELETED;
return NOT_DELETED_UNACKNOWLEDGED_OR_STILL_REFERENCED;
default:
throw new IllegalArgumentException("Unknown stream entry deletion result code: " + code);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,7 @@ public void testXdelexWithConsumerGroups() {
List<StreamEntryDeletionResult> results = jedis.xdelex(STREAM_KEY_1, StreamDeletionPolicy.ACKNOWLEDGED, readId1, readId2);
assertThat(results, hasSize(2));
assertEquals(StreamEntryDeletionResult.DELETED, results.get(0)); // id1 was acknowledged
assertEquals(StreamEntryDeletionResult.ACKNOWLEDGED_NOT_DELETED, results.get(1)); // id2 not acknowledged
assertEquals(StreamEntryDeletionResult.NOT_DELETED_UNACKNOWLEDGED_OR_STILL_REFERENCED, results.get(1)); // id2 not acknowledged

// Verify only acknowledged entry was deleted
assertEquals(1L, jedis.xlen(STREAM_KEY_1));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,7 @@ public void testXdelexWithConsumerGroups() {
List<StreamEntryDeletionResult> results = jedis.xdelex(STREAM_KEY_1, StreamDeletionPolicy.ACKNOWLEDGED, readId1, readId2);
assertThat(results, hasSize(2));
assertEquals(StreamEntryDeletionResult.DELETED, results.get(0)); // id1 was acknowledged
assertEquals(StreamEntryDeletionResult.ACKNOWLEDGED_NOT_DELETED, results.get(1)); // id2 not acknowledged
assertEquals(StreamEntryDeletionResult.NOT_DELETED_UNACKNOWLEDGED_OR_STILL_REFERENCED, results.get(1)); // id2 not acknowledged

// Verify only acknowledged entry was deleted
assertEquals(1L, jedis.xlen(STREAM_KEY_1));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@
import redis.clients.jedis.params.XPendingParams;
import redis.clients.jedis.params.XReadGroupParams;
import redis.clients.jedis.params.XTrimParams;
import redis.clients.jedis.resps.StreamEntry;
import redis.clients.jedis.resps.StreamPendingEntry;
import redis.clients.jedis.resps.StreamEntryDeletionResult;
import redis.clients.jedis.resps.*;

import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -760,8 +758,9 @@ public void xdelexWithConsumerGroups() {
StreamDeletionPolicy.ACKNOWLEDGED, readId1, readId2);
assertThat(results, hasSize(2));
assertEquals(StreamEntryDeletionResult.DELETED, results.get(0)); // id1 was acknowledged
assertEquals(StreamEntryDeletionResult.ACKNOWLEDGED_NOT_DELETED, results.get(1)); // id2 not
// acknowledged
assertEquals(StreamEntryDeletionResult.NOT_DELETED_UNACKNOWLEDGED_OR_STILL_REFERENCED,
results.get(1)); // id2 not
// acknowledged

// Verify only acknowledged entry was deleted
assertEquals(1L, jedis.xlen(STREAM_KEY_1));
Expand All @@ -778,4 +777,44 @@ public void xdelexEmptyStream() {
assertThat(results, hasSize(1));
assertEquals(StreamEntryDeletionResult.NOT_FOUND, results.get(0));
}

@Test
@SinceRedisVersion("8.1.240")
public void xdelexNotAcknowledged() {
setUpTestStream();

String groupName = "test_group";

// Add initial entries and create consumer group
Map<String, String> entry1 = singletonMap("field1", "value1");
jedis.xadd(STREAM_KEY_1, new StreamEntryID("1-0"), entry1);
jedis.xgroupCreate(STREAM_KEY_1, groupName, new StreamEntryID("0-0"), true);

// Read one message to create PEL entry
String consumerName = "consumer1";
Map<String, StreamEntryID> streamQuery = singletonMap(STREAM_KEY_1,
StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY);
jedis.xreadGroup(groupName, consumerName, XReadGroupParams.xReadGroupParams().count(1),
streamQuery);

// Add a new entry that was never delivered to any consumer
Map<String, String> entry2 = singletonMap("field4", "value4");
StreamEntryID id2 = jedis.xadd(STREAM_KEY_1, new StreamEntryID("2-0"), entry2);

// Verify initial state
StreamPendingSummary pending = jedis.xpending(STREAM_KEY_1, groupName);
assertEquals(1L, pending.getTotal()); // Only id1 is in PEL

StreamInfo info = jedis.xinfoStream(STREAM_KEY_1);
assertEquals(2L, info.getLength()); // Stream has 2 entries

// Test XDELEX with ACKNOWLEDGED policy on entry that was never delivered
// This should return NOT_DELETED_UNACKNOWLEDGED_OR_STILL_REFERENCED since id2 was never
// delivered to any consumer
List<StreamEntryDeletionResult> result = jedis.xdelex(STREAM_KEY_1,
StreamDeletionPolicy.ACKNOWLEDGED, id2);
assertThat(result, hasSize(1));
assertEquals(StreamEntryDeletionResult.NOT_DELETED_UNACKNOWLEDGED_OR_STILL_REFERENCED,
result.get(0));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ public class StreamEntryDeletionResultTest {
public void testFromCode() {
assertEquals(StreamEntryDeletionResult.NOT_FOUND, StreamEntryDeletionResult.fromCode(-1));
assertEquals(StreamEntryDeletionResult.DELETED, StreamEntryDeletionResult.fromCode(1));
assertEquals(StreamEntryDeletionResult.ACKNOWLEDGED_NOT_DELETED,
assertEquals(StreamEntryDeletionResult.NOT_DELETED_UNACKNOWLEDGED_OR_STILL_REFERENCED,
StreamEntryDeletionResult.fromCode(2));
}

Expand All @@ -25,7 +25,7 @@ public void testFromCodeInvalid() {
public void testFromLong() {
assertEquals(StreamEntryDeletionResult.NOT_FOUND, StreamEntryDeletionResult.fromLong(-1L));
assertEquals(StreamEntryDeletionResult.DELETED, StreamEntryDeletionResult.fromLong(1L));
assertEquals(StreamEntryDeletionResult.ACKNOWLEDGED_NOT_DELETED,
assertEquals(StreamEntryDeletionResult.NOT_DELETED_UNACKNOWLEDGED_OR_STILL_REFERENCED,
StreamEntryDeletionResult.fromLong(2L));
}

Expand All @@ -38,14 +38,15 @@ public void testFromLongNull() {
public void testGetCode() {
assertEquals(-1, StreamEntryDeletionResult.NOT_FOUND.getCode());
assertEquals(1, StreamEntryDeletionResult.DELETED.getCode());
assertEquals(2, StreamEntryDeletionResult.ACKNOWLEDGED_NOT_DELETED.getCode());
assertEquals(2,
StreamEntryDeletionResult.NOT_DELETED_UNACKNOWLEDGED_OR_STILL_REFERENCED.getCode());
}

@Test
public void testToString() {
assertEquals("NOT_FOUND(-1)", StreamEntryDeletionResult.NOT_FOUND.toString());
assertEquals("DELETED(1)", StreamEntryDeletionResult.DELETED.toString());
assertEquals("ACKNOWLEDGED_NOT_DELETED(2)",
StreamEntryDeletionResult.ACKNOWLEDGED_NOT_DELETED.toString());
assertEquals("NOT_DELETED_UNACKNOWLEDGED_OR_STILL_REFERENCED(2)",
StreamEntryDeletionResult.NOT_DELETED_UNACKNOWLEDGED_OR_STILL_REFERENCED.toString());
}
}
Loading