- Notifications
You must be signed in to change notification settings - Fork 44
Standardize bsp synchronization between master and worker #48
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Codecov Report
@@ Coverage Diff @@ ## master #48 +/- ## ============================================ + Coverage 84.56% 84.76% +0.19% - Complexity 1368 1384 +16 ============================================ Files 150 150 Lines 4530 4615 +85 Branches 376 378 +2 ============================================ + Hits 3831 3912 +81 - Misses 471 474 +3 - Partials 228 229 +1
Continue to review full report at Codecov.
|
LOG.info("Master waited all workers registered, workers: {}", | ||
LOG.info("Master waited all workers init-done, workers: {}", | ||
containers); | ||
this.assignContainerId(containers); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename assignIdForWorkers
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
* {@link #waitMasterAllInitDone()}. | ||
*/ | ||
String path = this.constructPath(BspEvent.BSP_WORKER_INIT_DONE, | ||
this.workerInfo.toString()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
define tmpWorkerId = this.workerInfo.host+port
and comment why it's ok
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use uniqueName
LOG.info("Worker({}) waited master all-registered, workers: {}", | ||
String path = this.constructPath(BspEvent.BSP_MASTER_ALL_INIT_DONE); | ||
byte[] serializedContainers = this.bspClient().get( | ||
path, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
align
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
*/ | ||
public int waitMasterSuperstepResume() { | ||
LOG.info("Worker({}) is waiting for master superstep-resume", | ||
public int waitMasterResume() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
waitMasterResumeDone
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
if (this.workerInfo.equalsExceptId(container)) { | ||
this.workerInfo.id(container.id()); | ||
LOG.info("Worker({}) assigned id {} from master", | ||
this.workerInfo, this.workerInfo.id()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this.workerInfo.uniqueName()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
// Note: The workerInfo in Bsp4Worker is the same object in WorkerService. | ||
private void assignThisWorkerId(List<ContainerInfo> workersFromMaster) { | ||
for (ContainerInfo container : workersFromMaster) { | ||
if (this.workerInfo.equalsExceptId(container)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
compare this.workerInfo.uniqueName()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
this.bsp4Master.masterStepComputeDone(superstep); | ||
List<WorkerStat> workerStats = | ||
this.bsp4Master.waitWorkersSuperstepDone(superstep); | ||
this.bsp4Master.waitWorkersStepDone(superstep); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
align with workerStats
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
} | ||
| ||
public static <V extends Readable> List<V> fromBytes(byte[] bytes, | ||
ObjectFactory<V> factory) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use Supplier<V>
BSP_WORKER_INIT_DONE(2, "/worker"), | ||
BSP_MASTER_ALL_INIT_DONE(3, "/master/all_init_done"), | ||
BSP_MASTER_RESUME(4, "/master/resume"), | ||
BSP_MASTER_RESUME_DONE(4, "/master/resume"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
keep the same style: remove "_done" suffix
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
this.rpcPort == other.rpcPort && | ||
this.dataPort == other.dataPort; | ||
public String uniqueName() { | ||
return String.format("%s:%s", this.hostname, this.dataPort); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use stringbuilder
…() instead of workerInfo.id()
unify bsp key to xxxDONE
master set workers info after all workers inited and assign worker id
add compute_done signal between master and workers for every superstep
worker set close_done signal after close the managers
worker signal bsp event BSP_WORKER_INIT_DONE with workerInfo.toString() instead of workerInfo.id()