- Notifications
You must be signed in to change notification settings - Fork 298
Improving the ThreadManager and TokenRefresher APIs #77
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 17 commits
a2ee38c 1cf4c3a 8234ca1 a8fb84e 913fda5 c218f82 a803a4e 458d0c2 b63b17a 050bfa4 f06b9b3 846c93c d3ea8e9 e06ae1d dfca1e7 b310b0a ae32b93 fc009ff a151749 65ef119 File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| | @@ -33,11 +33,14 @@ | |
| import com.google.common.collect.ImmutableList; | ||
| import com.google.common.io.BaseEncoding; | ||
| import com.google.firebase.internal.FirebaseAppStore; | ||
| import com.google.firebase.internal.FirebaseExecutors; | ||
| import com.google.firebase.internal.FirebaseService; | ||
| import com.google.firebase.internal.GaeThreadFactory; | ||
| import com.google.firebase.internal.NonNull; | ||
| import com.google.firebase.internal.Nullable; | ||
| | ||
| import com.google.firebase.internal.RevivingScheduledExecutor; | ||
| import com.google.firebase.tasks.Task; | ||
| import com.google.firebase.tasks.Tasks; | ||
| import java.io.IOException; | ||
| import java.util.ArrayList; | ||
| import java.util.Collections; | ||
| | @@ -47,9 +50,14 @@ | |
| import java.util.Map; | ||
| import java.util.Set; | ||
| import java.util.concurrent.Callable; | ||
| import java.util.concurrent.Future; | ||
| import java.util.concurrent.ScheduledExecutorService; | ||
| import java.util.concurrent.ScheduledFuture; | ||
| import java.util.concurrent.ThreadFactory; | ||
| import java.util.concurrent.TimeUnit; | ||
| import java.util.concurrent.atomic.AtomicBoolean; | ||
| import java.util.concurrent.atomic.AtomicInteger; | ||
| | ||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
| | ||
| | @@ -84,10 +92,14 @@ public class FirebaseApp { | |
| private final String name; | ||
| private final FirebaseOptions options; | ||
| private final TokenRefresher tokenRefresher; | ||
| private final ThreadManager threadManager; | ||
| private final ThreadManager.FirebaseExecutor executor; | ||
| | ||
| private final AtomicBoolean deleted = new AtomicBoolean(); | ||
| private final Map<String, FirebaseService> services = new HashMap<>(); | ||
| | ||
| private volatile ScheduledExecutorService scheduledExecutor; | ||
| | ||
| /** | ||
| * Per application lock for synchronizing all internal FirebaseApp state changes. | ||
| */ | ||
| | @@ -99,6 +111,8 @@ private FirebaseApp(String name, FirebaseOptions options, TokenRefresher.Factory | |
| this.name = name; | ||
| this.options = checkNotNull(options); | ||
| this.tokenRefresher = checkNotNull(factory).create(this); | ||
| this.threadManager = options.getThreadManager(); | ||
| this.executor = this.threadManager.getFirebaseExecutor(this); | ||
| } | ||
| | ||
| /** Returns a list of all FirebaseApps. */ | ||
| | @@ -242,7 +256,6 @@ private static String normalize(@NonNull String name) { | |
| /** Returns the unique name of this app. */ | ||
| @NonNull | ||
| public String getName() { | ||
| checkNotDeleted(); | ||
| return name; | ||
| } | ||
| | ||
| | @@ -315,7 +328,14 @@ public void delete() { | |
| service.destroy(); | ||
| } | ||
| services.clear(); | ||
| tokenRefresher.cleanup(); | ||
| tokenRefresher.stop(); | ||
| | ||
| // Clean up and terminate the thread pools | ||
| threadManager.releaseFirebaseExecutor(this, executor); | ||
| if (scheduledExecutor != null) { | ||
| scheduledExecutor.shutdownNow(); | ||
| scheduledExecutor = null; | ||
| } | ||
| } | ||
| | ||
| synchronized (appsLock) { | ||
| | @@ -332,6 +352,47 @@ private void checkNotDeleted() { | |
| checkState(!deleted.get(), "FirebaseApp was deleted %s", this); | ||
| } | ||
| | ||
| private ScheduledExecutorService ensureScheduledExecutorService() { | ||
| if (scheduledExecutor == null) { | ||
| synchronized (lock) { | ||
| checkNotDeleted(); | ||
| if (scheduledExecutor == null) { | ||
| scheduledExecutor = new RevivingScheduledExecutor(threadManager.getThreadFactory(), | ||
| "firebase-scheduled-worker", GaeThreadFactory.isAvailable()); | ||
| } | ||
| } | ||
| } | ||
| return scheduledExecutor; | ||
| } | ||
| | ||
| ThreadFactory getThreadFactory() { | ||
| return threadManager.getThreadFactory(); | ||
| } | ||
| | ||
| // TODO: Return an ApiFuture once Task API is fully removed. | ||
| <T> Task<T> submit(Callable<T> command) { | ||
| checkNotNull(command); | ||
| return Tasks.call(executor.getListeningExecutor(), command); | ||
| } | ||
| | ||
| <T> ScheduledFuture<T> schedule(Callable<T> command, long delayMillis) { | ||
| checkNotNull(command); | ||
| try { | ||
| return ensureScheduledExecutorService().schedule(command, delayMillis, TimeUnit.MILLISECONDS); | ||
| } catch (Exception e) { | ||
| // This may fail if the underlying ThreadFactory does not support long-lived threads. | ||
| throw new UnsupportedOperationException("Scheduled tasks not supported", e); | ||
| } | ||
| } | ||
| | ||
| void startTokenRefresher() { | ||
| synchronized (lock) { | ||
| checkNotDeleted(); | ||
| // TODO: Provide an option to disable this altogether. | ||
| tokenRefresher.start(); | ||
| } | ||
| } | ||
| | ||
| boolean isDefaultApp() { | ||
| return DEFAULT_APP_NAME.equals(getName()); | ||
| } | ||
| | @@ -362,24 +423,30 @@ FirebaseService getService(String id) { | |
| */ | ||
| static class TokenRefresher implements CredentialsChangedListener { | ||
| | ||
| private static final int STATE_READY = 0; | ||
| ||
| private static final int STATE_STARTED = 1; | ||
| private static final int STATE_STOPPED = 2; | ||
| | ||
| private final FirebaseApp firebaseApp; | ||
| private final GoogleCredentials credentials; | ||
| private ScheduledFuture<Void> future; | ||
| private boolean closed; | ||
| private final AtomicInteger state; | ||
| | ||
| private Future<Void> future; | ||
| | ||
| TokenRefresher(FirebaseApp app) { | ||
| this.credentials = app.getOptions().getCredentials(); | ||
| this.credentials.addChangeListener(this); | ||
| TokenRefresher(FirebaseApp firebaseApp) { | ||
| this.firebaseApp = checkNotNull(firebaseApp); | ||
| this.credentials = firebaseApp.getOptions().getCredentials(); | ||
| this.state = new AtomicInteger(STATE_READY); | ||
| } | ||
| | ||
| @Override | ||
| public final synchronized void onChanged(OAuth2Credentials credentials) throws IOException { | ||
| if (closed) { | ||
| if (state.get() != STATE_STARTED) { | ||
| return; | ||
| } | ||
| | ||
| AccessToken accessToken = credentials.getAccessToken(); | ||
| long refreshDelay = accessToken.getExpirationTime().getTime() | ||
| - System.currentTimeMillis() - TimeUnit.MINUTES.toMillis(5); | ||
| long refreshDelay = getRefreshDelay(accessToken); | ||
| if (refreshDelay > 0) { | ||
| There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This doesn't work for intervals <= 5 minutes. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That is correct. However, in practice this doesn't really happen. New tokens minted by Google auth have a 1 hour TTL. If it does happen (due to some issue in the remote token server), we don't want to schedule refresh events aggressively, and cause a feedback loop. So we simply log a warning and let things run course. This is what we have done in the past releases too. | ||
| scheduleRefresh(refreshDelay); | ||
| } else { | ||
| | @@ -388,26 +455,6 @@ public final synchronized void onChanged(OAuth2Credentials credentials) throws I | |
| } | ||
| } | ||
| | ||
| /** | ||
| * Schedule a forced token refresh to be executed after a specified duration. | ||
| * | ||
| * @param delayMillis Duration in milliseconds, after which the token should be forcibly | ||
| * refreshed. | ||
| */ | ||
| private void scheduleRefresh(final long delayMillis) { | ||
| cancelPrevious(); | ||
| scheduleNext( | ||
| new Callable<Void>() { | ||
| @Override | ||
| public Void call() throws Exception { | ||
| logger.debug("Refreshing OAuth2 credential"); | ||
| credentials.refresh(); | ||
| return null; | ||
| } | ||
| }, | ||
| delayMillis); | ||
| } | ||
| | ||
| protected void cancelPrevious() { | ||
| if (future != null) { | ||
| future.cancel(true); | ||
| | @@ -417,17 +464,71 @@ protected void cancelPrevious() { | |
| protected void scheduleNext(Callable<Void> task, long delayMillis) { | ||
| logger.debug("Scheduling next token refresh in {} milliseconds", delayMillis); | ||
| try { | ||
| future = | ||
| FirebaseExecutors.DEFAULT_SCHEDULED_EXECUTOR.schedule( | ||
| task, delayMillis, TimeUnit.MILLISECONDS); | ||
| } catch (UnsupportedOperationException ignored) { | ||
| future = firebaseApp.schedule(task, delayMillis); | ||
| } catch (UnsupportedOperationException e) { | ||
| // Cannot support task scheduling in the current runtime. | ||
| logger.debug("Failed to schedule token refresh event", e); | ||
| } | ||
| } | ||
| | ||
| protected synchronized void cleanup() { | ||
| /** | ||
| * Starts the TokenRefresher if not already started. Starts listening to credentials changed | ||
| * events, and schedules refresh events every time the OAuth2 token changes. If no active | ||
| * token is present, or if the available token is set to expire soon, this will also schedule | ||
| * a refresh event to be executed immediately. | ||
| * | ||
| * <p>This operation is idempotent. Calling it multiple times, or calling it after the | ||
| * refresher has been stopped has no effect. | ||
| */ | ||
| final synchronized void start() { | ||
| // Allow starting only from the ready state. | ||
| if (!state.compareAndSet(STATE_READY, STATE_STARTED)) { | ||
| return; | ||
| } | ||
| | ||
| logger.debug("Starting the proactive token refresher"); | ||
| credentials.addChangeListener(this); | ||
| AccessToken accessToken = credentials.getAccessToken(); | ||
| long refreshDelay = 0L; | ||
| if (accessToken != null) { | ||
| refreshDelay = Math.max(getRefreshDelay(accessToken), refreshDelay); | ||
| } | ||
| // If the access token is null, or is about to expire (i.e. expires in less than 5 minutes), | ||
| // schedule a refresh event with 0 delay. Otherwise schedule a refresh event at the token | ||
| // expiry time, minus 5 minutes. | ||
| scheduleRefresh(refreshDelay); | ||
| There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Subtract the 5 minutes here There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This happens in the call to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If you substract the 5 minutes here, then getRefreshDelay() will return exactly the delay that is specified in the token and then "refreshDelay > 0" check will work for all delays. | ||
| } | ||
| | ||
| final synchronized void stop() { | ||
| // Allow stopping from any state. | ||
| int previous = state.getAndSet(STATE_STOPPED); | ||
| if (previous == STATE_STARTED) { | ||
| cancelPrevious(); | ||
| logger.debug("Stopped the proactive token refresher"); | ||
| } | ||
| } | ||
| | ||
| /** | ||
| * Schedule a forced token refresh to be executed after a specified duration. | ||
| * | ||
| * @param delayMillis Duration in milliseconds, after which the token should be forcibly | ||
| * refreshed. | ||
| */ | ||
| private void scheduleRefresh(final long delayMillis) { | ||
| cancelPrevious(); | ||
| closed = true; | ||
| scheduleNext(new Callable<Void>() { | ||
| @Override | ||
| public Void call() throws Exception { | ||
| logger.debug("Refreshing OAuth2 credential"); | ||
| credentials.refresh(); | ||
| return null; | ||
| } | ||
| }, delayMillis); | ||
| } | ||
| | ||
| private long getRefreshDelay(AccessToken accessToken) { | ||
| return accessToken.getExpirationTime().getTime() - System.currentTimeMillis() | ||
| - TimeUnit.MINUTES.toMillis(5); | ||
| } | ||
| | ||
| static class Factory { | ||
| | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like 'ScheduledExecutorService' doesn't spawn its threads until it gets used for the first time. If you were to initialize it here, the code is this class could be drastically simplified.
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will initialize the
ThreadFactorythough (i.eThreadManager.getThreadFactory()will get called). On App Engine this attempts to start a no-op background thread to see if the background thread support is available. Probably not a big deal, but just wanted to get your opinion on it before I make the change.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I went through great lengths to make sure that we don't kick off random threads in the first place, so we should probably keep this as is. Thanks!