Skip to content

Commit 52fd648

Browse files
christophstroblmp911de
authored andcommitted
DATAREDIS-1084 - Add support for XCLAIM.
Original pull request: spring-projects#512.
1 parent ca01670 commit 52fd648

12 files changed

+756
-80
lines changed

src/main/java/org/springframework/data/redis/connection/DefaultStringRedisConnection.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3651,6 +3651,24 @@ public RecordId xAdd(StringRecord record) {
36513651
return convertAndReturn(delegate.xAdd(record.serialize(serializer)), identityConverter);
36523652
}
36533653

3654+
/*
3655+
* (non-Javadoc)
3656+
* @see org.springframework.data.redis.connection.StringRedisConnection#xClaimJustId(java.lang.String, java.lang.String, java.lang.String, org.springframework.data.redis.connection.RedisStreamCommands.XClaimOptions)
3657+
*/
3658+
@Override
3659+
public List<RecordId> xClaimJustId(String key, String group, String consumer, XClaimOptions options) {
3660+
return convertAndReturn(delegate.xClaimJustId(serialize(key), group, consumer, options), identityConverter);
3661+
}
3662+
3663+
/*
3664+
* (non-Javadoc)
3665+
* @see org.springframework.data.redis.connection.StringRedisConnection#xClaim(java.lang.String, java.lang.String, java.lang.String, org.springframework.data.redis.connection.RedisStreamCommands.XClaimOptions)
3666+
*/
3667+
@Override
3668+
public List<StringRecord> xClaim(String key, String group, String consumer, XClaimOptions options) {
3669+
return convertAndReturn(delegate.xClaim(serialize(key), group, consumer, options), listByteMapRecordToStringMapRecordConverter);
3670+
}
3671+
36543672
/*
36553673
* (non-Javadoc)
36563674
* @see org.springframework.data.redis.connection.StringRedisConnection#xDel(java.lang.String, java.lang.String[])
@@ -3803,6 +3821,24 @@ public RecordId xAdd(MapRecord<byte[], byte[], byte[]> record) {
38033821
return delegate.xAdd(record);
38043822
}
38053823

3824+
/*
3825+
* (non-Javadoc)
3826+
* @see org.springframework.data.redis.connection.RedisStreamCommands#xClaimJustId(byte[], java.lang.String, java.lag.String, org.springframework.data.redis.connection.RedisStreamCommands.XCLaimOptions)
3827+
*/
3828+
@Override
3829+
public List<RecordId> xClaimJustId(byte[] key, String group, String newOwner, XClaimOptions options) {
3830+
return delegate.xClaimJustId(key, group, newOwner, options);
3831+
}
3832+
3833+
/*
3834+
* (non-Javadoc)
3835+
* @see org.springframework.data.redis.connection.RedisStreamCommands#xClaim(byte[], java.lang.String, java.lag.String, org.springframework.data.redis.connection.RedisStreamCommands.XCLaimOptions)
3836+
*/
3837+
@Override
3838+
public List<ByteRecord> xClaim(byte[] key, String group, String newOwner, XClaimOptions options) {
3839+
return delegate.xClaim(key, group, newOwner, options);
3840+
}
3841+
38063842
/*
38073843
* (non-Javadoc)
38083844
* @see org.springframework.data.redis.connection.RedisStreamCommands#xDel(byte[], RecordId)

src/main/java/org/springframework/data/redis/connection/DefaultedRedisConnection.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -453,6 +453,20 @@ default RecordId xAdd(MapRecord<byte[], byte[], byte[]> record) {
453453
return streamCommands().xAdd(record);
454454
}
455455

456+
/** @deprecated in favor of {@link RedisConnection#streamCommands()}}. */
457+
@Override
458+
@Deprecated
459+
default List<RecordId> xClaimJustId(byte[] key, String group, String newOwner, XClaimOptions options) {
460+
return streamCommands().xClaimJustId(key, group, newOwner, options);
461+
}
462+
463+
/** @deprecated in favor of {@link RedisConnection#streamCommands()}}. */
464+
@Override
465+
@Deprecated
466+
default List<ByteRecord> xClaim(byte[] key, String group, String newOwner, XClaimOptions options) {
467+
return streamCommands().xClaim(key, group, newOwner, options);
468+
}
469+
456470
/** @deprecated in favor of {@link RedisConnection#streamCommands()}}. */
457471
@Override
458472
@Deprecated

src/main/java/org/springframework/data/redis/connection/ReactiveStreamCommands.java

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import reactor.core.publisher.Mono;
2020

