Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ public boolean canDecode(ResolvableType elementType, @Nullable MimeType mimeType
return (elementType.resolve() == byte[].class && super.canDecode(elementType, mimeType));
}

@Override
public boolean canDecodeEmptyMessage() {
return true;
}

@Override
public byte[] decode(DataBuffer dataBuffer, ResolvableType elementType,
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ public boolean canDecode(ResolvableType elementType, @Nullable MimeType mimeType
super.canDecode(elementType, mimeType));
}

@Override
public boolean canDecodeEmptyMessage() {
return true;
}

@Override
public ByteBuffer decode(DataBuffer dataBuffer, ResolvableType elementType,
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ public boolean canDecode(ResolvableType elementType, @Nullable MimeType mimeType
super.canDecode(elementType, mimeType));
}

@Override
public boolean canDecodeEmptyMessage() {
return true;
}

@Override
public Flux<DataBuffer> decode(Publisher<DataBuffer> input, ResolvableType elementType,
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,17 @@ public interface Decoder<T> {
*/
boolean canDecode(ResolvableType elementType, @Nullable MimeType mimeType);

/**
* Whether the decoder supports decoding an Object of its target type from an
* empty message. When it is true, the decoder will always return a non-null
* value from its {@code decode} method when an empty message is decoded.
* @return {@code true} if supported, {@code false} otherwise
* @since 6.0.5
*/
default boolean canDecodeEmptyMessage() {
return false;
}

/**
* Decode a {@link DataBuffer} input stream into a Flux of {@code T}.
* @param inputStream the {@code DataBuffer} input stream to decode
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ public boolean canDecode(ResolvableType elementType, @Nullable MimeType mimeType
super.canDecode(elementType, mimeType));
}

@Override
public boolean canDecodeEmptyMessage() {
return true;
}

@Override
public Buffer decode(DataBuffer dataBuffer, ResolvableType elementType,
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ public boolean canDecode(ResolvableType elementType, @Nullable MimeType mimeType
super.canDecode(elementType, mimeType));
}

@Override
public boolean canDecodeEmptyMessage() {
return true;
}

@Override
public ByteBuf decode(DataBuffer dataBuffer, ResolvableType elementType,
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ public boolean canDecode(ResolvableType elementType, @Nullable MimeType mimeType
super.canDecode(elementType, mimeType));
}

@Override
public boolean canDecodeEmptyMessage() {
return true;
}

@Override
public Flux<Resource> decode(Publisher<DataBuffer> inputStream, ResolvableType elementType,
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,11 @@ public boolean canDecode(ResolvableType elementType, @Nullable MimeType mimeType
return (elementType.resolve() == String.class && super.canDecode(elementType, mimeType));
}

@Override
public boolean canDecodeEmptyMessage() {
return true;
}

@Override
public Flux<String> decode(Publisher<DataBuffer> input, ResolvableType elementType,
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.springframework.util.MimeType;

import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.springframework.core.io.buffer.DataBufferUtils.release;

/**
* Abstract base class for {@link Decoder} unit tests. Subclasses need to implement
Expand Down Expand Up @@ -114,6 +115,7 @@ protected <T> void testDecodeAll(Publisher<DataBuffer> input, Class<? extends T>
* <li>{@link #testDecodeError(Publisher, ResolvableType, MimeType, Map)}</li>
* <li>{@link #testDecodeCancel(Publisher, ResolvableType, MimeType, Map)}</li>
* <li>{@link #testDecodeEmpty(ResolvableType, MimeType, Map)}</li>
* <li>{@link #testDecodeEmptyMessage(ResolvableType, MimeType, Map)}</li>
* </ul>
*
* @param input the input to be provided to the decoder
Expand All @@ -131,6 +133,7 @@ protected <T> void testDecodeAll(Publisher<DataBuffer> input, ResolvableType out
testDecodeError(input, outputType, mimeType, hints);
testDecodeCancel(input, outputType, mimeType, hints);
testDecodeEmpty(outputType, mimeType, hints);
testDecodeEmptyMessage(outputType, mimeType, hints);
}

/**
Expand Down Expand Up @@ -258,6 +261,25 @@ protected void testDecodeEmpty(ResolvableType outputType, @Nullable MimeType mim
StepVerifier.create(result).verifyComplete();
}

/**
* Test a {@link Decoder#decode decode} scenario where the input stream is an empty buffer.
* The output is expected to be filled when the decoder supports it.
*
* @param outputType the desired output type
* @param mimeType the mime type to use for decoding. May be {@code null}.
* @param hints the hints used for decoding. May be {@code null}.
*/
protected void testDecodeEmptyMessage(ResolvableType outputType, MimeType mimeType, Map<String, Object> hints) {
if (!this.decoder.canDecodeEmptyMessage()) {
return;
}
DataBuffer buffer = this.bufferFactory.allocateBuffer(0);
Object result = this.decoder.decode(buffer, outputType, mimeType, hints);
releaseDataBufferIfIdentical(buffer, result);
Assert.notNull(result, "result expected to be non null");
Assert.isAssignable(outputType.toClass(), result.getClass(), "result not of specified type");
}

// Mono

/**
Expand Down Expand Up @@ -289,6 +311,7 @@ protected <T> void testDecodeToMonoAll(Publisher<DataBuffer> input,
* <li>{@link #testDecodeToMonoError(Publisher, ResolvableType, MimeType, Map)}</li>
* <li>{@link #testDecodeToMonoCancel(Publisher, ResolvableType, MimeType, Map)}</li>
* <li>{@link #testDecodeToMonoEmpty(ResolvableType, MimeType, Map)}</li>
* <li>{@link #testDecodeToMonoEmptyMessage(ResolvableType, MimeType, Map)}</li>
* </ul>
*
* @param input the input to be provided to the decoder
Expand All @@ -306,6 +329,7 @@ protected <T> void testDecodeToMonoAll(Publisher<DataBuffer> input, ResolvableTy
testDecodeToMonoError(input, outputType, mimeType, hints);
testDecodeToMonoCancel(input, outputType, mimeType, hints);
testDecodeToMonoEmpty(outputType, mimeType, hints);
testDecodeToMonoEmptyMessage(outputType, mimeType, hints);
}

/**
Expand Down Expand Up @@ -419,6 +443,32 @@ protected void testDecodeToMonoEmpty(ResolvableType outputType, @Nullable MimeTy
StepVerifier.create(result).verifyComplete();
}

/**
* Test a {@link Decoder#decodeToMono decode} scenario where the input stream is an empty buffer.
* The output is expected to be filled when the decoder supports it.
*
* @param outputType the desired output type
* @param mimeType the mime type to use for decoding. May be {@code null}.
* @param hints the hints used for decoding. May be {@code null}.
*/
protected void testDecodeToMonoEmptyMessage(ResolvableType outputType, @Nullable MimeType mimeType,
@Nullable Map<String, Object> hints) {

if (!this.decoder.canDecodeEmptyMessage()) {
return;
}

Flux<DataBuffer> source = Flux.range(0, 2)
.map(i -> this.bufferFactory.allocateBuffer(0));

Mono<?> result = this.decoder.decodeToMono(source, outputType, mimeType, hints)
.doOnNext(this::releaseIfDataBuffer);

StepVerifier.create(result)
.expectNextMatches(next -> outputType.toClass().isInstance(next))
.verifyComplete();
}

/**
* Creates a deferred {@link DataBuffer} containing the given bytes.
* @param bytes the bytes that are to be stored in the buffer
Expand All @@ -432,6 +482,27 @@ protected Mono<DataBuffer> dataBuffer(byte[] bytes) {
});
}

/**
* If {@code value} is referentially identical to {@code buffer}, release it.
* @param buffer the {@link DataBuffer} that is compared
* @param value the {@link Object} that is compared
*/
private void releaseDataBufferIfIdentical(DataBuffer buffer, Object value) {
if (buffer == value) {
release(buffer);
}
}

/**
* If {@code value} is a {@link DataBuffer}, release it.
* @param value the {@link Object} that is checked
*/
private void releaseIfDataBuffer(Object value) {
if (value instanceof DataBuffer) {
release((DataBuffer) value);
}
}

/**
* Exception used in {@link #testDecodeError} and {@link #testDecodeToMonoError}
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ private Mono<Object> decodeContent(MethodParameter parameter, Message<?> message
if (decoder.canDecode(elementType, mimeType)) {
if (adapter != null && adapter.isMultiValue()) {
Flux<?> flux = content
.filter(this::nonEmptyDataBuffer)
.filter(dataBuffer -> nonEmptyDataBuffer(dataBuffer, decoder))
.map(buffer -> decoder.decode(buffer, elementType, mimeType, hints))
.onErrorResume(ex -> Flux.error(handleReadError(parameter, message, ex)));
if (isContentRequired) {
Expand All @@ -245,7 +245,7 @@ private Mono<Object> decodeContent(MethodParameter parameter, Message<?> message
else {
// Single-value (with or without reactive type wrapper)
Mono<?> mono = content.next()
.filter(this::nonEmptyDataBuffer)
.filter(dataBuffer -> nonEmptyDataBuffer(dataBuffer, decoder))
.map(buffer -> decoder.decode(buffer, elementType, mimeType, hints))
.onErrorResume(ex -> Mono.error(handleReadError(parameter, message, ex)));
if (isContentRequired) {
Expand All @@ -263,7 +263,10 @@ private Mono<Object> decodeContent(MethodParameter parameter, Message<?> message
message, parameter, "Cannot decode to [" + targetType + "]" + message));
}

private boolean nonEmptyDataBuffer(DataBuffer buffer) {
private boolean nonEmptyDataBuffer(DataBuffer buffer, Decoder<?> decoder) {
if (decoder.canDecodeEmptyMessage()) {
return true;
}
if (buffer.readableByteCount() > 0) {
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ public void echoChannel() {
@Test // gh-26344
public void echoChannelWithEmptyInput() {
Flux<String> result = requester.route("echo-channel-empty").data(Flux.empty()).retrieveFlux(String.class);
StepVerifier.create(result).verifyComplete();
StepVerifier.create(result).expectNext(" echoed").verifyComplete();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,11 @@ public boolean canDecode(ResolvableType elementType, @Nullable MimeType mimeType
return Message.class.isAssignableFrom(elementType.toClass()) && supportsMimeType(mimeType);
}

@Override
public boolean canDecodeEmptyMessage() {
return true;
}

@Override
public Flux<Message> decode(Publisher<DataBuffer> inputStream, ResolvableType elementType,
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,11 @@ public void exceedMaxSize() {
testDecode(input, Msg.class, step -> step.verifyError(DecodingException.class));
}

@Test
public void decodeEmpty() {
testDecodeEmptyMessage(ResolvableType.forClass(Msg.class), null, null);
}

private Mono<DataBuffer> dataBuffer(Msg msg) {
return Mono.fromCallable(() -> {
byte[] bytes = msg.toByteArray();
Expand Down