Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions .dev/dev_arm64.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ services:
KAFKA_CLUSTERS_0_AUDIT_CONSOLEAUDITENABLED: 'true'

kafka0:
image: confluentinc/cp-kafka:7.6.0.arm64
image: confluentinc/cp-kafka:7.8.0.arm64
user: "0:0"
hostname: kafka0
container_name: kafka0
Expand Down Expand Up @@ -60,7 +60,7 @@ services:
CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'

schema-registry0:
image: confluentinc/cp-schema-registry:7.6.0.arm64
image: confluentinc/cp-schema-registry:7.8.0.arm64
ports:
- 8085:8085
depends_on:
Expand All @@ -76,7 +76,7 @@ services:
SCHEMA_REGISTRY_KAFKASTORE_TOPIC: _schemas

kafka-connect0:
image: confluentinc/cp-kafka-connect:7.6.0.arm64
image: confluentinc/cp-kafka-connect:7.8.0.arm64
ports:
- 8083:8083
depends_on:
Expand All @@ -101,7 +101,7 @@ services:
CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components,/usr/local/share/kafka/plugins,/usr/share/filestream-connectors"

ksqldb0:
image: confluentinc/cp-ksqldb-server:7.6.0.arm64
image: confluentinc/cp-ksqldb-server:7.8.0.arm64
depends_on:
- kafka0
- kafka-connect0
Expand All @@ -119,7 +119,7 @@ services:
KSQL_CACHE_MAX_BYTES_BUFFERING: 0

kafka-init-topics:
image: confluentinc/cp-kafka:7.6.0.arm64
image: confluentinc/cp-kafka:7.8.0.arm64
volumes:
- ../documentation/compose/data/message.json:/data/message.json
depends_on:
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,4 @@ build/
*.tgz

