- Notifications
You must be signed in to change notification settings - Fork 44
feature: optimize data receiving #181
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
6fb7477
to 5709bc0
Compare 865a71e
to 24f806e
Compare buf.writeBytes(encoded); | ||
} | ||
| ||
public static void setMaxBytesPreRead(Channel channel, int length) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PreRead or PerRea?
this.length = length; | ||
} | ||
| ||
// Use zero-copy transform from socket channel to file |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use "/**" for java doc
// Skip ackId | ||
buf.skipBytes(Integer.BYTES); | ||
// Skip partition | ||
buf.skipBytes(Integer.BYTES); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
skipBytes(Integer.BYTES + Integer.BYTES);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it seems more clear to skip them separately
this.transportHandler().exceptionCaught(exception, connectionId); | ||
} | ||
| ||
@Deprecated |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
won't fail any more?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
at present the FailMessage
seems there is useless but lack of increased the complexity of the implementation
dataMessage.body()); | ||
| ||
this.serverSession.onHandledData(requestId); | ||
if (body instanceof FileRegionBuffer) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can add two private methods for if-else branch?
if (this.fileRegionMode) { | ||
return DataMessage.parseWithFileRegion(msgType, in); | ||
} else { | ||
return DataMessage.parseFrom(msgType, in); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also rename parseFrom to parseWithFileRegion style
} | ||
| ||
@Override | ||
public String genOutputPath(MessageType messageType, int partition) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it possible to keep an extension point that can use tmpfs?
Function<String, EntryIterator> fileToInput; | ||
Function<String, KvEntryFileWriter> fileToWriter; | ||
| ||
if (this.useZeroCopy) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can use a set of subclasses to implement zero-copy mode to replace all the if (this.useZeroCopy)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can use a set of subclasses to implement zero-copy mode to replace all the
if (this.useZeroCopy)
updated
* <pre> | ||
* +----------------------+ | ||
* | LocalBuffer | | ||
* | Local File | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there no local buffer mode any more?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
still have
b77a046
to da0a0b9
Compare if (this.threadNum(config) != 0) { | ||
this.sortExecutor = ExecutorUtil.newFixedThreadPool( | ||
this.threadNum(config), this.threadPrefix()); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
keep final mark and set sortExecutor=null
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
Boolean zeroCopyMode = | ||
config.get(ComputerOptions.TRANSPORT_ZERO_COPY_MODE); | ||
if (!zeroCopyMode) { | ||
this.recvBuffers = new MessageRecvBuffers(buffersLimit, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
check other codes where used recvBuffers to avoid NPE
7a483f1
to 690a2a5
Compare 62195d1
to fa07a59
Compare Codecov Report
@@ Coverage Diff @@ ## master #181 +/- ## ============================================ - Coverage 87.33% 86.60% -0.73% - Complexity 3150 3190 +40 ============================================ Files 332 339 +7 Lines 11806 12046 +240 Branches 1053 1070 +17 ============================================ + Hits 10311 10433 +122 - Misses 989 1095 +106 - Partials 506 518 +12
Continue to review full report at Codecov.
|
c140833
to bc4e56e
Compare There are still some comments need to be addressed |
Due to the lack of activity, the current pr is marked as stale and will be closed after 180 days, any update will remove the stale label |
Due to the lack of activity, the current pr is marked as stale and will be closed after 180 days, any update will remove the stale label |
buf.writeBytes(encoded); | ||
} | ||
| ||
public static void setMaxBytesPreRead(Channel channel, int length) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PreRead or PerRead?
waitSortTimeout); | ||
this.sortBuffers = new MessageRecvBuffers(buffersLimit, | ||
waitSortTimeout); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
check other code used recvBuffers and sortBuffers, since recvBuffers/sortBuffers may be null
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I checked there is no other reference when useFileRegion == true
can we try install hdfs through shell scrtipt: |
I will fix it in #184 |
299582a
to dfc33cb
Compare Codecov Report
@@ Coverage Diff @@ ## master #181 +/- ## ============================================ - Coverage 86.79% 86.03% -0.77% - Complexity 3169 3206 +37 ============================================ Files 334 341 +7 Lines 11962 12200 +238 Branches 1068 1085 +17 ============================================ + Hits 10383 10496 +113 - Misses 1067 1176 +109 - Partials 512 528 +16
Continue to review full report at Codecov.
|
dfc33cb
to fd037a4
Compare
implement #165