Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@
import com.baidu.hugegraph.computer.core.common.ContainerInfo;
import com.baidu.hugegraph.computer.core.config.Config;
import com.baidu.hugegraph.computer.core.graph.SuperstepStat;
import com.baidu.hugegraph.computer.core.worker.WorkerStat;
import com.baidu.hugegraph.computer.core.graph.value.IntValue;
import com.baidu.hugegraph.computer.core.util.SerializeUtil;
import com.baidu.hugegraph.computer.core.worker.WorkerStat;
import com.baidu.hugegraph.util.Log;

public class Bsp4Master extends BspBase {
Expand All @@ -47,14 +47,14 @@ public Bsp4Master(Config config) {
public void registerMaster(ContainerInfo masterInfo) {
String path = this.constructPath(BspEvent.BSP_MASTER_REGISTERED);
this.bspClient().put(path, SerializeUtil.toBytes(masterInfo));
LOG.info("Master registered, masterInfo: {}", masterInfo);
LOG.info("Master is registered, master info: {}", masterInfo);
}

/**
* Wait workers registered.
*/
public List<ContainerInfo> waitWorkersRegistered() {
LOG.info("Master is waiting workers registered");
LOG.info("Master is waiting for workers registered");
String path = this.constructPath(BspEvent.BSP_WORKER_REGISTERED);
List<byte[]> serializedContainers = this.waitOnWorkersEvent(
path, this.registerTimeout());
Expand All @@ -64,7 +64,8 @@ public List<ContainerInfo> waitWorkersRegistered() {
SerializeUtil.fromBytes(serializedContainer, container);
containers.add(container);
}
LOG.info("All workers registered, workers: {}", containers);
LOG.info("Master waited all workers registered, workers: {}",
containers);
return containers;
}

Expand All @@ -75,27 +76,28 @@ public void masterSuperstepResume(int superstep) {
String path = this.constructPath(BspEvent.BSP_MASTER_SUPERSTEP_RESUME);
IntValue superstepWritable = new IntValue(superstep);
this.bspClient().put(path, SerializeUtil.toBytes(superstepWritable));
LOG.info("Master resume superstep {}", superstep);
LOG.info("Master set superstep-resume({})", superstep);
}

/**
* Wait all workers read input splits, and send all vertices and
* edges to correspond workers. After this, master call masterInputDone.
*/
public void waitWorkersInputDone() {
LOG.info("Master is waiting workers input done");
LOG.info("Master is waiting for workers input-done");
String path = this.constructPath(BspEvent.BSP_WORKER_INPUT_DONE);
this.waitOnWorkersEvent(path, this.barrierOnWorkersTimeout());
LOG.info("Master waited workers input-done");
}

/**
* The master signal workers the master input done, the workers can merge
* vertices and edges after receive this signal.
*/
public void masterInputDone() {
LOG.info("Master set input-done");
String path = this.constructPath(BspEvent.BSP_MASTER_INPUT_DONE);
this.bspClient().put(path, Constants.EMPTY_BYTES);
LOG.info("Master input done");
}

/**
Expand All @@ -105,7 +107,7 @@ public void masterInputDone() {
* synchronize superstep result.
*/
public List<WorkerStat> waitWorkersSuperstepDone(int superstep) {
LOG.info("Master is waiting workers superstep {} done", superstep);
LOG.info("Master is waiting for workers superstep-done({})", superstep);
String path = this.constructPath(BspEvent.BSP_WORKER_SUPERSTEP_DONE,
superstep);
List<byte[]> list = this.waitOnWorkersEvent(path,
Expand All @@ -116,7 +118,7 @@ public List<WorkerStat> waitWorkersSuperstepDone(int superstep) {
SerializeUtil.fromBytes(bytes, workerStat);
result.add(workerStat);
}
LOG.info("Workers superstep {} done, workers stat: {}",
LOG.info("Master waited workers superstep-done({}), workers stat: {}",
superstep, result);
return result;
}
Expand All @@ -127,21 +129,22 @@ public List<WorkerStat> waitWorkersSuperstepDone(int superstep) {
* prepared done.
*/
public void waitWorkersSuperstepPrepared(int superstep) {
LOG.info("Master is waiting workers prepare superstep {} done",
LOG.info("Master is waiting for workers superstep-prepared({})",
superstep);
String path = this.constructPath(BspEvent.BSP_WORKER_SUPERSTEP_PREPARED,
superstep);
this.waitOnWorkersEvent(path, this.barrierOnWorkersTimeout());
LOG.info("Master waited workers superstep-prepared");
}

/**
* Master signals the workers that the master superstep prepared.
*/
public void masterSuperstepPrepared(int superstep) {
LOG.info("Master set superstep-prepared({})", superstep);
String path = this.constructPath(BspEvent.BSP_MASTER_SUPERSTEP_PREPARED,
superstep);
this.bspClient().put(path, Constants.EMPTY_BYTES);
LOG.info("Master prepare superstep {} done", superstep);
}

/**
Expand All @@ -153,22 +156,23 @@ public void masterSuperstepDone(int superstep,
String path = this.constructPath(BspEvent.BSP_MASTER_SUPERSTEP_DONE,
superstep);
this.bspClient().put(path, SerializeUtil.toBytes(superstepStat));
LOG.info("Master superstep {} done, graph stat: {}",
LOG.info("Master set superstep-done({}), graph stat: {}",
superstep, superstepStat);
}

/**
* Wait workers output the vertices.
*/
public void waitWorkersOutputDone() {
LOG.info("Master is waiting workers output done");
LOG.info("Master is waiting for workers output-done");
String path = this.constructPath(BspEvent.BSP_WORKER_OUTPUT_DONE);
this.waitOnWorkersEvent(path, this.barrierOnWorkersTimeout());
LOG.info("Master waited workers output-done");
}

public void clean() {
this.bspClient().clean();
LOG.info("Clean bsp data done");
LOG.info("Cleaned up the BSP data");
}

private List<byte[]> waitOnWorkersEvent(String prefix, long timeout) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@
import com.baidu.hugegraph.computer.core.common.ContainerInfo;
import com.baidu.hugegraph.computer.core.config.Config;
import com.baidu.hugegraph.computer.core.graph.SuperstepStat;
import com.baidu.hugegraph.computer.core.worker.WorkerStat;
import com.baidu.hugegraph.computer.core.graph.value.IntValue;
import com.baidu.hugegraph.computer.core.util.SerializeUtil;
import com.baidu.hugegraph.computer.core.worker.WorkerStat;
import com.baidu.hugegraph.util.Log;

public class Bsp4Worker extends BspBase {
Expand All @@ -51,20 +51,23 @@ public void registerWorker() {
String path = this.constructPath(BspEvent.BSP_WORKER_REGISTERED,
this.workerInfo.id());
this.bspClient().put(path, SerializeUtil.toBytes(this.workerInfo));
LOG.info("Worker {} registered", this.workerInfo);
LOG.info("Worker is registered: {}", this.workerInfo);
}

/**
* Wait master registered, get master's information includes hostname
* and port.
*/
public ContainerInfo waitMasterRegistered() {
LOG.info("Worker({}) is waiting for master registered",
this.workerInfo.id());
String path = this.constructPath(BspEvent.BSP_MASTER_REGISTERED);
byte[] bytes = this.bspClient().get(path, this.registerTimeout(),
this.logInterval());
ContainerInfo masterInfo = new ContainerInfo();
SerializeUtil.fromBytes(bytes, masterInfo);
LOG.info("Master {} registered", masterInfo);
LOG.info("Worker({}) waited master registered: {}",
this.workerInfo.id(), masterInfo);
return masterInfo;
}

Expand All @@ -73,6 +76,9 @@ public ContainerInfo waitMasterRegistered() {
* listen on.
*/
public List<ContainerInfo> waitWorkersRegistered() {
LOG.info("Worker({}) is waiting for master all-registered",
this.workerInfo.id());
// TODO: change to wait BSP_MASTER_ALL_REGISTERED
String path = this.constructPath(BspEvent.BSP_WORKER_REGISTERED);
List<byte[]> serializedContainers = this.bspClient().getChildren(
path, this.workerCount(),
Expand All @@ -84,7 +90,8 @@ public List<ContainerInfo> waitWorkersRegistered() {
SerializeUtil.fromBytes(serializedContainer, container);
containers.add(container);
}
LOG.info("All workers registered, workers: {}", containers);
LOG.info("Worker({}) waited master all-registered, workers: {}",
this.workerInfo.id(), containers);
return containers;
}

Expand All @@ -93,13 +100,15 @@ public List<ContainerInfo> waitWorkersRegistered() {
* start with.
*/
public int waitMasterSuperstepResume() {
LOG.info("Worker({}) is waiting for master superstep-resume",
this.workerInfo.id());
String path = this.constructPath(BspEvent.BSP_MASTER_SUPERSTEP_RESUME);
byte[] bytes = this.bspClient().get(path,
this.barrierOnMasterTimeout(),
byte[] bytes = this.bspClient().get(path, this.barrierOnMasterTimeout(),
this.logInterval());
IntValue superstep = new IntValue();
SerializeUtil.fromBytes(bytes, superstep);
LOG.info("Resume from superstep {}", superstep.value());
LOG.info("Worker({}) waited superstep-resume({})",
this.workerInfo.id(), superstep.value());
return superstep.value();
}

Expand All @@ -111,18 +120,20 @@ public void workerInputDone() {
String path = this.constructPath(BspEvent.BSP_WORKER_INPUT_DONE,
this.workerInfo.id());
this.bspClient().put(path, Constants.EMPTY_BYTES);
LOG.info("Worker {} input done", this.workerInfo.id());
LOG.info("Worker({}) set input-done", this.workerInfo.id());
}

/**
* Wait master signal that all workers input done. After this, worker
* can merge the vertices and edges.
*/
public void waitMasterInputDone() {
LOG.info("Worker({}) is waiting for master input-done",
this.workerInfo.id());
String path = this.constructPath(BspEvent.BSP_MASTER_INPUT_DONE);
this.bspClient().get(path, this.barrierOnMasterTimeout(),
this.logInterval());
LOG.info("Master input done");
LOG.info("Worker({}) waited master input-done", this.workerInfo.id());
}

/**
Expand All @@ -133,7 +144,7 @@ public void workerSuperstepPrepared(int superstep) {
String path = this.constructPath(BspEvent.BSP_WORKER_SUPERSTEP_PREPARED,
superstep, this.workerInfo.id());
this.bspClient().put(path, Constants.EMPTY_BYTES);
LOG.info("Worker {} prepared superstep {} done",
LOG.info("Worker({}) set superstep-prepared({})",
this.workerInfo.id(), superstep);
}

Expand All @@ -142,11 +153,14 @@ public void workerSuperstepPrepared(int superstep) {
* to other workers.
*/
public void waitMasterSuperstepPrepared(int superstep) {
LOG.info("Waiting master prepared superstep {} done", superstep);
LOG.info("Worker({}) is waiting for master superstep-prepared({})",
this.workerInfo.id(), superstep);
String path = this.constructPath(BspEvent.BSP_MASTER_SUPERSTEP_PREPARED,
superstep);
this.bspClient().get(path, this.barrierOnMasterTimeout(),
this.logInterval());
LOG.info("Worker({}) waited master superstep-prepared({})",
this.workerInfo.id(), superstep);
}

/**
Expand All @@ -157,8 +171,8 @@ public void workerSuperstepDone(int superstep, WorkerStat workerStat) {
String path = this.constructPath(BspEvent.BSP_WORKER_SUPERSTEP_DONE,
superstep, this.workerInfo.id());
this.bspClient().put(path, SerializeUtil.toBytes(workerStat));
LOG.info("Worker superstep {} done, worker stat: {}",
superstep, workerStat);
LOG.info("Worker({}) set superstep-done({}), worker stat: {}",
this.workerInfo.id(), superstep, workerStat);
}

/**
Expand All @@ -167,15 +181,16 @@ public void workerSuperstepDone(int superstep, WorkerStat workerStat) {
* works.
*/
public SuperstepStat waitMasterSuperstepDone(int superstep) {
LOG.info("Worker({}) is waiting for master superstep-done({})",
this.workerInfo.id(), superstep);
String path = this.constructPath(BspEvent.BSP_MASTER_SUPERSTEP_DONE,
superstep);
byte[] bytes = this.bspClient().get(path,
this.barrierOnMasterTimeout(),
byte[] bytes = this.bspClient().get(path, this.barrierOnMasterTimeout(),
this.logInterval());
SuperstepStat superstepStat = new SuperstepStat();
SerializeUtil.fromBytes(bytes, superstepStat);
LOG.info("Master superstep {} done, graph stat: {}",
superstep, superstepStat);
LOG.info("Worker({}) waited master superstep-done({}), graph stat: {}",
this.workerInfo.id(), superstep, superstepStat);
return superstepStat;
}

Expand All @@ -187,6 +202,6 @@ public void workerOutputDone() {
String path = this.constructPath(BspEvent.BSP_WORKER_OUTPUT_DONE,
this.workerInfo.id());
this.bspClient().put(path, Constants.EMPTY_BYTES);
LOG.info("Worker {} output done", this.workerInfo.id());
LOG.info("Worker({}) set output-done", this.workerInfo.id());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public void init() {
this.barrierOnMasterTimeout = this.config.get(
ComputerOptions.BSP_WAIT_MASTER_TIMEOUT);
this.logInterval = this.config.get(ComputerOptions.BSP_LOG_INTERVAL);
LOG.info("Connect to BSP server: {}", this.bspClient.endpoint());
}

/**
Expand All @@ -63,7 +64,7 @@ public void init() {
*/
public void close() {
this.bspClient.close();
LOG.info("closed");
LOG.info("Closed the BSP connection: {}", this.bspClient.endpoint());
}

private BspClient createBspClient() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,24 @@ interface BspClient {
*/
void clean();

/**
* Get endpoint of the bsp server.
*/
String endpoint();

/**
* Put KV pair to the bsp server.
*/
void put(String key, byte[] value);

/**
* Get value by key from the bsp server.
*/
byte[] get(String key);

/**
* Get value by key from the bsp server with timout.
*/
byte[] get(String key, long timeout, long logInterval);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,14 @@ public EtcdBspClient(Config config) {
this.config = config;
}

@Override
public String endpoint() {
return this.config.get(ComputerOptions.BSP_ETCD_ENDPOINTS);
}

@Override
public void init() {
String endpoints = this.config.get(ComputerOptions.BSP_ETCD_ENDPOINTS);
String endpoints = this.endpoint();
String jobId = this.config.get(ComputerOptions.JOB_ID);
this.etcdClient = new EtcdClient(endpoints, jobId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,8 @@ private byte[] waitAndGetFromPutEvent(ByteSequence keySeq, long revision,
try (Watch.Watcher watcher = this.watch.watch(keySeq, watchOption,
consumer)) {
return barrierEvent.await(timeout, logInterval, () -> {
LOG.info("Wait for key '{}'", keySeq.toString(ENCODING));
LOG.info("Wait for key '{}' with timeout {}ms",
keySeq.toString(ENCODING), timeout);
});
}
}
Expand Down Expand Up @@ -373,8 +374,10 @@ private List<byte[]> waitAndPrefixGetFromPutEvent(
watchOption,
consumer)) {
return barrierEvent.await(timeout, logInterval, () -> {
LOG.info("Wait for keys with prefix '{}', expect {} actual {}",
prefixSeq.toString(ENCODING), count, keyValues.size());
LOG.info("Wait for keys with prefix '{}' and timeout {}ms, " +
"expect {} keys but actual got {} keys",
prefixSeq.toString(ENCODING),
timeout, count, keyValues.size());
});
}
}
Expand Down
Loading