Skip to content

Conversation

javeme
Copy link
Contributor

@javeme javeme commented May 8, 2021

No description provided.

@codecov
Copy link

codecov bot commented May 8, 2021

Codecov Report

Merging #46 (6c41731) into master (b1408eb) will increase coverage by 0.21%.
The diff coverage is 89.89%.

Impacted file tree graph

@@ Coverage Diff @@ ## master #46 +/- ## ============================================ + Coverage 85.54% 85.76% +0.21%  - Complexity 1429 1490 +61  ============================================ Files 153 156 +3 Lines 4671 4918 +247 Branches 381 405 +24 ============================================ + Hits 3996 4218 +222  - Misses 458 465 +7  - Partials 217 235 +18 
Impacted Files Coverage Δ Complexity Δ
...hugegraph/computer/core/aggregator/Aggregator.java 0.00% <ø> (ø) 0.00 <0.00> (ø)
...ugegraph/computer/core/config/ComputerOptions.java 99.36% <ø> (-0.02%) 2.00 <0.00> (ø)
...ugegraph/computer/core/aggregator/Aggregators.java 79.16% <79.16%> (ø) 8.00 <8.00> (?)
...ph/computer/core/aggregator/DefaultAggregator.java 82.85% <82.85%> (ø) 22.00 <22.00> (?)
.../hugegraph/computer/core/master/MasterService.java 91.30% <87.17%> (+1.92%) 16.00 <2.00> (+1.00)
.../computer/core/aggregator/RegisterAggregators.java 90.90% <90.90%> (ø) 10.00 <10.00> (?)
...ph/computer/core/aggregator/MasterAggrManager.java 95.65% <95.55%> (-4.35%) 11.00 <10.00> (+8.00) ⬇️
...ph/computer/core/aggregator/WorkerAggrManager.java 95.65% <95.55%> (-4.35%) 13.00 <12.00> (+10.00) ⬇️
.../hugegraph/computer/core/worker/WorkerService.java 91.26% <96.66%> (+3.08%) 14.00 <2.00> (+1.00)
.../baidu/hugegraph/computer/core/bsp/Bsp4Master.java 100.00% <100.00%> (ø) 22.00 <0.00> (ø)
... and 15 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update b1408eb...6c41731. Read the comment docs.

* master. Can be called in each superstep.
* Throws ComputerException if master does not register the aggregator
* with specified name.
* @param value The value to be aggregated
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove this line

/**
* Set aggregate value after superstep. The value will be sent to
* Create aggregator by name in worker, the aggregator is registered by
* master. Can be called in each superstep.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is master and master-computation interchangeable?

Copy link
Contributor Author

@javeme javeme May 10, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you mean aslo add createAggregator() to Aggregator4Master?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the aggregator is registered by master-computation.


@Override
public BooleanValue copy() {
return new BooleanValue(this.value);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is better if to create new object.
return this.value ? TRUE : FALSE;

@Override
public <V extends Value<?>> Aggregator<V> getAggregator(String name) {
Aggregator<?> aggr = this.aggregators.get(name, null);
assert aggr != null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Throw exception to the client.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aggregators.get() would throw if not found

this.aggrFloat.aggregatedValue());

this.aggrFloat.aggregateValue(1f);
Assert.assertEquals(new FloatValue(1),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1F

Assert.assertEquals(new FloatValue(1),
this.aggrFloat.aggregatedValue());

this.aggrFloat.aggregateValue(new FloatValue(1));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1F

Assert.assertEquals(new LongValue(8L), context.aggregatedValue(
MockMasterComputation.AGGR_TEST_LONG_MAX));

Assert.assertEquals(new DoubleValue(20.8), context.aggregatedValue(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

20.8D and ditto in the following lines.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

double value 20.8 is ok

Assert.assertEquals(new LongValue(8L), context.aggregatedValue(
MockMasterComputation.AGGR_TEST_LONG_MAX));

Assert.assertEquals(new DoubleValue(10.4), context.aggregatedValue(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

10.4D and ditto in following lines

Assert.assertEquals(new LongValue(8L), context.aggregatedValue(
MockMasterComputation2.AGGR_TEST_LONG_MAX));

Assert.assertEquals(new DoubleValue(20.8), context.aggregatedValue(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

20.8D

houzhizhen
houzhizhen previously approved these changes May 10, 2021
@javeme
Copy link
Contributor Author

javeme commented May 10, 2021

wip: add test case

TimeUnit.SECONDS.toMillis(30L)
);

public static final ConfigOption<Integer> WORKER_ID =
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let master assign id for workers

* get aggregators from master.
*/
MasterAggrManager manager = this.managers.get(MasterAggrManager.NAME);
manager.applyAggregators();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add manager.inited() event

Assert.assertEquals(new LongValue(8L), context.aggregatedValue(
MockMasterComputation.AGGR_TEST_LONG_MAX));

