- Notifications
You must be signed in to change notification settings - Fork 44
implement aggregator module #46
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Codecov Report
@@ Coverage Diff @@ ## master #46 +/- ## ============================================ + Coverage 85.54% 85.76% +0.21% - Complexity 1429 1490 +61 ============================================ Files 153 156 +3 Lines 4671 4918 +247 Branches 381 405 +24 ============================================ + Hits 3996 4218 +222 - Misses 458 465 +7 - Partials 217 235 +18 Continue to review full report at Codecov.
|
* master. Can be called in each superstep. | ||
* Throws ComputerException if master does not register the aggregator | ||
* with specified name. | ||
* @param value The value to be aggregated |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove this line
/** | ||
* Set aggregate value after superstep. The value will be sent to | ||
* Create aggregator by name in worker, the aggregator is registered by | ||
* master. Can be called in each superstep. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is master and master-computation interchangeable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you mean aslo add createAggregator() to Aggregator4Master?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the aggregator is registered by master-computation.
| ||
@Override | ||
public BooleanValue copy() { | ||
return new BooleanValue(this.value); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is better if to create new object.
return this.value ? TRUE : FALSE;
@Override | ||
public <V extends Value<?>> Aggregator<V> getAggregator(String name) { | ||
Aggregator<?> aggr = this.aggregators.get(name, null); | ||
assert aggr != null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Throw exception to the client.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
aggregators.get() would throw if not found
computer-core/src/main/java/com/baidu/hugegraph/computer/core/aggregator/WorkerAggrManager.java Outdated Show resolved Hide resolved
this.aggrFloat.aggregatedValue()); | ||
| ||
this.aggrFloat.aggregateValue(1f); | ||
Assert.assertEquals(new FloatValue(1), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
1F
Assert.assertEquals(new FloatValue(1), | ||
this.aggrFloat.aggregatedValue()); | ||
| ||
this.aggrFloat.aggregateValue(new FloatValue(1)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
1F
Assert.assertEquals(new LongValue(8L), context.aggregatedValue( | ||
MockMasterComputation.AGGR_TEST_LONG_MAX)); | ||
| ||
Assert.assertEquals(new DoubleValue(20.8), context.aggregatedValue( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
20.8D and ditto in the following lines.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
double value 20.8 is ok
Assert.assertEquals(new LongValue(8L), context.aggregatedValue( | ||
MockMasterComputation.AGGR_TEST_LONG_MAX)); | ||
| ||
Assert.assertEquals(new DoubleValue(10.4), context.aggregatedValue( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
10.4D and ditto in following lines
Assert.assertEquals(new LongValue(8L), context.aggregatedValue( | ||
MockMasterComputation2.AGGR_TEST_LONG_MAX)); | ||
| ||
Assert.assertEquals(new DoubleValue(20.8), context.aggregatedValue( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
20.8D
wip: add test case |
TimeUnit.SECONDS.toMillis(30L) | ||
); | ||
| ||
public static final ConfigOption<Integer> WORKER_ID = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let master assign id for workers
* get aggregators from master. | ||
*/ | ||
MasterAggrManager manager = this.managers.get(MasterAggrManager.NAME); | ||
manager.applyAggregators(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add manager.inited() event
Assert.assertEquals(new LongValue(8L), context.aggregatedValue( | ||
MockMasterComputation.AGGR_TEST_LONG_MAX)); | ||
| ||
Assert.assertEquals(new DoubleValue(20.8), context.aggregatedValue( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
double value 20.8 is ok
MasterService masterService = new MasterService(); | ||
WorkerService workerService = new MockWorkerService(); | ||
| ||
// Config configm = UnitTestBase.updateWithRequiredOptions( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove unused line and ditto in the following lines
CountDownLatch countDownLatch = new CountDownLatch(3); | ||
Throwable[] exceptions = new Throwable[3]; | ||
| ||
// Config configm = UnitTestBase.updateWithRequiredOptions( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove unused line and ditto in the following lines
6d2fc14
to 0245bd2
Compare also add two-tier structure to store current-step and previous-step aggregators
0245bd2
to 5493b60
Compare this.aggregators.put(name, aggregator); | ||
} | ||
| ||
public Aggregator<? extends Value<?>> get(String name) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename to copy to consist with #copyAll
// Create managers | ||
WorkerRpcManager rpcManager = new WorkerRpcManager(); | ||
this.managers.add(rpcManager); | ||
/* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
set master's rpc address to config, and remove the comments.
config.hugeConfig().setProperty(RpcOptions.RPC_REMOTE_URL.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's not the best way
* config.hugeConfig().setProperty(this.masterInfo.rpcPort()) | ||
* NOTE: this init() method will be called twice, ignore the 2nd call | ||
*/ | ||
rpcManager.init(this.config); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems rpcManager.init need't be called twice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it must init before calling rpcManager.inputSplitService()
import com.baidu.hugegraph.computer.core.graph.vertex.Vertex; | ||
import com.baidu.hugegraph.testutil.Assert; | ||
| ||
public class MockComputation implements Computation<DoubleValue> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rename to MockAggregatorComputation? We be need different type of computation test.
import com.baidu.hugegraph.computer.core.graph.value.LongValue; | ||
import com.baidu.hugegraph.testutil.Assert; | ||
| ||
public class MockComputation2 extends MockComputation { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
MockAggregatorComputation2 ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there is still other test cases
| ||
pool.submit(() -> { | ||
Config config = UnitTestBase.updateWithRequiredOptions( | ||
RpcOptions.RPC_REMOTE_URL, "127.0.0.1:8090", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can remove this line if get hostname and port from masterInfo
| ||
pool.submit(() -> { | ||
Config config = UnitTestBase.updateWithRequiredOptions( | ||
RpcOptions.RPC_SERVER_HOST, "localhost", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add config ' RpcOptions.RPC_REMOTE_URL, "127.0.0.1:0" ' to listen on random port
import com.baidu.hugegraph.computer.core.master.MasterContext; | ||
import com.baidu.hugegraph.testutil.Assert; | ||
| ||
public class MockMasterComputation extends DefaultMasterComputation { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename MockMasterAggregatorComputation ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there is still other test cases
1f9cc6d
to 6e49914
Compare this.combineAndSwapIfNeeded(localValue, this.value); | ||
} | ||
| ||
private void combineAndSwapIfNeeded(V localValue, V value2) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What mean? doesn't need value2?
public <V extends Value<?>> Aggregator<V> createAggregator(String name) { | ||
/* | ||
* Create aggregator for the current superstep, | ||
* Generally called when computation.beforeSuperstep() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
generally
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this method called every time when computation.beforeSuperstep()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, this method would be called once per superstep for each aggregator
public <V extends Value<?>> void aggregateValue(String name, V value) { | ||
/* | ||
* Update aggregator for the current superstep, | ||
* Generally called when computation.afterSuperstep() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
"The server bind port, if it is zero " + | ||
"will let the system pick up an ephemeral port.", | ||
"The server port to listen on to transfer data. " + | ||
"The system will assign a random port if server_port=0.", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
server_port -> transport.server_port
| ||
/** | ||
* Used to add the resources the computation needed. Be called only one | ||
* Used to add the resources needed by the computation. Be called only one |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Be called ... move to line 39
public WorkerAggrManager(ComputerContext context) { | ||
this.context = context; | ||
this.service = null; | ||
this.registerAggregators = new RegisterAggregators(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems no need to new RegisterAggregators(); it will be assigned at init()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just to avoid the potential NPE
// Registered aggregators from master | ||
private RegisterAggregators registerAggregators; | ||
// Cache the aggregators of the previous superstep | ||
private Map<String, Value<?>> lastAggregators; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the function of lastAggregator values? I found that it was not involved in the calculation, just get the value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// Registered aggregators from master | ||
private RegisterAggregators registerAggregators; | ||
// Cache the aggregators of the previous superstep | ||
private Map<String, Value<?>> lastAggregators; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// Cache the aggregators of the previous superstep | ||
private Map<String, Value<?>> lastAggregators; | ||
// Cache the aggregators of the current superstep | ||
private Aggregators currentAggregators; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
d6e000d
to 6c41731
Compare
No description provided.