Skip to content

Commit b56578e

Browse files
make exclusive containers first class citizens
1 parent 5727127 commit b56578e

File tree

11 files changed

+93
-89
lines changed

11 files changed

+93
-89
lines changed

airbyte-cdk/java/airbyte-cdk/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,7 @@ MavenLocal debugging steps:
166166

167167
| Version | Date | Pull Request | Subject |
168168
|:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
169+
| 0.16.5 | 2024-02-05 | [\#34892](https://github.com/airbytehq/airbyte/pull/34892) | Improved testcontainers logging and support for unshared containers. |
169170
| 0.16.4 | 2024-02-01 | [\#34727](https://github.com/airbytehq/airbyte/pull/34727) | Add future based stdout consumer in BaseTypingDedupingTest |
170171
| 0.16.3 | 2024-01-30 | [\#34669](https://github.com/airbytehq/airbyte/pull/34669) | Fix org.apache.logging.log4j:log4j-slf4j-impl version conflicts. |
171172
| 0.16.2 | 2024-01-29 | [\#34630](https://github.com/airbytehq/airbyte/pull/34630) | expose NamingTransformer to sub-classes in destinations JdbcSqlGenerator. |
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
version=0.16.4
1+
version=0.16.5

airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/java/io/airbyte/cdk/testutils/ContainerFactory.java

Lines changed: 80 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,14 @@
88
import java.lang.reflect.Method;
99
import java.util.ArrayList;
1010
import java.util.Arrays;
11+
import java.util.Collections;
1112
import java.util.List;
1213
import java.util.concurrent.ConcurrentHashMap;
13-
import java.util.stream.Collectors;
14-
import java.util.stream.Stream;
14+
import java.util.concurrent.ConcurrentMap;
15+
import java.util.function.Supplier;
1516
import org.slf4j.Logger;
1617
import org.slf4j.LoggerFactory;
18+
import org.testcontainers.containers.GenericContainer;
1719
import org.testcontainers.containers.JdbcDatabaseContainer;
1820
import org.testcontainers.containers.output.Slf4jLogConsumer;
1921
import org.testcontainers.utility.DockerImageName;
@@ -22,97 +24,106 @@
2224
* ContainerFactory is the companion interface to {@link TestDatabase} for providing it with
2325
* suitable testcontainer instances.
2426
*/
25-
public interface ContainerFactory<C extends JdbcDatabaseContainer<?>> {
27+
public abstract class ContainerFactory<C extends JdbcDatabaseContainer<?>> {
2628

27-
/**
28-
* Creates a new, unshared testcontainer instance. This usually wraps the default constructor for
29-
* the testcontainer type.
30-
*/
31-
C createNewContainer(DockerImageName imageName);
29+
static private final Logger LOGGER = LoggerFactory.getLogger(ContainerFactory.class);
3230

33-
/**
34-
* Returns the class object of the testcontainer.
35-
*/
36-
Class<?> getContainerClass();
31+
private record ContainerKey(Class<? extends ContainerFactory> clazz, DockerImageName imageName, List<String> methods) {};
3732

38-
/**
39-
* Returns a shared instance of the testcontainer.
40-
*/
41-
default C shared(String imageName, String... methods) {
42-
final String mapKey = Stream.concat(
43-
Stream.of(imageName, this.getClass().getCanonicalName()),
44-
Stream.of(methods))
45-
.collect(Collectors.joining("+"));
46-
return Singleton.getOrCreate(mapKey, this);
47-
}
33+
private static class ContainerOrException {
4834

49-
/**
50-
* This class is exclusively used by {@link #shared(String, String...)}. It wraps a specific shared
51-
* testcontainer instance, which is created exactly once.
52-
*/
53-
class Singleton<C extends JdbcDatabaseContainer<?>> {
54-
55-
static private final Logger LOGGER = LoggerFactory.getLogger(Singleton.class);
56-
static private final ConcurrentHashMap<String, Singleton<?>> LAZY = new ConcurrentHashMap<>();
35+
private final Supplier<GenericContainer<?>> containerSupplier;
36+
private volatile RuntimeException _exception = null;
37+
private volatile GenericContainer<?> _container = null;
5738

58-
@SuppressWarnings("unchecked")
59-
static private <C extends JdbcDatabaseContainer<?>> C getOrCreate(String mapKey, ContainerFactory<C> factory) {
60-
final Singleton<?> singleton = LAZY.computeIfAbsent(mapKey, Singleton<C>::new);
61-
return ((Singleton<C>) singleton).getOrCreate(factory);
39+
ContainerOrException(Supplier<GenericContainer<?>> containerSupplier) {
40+
this.containerSupplier = containerSupplier;
6241
}
6342

64-
final private String imageName;
65-
final private List<String> methodNames;
66-
67-
private C sharedContainer;
68-
private RuntimeException containerCreationError;
69-
70-
private Singleton(String imageNamePlusMethods) {
71-
final String[] parts = imageNamePlusMethods.split("\\+");
72-
this.imageName = parts[0];
73-
this.methodNames = Arrays.stream(parts).skip(2).toList();
43+
private void populate() {
44+
synchronized (this) {
45+
if (_container == null && _exception == null) {
46+
try {
47+
_container = containerSupplier.get();
48+
} catch (RuntimeException e) {
49+
_exception = e;
50+
}
51+
}
52+
}
7453
}
7554

76-
private synchronized C getOrCreate(ContainerFactory<C> factory) {
77-
if (sharedContainer == null && containerCreationError == null) {
78-
try {
79-
create(imageName, factory, methodNames);
80-
} catch (RuntimeException e) {
81-
sharedContainer = null;
82-
containerCreationError = e;
83-
}
55+
RuntimeException exception() {
56+
if (_exception == null && _container == null) {
57+
populate();
8458
}
85-
if (containerCreationError != null) {
86-
throw new RuntimeException(
87-
"Error during container creation for imageName=" + imageName
88-
+ ", factory=" + factory.getClass().getName()
89-
+ ", methods=" + methodNames,
90-
containerCreationError);
59+
return _exception;
60+
}
61+
62+
GenericContainer<?> container() {
63+
if (_exception == null && _container == null) {
64+
populate();
9165
}
92-
return sharedContainer;
66+
return _container;
9367
}
9468

95-
private void create(String imageName, ContainerFactory<C> factory, List<String> methodNames) {
69+
}
70+
71+
private static final ConcurrentMap<ContainerKey, ContainerOrException> SHARED_CONTAINERS = new ConcurrentHashMap<>();
72+
73+
/**
74+
* Creates a new, unshared testcontainer instance. This usually wraps the default constructor for
75+
* the testcontainer type.
76+
*/
77+
protected abstract C createNewContainer(DockerImageName imageName);
78+
79+
/**
80+
* Returns a shared instance of the testcontainer.
81+
*/
82+
public final C shared(String imageName, String... methods) {
83+
List<String> methodList = methods == null ? Collections.emptyList() : Arrays.asList(methods);
84+
DockerImageName dockerImageName = DockerImageName.parse(imageName);
85+
final ContainerKey containerKey = new ContainerKey(getClass(), dockerImageName, methodList);
86+
ContainerOrException containerOrError = SHARED_CONTAINERS.computeIfAbsent(containerKey, this::createContainerOrError);
87+
if (containerOrError.exception() != null) {
88+
throw containerOrError.exception();
89+
}
90+
return (C) containerOrError.container();
91+
}
92+
93+
public final C exclusive(String imageName, String... methods) {
94+
DockerImageName dockerImageName = DockerImageName.parse(imageName);
95+
List<String> methodList = methods == null ? Collections.emptyList() : Arrays.asList(methods);
96+
return (C) createContainerSupplier(dockerImageName, methodList).get();
97+
}
98+
99+
private ContainerOrException createContainerOrError(ContainerKey containerKey) {
100+
DockerImageName imageName = containerKey.imageName();
101+
List<String> methodNames = containerKey.methods();
102+
return new ContainerOrException(createContainerSupplier(imageName, methodNames));
103+
}
104+
105+
private Supplier<GenericContainer<?>> createContainerSupplier(DockerImageName imageName, List<String> methodNames) {
106+
return () -> {
96107
LOGGER.info("Creating new shared container based on {} with {}.", imageName, methodNames);
97108
try {
98-
final var parsed = DockerImageName.parse(imageName);
109+
GenericContainer container = createNewContainer(imageName);
110+
99111
final var methods = new ArrayList<Method>();
100112
for (String methodName : methodNames) {
101-
methods.add(factory.getClass().getMethod(methodName, factory.getContainerClass()));
113+
methods.add(getClass().getMethod(methodName, container.getClass()));
102114
}
103-
sharedContainer = factory.createNewContainer(parsed);
104-
sharedContainer.withLogConsumer(new Slf4jLogConsumer(LOGGER));
115+
container.withLogConsumer(new Slf4jLogConsumer(LOGGER));
105116
for (Method method : methods) {
106117
LOGGER.info("Calling {} in {} on new shared container based on {}.",
107-
method.getName(), factory.getClass().getName(), imageName);
108-
method.invoke(factory, sharedContainer);
118+
method.getName(), getClass().getName(), imageName);
119+
method.invoke(this, container);
109120
}
110-
sharedContainer.start();
121+
container.start();
122+
return container;
111123
} catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {
112124
throw new RuntimeException(e);
113125
}
114-
}
115-
126+
};
116127
}
117128

118129
}

airbyte-integrations/connectors/source-mssql/build.gradle

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,9 @@ plugins {
44
}
55

66
airbyteJavaConnector {
7-
cdkVersionRequired = '0.16.0'
7+
cdkVersionRequired = '0.16.5'
88
features = ['db-sources']
9-
useLocalCdk = false
9+
useLocalCdk = true
1010
}
1111

1212
configurations.all {

airbyte-integrations/connectors/source-mssql/metadata.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ data:
99
connectorSubtype: database
1010
connectorType: source
1111
definitionId: b5ea17b1-f170-46dc-bc31-cc744ca984c1
12-
dockerImageTag: 3.6.1
12+
dockerImageTag: 3.6.2
1313
dockerRepository: airbyte/source-mssql
1414
documentationUrl: https://docs.airbyte.com/integrations/sources/mssql
1515
githubIssueLabel: source-mssql

airbyte-integrations/connectors/source-mssql/src/testFixtures/java/io/airbyte/integrations/source/mssql/MsSQLContainerFactory.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
import org.testcontainers.containers.Network;
1111
import org.testcontainers.utility.DockerImageName;
1212

13-
public class MsSQLContainerFactory implements ContainerFactory<MSSQLServerContainer<?>> {
13+
public class MsSQLContainerFactory extends ContainerFactory<MSSQLServerContainer<?>> {
1414

1515
@Override
1616
public MSSQLServerContainer<?> createNewContainer(DockerImageName imageName) {
@@ -20,11 +20,6 @@ public MSSQLServerContainer<?> createNewContainer(DockerImageName imageName) {
2020
return container;
2121
}
2222

23-
@Override
24-
public Class<?> getContainerClass() {
25-
return MSSQLServerContainer.class;
26-
}
27-
2823
/**
2924
* Create a new network and bind it to the container.
3025
*/

airbyte-integrations/connectors/source-postgres/build.gradle

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,9 @@ java {
1313
}
1414

1515
airbyteJavaConnector {
16-
cdkVersionRequired = '0.16.3'
16+
cdkVersionRequired = '0.16.5'
1717
features = ['db-sources']
18-
useLocalCdk = false
18+
useLocalCdk = true
1919
}
2020

2121

airbyte-integrations/connectors/source-postgres/metadata.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ data:
99
connectorSubtype: database
1010
connectorType: source
1111
definitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
12-
dockerImageTag: 3.3.4
12+
dockerImageTag: 3.3.5
1313
dockerRepository: airbyte/source-postgres
1414
documentationUrl: https://docs.airbyte.com/integrations/sources/postgres
1515
githubIssueLabel: source-postgres

airbyte-integrations/connectors/source-postgres/src/testFixtures/java/io/airbyte/integrations/source/postgres/PostgresContainerFactory.java

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,19 +12,14 @@
1212
import org.testcontainers.utility.DockerImageName;
1313
import org.testcontainers.utility.MountableFile;
1414

15-
public class PostgresContainerFactory implements ContainerFactory<PostgreSQLContainer<?>> {
15+
public class PostgresContainerFactory extends ContainerFactory<PostgreSQLContainer<?>> {
1616

1717
@Override
18-
public PostgreSQLContainer<?> createNewContainer(DockerImageName imageName) {
18+
protected PostgreSQLContainer<?> createNewContainer(DockerImageName imageName) {
1919
return new PostgreSQLContainer<>(imageName.asCompatibleSubstituteFor("postgres"));
2020

2121
}
2222

23-
@Override
24-
public Class<?> getContainerClass() {
25-
return PostgreSQLContainer.class;
26-
}
27-
2823
/**
2924
* Apply the postgresql.conf file that we've packaged as a resource.
3025
*/

docs/integrations/sources/mssql.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -342,6 +342,7 @@ WHERE actor_definition_id ='b5ea17b1-f170-46dc-bc31-cc744ca984c1' AND (configura
342342

343343
| Version | Date | Pull Request | Subject |
344344
|:--------|:-----------|:------------------------------------------------------------------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------|
345+
| 3.6.2 | 2024-02-06 | [34892](https://github.com/airbytehq/airbyte/pull/34892) | Adopt CDK v0.16.5. |
345346
| 3.6.1 | 2024-01-26 | [34573](https://github.com/airbytehq/airbyte/pull/34573) | Adopt CDK v0.16.0. |
346347
| 3.6.0 | 2024-01-10 | [33700](https://github.com/airbytehq/airbyte/pull/33700) | Remove CDC config options for data_to_sync and snapshot isolation. |
347348
| 3.5.1 | 2024-01-05 | [33510](https://github.com/airbytehq/airbyte/pull/33510) | Test-only changes. |

0 commit comments

Comments
 (0)