Skip to content

Conversation

@jiangmichaellll
Copy link
Contributor

This implements major classes for spark continuous streaming.

@jiangmichaellll jiangmichaellll requested review from a team as code owners December 3, 2020 04:48
@google-cla google-cla bot added the cla: yes This human has signed the Contributor License Agreement. label Dec 3, 2020
@product-auto-label product-auto-label bot added the api: pubsublite Issues related to the googleapis/java-pubsublite API. label Dec 3, 2020
Base automatically changed from jiangmichael-spark-utils to master December 3, 2020 23:34
private final MultiPartitionCommitter committer;
private PslSourceOffset startOffset;

public PslContinuousReader(PslDataSourceOptions options) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this doesn't belong here, this is wiring code. Remove this constructor.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like to have this so I could keep code in PslDataSource super simple.

AdminServiceClient adminServiceClient,
CursorServiceClient cursorServiceClient,
MultiPartitionCommitter committer) {
this.options = options;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't take the options, take the things you need from it.

Copy link
Contributor Author

@jiangmichaellll jiangmichaellll Dec 4, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is needed for lower level (PslContinuousInputPartition) for many params in the option(subPath, flowctrl, creds), prefer not to disassemble it early.

assert PslSourceOffset.class.isAssignableFrom(start.get().getClass())
: "start offset is not assignable to PslSourceOffset.";
startOffset = (PslSourceOffset) start.get();
return;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do these methods need to be thread safe?

Copy link
Contributor Author

@jiangmichaellll jiangmichaellll Dec 4, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, they are called serially by the order of deserializeoffset (if writeahead log has any from previous query), setStartOffset, getStartOffset, planInputPartitions... get things done... mergeOffsets, commit.

@jiangmichaellll jiangmichaellll changed the base branch from master to jiangmichael-spark-offsets December 4, 2020 07:04
@jiangmichaellll
Copy link
Contributor Author

jiangmichaellll commented Dec 7, 2020

There is an extra issue in this PR. The buffering pull subscriber will lead to an unbounded message cache inside, thus not respecting the flow control. Please ignore this for this PR. I have another PR to address and make it bounded.

EDIT: #408 is the PR to make it bounded but let's finish this before I assign that.

@AutoValue.Builder
public abstract static class Builder {

public abstract Builder credentialsKey(String credentialsKey);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit. these methods should be called "set"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Collaborator

@dpcollins-google dpcollins-google left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rebase this, fix the few comments, and then ping me back for approval

Base automatically changed from jiangmichael-spark-offsets to master December 9, 2020 06:22
@codecov
Copy link

codecov bot commented Dec 10, 2020

Codecov Report

Merging #396 (fabdebe) into master (63c34f0) will decrease coverage by 0.46%.
The diff coverage is 37.96%.

Impacted file tree graph

@@ Coverage Diff @@ ## master #396 +/- ## ============================================ - Coverage 72.10% 71.63% -0.47%  - Complexity 845 871 +26  ============================================ Files 158 163 +5 Lines 4399 4573 +174 Branches 222 226 +4 ============================================ + Hits 3172 3276 +104  - Misses 1104 1168 +64  - Partials 123 129 +6 
Impacted Files Coverage Δ Complexity Δ
.../pubsublite/spark/PslContinuousInputPartition.java 0.00% <0.00%> (ø) 0.00 <0.00> (?)
...m/google/cloud/pubsublite/spark/PslDataSource.java 0.00% <0.00%> (ø) 0.00 <0.00> (?)
...e/cloud/pubsublite/spark/PslDataSourceOptions.java 11.11% <6.45%> (+11.11%) 2.00 <0.00> (+2.00)
...le/cloud/pubsublite/spark/PslContinuousReader.java 42.42% <42.42%> (ø) 10.00 <10.00> (?)
...blite/spark/PslContinuousInputPartitionReader.java 65.62% <65.62%> (ø) 6.00 <6.00> (?)
.../pubsublite/spark/MultiPartitionCommitterImpl.java 80.00% <80.00%> (ø) 6.00 <6.00> (?)
... and 4 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 63c34f0...2950e08. Read the comment docs.

@jiangmichaellll jiangmichaellll merged commit 0c0d928 into master Dec 10, 2020
@jiangmichaellll jiangmichaellll deleted the jiangmichael-spark-continuous-processing branch December 10, 2020 22:39
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

api: pubsublite Issues related to the googleapis/java-pubsublite API. cla: yes This human has signed the Contributor License Agreement.

3 participants