Skip to content

Commit 3c837f6

Browse files
committed
Merge remote-tracking branch 'origin/master'
2 parents 9dbec47 + 9e9a463 commit 3c837f6

File tree

1 file changed

+99
-3
lines changed
  • spring-demo/spring-security/src/main/java/com/janloong/springsecurity/jdk/concurrent

1 file changed

+99
-3
lines changed
Lines changed: 99 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,16 @@
11
package com.janloong.springsecurity.jdk.concurrent;
22

33

4+
import java.util.concurrent.BlockingQueue;
45
import java.util.concurrent.DelayQueue;
56
import java.util.concurrent.Delayed;
7+
import java.util.concurrent.TimeUnit;
8+
import java.util.concurrent.atomic.AtomicInteger;
69

710
/**
811
* 延迟队列测试
12+
* <p/>
13+
* See <a href="https://blog.csdn.net/dkfajsldfsdfsd/article/details/88966814">参考blog</a>
914
*
1015
* @author <a href ="https://blog.janloong.com">Janloong Doo</a>
1116
* @version 1.0.0
@@ -15,10 +20,101 @@
1520
public class DelayQueueTest {
1621

1722

18-
public void sendWithQueue() {
19-
DelayQueue<Delayed> delayeds = new DelayQueue<>();
23+
public static void main(String[] args) {
24+
BlockingQueue<DelayTask> queue = new DelayQueue<DelayTask>();
25+
//添加10个延迟任务,每个任务执行耗时2秒
26+
for (int i = 0; i < 10; i++) {
27+
try {
28+
queue.put(new DelayTask("work " + i, 2000));
29+
} catch (InterruptedException e) {
30+
e.printStackTrace();
31+
}
32+
}
33+
ThreadGroup g = new ThreadGroup("Consumers");
34+
//初始化线程数量i,开始延迟任务的执行
35+
for (int i = 0; i < 1; i++) {
36+
new Thread(g, new DelayTaskComsumer(queue)).start();
37+
}
38+
while (DelayTask.taskCount.get() > 0) {
39+
try {
40+
Thread.sleep(50);
41+
} catch (InterruptedException e) {
42+
e.printStackTrace();
43+
}
44+
}
45+
g.interrupt();
46+
System.out.println("Main thread finished");
47+
}
48+
}
49+
50+
/**
51+
* 延迟任务
52+
*
53+
* @author <a href ="https://blog.janloong.com">Janloong Doo</a>
54+
* @since 2020/5/12 0012 23:26
55+
**/
56+
class DelayTask implements Delayed {
57+
private long currentTime = System.currentTimeMillis();
58+
protected final String taskName; //任务名称
59+
protected final int timeCost; //任务执行耗时
60+
protected final long scheduleTime;
61+
protected static final AtomicInteger taskCount = new AtomicInteger(0);
62+
63+
public DelayTask(String taskName, int timeCost) {
64+
this.taskName = taskName;
65+
this.timeCost = timeCost;
66+
taskCount.incrementAndGet();
67+
currentTime += 1000 + (long) (Math.random() * 1000);
68+
scheduleTime = currentTime;
69+
}
70+
71+
@Override
72+
public long getDelay(TimeUnit unit) {
73+
long expirationTime = scheduleTime - System.currentTimeMillis();
74+
return unit.convert(expirationTime, TimeUnit.MILLISECONDS);
75+
}
2076

77+
@Override
78+
public int compareTo(Delayed o) {
79+
return (int) (this.scheduleTime - ((DelayTask) o).scheduleTime);
2180
}
2281

23-
//class
82+
83+
public void execTask() {
84+
long startTime = System.currentTimeMillis();
85+
System.out.println("Task " + taskName + ": schedule_start_time=" + scheduleTime + ",real start time="
86+
+ startTime + ",delay=" + (startTime - scheduleTime));
87+
try {
88+
Thread.sleep(timeCost);
89+
} catch (InterruptedException e) {
90+
e.printStackTrace();
91+
}
92+
}
93+
}
94+
95+
96+
class DelayTaskComsumer extends Thread {
97+
private final BlockingQueue<DelayTask> queue;
98+
99+
public DelayTaskComsumer(BlockingQueue<DelayTask> queue) {
100+
this.queue = queue;
101+
}
102+
103+
@Override
104+
public void run() {
105+
DelayTask task = null;
106+
try {
107+
//在当前线程持续获取队列中的任务进行执行
108+
while (true) {
109+
//获取延迟队列中的延迟任务,当未到延迟时间时会出于阻塞状态无法获取到任务
110+
task = queue.take();
111+
task.execTask();
112+
//任务执行完成后通过让计数器抛出异常终止延迟任务的执行
113+
DelayTask.taskCount.decrementAndGet();
114+
}
115+
} catch (InterruptedException e) {
116+
System.out.println(getName() + " finished");
117+
}
118+
}
24119
}
120+

0 commit comments

Comments
 (0)