2121
import java.nio.ByteBuffer;
22+
import java.time.Duration;
2223
import java.util.ArrayList;
2324
import java.util.Arrays;
2425
import java.util.Collections;
@@ -30,6 +31,7 @@
3031
import org.springframework.data.redis.connection.ReactiveRedisConnection.CommandResponse;
3132
import org.springframework.data.redis.connection.ReactiveRedisConnection.KeyCommand;
3233
import org.springframework.data.redis.connection.ReactiveRedisConnection.NumericResponse;
34+
import org.springframework.data.redis.connection.RedisStreamCommands.XClaimOptions;
3335
import org.springframework.data.redis.connection.RedisStreamCommands.XPendingOptions;
3436
import org.springframework.data.redis.connection.RedisZSetCommands.Limit;
3537
import org.springframework.data.redis.connection.stream.ByteBufferRecord;
@@ -285,6 +287,112 @@ default Mono<RecordId> xAdd(ByteBufferRecord record) {
285287
*/
286288
Flux<CommandResponse<AddStreamRecord, RecordId>> xAdd(Publisher<AddStreamRecord> commands);
287289

290+
/**
291+
* Change the ownership of a pending message to the given new {@literal consumer} without increasing the delivered
292+
* count.
293+
*
294+
* @param key the {@literal key} the stream is stored at.
295+
* @param group the name of the {@literal consumer group}.
296+
* @param newOwner the name of the new {@literal consumer}.
297+
* @param options must not be {@literal null}.
298+
* @return a {@link Flux} emitting {@link RecordId is} that changed user.
299+
* @see <a href="https://redis.io/commands/xclaim">Redis Documentation: XCLAIM</a>
300+
* @since 2.3
301+
*/
302+
default Flux<RecordId> xClaimJustId(ByteBuffer key, String group, String newOwner, XClaimOptions options) {
303+
304+
return xClaimJustId(Mono.just(new XClaimCommand(key, group, newOwner, options))).next()
305+
.flatMapMany(CommandResponse::getOutput);
306+
}
307+
308+
/**
309+
* Change the ownership of a pending message to the given new {@literal consumer} without increasing the delivered
310+
* count.
311+
*
312+
* @param commands must not be {@literal null}.
313+
* @return a {@link Flux} emitting {@link RecordId is} that changed user.
314+
* @see <a href="https://redis.io/commands/xclaim">Redis Documentation: XCLAIM</a>
315+
* @since 2.3
316+
*/
317+
Flux<CommandResponse<XClaimCommand, Flux<RecordId>>> xClaimJustId(Publisher<XClaimCommand> commands);
318+
319+
/**
320+
* Change the ownership of a pending message to the given new {@literal consumer}.
321+
*
322+
* @param key the {@literal key} the stream is stored at.
323+
* @param group the name of the {@literal consumer group}.
324+
* @param newOwner the name of the new {@literal consumer}.
325+
* @param minIdleTime must not be {@literal null}.
326+
* @param recordIds must not be {@literal null}.
327+
* @return a {@link Flux} emitting {@link ByteBufferRecord} that changed user.
328+
* @see <a href="https://redis.io/commands/xclaim">Redis Documentation: XCLAIM</a>
329+
* @since 2.3
330+
*/
331+
default Flux<ByteBufferRecord> xClaim(ByteBuffer key, String group, String newOwner, Duration minIdleTime,
332+
RecordId... recordIds) {
333+
334+
return xClaim(key, group, newOwner, XClaimOptions.minIdle(minIdleTime).ids(recordIds));
335+
}
336+
337+
/**
338+
* Change the ownership of a pending message to the given new {@literal consumer}.
339+
*
340+
* @param key the {@literal key} the stream is stored at.
341+
* @param group the name of the {@literal consumer group}.
342+
* @param newOwner the name of the new {@literal consumer}.
343+
* @param options must not be {@literal null}.
344+
* @return a {@link Flux} emitting {@link ByteBufferRecord} that changed user.
345+
* @see <a href="https://redis.io/commands/xclaim">Redis Documentation: XCLAIM</a>
346+
* @since 2.3
347+
*/
348+
default Flux<ByteBufferRecord> xClaim(ByteBuffer key, String group, String newOwner, XClaimOptions options) {
349+
350+
return xClaim(Mono.just(new XClaimCommand(key, group, newOwner, options))).next()
351+
.flatMapMany(CommandResponse::getOutput);
352+
}
353+
354+
/**
355+
* Change the ownership of a pending message to the given new {@literal consumer}.
356+
*
357+
* @param commands must not be {@literal null}.
358+
* @return
359+
* @see <a href="https://redis.io/commands/xclaim">Redis Documentation: XCLAIM</a>
360+
* @since 2.3
361+
*/
362+
Flux<CommandResponse<XClaimCommand, Flux<ByteBufferRecord>>> xClaim(Publisher<XClaimCommand> commands);
363+
364+
/**
365+
* {@code XCLAIM} command parameters.
366+
*
367+
* @see <a href="https://redis.io/commands/xclaim">Redis Documentation: XCLAIM</a>
368+
*/
369+
class XClaimCommand extends KeyCommand {
370+
371+
private final String groupName;
372+
private final String consumerName;
373+
private final XClaimOptions options;
374+
375+
private XClaimCommand(@Nullable ByteBuffer key, String groupName, String consumerName, XClaimOptions options) {
376+
377+
super(key);
378+
this.groupName = groupName;
379+
this.consumerName = consumerName;
380+
this.options = options;
381+
}
382+
383+
public XClaimOptions getOptions() {
384+
return options;
385+
}
386+
387+
public String getConsumerName() {
388+
return consumerName;
389+
}
390+
391+
public String getGroupName() {
392+
return groupName;
393+
}
394+
}
395+
288396
/**
289397
* {@code XDEL} command parameters.
290398
*

0 commit comments

Comments
 (0)