- Notifications
You must be signed in to change notification settings - Fork 25.6k
Support per-project s3 clients #127631
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support per-project s3 clients #127631
Changes from all commits
33dc439 a8dc278 a318fc8 c3b36e7 fdc9c1d acf6da4 0f1bdd5 9bc5d09 d665303 55c697c 833e085 4d7bd69 6ce0828 616a770 39fa184 922327b f4537ef cf7cf7f 208a540 ab5f911 dc14ef7 ff631de 3962106 6cc05c6 f17e3e9 ef5631b 9066af3 4048064 38ee3b0 e1862dd 9ab977b d3c0dbb File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| | @@ -37,17 +37,19 @@ | |
| import org.apache.http.conn.DnsResolver; | ||
| import org.apache.logging.log4j.LogManager; | ||
| import org.apache.logging.log4j.Logger; | ||
| import org.apache.lucene.store.AlreadyClosedException; | ||
| import org.elasticsearch.ElasticsearchException; | ||
| import org.elasticsearch.cluster.coordination.stateless.StoreHeartbeatService; | ||
| import org.elasticsearch.cluster.metadata.ProjectId; | ||
| import org.elasticsearch.cluster.metadata.RepositoryMetadata; | ||
| import org.elasticsearch.cluster.node.DiscoveryNode; | ||
| import org.elasticsearch.cluster.project.ProjectResolver; | ||
| import org.elasticsearch.cluster.service.ClusterService; | ||
| import org.elasticsearch.common.Strings; | ||
| import org.elasticsearch.common.component.AbstractLifecycleComponent; | ||
| import org.elasticsearch.common.settings.Setting; | ||
| import org.elasticsearch.common.settings.Settings; | ||
| import org.elasticsearch.common.util.Maps; | ||
| import org.elasticsearch.common.util.concurrent.RunOnce; | ||
| import org.elasticsearch.core.FixForMultiProject; | ||
| import org.elasticsearch.core.Nullable; | ||
| import org.elasticsearch.core.Releasable; | ||
| import org.elasticsearch.core.Releasables; | ||
| | @@ -72,7 +74,6 @@ | |
| import java.util.function.Consumer; | ||
| import java.util.function.Supplier; | ||
| | ||
| import static java.util.Collections.emptyMap; | ||
| import static software.amazon.awssdk.core.SdkSystemSetting.AWS_ROLE_ARN; | ||
| import static software.amazon.awssdk.core.SdkSystemSetting.AWS_ROLE_SESSION_NAME; | ||
| import static software.amazon.awssdk.core.SdkSystemSetting.AWS_WEB_IDENTITY_TOKEN_FILE; | ||
| | @@ -93,21 +94,6 @@ class S3Service extends AbstractLifecycleComponent { | |
| TimeValue.timeValueHours(24), | ||
| Setting.Property.NodeScope | ||
| ); | ||
| private volatile Map<S3ClientSettings, AmazonS3Reference> clientsCache = emptyMap(); | ||
| | ||
| /** | ||
| * Client settings calculated from static configuration and settings in the keystore. | ||
| */ | ||
| private volatile Map<String, S3ClientSettings> staticClientSettings = Map.of( | ||
| "default", | ||
| S3ClientSettings.getClientSettings(Settings.EMPTY, "default") | ||
| ); | ||
| | ||
| /** | ||
| * Client settings derived from those in {@link #staticClientSettings} by combining them with settings | ||
| * in the {@link RepositoryMetadata}. | ||
| */ | ||
| private volatile Map<Settings, S3ClientSettings> derivedClientSettings = emptyMap(); | ||
| | ||
| private final Runnable defaultRegionSetter; | ||
| private volatile Region defaultRegion; | ||
| | @@ -124,13 +110,16 @@ class S3Service extends AbstractLifecycleComponent { | |
| final TimeValue compareAndExchangeTimeToLive; | ||
| final TimeValue compareAndExchangeAntiContentionDelay; | ||
| final boolean isStateless; | ||
| private final S3ClientsManager s3ClientsManager; | ||
| | ||
| S3Service( | ||
| Environment environment, | ||
| Settings nodeSettings, | ||
| ClusterService clusterService, | ||
| ProjectResolver projectResolver, | ||
| ResourceWatcherService resourceWatcherService, | ||
| Supplier<Region> defaultRegionSupplier | ||
| ) { | ||
| final Settings nodeSettings = clusterService.getSettings(); | ||
| webIdentityTokenCredentialsProvider = new CustomWebIdentityTokenCredentialsProvider( | ||
| environment, | ||
| System::getenv, | ||
| | @@ -142,6 +131,20 @@ class S3Service extends AbstractLifecycleComponent { | |
| compareAndExchangeAntiContentionDelay = REPOSITORY_S3_CAS_ANTI_CONTENTION_DELAY_SETTING.get(nodeSettings); | ||
| isStateless = DiscoveryNode.isStateless(nodeSettings); | ||
| defaultRegionSetter = new RunOnce(() -> defaultRegion = defaultRegionSupplier.get()); | ||
| s3ClientsManager = new S3ClientsManager( | ||
| nodeSettings, | ||
| this::buildClientReference, | ||
| clusterService.threadPool().generic(), | ||
| projectResolver.supportsMultipleProjects() | ||
| ); | ||
| if (projectResolver.supportsMultipleProjects()) { | ||
| clusterService.addHighPriorityApplier(s3ClientsManager); | ||
| } | ||
| } | ||
| | ||
| // visible to tests | ||
| S3ClientsManager getS3ClientsManager() { | ||
| return s3ClientsManager; | ||
| } | ||
| | ||
| /** | ||
| | @@ -151,86 +154,55 @@ class S3Service extends AbstractLifecycleComponent { | |
| * of being returned to the cache. | ||
| */ | ||
| public synchronized void refreshAndClearCache(Map<String, S3ClientSettings> clientsSettings) { | ||
| // shutdown all unused clients | ||
| // others will shutdown on their respective release | ||
| releaseCachedClients(); | ||
| this.staticClientSettings = Maps.ofEntries(clientsSettings.entrySet()); | ||
| derivedClientSettings = emptyMap(); | ||
| assert this.staticClientSettings.containsKey("default") : "always at least have 'default'"; | ||
| /* clients are built lazily by {@link #client} */ | ||
| s3ClientsManager.refreshAndClearCacheForClusterClients(clientsSettings); | ||
| } | ||
| | ||
| /** | ||
| * Attempts to retrieve a client by its repository metadata and settings from the cache. | ||
| * If the client does not exist it will be created. | ||
| */ | ||
| @FixForMultiProject(description = "can be removed once blobstore is project aware") | ||
| public AmazonS3Reference client(RepositoryMetadata repositoryMetadata) { | ||
| final S3ClientSettings clientSettings = settings(repositoryMetadata); | ||
| { | ||
| final AmazonS3Reference clientReference = clientsCache.get(clientSettings); | ||
| if (clientReference != null && clientReference.tryIncRef()) { | ||
| return clientReference; | ||
| } | ||
| } | ||
| synchronized (this) { | ||
| final AmazonS3Reference existing = clientsCache.get(clientSettings); | ||
| if (existing != null && existing.tryIncRef()) { | ||
| return existing; | ||
| } | ||
| | ||
| if (lifecycle.started() == false) { | ||
| // doClose() calls releaseCachedClients() which is also synchronized (this) so if we're STARTED here then the client we | ||
| // create will definitely not leak on close. | ||
| throw new AlreadyClosedException("S3Service is in state [" + lifecycle + "]"); | ||
| } | ||
| return client(ProjectId.DEFAULT, repositoryMetadata); | ||
| } | ||
| | ||
| final SdkHttpClient httpClient = buildHttpClient(clientSettings, getCustomDnsResolver()); | ||
| Releasable toRelease = httpClient::close; | ||
| try { | ||
| final AmazonS3Reference clientReference = new AmazonS3Reference(buildClient(clientSettings, httpClient), httpClient); | ||
| clientReference.mustIncRef(); | ||
| clientsCache = Maps.copyMapWithAddedEntry(clientsCache, clientSettings, clientReference); | ||
| toRelease = null; | ||
| return clientReference; | ||
| } finally { | ||
| Releasables.close(toRelease); | ||
| } | ||
| } | ||
| /** | ||
| * Attempts to retrieve either a cluster or project client from the client manager. Throws if project-id or | ||
| * the client name does not exist. The client maybe initialized lazily. | ||
| * @param projectId The project associated with the client, or null if the client is cluster level | ||
| */ | ||
| public AmazonS3Reference client(@Nullable ProjectId projectId, RepositoryMetadata repositoryMetadata) { | ||
| return s3ClientsManager.client(effectiveProjectId(projectId), repositoryMetadata); | ||
| } | ||
| | ||
| /** | ||
| * Either fetches {@link S3ClientSettings} for a given {@link RepositoryMetadata} from cached settings or creates them | ||
| * by overriding static client settings from {@link #staticClientSettings} with settings found in the repository metadata. | ||
| * @param repositoryMetadata Repository Metadata | ||
| * @return S3ClientSettings | ||
| * We use the default project-id for cluster level clients. | ||
| */ | ||
| S3ClientSettings settings(RepositoryMetadata repositoryMetadata) { | ||
| final Settings settings = repositoryMetadata.settings(); | ||
| { | ||
| final S3ClientSettings existing = derivedClientSettings.get(settings); | ||
| if (existing != null) { | ||
| return existing; | ||
| } | ||
| } | ||
| final String clientName = S3Repository.CLIENT_NAME.get(settings); | ||
| final S3ClientSettings staticSettings = staticClientSettings.get(clientName); | ||
| if (staticSettings != null) { | ||
| synchronized (this) { | ||
| final S3ClientSettings existing = derivedClientSettings.get(settings); | ||
| if (existing != null) { | ||
| return existing; | ||
| } | ||
| final S3ClientSettings newSettings = staticSettings.refine(settings); | ||
| derivedClientSettings = Maps.copyMapWithAddedOrReplacedEntry(derivedClientSettings, settings, newSettings); | ||
| return newSettings; | ||
| } | ||
| ProjectId effectiveProjectId(@Nullable ProjectId projectId) { | ||
| return projectId == null ? ProjectId.DEFAULT : projectId; | ||
| } | ||
| | ||
| // TODO: consider moving client building into S3ClientsManager | ||
| private AmazonS3Reference buildClientReference(final S3ClientSettings clientSettings) { | ||
| Contributor There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. AFAICT this method is only used as a method reference that's passed into Member Author There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This method calls into many other methods directly and indirectly. If we move all of them across, we basically no longer have a Also since this change will not be backported, I'd prefer to keep the change-set smaller so it does not cause trouble for any future backports. We can revisit the structure but outside of this PR in future. I can add a TODO for it. Does that sound OK? Contributor There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see, ok | ||
| final SdkHttpClient httpClient = buildHttpClient(clientSettings, getCustomDnsResolver()); | ||
| Releasable toRelease = httpClient::close; | ||
| try { | ||
| final AmazonS3Reference clientReference = new AmazonS3Reference(buildClient(clientSettings, httpClient), httpClient); | ||
| clientReference.mustIncRef(); | ||
| toRelease = null; | ||
| return clientReference; | ||
| } finally { | ||
| Releasables.close(toRelease); | ||
| } | ||
| throw new IllegalArgumentException( | ||
| "Unknown s3 client name [" | ||
| + clientName | ||
| + "]. Existing client configs: " | ||
| + Strings.collectionToDelimitedString(staticClientSettings.keySet(), ",") | ||
| ); | ||
| } | ||
| | ||
| @FixForMultiProject(description = "can be removed once blobstore is project aware") | ||
| S3ClientSettings settings(RepositoryMetadata repositoryMetadata) { | ||
| return settings(ProjectId.DEFAULT, repositoryMetadata); | ||
| } | ||
| | ||
| S3ClientSettings settings(@Nullable ProjectId projectId, RepositoryMetadata repositoryMetadata) { | ||
| return s3ClientsManager.settingsForClient(effectiveProjectId(projectId), repositoryMetadata); | ||
| } | ||
| | ||
| // proxy for testing | ||
| | @@ -448,18 +420,17 @@ static AwsCredentialsProvider buildCredentials( | |
| } | ||
| } | ||
| | ||
| private synchronized void releaseCachedClients() { | ||
| // the clients will shutdown when they will not be used anymore | ||
| for (final AmazonS3Reference clientReference : clientsCache.values()) { | ||
| clientReference.decRef(); | ||
| } | ||
| // clear previously cached clients, they will be build lazily | ||
| clientsCache = emptyMap(); | ||
| derivedClientSettings = emptyMap(); | ||
| @FixForMultiProject(description = "can be removed once blobstore is project aware") | ||
| public void onBlobStoreClose() { | ||
| onBlobStoreClose(ProjectId.DEFAULT); | ||
| } | ||
| | ||
| public void onBlobStoreClose() { | ||
| releaseCachedClients(); | ||
| /** | ||
| * Release clients for the specified project. | ||
| * @param projectId The project associated with the client, or null if the client is cluster level | ||
| */ | ||
| public void onBlobStoreClose(@Nullable ProjectId projectId) { | ||
| s3ClientsManager.releaseCachedClients(effectiveProjectId(projectId)); | ||
| } | ||
| | ||
| @Override | ||
| | @@ -472,7 +443,7 @@ protected void doStop() {} | |
| | ||
| @Override | ||
| public void doClose() throws IOException { | ||
| releaseCachedClients(); | ||
| s3ClientsManager.close(); | ||
| webIdentityTokenCredentialsProvider.close(); | ||
| } | ||
| | ||
| | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 this lifecycle check is now happening in
S3ClientsManager