- Notifications
You must be signed in to change notification settings - Fork 44
add startAsync and finishAsync on client side, add onStarted and onFinished on server side #53
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
coderzc commented May 20, 2021
- add startAsync and finishAsync on client side
- add onStarted and onFinished on server side
61b9724
to a65aeef
Compare Codecov Report
@@ Coverage Diff @@ ## master #53 +/- ## ========================================= Coverage 85.75% 85.76% - Complexity 1490 1494 +4 ========================================= Files 157 157 Lines 4914 4952 +38 Branches 405 411 +6 ========================================= + Hits 4214 4247 +33 + Misses 465 463 -2 - Partials 235 242 +7
Continue to review full report at Codecov.
|
void startSession() throws TransportException; | ||
| ||
/** | ||
* This method is asynchronous startSession. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method is the asynchronous version of #startSession().
void finishSession() throws TransportException; | ||
| ||
/** | ||
* This method is asynchronous finishSession. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
/** | ||
* This method is asynchronous startSession. | ||
*/ | ||
Future<Boolean> startSessionAsync() throws TransportException; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return Future
"at start()", this.state); | ||
| ||
SettableFuture<Boolean> startedFuture = SettableFuture.create(); | ||
this.startedFutureRef.set(startedFuture); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
call compareAndSet(null, startedFuture), throw exception if the previous call is not completed
SettableFuture<Boolean> settableFuture = this.startedFutureRef.get(); | ||
if (settableFuture != null && !settableFuture.isCancelled()) { | ||
settableFuture.set(true); | ||
this.startedFutureRef.compareAndSet(settableFuture, null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if (settableFuture != null) { if (!settableFuture.isCancelled()) settableFuture.set(true); this.startedFutureRef.compareAndSet(settableFuture, null); }
| ||
this.finishedBarrier.signalAll(); | ||
SettableFuture<Boolean> finishedFuture = this.finishedFutureRef.get(); | ||
if (finishedFuture != null && !finishedFuture.isCancelled()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
SettableFuture<Void> startedFuture = SettableFuture.create(); | ||
boolean success = this.startedFutureRef.compareAndSet(null, | ||
startedFuture); | ||
E.checkArgument(success, "The startedFutureRef must be null"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add error context
SettableFuture<Void> finishedFuture = SettableFuture.create(); | ||
boolean success = this.finishedFutureRef.compareAndSet(null, | ||
finishedFuture); | ||
E.checkArgument(success, "The finishedFutureRef must be null"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
private volatile boolean flowBlocking; | ||
private final BarrierEvent startedBarrier; | ||
private final BarrierEvent finishedBarrier; | ||
private final AtomicReference<SettableFuture<Void>> startedFutureRef; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use Future<Void>?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
need to call set()
void handle(MessageType messageType, int partition, ManagedBuffer buffer); | ||
| ||
/** | ||
* Notice start session completed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Notice -> Notify
/** | ||
* This method is the asynchronous version of {@link #startSession}. | ||
*/ | ||
Future<Void> startSessionAsync() throws TransportException; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return CompletableFuture
private volatile boolean flowBlocking; | ||
private final BarrierEvent startedBarrier; | ||
private final BarrierEvent finishedBarrier; | ||
private final AtomicReference<SettableFuture<Void>> startedFutureRef; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use CompletableFuture
SettableFuture<Void> settableFuture = this.startedFutureRef.get(); | ||
if (settableFuture != null) { | ||
if (!settableFuture.isCancelled()) { | ||
settableFuture.set(null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CompletableFuture.complete(null)
a46ab33
to a87a91a
Compare public synchronized void start(long timeout) throws TransportException, | ||
InterruptedException { | ||
public synchronized void start(long timeout) throws TransportException { | ||
CompletableFuture<Void> startFuture = startAsync(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this.
| ||
public synchronized void finish(long timeout) throws TransportException { | ||
Future<Void> finishFuture = finishAsync(); | ||
CompletableFuture<Void> finishFuture = finishAsync(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this.
void startSession() throws TransportException; | ||
| ||
/** | ||
* This method is the asynchronous version of {@link #startSession}. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
startSession()
void finishSession() throws TransportException; | ||
| ||
/** | ||
* This method is the asynchronous version of {@link #finishSession}. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
finishSession()
void handle(MessageType messageType, int partition, ManagedBuffer buffer); | ||
| ||
/** | ||
* Notify start session completed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Notify start-session completed on server-side.
void onStarted(ConnectionId connectionId); | ||
| ||
/** | ||
* Notify finish session completed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
} | ||
| ||
@Override | ||
public void channelActive(ConnectionId connectionId) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also renamed to onChannelActive
ctx.writeAndFlush(startAck).addListener(this.listenerOnWrite); | ||
this.serverSession.completeStateStart(); | ||
| ||
Channel channel = ctx.channel(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this.handler.onStarted() should be called in #processStartMessage() instead of #ackStartMessage(),
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
need to notify when receive StartMessage?
boolean success = this.startedFutureRef.compareAndSet(null, | ||
startedFuture); | ||
E.checkArgument(success, | ||
"The startedFutureRef value must be null at start()"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
start() -> #startAsync() ?
boolean success = this.finishedFutureRef.compareAndSet(null, | ||
finishedFuture); | ||
E.checkArgument(success, | ||
"The finishedFutureRef value must be null at finish()"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#finishAsync() ?
* add onStarted and onFinished on server side
812b846
to d51ab34
Compare d51ab34
to 61d5404
Compare