/docker/*.override.yaml
/e2e-tests/allure-results/
5 changes: 4 additions & 1 deletion api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,10 @@
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka-clients.version}</version>
<!-- ccs stands for Confluent Community Edition
See https://www.confluent.io/confluent-community-license-faq/
-->
<version>${confluent.version}-ccs</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Objects;
import javax.annotation.Nullable;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.http.ResponseEntity;
import org.springframework.util.unit.DataSize;
import org.springframework.web.client.RestClientException;
Expand Down Expand Up @@ -51,14 +52,36 @@ private static Retry conflictCodeRetry() {
(WebClientResponseException.Conflict) signal.failure()));
}

private static <T> Mono<T> withRetryOnConflict(Mono<T> publisher) {
return publisher.retryWhen(conflictCodeRetry());
private static @NotNull Retry retryOnRebalance() {
return Retry.fixedDelay(MAX_RETRIES, RETRIES_DELAY).filter(e -> {

if (e instanceof WebClientResponseException.InternalServerError exception) {
final var errorMessage = getMessage(exception);
return StringUtils.equals(errorMessage,
// From https://github.com/apache/kafka/blob/dfc07e0e0c6e737a56a5402644265f634402b864/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L2340
"Request cannot be completed because a rebalance is expected");
}
return false;
});
}

private static <T> Mono<T> withRetryOnConflictOrRebalance(Mono<T> publisher) {
return publisher
.retryWhen(retryOnRebalance())
.retryWhen(conflictCodeRetry());
}

private static <T> Flux<T> withRetryOnConflictOrRebalance(Flux<T> publisher) {
return publisher
.retryWhen(retryOnRebalance())
.retryWhen(conflictCodeRetry());
}

private static <T> Flux<T> withRetryOnConflict(Flux<T> publisher) {
return publisher.retryWhen(conflictCodeRetry());
private static <T> Mono<T> withRetryOnRebalance(Mono<T> publisher) {
return publisher.retryWhen(retryOnRebalance());
}


private static <T> Mono<T> withBadRequestErrorHandling(Mono<T> publisher) {
return publisher
.onErrorResume(WebClientResponseException.BadRequest.class,
Expand All @@ -73,197 +96,200 @@ private record ErrorMessage(@NotNull @JsonProperty("message") String message) {
}

private static <T> @NotNull Mono<T> parseConnectErrorMessage(WebClientResponseException parseException) {
return Mono.error(new ValidationException(getMessage(parseException)));
}

private static String getMessage(WebClientResponseException parseException) {
final var errorMessage = parseException.getResponseBodyAs(ErrorMessage.class);
return Mono.error(new ValidationException(
Objects.requireNonNull(errorMessage,
// see https://github.com/apache/kafka/blob/a0a501952b6d61f6f273bdb8f842346b51e9dfce/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/errors/ConnectExceptionMapper.java
"This should not happen according to the ConnectExceptionMapper")
.message()));
return Objects.requireNonNull(errorMessage,
// see https://github.com/apache/kafka/blob/a0a501952b6d61f6f273bdb8f842346b51e9dfce/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/errors/ConnectExceptionMapper.java
"This should not happen according to the ConnectExceptionMapper")
.message();
}

@Override
public Mono<Connector> createConnector(NewConnector newConnector) throws RestClientException {
return withBadRequestErrorHandling(
super.createConnector(newConnector)
withRetryOnRebalance(super.createConnector(newConnector))
);
}

@Override
public Mono<Connector> setConnectorConfig(String connectorName, Map<String, Object> requestBody)
throws RestClientException {
return withBadRequestErrorHandling(
super.setConnectorConfig(connectorName, requestBody)
withRetryOnRebalance(super.setConnectorConfig(connectorName, requestBody))
);
}

@Override
public Mono<ResponseEntity<Connector>> createConnectorWithHttpInfo(NewConnector newConnector)
throws WebClientResponseException {
return withRetryOnConflict(super.createConnectorWithHttpInfo(newConnector));
return withRetryOnConflictOrRebalance(super.createConnectorWithHttpInfo(newConnector));
}

@Override
public Mono<Void> deleteConnector(String connectorName) throws WebClientResponseException {
return withRetryOnConflict(super.deleteConnector(connectorName));
return withRetryOnConflictOrRebalance(super.deleteConnector(connectorName));
}

@Override
public Mono<ResponseEntity<Void>> deleteConnectorWithHttpInfo(String connectorName)
throws WebClientResponseException {
return withRetryOnConflict(super.deleteConnectorWithHttpInfo(connectorName));
return withRetryOnConflictOrRebalance(super.deleteConnectorWithHttpInfo(connectorName));
}


@Override
public Mono<Connector> getConnector(String connectorName) throws WebClientResponseException {
return withRetryOnConflict(super.getConnector(connectorName));
return withRetryOnConflictOrRebalance(super.getConnector(connectorName));
}

@Override
public Mono<ResponseEntity<Connector>> getConnectorWithHttpInfo(String connectorName)
throws WebClientResponseException {
return withRetryOnConflict(super.getConnectorWithHttpInfo(connectorName));
return withRetryOnConflictOrRebalance(super.getConnectorWithHttpInfo(connectorName));
}

@Override
public Mono<Map<String, Object>> getConnectorConfig(String connectorName) throws WebClientResponseException {
return withRetryOnConflict(super.getConnectorConfig(connectorName));
return withRetryOnConflictOrRebalance(super.getConnectorConfig(connectorName));
}

@Override
public Mono<ResponseEntity<Map<String, Object>>> getConnectorConfigWithHttpInfo(String connectorName)
throws WebClientResponseException {
return withRetryOnConflict(super.getConnectorConfigWithHttpInfo(connectorName));
return withRetryOnConflictOrRebalance(super.getConnectorConfigWithHttpInfo(connectorName));
}

@Override
public Flux<ConnectorPlugin> getConnectorPlugins() throws WebClientResponseException {
return withRetryOnConflict(super.getConnectorPlugins());
return withRetryOnConflictOrRebalance(super.getConnectorPlugins());
}

@Override
public Mono<ResponseEntity<List<ConnectorPlugin>>> getConnectorPluginsWithHttpInfo()
throws WebClientResponseException {
return withRetryOnConflict(super.getConnectorPluginsWithHttpInfo());
return withRetryOnConflictOrRebalance(super.getConnectorPluginsWithHttpInfo());
}

@Override
public Mono<ConnectorStatus> getConnectorStatus(String connectorName) throws WebClientResponseException {
return withRetryOnConflict(super.getConnectorStatus(connectorName));
return withRetryOnConflictOrRebalance(super.getConnectorStatus(connectorName));
}

@Override
public Mono<ResponseEntity<ConnectorStatus>> getConnectorStatusWithHttpInfo(String connectorName)
throws WebClientResponseException {
return withRetryOnConflict(super.getConnectorStatusWithHttpInfo(connectorName));
return withRetryOnConflictOrRebalance(super.getConnectorStatusWithHttpInfo(connectorName));
}

@Override
public Mono<TaskStatus> getConnectorTaskStatus(String connectorName, Integer taskId)
throws WebClientResponseException {
return withRetryOnConflict(super.getConnectorTaskStatus(connectorName, taskId));
return withRetryOnConflictOrRebalance(super.getConnectorTaskStatus(connectorName, taskId));
}

@Override
public Mono<ResponseEntity<TaskStatus>> getConnectorTaskStatusWithHttpInfo(String connectorName, Integer taskId)
throws WebClientResponseException {
return withRetryOnConflict(super.getConnectorTaskStatusWithHttpInfo(connectorName, taskId));
return withRetryOnConflictOrRebalance(super.getConnectorTaskStatusWithHttpInfo(connectorName, taskId));
}

@Override
public Flux<ConnectorTask> getConnectorTasks(String connectorName) throws WebClientResponseException {
return withRetryOnConflict(super.getConnectorTasks(connectorName));
return withRetryOnConflictOrRebalance(super.getConnectorTasks(connectorName));
}

@Override
public Mono<ResponseEntity<List<ConnectorTask>>> getConnectorTasksWithHttpInfo(String connectorName)
throws WebClientResponseException {
return withRetryOnConflict(super.getConnectorTasksWithHttpInfo(connectorName));
return withRetryOnConflictOrRebalance(super.getConnectorTasksWithHttpInfo(connectorName));
}

@Override
public Mono<Map<String, ConnectorTopics>> getConnectorTopics(String connectorName) throws WebClientResponseException {
return withRetryOnConflict(super.getConnectorTopics(connectorName));
return withRetryOnConflictOrRebalance(super.getConnectorTopics(connectorName));
}

@Override
public Mono<ResponseEntity<Map<String, ConnectorTopics>>> getConnectorTopicsWithHttpInfo(String connectorName)
throws WebClientResponseException {
return withRetryOnConflict(super.getConnectorTopicsWithHttpInfo(connectorName));
return withRetryOnConflictOrRebalance(super.getConnectorTopicsWithHttpInfo(connectorName));
}

@Override
public Mono<List<String>> getConnectors(String search) throws WebClientResponseException {
return withRetryOnConflict(super.getConnectors(search));
return withRetryOnConflictOrRebalance(super.getConnectors(search));
}

@Override
public Mono<ResponseEntity<List<String>>> getConnectorsWithHttpInfo(String search) throws WebClientResponseException {
return withRetryOnConflict(super.getConnectorsWithHttpInfo(search));
return withRetryOnConflictOrRebalance(super.getConnectorsWithHttpInfo(search));
}

@Override
public Mono<Void> pauseConnector(String connectorName) throws WebClientResponseException {
return withRetryOnConflict(super.pauseConnector(connectorName));
return withRetryOnConflictOrRebalance(super.pauseConnector(connectorName));
}

@Override
public Mono<ResponseEntity<Void>> pauseConnectorWithHttpInfo(String connectorName) throws WebClientResponseException {
return withRetryOnConflict(super.pauseConnectorWithHttpInfo(connectorName));
return withRetryOnConflictOrRebalance(super.pauseConnectorWithHttpInfo(connectorName));
}

@Override
public Mono<Void> restartConnector(String connectorName, Boolean includeTasks, Boolean onlyFailed)
throws WebClientResponseException {
return withRetryOnConflict(super.restartConnector(connectorName, includeTasks, onlyFailed));
return withRetryOnConflictOrRebalance(super.restartConnector(connectorName, includeTasks, onlyFailed));
}

@Override
public Mono<ResponseEntity<Void>> restartConnectorWithHttpInfo(String connectorName, Boolean includeTasks,
Boolean onlyFailed) throws WebClientResponseException {
return withRetryOnConflict(super.restartConnectorWithHttpInfo(connectorName, includeTasks, onlyFailed));
return withRetryOnConflictOrRebalance(super.restartConnectorWithHttpInfo(connectorName, includeTasks, onlyFailed));
}

@Override
public Mono<Void> restartConnectorTask(String connectorName, Integer taskId) throws WebClientResponseException {
return withRetryOnConflict(super.restartConnectorTask(connectorName, taskId));
return withRetryOnConflictOrRebalance(super.restartConnectorTask(connectorName, taskId));
}

@Override
public Mono<ResponseEntity<Void>> restartConnectorTaskWithHttpInfo(String connectorName, Integer taskId)
throws WebClientResponseException {
return withRetryOnConflict(super.restartConnectorTaskWithHttpInfo(connectorName, taskId));
return withRetryOnConflictOrRebalance(super.restartConnectorTaskWithHttpInfo(connectorName, taskId));
}

@Override
public Mono<Void> resumeConnector(String connectorName) throws WebClientResponseException {
return super.resumeConnector(connectorName);
return withRetryOnRebalance(super.resumeConnector(connectorName));
}

@Override
public Mono<ResponseEntity<Void>> resumeConnectorWithHttpInfo(String connectorName)
throws WebClientResponseException {
return withRetryOnConflict(super.resumeConnectorWithHttpInfo(connectorName));
return withRetryOnConflictOrRebalance(super.resumeConnectorWithHttpInfo(connectorName));
}

@Override
public Mono<ResponseEntity<Connector>> setConnectorConfigWithHttpInfo(String connectorName,
Map<String, Object> requestBody)
throws WebClientResponseException {
return withRetryOnConflict(super.setConnectorConfigWithHttpInfo(connectorName, requestBody));
return withRetryOnConflictOrRebalance(super.setConnectorConfigWithHttpInfo(connectorName, requestBody));
}

@Override
public Mono<ConnectorPluginConfigValidationResponse> validateConnectorPluginConfig(String pluginName,
Map<String, Object> requestBody)
throws WebClientResponseException {
return withRetryOnConflict(super.validateConnectorPluginConfig(pluginName, requestBody));
return withRetryOnConflictOrRebalance(super.validateConnectorPluginConfig(pluginName, requestBody));
}

@Override
public Mono<ResponseEntity<ConnectorPluginConfigValidationResponse>> validateConnectorPluginConfigWithHttpInfo(
String pluginName, Map<String, Object> requestBody) throws WebClientResponseException {
return withRetryOnConflict(super.validateConnectorPluginConfigWithHttpInfo(pluginName, requestBody));
return withRetryOnConflictOrRebalance(super.validateConnectorPluginConfigWithHttpInfo(pluginName, requestBody));
}

private static class RetryingApiClient extends ApiClient {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public abstract class AbstractIntegrationTest {
private static final boolean IS_ARM =
System.getProperty("os.arch").contains("arm") || System.getProperty("os.arch").contains("aarch64");

private static final String CONFLUENT_PLATFORM_VERSION = IS_ARM ? "7.2.1.arm64" : "7.2.1";
private static final String CONFLUENT_PLATFORM_VERSION = IS_ARM ? "7.8.0.arm64" : "7.8.0";

public static final KafkaContainer kafka = new KafkaContainer(
DockerImageName.parse("confluentinc/cp-kafka").withTag(CONFLUENT_PLATFORM_VERSION))
Expand Down
Loading
Loading