Skip to content
This repository was archived by the owner on Feb 25, 2025. It is now read-only.

Commit 222c2d2

Browse files
committed
Part 2: Introduced TaskQueue API
1 parent fdfc415 commit 222c2d2

File tree

13 files changed

+144
-32
lines changed

13 files changed

+144
-32
lines changed

shell/common/platform_view.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -810,6 +810,8 @@ class PlatformView {
810810
virtual std::unique_ptr<SnapshotSurfaceProducer>
811811
CreateSnapshotSurfaceProducer();
812812

813+
// This is temporary until all embedders start handling thread dispatching to
814+
// allow messages to be handled on background threads.
813815
virtual bool DoesHandlePlatformMessagesOnPlatformThread() const {
814816
return true;
815817
}

shell/platform/android/io/flutter/embedding/engine/dart/DartExecutor.java

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ public DartExecutor(@NonNull FlutterJNI flutterJNI, @NonNull AssetManager assetM
5959
this.flutterJNI = flutterJNI;
6060
this.assetManager = assetManager;
6161
this.dartMessenger = new DartMessenger(flutterJNI);
62-
dartMessenger.setMessageHandler("flutter/isolate", isolateChannelMessageHandler);
62+
dartMessenger.setMessageHandler("flutter/isolate", isolateChannelMessageHandler, null);
6363
this.binaryMessenger = new DefaultBinaryMessenger(dartMessenger);
6464
// The JNI might already be attached if coming from a spawned engine. If so, correctly report
6565
// that this DartExecutor is already running.
@@ -168,6 +168,14 @@ public BinaryMessenger getBinaryMessenger() {
168168
}
169169

170170
// ------ START BinaryMessenger (Deprecated: use getBinaryMessenger() instead) -----
171+
/** @deprecated Use {@link #getBinaryMessenger()} instead. */
172+
@Deprecated
173+
@UiThread
174+
@Override
175+
public TaskQueue makeBackgroundTaskQueue() {
176+
return binaryMessenger.makeBackgroundTaskQueue();
177+
}
178+
171179
/** @deprecated Use {@link #getBinaryMessenger()} instead. */
172180
@Deprecated
173181
@Override
@@ -192,8 +200,10 @@ public void send(
192200
@Override
193201
@UiThread
194202
public void setMessageHandler(
195-
@NonNull String channel, @Nullable BinaryMessenger.BinaryMessageHandler handler) {
196-
binaryMessenger.setMessageHandler(channel, handler);
203+
@NonNull String channel,
204+
@Nullable BinaryMessenger.BinaryMessageHandler handler,
205+
@Nullable TaskQueue taskQueue) {
206+
binaryMessenger.setMessageHandler(channel, handler, taskQueue);
197207
}
198208
// ------ END BinaryMessenger -----
199209

@@ -371,6 +381,10 @@ private DefaultBinaryMessenger(@NonNull DartMessenger messenger) {
371381
this.messenger = messenger;
372382
}
373383

384+
public TaskQueue makeBackgroundTaskQueue() {
385+
return messenger.makeBackgroundTaskQueue();
386+
}
387+
374388
/**
375389
* Sends the given {@code message} from Android to Dart over the given {@code channel}.
376390
*
@@ -413,8 +427,10 @@ public void send(
413427
@Override
414428
@UiThread
415429
public void setMessageHandler(
416-
@NonNull String channel, @Nullable BinaryMessenger.BinaryMessageHandler handler) {
417-
messenger.setMessageHandler(channel, handler);
430+
@NonNull String channel,
431+
@Nullable BinaryMessenger.BinaryMessageHandler handler,
432+
@Nullable TaskQueue taskQueue) {
433+
messenger.setMessageHandler(channel, handler, taskQueue);
418434
}
419435
}
420436
}

shell/platform/android/io/flutter/embedding/engine/dart/DartMessenger.java

Lines changed: 61 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@
1515
import java.nio.ByteBuffer;
1616
import java.util.HashMap;
1717
import java.util.Map;
18+
import java.util.concurrent.ConcurrentHashMap;
19+
import java.util.concurrent.ExecutorService;
20+
import java.util.concurrent.Executors;
1821
import java.util.concurrent.atomic.AtomicBoolean;
1922

2023
/**
@@ -28,25 +31,60 @@ class DartMessenger implements BinaryMessenger, PlatformMessageHandler {
2831
private static final String TAG = "DartMessenger";
2932

3033
@NonNull private final FlutterJNI flutterJNI;
31-
@NonNull private final Map<String, BinaryMessenger.BinaryMessageHandler> messageHandlers;
34+
35+
@NonNull private final ConcurrentHashMap<String, HandlerInfo> messageHandlers;
36+
3237
@NonNull private final Map<Integer, BinaryMessenger.BinaryReply> pendingReplies;
3338
private int nextReplyId = 1;
3439

3540
DartMessenger(@NonNull FlutterJNI flutterJNI) {
3641
this.flutterJNI = flutterJNI;
37-
this.messageHandlers = new HashMap<>();
42+
this.messageHandlers = new ConcurrentHashMap<>();
3843
this.pendingReplies = new HashMap<>();
3944
}
4045

46+
private static class HandlerInfo {
47+
@NonNull public final BinaryMessenger.BinaryMessageHandler handler;
48+
@Nullable public final TaskQueue taskQueue;
49+
50+
HandlerInfo(
51+
@NonNull BinaryMessenger.BinaryMessageHandler handler, @Nullable TaskQueue taskQueue) {
52+
this.handler = handler;
53+
this.taskQueue = taskQueue;
54+
}
55+
}
56+
57+
private static class DefaultTaskQueue implements TaskQueue {
58+
@NonNull private final ExecutorService executor;
59+
60+
DefaultTaskQueue() {
61+
// TODO(gaaclarke): Use a shared thread pool with serial queues instead of
62+
// making a thread for each TaskQueue.
63+
this.executor = Executors.newSingleThreadExecutor();
64+
}
65+
66+
@Override
67+
public void dispatch(@NonNull Runnable runnable) {
68+
executor.submit(runnable);
69+
}
70+
}
71+
72+
@Override
73+
public TaskQueue makeBackgroundTaskQueue() {
74+
return new DefaultTaskQueue();
75+
}
76+
4177
@Override
4278
public void setMessageHandler(
43-
@NonNull String channel, @Nullable BinaryMessenger.BinaryMessageHandler handler) {
79+
@NonNull String channel,
80+
@Nullable BinaryMessenger.BinaryMessageHandler handler,
81+
@Nullable TaskQueue taskQueue) {
4482
if (handler == null) {
4583
Log.v(TAG, "Removing handler for channel '" + channel + "'");
4684
messageHandlers.remove(channel);
4785
} else {
4886
Log.v(TAG, "Setting handler for channel '" + channel + "'");
49-
messageHandlers.put(channel, handler);
87+
messageHandlers.put(channel, new HandlerInfo(handler, taskQueue));
5088
}
5189
}
5290

@@ -74,14 +112,13 @@ public void send(
74112
}
75113
}
76114

77-
private void handleMessageFromDartPlatformThread(
78-
@NonNull final String channel, @Nullable ByteBuffer message, final int replyId) {
79-
Log.v(TAG, "Received message from Dart over channel '" + channel + "'");
80-
BinaryMessenger.BinaryMessageHandler handler = messageHandlers.get(channel);
81-
if (handler != null) {
115+
private void invokeHandler(
116+
@Nullable HandlerInfo handlerInfo, @Nullable ByteBuffer message, final int replyId) {
117+
// Called from any thread.
118+
if (handlerInfo != null) {
82119
try {
83120
Log.v(TAG, "Deferring to registered handler to process message.");
84-
handler.onMessage(message, new Reply(flutterJNI, replyId));
121+
handlerInfo.handler.onMessage(message, new Reply(flutterJNI, replyId));
85122
if (message != null && message.isDirect()) {
86123
// This ensures that if a user retains an instance to the ByteBuffer and it happens to
87124
// be direct they will get a deterministic error.
@@ -105,18 +142,27 @@ public void handleMessageFromDart(
105142
@Nullable ByteBuffer message,
106143
final int replyId,
107144
long messageData) {
108-
// TODO(gaaclarke): Dispatch to a TaskQueue other than the platform channel if specified by the
109-
// handler.
110-
Handler mainHandler = new Handler(Looper.getMainLooper());
145+
// Called from the ui thread.
146+
Log.v(TAG, "Received message from Dart over channel '" + channel + "'");
147+
@Nullable HandlerInfo handlerInfo = messageHandlers.get(channel);
148+
@Nullable TaskQueue taskQueue = null;
149+
if (handlerInfo != null) {
150+
taskQueue = handlerInfo.taskQueue;
151+
}
111152
Runnable myRunnable =
112153
() -> {
113154
try {
114-
handleMessageFromDartPlatformThread(channel, message, replyId);
155+
invokeHandler(handlerInfo, message, replyId);
115156
} finally {
116157
flutterJNI.cleanupMessageData(messageData);
117158
}
118159
};
119-
mainHandler.post(myRunnable);
160+
if (taskQueue == null) {
161+
Handler mainHandler = new Handler(Looper.getMainLooper());
162+
mainHandler.post(myRunnable);
163+
} else {
164+
taskQueue.dispatch(myRunnable);
165+
}
120166
}
121167

122168
@Override

shell/platform/android/io/flutter/plugin/common/BasicMessageChannel.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,8 @@ public void send(@Nullable T message, @Nullable final Reply<T> callback) {
101101
*/
102102
@UiThread
103103
public void setMessageHandler(@Nullable final MessageHandler<T> handler) {
104-
messenger.setMessageHandler(name, handler == null ? null : new IncomingMessageHandler(handler));
104+
messenger.setMessageHandler(
105+
name, handler == null ? null : new IncomingMessageHandler(handler), null);
105106
}
106107

107108
/**

shell/platform/android/io/flutter/plugin/common/BinaryMessenger.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,13 @@
2626
* @see EventChannel , which supports communication using event streams.
2727
*/
2828
public interface BinaryMessenger {
29+
public interface TaskQueue {
30+
void dispatch(@NonNull Runnable runnable);
31+
}
32+
33+
@UiThread
34+
TaskQueue makeBackgroundTaskQueue();
35+
2936
/**
3037
* Sends a binary message to the Flutter application.
3138
*
@@ -64,7 +71,10 @@ public interface BinaryMessenger {
6471
* @param handler a {@link BinaryMessageHandler} to be invoked on incoming messages, or null.
6572
*/
6673
@UiThread
67-
void setMessageHandler(@NonNull String channel, @Nullable BinaryMessageHandler handler);
74+
void setMessageHandler(
75+
@NonNull String channel,
76+
@Nullable BinaryMessageHandler handler,
77+
@Nullable TaskQueue taskQueue);
6878

6979
/** Handler for incoming binary messages from Flutter. */
7080
interface BinaryMessageHandler {

shell/platform/android/io/flutter/plugin/common/EventChannel.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ public EventChannel(BinaryMessenger messenger, String name, MethodCodec codec) {
8484
@UiThread
8585
public void setStreamHandler(final StreamHandler handler) {
8686
messenger.setMessageHandler(
87-
name, handler == null ? null : new IncomingStreamRequestHandler(handler));
87+
name, handler == null ? null : new IncomingStreamRequestHandler(handler), null);
8888
}
8989

9090
/**

shell/platform/android/io/flutter/plugin/common/MethodChannel.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ public class MethodChannel {
3535
private final BinaryMessenger messenger;
3636
private final String name;
3737
private final MethodCodec codec;
38+
private final BinaryMessenger.TaskQueue taskQueue;
3839

3940
/**
4041
* Creates a new channel associated with the specified {@link BinaryMessenger} and with the
@@ -56,6 +57,14 @@ public MethodChannel(BinaryMessenger messenger, String name) {
5657
* @param codec a {@link MessageCodec}.
5758
*/
5859
public MethodChannel(BinaryMessenger messenger, String name, MethodCodec codec) {
60+
this(messenger, name, codec, null);
61+
}
62+
63+
public MethodChannel(
64+
BinaryMessenger messenger,
65+
String name,
66+
MethodCodec codec,
67+
@Nullable BinaryMessenger.TaskQueue taskQueue) {
5968
if (BuildConfig.DEBUG) {
6069
if (messenger == null) {
6170
Log.e(TAG, "Parameter messenger must not be null.");
@@ -70,6 +79,7 @@ public MethodChannel(BinaryMessenger messenger, String name, MethodCodec codec)
7079
this.messenger = messenger;
7180
this.name = name;
7281
this.codec = codec;
82+
this.taskQueue = taskQueue;
7383
}
7484

7585
/**
@@ -117,7 +127,7 @@ public void invokeMethod(String method, @Nullable Object arguments, @Nullable Re
117127
@UiThread
118128
public void setMethodCallHandler(final @Nullable MethodCallHandler handler) {
119129
messenger.setMessageHandler(
120-
name, handler == null ? null : new IncomingMethodCallHandler(handler));
130+
name, handler == null ? null : new IncomingMethodCallHandler(handler), taskQueue);
121131
}
122132

123133
/**

shell/platform/android/io/flutter/view/FlutterNativeView.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,12 @@ public static String getObservatoryUri() {
124124
return FlutterJNI.getObservatoryUri();
125125
}
126126

127+
@Override
128+
@UiThread
129+
public TaskQueue makeBackgroundTaskQueue() {
130+
return dartExecutor.getBinaryMessenger().makeBackgroundTaskQueue();
131+
}
132+
127133
@Override
128134
@UiThread
129135
public void send(String channel, ByteBuffer message) {
@@ -143,8 +149,8 @@ public void send(String channel, ByteBuffer message, BinaryReply callback) {
143149

144150
@Override
145151
@UiThread
146-
public void setMessageHandler(String channel, BinaryMessageHandler handler) {
147-
dartExecutor.getBinaryMessenger().setMessageHandler(channel, handler);
152+
public void setMessageHandler(String channel, BinaryMessageHandler handler, TaskQueue taskQueue) {
153+
dartExecutor.getBinaryMessenger().setMessageHandler(channel, handler, taskQueue);
148154
}
149155

150156
/*package*/ FlutterJNI getFlutterJNI() {

shell/platform/android/io/flutter/view/FlutterView.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -822,6 +822,12 @@ public PointerIcon getSystemPointerIcon(int type) {
822822
return PointerIcon.getSystemIcon(getContext(), type);
823823
}
824824

825+
@Override
826+
@UiThread
827+
public TaskQueue makeBackgroundTaskQueue() {
828+
return null;
829+
}
830+
825831
@Override
826832
@UiThread
827833
public void send(String channel, ByteBuffer message) {
@@ -840,8 +846,8 @@ public void send(String channel, ByteBuffer message, BinaryReply callback) {
840846

841847
@Override
842848
@UiThread
843-
public void setMessageHandler(String channel, BinaryMessageHandler handler) {
844-
mNativeView.setMessageHandler(channel, handler);
849+
public void setMessageHandler(String channel, BinaryMessageHandler handler, TaskQueue taskQueue) {
850+
mNativeView.setMessageHandler(channel, handler, taskQueue);
845851
}
846852

847853
/** Listener will be called on the Android UI thread once when Flutter renders the first frame. */

shell/platform/android/platform_view_android.cc

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -195,8 +195,13 @@ void PlatformViewAndroid::InvokePlatformMessageResponseCallback(
195195
jint response_id,
196196
jobject java_response_data,
197197
jint java_response_position) {
198-
if (!response_id)
198+
// Called from any thread.
199+
if (!response_id) {
199200
return;
201+
}
202+
// TODO(gaaclarke): Move the jump to the ui thread here from
203+
// PlatformMessageResponseDart so we won't need to use a mutex anymore.
204+
std::unique_lock lock(pending_responses_mutex_);
200205
auto it = pending_responses_.find(response_id);
201206
if (it == pending_responses_.end())
202207
return;
@@ -207,28 +212,35 @@ void PlatformViewAndroid::InvokePlatformMessageResponseCallback(
207212
response_data, response_data + java_response_position);
208213
auto message_response = std::move(it->second);
209214
pending_responses_.erase(it);
215+
lock.unlock();
210216
message_response->Complete(
211217
std::make_unique<fml::DataMapping>(std::move(response)));
212218
}
213219

214220
void PlatformViewAndroid::InvokePlatformMessageEmptyResponseCallback(
215221
JNIEnv* env,
216222
jint response_id) {
217-
if (!response_id)
223+
// Called from any thread.
224+
if (!response_id) {
218225
return;
226+
}
227+
std::unique_lock lock(pending_responses_mutex_);
219228
auto it = pending_responses_.find(response_id);
220229
if (it == pending_responses_.end())
221230
return;
222231
auto message_response = std::move(it->second);
223232
pending_responses_.erase(it);
233+
lock.unlock();
224234
message_response->CompleteEmpty();
225235
}
226236

227237
// |PlatformView|
228238
void PlatformViewAndroid::HandlePlatformMessage(
229239
std::unique_ptr<flutter::PlatformMessage> message) {
240+
// Called from the ui thread.
230241
int response_id = next_response_id_++;
231242
if (auto response = message->response()) {
243+
std::lock_guard lock(pending_responses_mutex_);
232244
pending_responses_[response_id] = response;
233245
}
234246
// This call can re-enter in InvokePlatformMessageXxxResponseCallback.

0 commit comments

Comments
 (0)