Assert.assertEquals(new DoubleValue(20.8), context.aggregatedValue(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

double value 20.8 is ok

MasterService masterService = new MasterService();
WorkerService workerService = new MockWorkerService();

// Config configm = UnitTestBase.updateWithRequiredOptions(
Copy link
Contributor

@houzhizhen houzhizhen May 13, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove unused line and ditto in the following lines

CountDownLatch countDownLatch = new CountDownLatch(3);
Throwable[] exceptions = new Throwable[3];

// Config configm = UnitTestBase.updateWithRequiredOptions(
Copy link
Contributor

@houzhizhen houzhizhen May 13, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove unused line and ditto in the following lines

@javeme javeme force-pushed the implement-aggregator branch 2 times, most recently from 6d2fc14 to 0245bd2 Compare May 13, 2021 15:32
@javeme javeme force-pushed the implement-aggregator branch from 0245bd2 to 5493b60 Compare May 19, 2021 12:13
this.aggregators.put(name, aggregator);
}

public Aggregator<? extends Value<?>> get(String name) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename to copy to consist with #copyAll

// Create managers
WorkerRpcManager rpcManager = new WorkerRpcManager();
this.managers.add(rpcManager);
/*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

set master's rpc address to config, and remove the comments.
config.hugeConfig().setProperty(RpcOptions.RPC_REMOTE_URL.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's not the best way

* config.hugeConfig().setProperty(this.masterInfo.rpcPort())
* NOTE: this init() method will be called twice, ignore the 2nd call
*/
rpcManager.init(this.config);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems rpcManager.init need't be called twice.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it must init before calling rpcManager.inputSplitService()

import com.baidu.hugegraph.computer.core.graph.vertex.Vertex;
import com.baidu.hugegraph.testutil.Assert;

public class MockComputation implements Computation<DoubleValue> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename to MockAggregatorComputation? We be need different type of computation test.

import com.baidu.hugegraph.computer.core.graph.value.LongValue;
import com.baidu.hugegraph.testutil.Assert;

public class MockComputation2 extends MockComputation {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MockAggregatorComputation2 ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is still other test cases


pool.submit(() -> {
Config config = UnitTestBase.updateWithRequiredOptions(
RpcOptions.RPC_REMOTE_URL, "127.0.0.1:8090",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can remove this line if get hostname and port from masterInfo


pool.submit(() -> {
Config config = UnitTestBase.updateWithRequiredOptions(
RpcOptions.RPC_SERVER_HOST, "localhost",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add config ' RpcOptions.RPC_REMOTE_URL, "127.0.0.1:0" ' to listen on random port

import com.baidu.hugegraph.computer.core.master.MasterContext;
import com.baidu.hugegraph.testutil.Assert;

public class MockMasterComputation extends DefaultMasterComputation {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename MockMasterAggregatorComputation ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is still other test cases

@javeme javeme force-pushed the implement-aggregator branch from 1f9cc6d to 6e49914 Compare May 20, 2021 06:13
houzhizhen
houzhizhen previously approved these changes May 20, 2021
this.combineAndSwapIfNeeded(localValue, this.value);
}

private void combineAndSwapIfNeeded(V localValue, V value2) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What mean? doesn't need value2?

public <V extends Value<?>> Aggregator<V> createAggregator(String name) {
/*
* Create aggregator for the current superstep,
* Generally called when computation.beforeSuperstep()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

generally

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this method called every time when computation.beforeSuperstep()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, this method would be called once per superstep for each aggregator

public <V extends Value<?>> void aggregateValue(String name, V value) {
/*
* Update aggregator for the current superstep,
* Generally called when computation.afterSuperstep()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

"The server bind port, if it is zero " +
"will let the system pick up an ephemeral port.",
"The server port to listen on to transfer data. " +
"The system will assign a random port if server_port=0.",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

server_port -> transport.server_port


/**
* Used to add the resources the computation needed. Be called only one
* Used to add the resources needed by the computation. Be called only one
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Be called ... move to line 39

public WorkerAggrManager(ComputerContext context) {
this.context = context;
this.service = null;
this.registerAggregators = new RegisterAggregators();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems no need to new RegisterAggregators(); it will be assigned at init()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just to avoid the potential NPE

// Registered aggregators from master
private RegisterAggregators registerAggregators;
// Cache the aggregators of the previous superstep
private Map<String, Value<?>> lastAggregators;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the function of lastAggregator values? I found that it was not involved in the calculation, just get the value.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// Registered aggregators from master
private RegisterAggregators registerAggregators;
// Cache the aggregators of the previous superstep
private Map<String, Value<?>> lastAggregators;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// Cache the aggregators of the previous superstep
private Map<String, Value<?>> lastAggregators;
// Cache the aggregators of the current superstep
private Aggregators currentAggregators;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Linary
Linary previously approved these changes May 21, 2021
@javeme javeme force-pushed the implement-aggregator branch from d6e000d to 6c41731 Compare May 21, 2021 07:41
@houzhizhen houzhizhen merged commit a087d27 into master May 21, 2021
@houzhizhen houzhizhen deleted the implement-aggregator branch May 21, 2021 07:50
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

3 participants