Skip to content

Conversation

coderzc
Copy link
Member

@coderzc coderzc commented May 20, 2021

  • add startAsync and finishAsync on client side
  • add onStarted and onFinished on server side
@coderzc coderzc force-pushed the transport-extended branch from 61b9724 to a65aeef Compare May 20, 2021 10:39
@codecov
Copy link

codecov bot commented May 20, 2021

Codecov Report

Merging #53 (61d5404) into master (aaeec64) will increase coverage by 0.00%.
The diff coverage is 80.64%.

Impacted file tree graph

@@ Coverage Diff @@ ## master #53 +/- ## ========================================= Coverage 85.75% 85.76% - Complexity 1490 1494 +4  ========================================= Files 157 157 Lines 4914 4952 +38 Branches 405 411 +6 ========================================= + Hits 4214 4247 +33  + Misses 465 463 -2  - Partials 235 242 +7 
Impacted Files Coverage Δ
...h/computer/core/network/session/ClientSession.java 84.42% <76.00%> (-3.67%) ⬇️
...omputer/core/network/netty/NettyClientHandler.java 72.97% <100.00%> (ø)
...omputer/core/network/netty/NettyServerHandler.java 87.34% <100.00%> (+0.67%) ⬆️
...puter/core/network/netty/NettyTransportClient.java 98.11% <100.00%> (+6.88%) ⬆️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update aaeec64...61d5404. Read the comment docs.

void startSession() throws TransportException;

/**
* This method is asynchronous startSession.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is the asynchronous version of #startSession().

void finishSession() throws TransportException;

/**
* This method is asynchronous finishSession.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

/**
* This method is asynchronous startSession.
*/
Future<Boolean> startSessionAsync() throws TransportException;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return Future

"at start()", this.state);

SettableFuture<Boolean> startedFuture = SettableFuture.create();
this.startedFutureRef.set(startedFuture);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

call compareAndSet(null, startedFuture), throw exception if the previous call is not completed

SettableFuture<Boolean> settableFuture = this.startedFutureRef.get();
if (settableFuture != null && !settableFuture.isCancelled()) {
settableFuture.set(true);
this.startedFutureRef.compareAndSet(settableFuture, null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

 if (settableFuture != null) { if (!settableFuture.isCancelled()) settableFuture.set(true); this.startedFutureRef.compareAndSet(settableFuture, null); }

this.finishedBarrier.signalAll();
SettableFuture<Boolean> finishedFuture = this.finishedFutureRef.get();
if (finishedFuture != null && !finishedFuture.isCancelled()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

SettableFuture<Void> startedFuture = SettableFuture.create();
boolean success = this.startedFutureRef.compareAndSet(null,
startedFuture);
E.checkArgument(success, "The startedFutureRef must be null");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add error context

SettableFuture<Void> finishedFuture = SettableFuture.create();
boolean success = this.finishedFutureRef.compareAndSet(null,
finishedFuture);
E.checkArgument(success, "The finishedFutureRef must be null");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

private volatile boolean flowBlocking;
private final BarrierEvent startedBarrier;
private final BarrierEvent finishedBarrier;
private final AtomicReference<SettableFuture<Void>> startedFutureRef;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use Future<Void>?

Copy link
Member Author

@coderzc coderzc May 27, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need to call set()

javeme
javeme previously approved these changes May 27, 2021
@coderzc coderzc requested review from Linary and houzhizhen May 28, 2021 05:45
void handle(MessageType messageType, int partition, ManagedBuffer buffer);

/**
* Notice start session completed.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Notice -> Notify

/**
* This method is the asynchronous version of {@link #startSession}.
*/
Future<Void> startSessionAsync() throws TransportException;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return CompletableFuture

private volatile boolean flowBlocking;
private final BarrierEvent startedBarrier;
private final BarrierEvent finishedBarrier;
private final AtomicReference<SettableFuture<Void>> startedFutureRef;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use CompletableFuture

SettableFuture<Void> settableFuture = this.startedFutureRef.get();
if (settableFuture != null) {
if (!settableFuture.isCancelled()) {
settableFuture.set(null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CompletableFuture.complete(null)

public synchronized void start(long timeout) throws TransportException,
InterruptedException {
public synchronized void start(long timeout) throws TransportException {
CompletableFuture<Void> startFuture = startAsync();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this.


public synchronized void finish(long timeout) throws TransportException {
Future<Void> finishFuture = finishAsync();
CompletableFuture<Void> finishFuture = finishAsync();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this.

void startSession() throws TransportException;

/**
* This method is the asynchronous version of {@link #startSession}.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

startSession()

void finishSession() throws TransportException;

/**
* This method is the asynchronous version of {@link #finishSession}.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

finishSession()

void handle(MessageType messageType, int partition, ManagedBuffer buffer);

/**
* Notify start session completed.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Notify start-session completed on server-side.

void onStarted(ConnectionId connectionId);

/**
* Notify finish session completed.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

}

@Override
public void channelActive(ConnectionId connectionId) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also renamed to onChannelActive

ctx.writeAndFlush(startAck).addListener(this.listenerOnWrite);
this.serverSession.completeStateStart();

Channel channel = ctx.channel();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this.handler.onStarted() should be called in #processStartMessage() instead of #ackStartMessage(),

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need to notify when receive StartMessage?

boolean success = this.startedFutureRef.compareAndSet(null,
startedFuture);
E.checkArgument(success,
"The startedFutureRef value must be null at start()");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

start() -> #startAsync() ?

boolean success = this.finishedFutureRef.compareAndSet(null,
finishedFuture);
E.checkArgument(success,
"The finishedFutureRef value must be null at finish()");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#finishAsync() ?

javeme
javeme previously approved these changes May 31, 2021
@coderzc coderzc force-pushed the transport-extended branch 2 times, most recently from 812b846 to d51ab34 Compare May 31, 2021 05:55
@coderzc coderzc force-pushed the transport-extended branch from d51ab34 to 61d5404 Compare May 31, 2021 06:13
@houzhizhen houzhizhen merged commit bfb8826 into master May 31, 2021
@houzhizhen houzhizhen deleted the transport-extended branch June 15, 2021 02:22
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

4 participants