Skip to content

Commit 5fdc98c

Browse files
count mode for parallel
1 parent d8fed4e commit 5fdc98c

File tree

10 files changed

+123
-55
lines changed

10 files changed

+123
-55
lines changed

src/main/java/ch/sebastianhaeni/pancake/IterativeSolver.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,10 @@
1212
public final class IterativeSolver {
1313

1414
// Change mode here!
15-
private static final Mode CURRENT_MODE = Mode.COUNT;
15+
private static final Mode CURRENT_MODE = Mode.SOLVE;
1616

1717
// Start with alternating sequence
18-
private static final int[] INITIAL_STATE = Generator.alternate(16);
18+
private static final int[] INITIAL_STATE = Generator.alternate(14);
1919

2020
// Start with random sequence
2121
// private static final int[] INITIAL_STATE = Generator.random(25));

src/main/java/ch/sebastianhaeni/pancake/ParallelSolver.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ public final class ParallelSolver {
1111
private static final Mode CURRENT_MODE = Mode.COUNT;
1212

1313
// Start with alternating sequence
14-
private static final int[] INITIAL_STATE = Generator.alternate(10);
14+
private static final int[] INITIAL_STATE = Generator.alternate(16);
1515

1616
// Start with random sequence
1717
// private static final int[] INITIAL_STATE = Generator.random(25));

src/main/java/ch/sebastianhaeni/pancake/dto/Tags.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ public enum Tags {
2424
/**
2525
* The controller commands the worker to split it's work and send back half of it.
2626
*/
27-
SPLIT;
27+
SPLIT, GATHER;
2828

2929
/**
3030
* Create an integer for MPI.

src/main/java/ch/sebastianhaeni/pancake/processor/Controller.java

Lines changed: 7 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
import ch.sebastianhaeni.pancake.dto.Tags;
44
import ch.sebastianhaeni.pancake.dto.WorkPacket;
55
import ch.sebastianhaeni.pancake.model.Node;
6-
import ch.sebastianhaeni.pancake.util.IntListener;
76
import ch.sebastianhaeni.pancake.util.Partition;
87
import ch.sebastianhaeni.pancake.util.Status;
98
import mpi.MPI;
@@ -17,16 +16,16 @@ public abstract class Controller implements IProcessor {
1716

1817
private static final int INITIAL_WORK_DEPTH = 1000;
1918

20-
private final LinkedBlockingQueue<Integer> idleWorkers = new LinkedBlockingQueue<>(MPI.COMM_WORLD.Size() - 1);
19+
final LinkedBlockingQueue<Integer> idleWorkers = new LinkedBlockingQueue<>(MPI.COMM_WORLD.Size() - 1);
2120
final Stack<Node> stack = new Stack<>();
22-
private final int[] workers;
23-
private final int workerCount;
21+
final int[] workers;
22+
final int workerCount;
2423
final int[] initialState;
2524
final Status status = new Status();
2625

2726
private int candidateBound;
28-
private int bound = -1;
29-
private int lastIncrease = -1;
27+
int bound = -1;
28+
int lastIncrease = -1;
3029

3130
Controller(int[] initialState, int workerCount) {
3231
this.initialState = initialState;
@@ -46,33 +45,9 @@ public void run() {
4645

4746
abstract void work();
4847

49-
private void initializeListeners() {
50-
for (int worker : workers) {
51-
(new Thread(new IntListener(Tags.IDLE, this::handleIdle, status, worker))).start();
52-
}
53-
}
54-
55-
private void handleIdle(int source, int result) {
56-
if (idleWorkers.contains(source)) {
57-
return;
58-
}
59-
idleWorkers.add(source);
60-
61-
if (result > bound) {
62-
bound = result;
63-
}
64-
65-
if (idleWorkers.size() == workerCount) {
66-
idleWorkers.clear();
67-
68-
if (lastIncrease == bound) {
69-
return;
70-
}
71-
lastIncrease = bound;
72-
(new Thread(this::solve)).start();
73-
}
74-
}
48+
abstract void handleIdle(int source, int[] result);
7549

50+
abstract void initializeListeners();
7651

7752
void clearListeners() {
7853
Object[] packetBuf = new Object[]{new WorkPacket(0, 0)};

src/main/java/ch/sebastianhaeni/pancake/processor/CountController.java

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,17 @@
11
package ch.sebastianhaeni.pancake.processor;
22

3+
import ch.sebastianhaeni.pancake.dto.Tags;
4+
import ch.sebastianhaeni.pancake.util.IntListener;
35
import mpi.MPI;
46

57
import static ch.sebastianhaeni.pancake.ParallelSolver.CONTROLLER_RANK;
8+
import static ch.sebastianhaeni.pancake.ParallelSolver.EMPTY_BUFFER;
69
import static ch.sebastianhaeni.pancake.util.Output.showCount;
710

811
public class CountController extends Controller {
912

13+
private boolean foundSolution;
14+
1015
public CountController(int[] initialState, int workerCount) {
1116
super(initialState, workerCount);
1217
}
@@ -34,6 +39,57 @@ void work() {
3439
finishCount(countBuf[0], end - start);
3540
}
3641

42+
@Override
43+
void initializeListeners() {
44+
for (int worker : workers) {
45+
(new Thread(new IntListener(Tags.IDLE, this::handleIdle, status, worker, 2))).start();
46+
}
47+
}
48+
49+
@Override
50+
void handleIdle(int source, int[] result) {
51+
52+
if (result[1] > 0 && !foundSolution) {
53+
// found a solution, we won't increase the bound anymore
54+
foundSolution = true;
55+
System.out.println("Found the first solution. Not increasing bound anymore.");
56+
}
57+
58+
if (idleWorkers.contains(source)) {
59+
return;
60+
}
61+
idleWorkers.add(source);
62+
63+
if (result[0] > bound) {
64+
bound = result[0];
65+
}
66+
67+
if (idleWorkers.size() == workerCount) {
68+
idleWorkers.clear();
69+
70+
if (foundSolution) {
71+
try {
72+
Thread.sleep(1000);
73+
} catch (InterruptedException e) {
74+
e.printStackTrace();
75+
}
76+
if (idleWorkers.size() != workerCount) {
77+
return;
78+
}
79+
for (int worker : workers) {
80+
MPI.COMM_WORLD.Isend(EMPTY_BUFFER, 0, 0, MPI.INT, worker, Tags.GATHER.tag());
81+
}
82+
return;
83+
}
84+
85+
if (lastIncrease == bound) {
86+
return;
87+
}
88+
lastIncrease = bound;
89+
(new Thread(this::solve)).start();
90+
}
91+
}
92+
3793
private void finishCount(int count, long millis) {
3894
showCount(initialState, count, millis);
3995
clearListeners();

src/main/java/ch/sebastianhaeni/pancake/processor/CountWorker.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package ch.sebastianhaeni.pancake.processor;
22

3+
import ch.sebastianhaeni.pancake.dto.Tags;
4+
import ch.sebastianhaeni.pancake.util.IntListener;
35
import mpi.MPI;
46

57
import static ch.sebastianhaeni.pancake.ParallelSolver.CONTROLLER_RANK;
@@ -10,6 +12,13 @@ public class CountWorker extends Worker {
1012

1113
@Override
1214
void work() {
15+
(new Thread(new IntListener(Tags.GATHER, (source, result) -> {
16+
status.done();
17+
int[] countResult = new int[1];
18+
countResult[0] = count;
19+
MPI.COMM_WORLD.Reduce(countResult, 0, new int[1], 0, 1, MPI.INT, MPI.SUM, CONTROLLER_RANK);
20+
}, status, CONTROLLER_RANK, 1))).start();
21+
1322
while (!stack.isEmpty()) {
1423
count();
1524
}
@@ -37,7 +46,7 @@ private void count() {
3746
stack.pop();
3847
} else if (stack.peek().getChildren().empty()) {
3948
if (stack.peek().getDepth() == 0) {
40-
requestWork();
49+
requestWork(count);
4150
} else {
4251
stack.pop();
4352
}
@@ -54,7 +63,7 @@ private void count() {
5463
}
5564

5665
if (stack.isEmpty() && !status.isDone()) {
57-
requestWork();
66+
requestWork(count);
5867
}
5968
}
6069
}

src/main/java/ch/sebastianhaeni/pancake/processor/SolveController.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import ch.sebastianhaeni.pancake.dto.Tags;
44
import ch.sebastianhaeni.pancake.model.Node;
5+
import ch.sebastianhaeni.pancake.util.IntListener;
56
import mpi.MPI;
67

78
import java.util.Stack;
@@ -37,6 +38,35 @@ void work() {
3738
finishSolve(solution[0], end - start);
3839
}
3940

41+
@Override
42+
void initializeListeners() {
43+
for (int worker : workers) {
44+
(new Thread(new IntListener(Tags.IDLE, this::handleIdle, status, worker, 1))).start();
45+
}
46+
}
47+
48+
@Override
49+
void handleIdle(int source, int[] result) {
50+
if (idleWorkers.contains(source)) {
51+
return;
52+
}
53+
idleWorkers.add(source);
54+
55+
if (result[0] > bound) {
56+
bound = result[0];
57+
}
58+
59+
if (idleWorkers.size() == workerCount) {
60+
idleWorkers.clear();
61+
62+
if (lastIncrease == bound) {
63+
return;
64+
}
65+
lastIncrease = bound;
66+
(new Thread(this::solve)).start();
67+
}
68+
}
69+
4070
private void finishSolve(Stack<Node> solution, long millis) {
4171
showSolution(solution, millis);
4272
clearListeners();

src/main/java/ch/sebastianhaeni/pancake/processor/SolveWorker.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ private void solve() {
3636
stack.pop();
3737
} else if (stack.peek().getChildren().empty()) {
3838
if (stack.peek().getDepth() == 0) {
39-
requestWork();
39+
requestWork(0);
4040
} else {
4141
stack.pop();
4242
}
@@ -53,7 +53,7 @@ private void solve() {
5353
}
5454

5555
if (stack.isEmpty() && !status.isDone()) {
56-
requestWork();
56+
requestWork(0);
5757
}
5858
}
5959

src/main/java/ch/sebastianhaeni/pancake/processor/Worker.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -47,19 +47,19 @@ private void listenToKill() {
4747
(new Thread(new IntListener(Tags.KILL, (source, result) -> {
4848
status.done();
4949
MPI.COMM_WORLD.Send(EMPTY_BUFFER, 0, 0, MPI.INT, CONTROLLER_RANK, Tags.IDLE.tag());
50-
}, status))).start();
50+
}, status, MPI.ANY_SOURCE, 1))).start();
5151
}
5252

53-
5453
void listenToSplit() {
5554
splitCommand = MPI.COMM_WORLD.Irecv(EMPTY_BUFFER, 0, 0, MPI.INT, MPI.ANY_SOURCE, Tags.SPLIT.tag());
5655
}
5756

58-
void requestWork() {
59-
int[] boundBuf = new int[1];
57+
void requestWork(int data) {
58+
int[] boundBuf = new int[2];
6059
boundBuf[0] = candidateBound;
60+
boundBuf[1] = data;
6161

62-
MPI.COMM_WORLD.Isend(boundBuf, 0, 1, MPI.INT, CONTROLLER_RANK, Tags.IDLE.tag());
62+
MPI.COMM_WORLD.Isend(boundBuf, 0, 2, MPI.INT, CONTROLLER_RANK, Tags.IDLE.tag());
6363
MPI.COMM_WORLD.Isend(EMPTY_BUFFER, 0, 0, MPI.INT, splitDestination, Tags.SPLIT.tag());
6464

6565
waitForWork();

src/main/java/ch/sebastianhaeni/pancake/util/IntListener.java

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,34 +7,32 @@
77

88
public class IntListener implements Runnable {
99
private final Tags tag;
10-
private final BiConsumer<Integer, Integer> consumer;
10+
private final BiConsumer<Integer, int[]> consumer;
1111
private final Status status;
1212
private final int source;
13+
private final int dataCount;
1314

14-
public IntListener(Tags tag, BiConsumer<Integer, Integer> consumer, Status status) {
15-
this(tag, consumer, status, MPI.ANY_SOURCE);
16-
}
17-
18-
public IntListener(Tags tag, BiConsumer<Integer, Integer> consumer, Status status, int source) {
15+
public IntListener(Tags tag, BiConsumer<Integer, int[]> consumer, Status status, int source, int dataCount) {
1916
this.tag = tag;
2017
this.consumer = consumer;
2118
this.status = status;
2219
this.source = source;
20+
this.dataCount = dataCount;
2321
}
2422

2523
@Override
2624
public void run() {
27-
int[] result = new int[1];
28-
mpi.Status response = MPI.COMM_WORLD.Recv(result, 0, 1, MPI.INT, source, tag.tag());
25+
int[] result = new int[dataCount];
26+
mpi.Status response = MPI.COMM_WORLD.Recv(result, 0, dataCount, MPI.INT, source, tag.tag());
2927

3028
if (status.isDone()) {
3129
return;
3230
}
3331

34-
consumer.accept(response.source, result[0]);
32+
consumer.accept(response.source, result);
3533

3634
if (!status.isDone()) {
37-
(new Thread(new IntListener(tag, consumer, status, source))).start();
35+
(new Thread(new IntListener(tag, consumer, status, source, dataCount))).start();
3836
}
3937
}
4038

0 commit comments

Comments
 (0)