Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,33 +29,42 @@ public abstract class BspBase {

private static final Logger LOG = Log.logger(BspBase.class);

private Config config;
private BspClient bspClient;
private int workerCount;
private long registerTimeout;
private long barrierOnMasterTimeout;
private long barrierOnWorkersTimeout;
private long logInterval;
private final Config config;

private final String jobId;
private final int workerCount;
private final long registerTimeout;
private final long barrierOnMasterTimeout;
private final long barrierOnWorkersTimeout;
private final long logInterval;

private final BspClient bspClient;

public BspBase(Config config) {
this.config = config;
}

/**
* Do initialization operation, like connect to etcd or ZooKeeper cluster.
*/
public void init() {
this.bspClient = this.createBspClient();
this.bspClient.init();
this.jobId = config.get(ComputerOptions.JOB_ID);
this.workerCount = this.config.get(ComputerOptions.JOB_WORKERS_COUNT);
this.registerTimeout = this.config.get(
ComputerOptions.BSP_REGISTER_TIMEOUT);
ComputerOptions.BSP_REGISTER_TIMEOUT);
this.barrierOnWorkersTimeout = this.config.get(
ComputerOptions.BSP_WAIT_WORKERS_TIMEOUT);
this.barrierOnMasterTimeout = this.config.get(
ComputerOptions.BSP_WAIT_MASTER_TIMEOUT);
this.logInterval = this.config.get(ComputerOptions.BSP_LOG_INTERVAL);
LOG.info("Connect to BSP server: {}", this.bspClient.endpoint());

this.bspClient = this.init();
}

/**
* Do initialization operation, like connect to etcd or ZooKeeper cluster.
*/
private BspClient init() {
BspClient bspClient = this.createBspClient();
bspClient.init(this.jobId);
LOG.info("Init {} BSP connection to '{}' for job '{}'",
bspClient.type(), bspClient.endpoint(), this.jobId);
return bspClient;
}

/**
Expand All @@ -64,11 +73,12 @@ public void init() {
*/
public void close() {
this.bspClient.close();
LOG.info("Closed the BSP connection: {}", this.bspClient.endpoint());
LOG.info("Closed {} BSP connection '{}' for job '{}'",
this.bspClient.type(), this.bspClient.endpoint(), this.jobId);
}

private BspClient createBspClient() {
// TODO: the type of bsp client can be get from config
// TODO: create from factory. the type of bsp can be get from config
return new EtcdBspClient(this.config);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,27 +24,33 @@
interface BspClient {

/**
* Do initialization operation, like connect to etcd cluster.
* Return bsp server type, like etcd or zookeeper.
*/
void init();
String type();

/**
* Contrary to init. Could not do any bsp operation after close is called.
* Get endpoint of the bsp server.
*/
void close();
String endpoint();

/**
* Clean the bsp data of the job.
* Do initialization operation, like connect to etcd server.
*/
void clean();
void init(String namespace);

/**
* Get endpoint of the bsp server.
* Close connection from bsp server.
* Could not do any bsp operation after close is called.
*/
String endpoint();
void close();

/**
* Clean the bsp data of the job.
*/
void clean();

/**
* Put KV pair to the bsp server.
* Put key & value to the bsp server.
*/
void put(String key, byte[] value);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,23 +26,27 @@

public final class EtcdBspClient implements BspClient {

private final Config config;
private final String endpoints;

private EtcdClient etcdClient;

public EtcdBspClient(Config config) {
this.config = config;
this.endpoints = config.get(ComputerOptions.BSP_ETCD_ENDPOINTS);
}

@Override
public String type() {
return "etcd";
}

@Override
public String endpoint() {
return this.config.get(ComputerOptions.BSP_ETCD_ENDPOINTS);
return this.endpoints;
}

@Override
public void init() {
String endpoints = this.endpoint();
String jobId = this.config.get(ComputerOptions.JOB_ID);
this.etcdClient = new EtcdClient(endpoints, jobId);
public void init(String namespace) {
this.etcdClient = new EtcdClient(this.endpoints, namespace);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ public void init(Config config) {
this.maxSuperStep = this.config.get(ComputerOptions.BSP_MAX_SUPER_STEP);

this.bsp4Master = new Bsp4Master(this.config);
this.bsp4Master.init();

InetSocketAddress rpcAddress = this.initManagers();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ public void init(Config config) {
this.workerInfo = new ContainerInfo(0, dataAddress.getHostName(),
0, dataAddress.getPort());
this.bsp4Worker = new Bsp4Worker(this.config, this.workerInfo);
this.bsp4Worker.init();
this.computation = this.config.createObject(
ComputerOptions.WORKER_COMPUTATION_CLASS);
this.computation.init(config);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,9 @@ public void setup() {
);

this.bsp4Master = new Bsp4Master(config);
this.bsp4Master.init();
this.masterInfo = new ContainerInfo(-1, "localhost", 8001, 8002);
this.workerInfo = new ContainerInfo(0, "localhost", 8003, 8004);
this.bsp4Worker = new Bsp4Worker(config, this.workerInfo);
this.bsp4Worker.init();
this.maxSuperStep = config.get(ComputerOptions.BSP_MAX_SUPER_STEP);
}

Expand Down