Skip to content

Commit 22f513a

Browse files
committed
回顾总结多线程-观察者模式在多线程中的使用
1 parent a775bbf commit 22f513a

File tree

14 files changed

+260
-1
lines changed

14 files changed

+260
-1
lines changed
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
多线程架构设计
2+
---
3+
* [监控任务的生命周期](/observer/doc.md)
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
plugins {
2+
java
3+
}
4+
5+
group = "com.xdq.observer"
6+
version = "0.0.1"
7+
java.sourceCompatibility = JavaVersion.VERSION_1_8
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
监控任务的生命周期
2+
---
3+
### 场景
4+
在使用多线程时,我们执行完start后其实不会等待线程的执行结果,
5+
有时我们会用同步等待的方式去获取线程的执行结果,这样其实是不好的。
6+
所以我们可以不去使用主动的方式获取执行结果,而是在执行结束后被告知结果,
7+
这样及可以准时的获取执行结果反馈,也不需要阻塞式的获取线程执行结果。
8+
### 模型
9+
>使用观察者模型
10+
11+
![任务周期模型](http://assets.processon.com/chart_image/608adcd7e0b34d47a7146064.png)
12+
13+
### 实践
14+
1. 非Spring容器下使用,见[代码示例](/src/main/java/com/xdq/observer/Main.java)
15+
2. 如何在Spring容器中使用
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package com.xdq.observer;
2+
3+
/**
4+
* @author Seven.Xu
5+
* @version 2021/4/30
6+
**/
7+
public enum Cycle {
8+
STARTED, RUNNING, DONE, ERROR
9+
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package com.xdq.observer;
2+
3+
/**
4+
* @author Seven.Xu
5+
* @version 2021/4/30
6+
**/
7+
public class EmptyLifecycle<T> implements TaskLifecycle<T> {
8+
@Override
9+
public void onStart(Thread thread) {
10+
11+
}
12+
13+
@Override
14+
public void onRunning(Thread thread) {
15+
16+
}
17+
18+
@Override
19+
public void onFinish(Thread thread, T result) {
20+
21+
}
22+
23+
@Override
24+
public void onError(Thread thread, Exception e) {
25+
26+
}
27+
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
package com.xdq.observer;
2+
3+
import java.util.concurrent.TimeUnit;
4+
5+
/**
6+
* @author Seven.Xu
7+
* @version 2021/4/30
8+
**/
9+
public class Main {
10+
public static void main(String[] args) {
11+
TaskLifecycle<String> lifecycle = new TaskLifecycle<String>() {
12+
@Override
13+
public void onStart(Thread thread) {
14+
System.out.println("任务start");
15+
}
16+
17+
@Override
18+
public void onRunning(Thread thread) {
19+
System.out.println("任务running");
20+
}
21+
22+
@Override
23+
public void onFinish(Thread thread, String result) {
24+
System.out.println("任务finish,result:" + result);
25+
}
26+
27+
@Override
28+
public void onError(Thread thread, Exception e) {
29+
System.out.println("任务error,exception:" + e);
30+
}
31+
};
32+
33+
Observable observable = new ObservableThread<>(lifecycle, () -> {
34+
TimeUnit.SECONDS.sleep(2);
35+
System.out.println("任务内容");
36+
return "任务结果";
37+
});
38+
observable.start();
39+
}
40+
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package com.xdq.observer;
2+
3+
/**
4+
* 需要Thread其他方法可以在这个接口里扩展
5+
* @author Seven.Xu
6+
* @version 2021/4/30
7+
**/
8+
public interface Observable {
9+
/**
10+
* 获取当前任务的生命周期
11+
* @return
12+
*/
13+
Cycle getCycle();
14+
15+
/**
16+
* 定义启动线程的方法,主要作用为了屏蔽Thread的其他方法
17+
*/
18+
void start();
19+
20+
/**
21+
* 定义打断线程的方法,也是为了屏蔽Thread的方法
22+
*/
23+
void interrupt();
24+
25+
}
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
package com.xdq.observer;
2+
3+
import java.util.Objects;
4+
5+
/**
6+
* @author Seven.Xu
7+
* @version 2021/4/30
8+
**/
9+
public class ObservableThread<R> extends Thread implements Observable {
10+
11+
private final TaskLifecycle<R> lifecycle;
12+
private final Task<R> task;
13+
private Cycle cycle;
14+
15+
/**
16+
* 指定Task实现,默认执行EmptyLifecycle
17+
*
18+
* @param task
19+
*/
20+
public ObservableThread(Task<R> task) {
21+
this(new TaskLifecycle.EmptyLifecycle<R>(), task);
22+
}
23+
24+
public ObservableThread(TaskLifecycle<R> lifecycle, Task<R> task) {
25+
super();//初始化Thread
26+
Objects.requireNonNull(task,"task cannot be null");
27+
this.lifecycle = lifecycle;
28+
this.task = task;
29+
}
30+
31+
@Override
32+
public Cycle getCycle() {
33+
return this.cycle;
34+
}
35+
36+
/**
37+
* final防止重写
38+
*/
39+
@Override
40+
public final void run() {
41+
this.update(Cycle.STARTED, null, null);
42+
try {
43+
this.update(Cycle.RUNNING, null, null);
44+
R result = this.task.call();
45+
this.update(Cycle.DONE, result, null);
46+
} catch (Exception e) {
47+
this.update(Cycle.ERROR, null, e);
48+
}
49+
}
50+
51+
private void update(Cycle cycle, R result, Exception e) {
52+
this.cycle = cycle;
53+
if (lifecycle == null) {
54+
return;
55+
}
56+
try {
57+
switch (this.cycle) {
58+
case STARTED:
59+
this.lifecycle.onStart(currentThread());
60+
break;
61+
case RUNNING:
62+
this.lifecycle.onRunning(currentThread());
63+
break;
64+
case DONE:
65+
this.lifecycle.onFinish(currentThread(), result);
66+
break;
67+
case ERROR:
68+
this.lifecycle.onError(currentThread(), e);
69+
}
70+
} catch (Exception ex) {
71+
//不能屏蔽任务执行的异常
72+
if (cycle == Cycle.ERROR) {
73+
throw ex;
74+
}
75+
//通知后出现异常需要catch,防止影响任务的正常执行
76+
}
77+
78+
}
79+
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package com.xdq.observer;
2+
3+
/**
4+
* @author Seven.Xu
5+
* @version 2021/4/30
6+
**/
7+
@FunctionalInterface
8+
public interface Task<R> {
9+
10+
/**
11+
* 任务执行接口
12+
* @return
13+
*/
14+
R call() throws InterruptedException;
15+
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package com.xdq.observer;
2+
3+
/**
4+
* @author Seven.Xu
5+
* @version 2021/4/30
6+
**/
7+
public interface TaskLifecycle<T> {
8+
void onStart(Thread thread);
9+
10+
void onRunning(Thread thread);
11+
12+
void onFinish(Thread thread, T result);
13+
14+
void onError(Thread thread,Exception e);
15+
16+
class EmptyLifecycle<T> implements TaskLifecycle<T>{
17+
18+
@Override
19+
public void onStart(Thread thread) {
20+
21+
}
22+
23+
@Override
24+
public void onRunning(Thread thread) {
25+
26+
}
27+
28+
@Override
29+
public void onFinish(Thread thread, T result) {
30+
31+
}
32+
33+
@Override
34+
public void onError(Thread thread, Exception e) {
35+
36+
}
37+
}
38+
}

0 commit comments

Comments
 (0)