0%

java | 自定义阻塞队列 - 超时阻塞

java | 线程池 自定义阻塞队列 - 基本阻塞队列 完成了基本的阻塞队列,但是,其还有不足。

这一章节,将加上超时逻辑。

超时逻辑,分别是

  • 取超时
  • 放超时

主要针对的是阻塞队列。

超时阻塞队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
// 阻塞队列
@Slf4j(topic = "c.BlockQuene")
class BlockQuene<T> {
// 任务队列
private Deque<T> queue = new ArrayDeque<>();
// 锁
private ReentrantLock lock = new ReentrantLock();
// 满条件变量
private Condition fullWaitSet = lock.newCondition();
// 不满条件变量
private Condition noFullWaitSet = lock.newCondition();
// 容量
private int capcity;

public BlockQuene(int capcity) {
this.capcity = capcity;
}


// 阻塞获取
public T take() {
lock.lock();
try {
while (queue.isEmpty()) {
log.debug("队列为空");
try {
noFullWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T t = queue.removeFirst();
log.debug("唤醒队列 ~ 进行添加");
fullWaitSet.signalAll();
return t;
} finally {
lock.unlock();
}
}

// 阻塞添加
public void put(T task) {
lock.lock();
try {
while (queue.size() == capcity) {
try {
log.debug("队列已满,等待...");
fullWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("添加任务 {}", task);
queue.addLast(task);
noFullWaitSet.signalAll();
} finally {
lock.unlock();
}
}

// 带超时的阻塞获取
public T poll(long timeout, TimeUnit unit) {
lock.lock();
try {
// 将 timeout 统一转化为纳秒
long nanos = unit.toNanos(timeout);
while (queue.isEmpty()) {
try {
// 返回的是剩余时间
if (nanos <= 0) {
return null;
}
nanos = noFullWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T t = queue.removeFirst();
fullWaitSet.signal();
return t;
} finally {
lock.unlock();
}
}

// 带超时时间的阻塞添加
// 防止队列满了,一直死等
public boolean offer(T task, long timeout, TimeUnit unit) {
lock.lock();
try {
// 将 timeout 统一转化为纳秒
long nanos = unit.toNanos(timeout);
while (queue.size() == capcity) {
try {
log.debug("等待加入任务队列 {} ...", task);
if (nanos <= 0) {
log.debug("任务超时 {}", task);
return false;
}
nanos = fullWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("加入任务队列 {}", task);
queue.addLast(task);
noFullWaitSet.signalAll();
return true;
} finally {
lock.unlock();
}
}

public int size() {
lock.lock();
try {
return queue.size();
} finally {
lock.unlock();
}
}
}

里面主要增加了逻辑

  • poll(long timeout, TimeUnit unit)
  • offer(T task, long timeout, TimeUnit unit)

代码下载

下载 JAVA 文件

日志输出如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
14:40:12.691 [main] DEBUG c.ThreadPool - 新增 worker Thread[Thread-0,5,main] com.redisc.Run$$Lambda$1/836514715@2286778
14:40:12.702 [main] DEBUG c.ThreadPool - 新增 worker Thread[Thread-1,5,main] com.redisc.Run$$Lambda$1/836514715@5a8e6209
14:40:12.702 [Thread-0] DEBUG c.ThreadPool - 正在执行... com.redisc.Run$$Lambda$1/836514715@2286778
14:40:12.702 [main] DEBUG c.ThreadPool - 加入任务队列 com.redisc.Run$$Lambda$1/836514715@731a74c
14:40:12.702 [main] DEBUG c.BlockQuene - 加入任务队列 com.redisc.Run$$Lambda$1/836514715@731a74c
14:40:12.702 [Thread-1] DEBUG c.ThreadPool - 正在执行... com.redisc.Run$$Lambda$1/836514715@5a8e6209
14:40:12.702 [main] DEBUG c.ThreadPool - 加入任务队列 com.redisc.Run$$Lambda$1/836514715@369f73a2
14:40:12.702 [main] DEBUG c.BlockQuene - 加入任务队列 com.redisc.Run$$Lambda$1/836514715@369f73a2
14:40:12.702 [main] DEBUG c.ThreadPool - 加入任务队列 com.redisc.Run$$Lambda$1/836514715@1f28c152
14:40:12.702 [main] DEBUG c.BlockQuene - 等待加入任务队列 com.redisc.Run$$Lambda$1/836514715@1f28c152 ...
14:40:13.208 [main] DEBUG c.BlockQuene - 等待加入任务队列 com.redisc.Run$$Lambda$1/836514715@1f28c152 ...
14:40:13.208 [main] DEBUG c.BlockQuene - 任务超时 com.redisc.Run$$Lambda$1/836514715@1f28c152
14:40:13.208 [main] DEBUG c.ThreadPool - 加入任务队列 com.redisc.Run$$Lambda$1/836514715@7791a895
14:40:13.208 [main] DEBUG c.BlockQuene - 等待加入任务队列 com.redisc.Run$$Lambda$1/836514715@7791a895 ...
14:40:13.709 [main] DEBUG c.BlockQuene - 等待加入任务队列 com.redisc.Run$$Lambda$1/836514715@7791a895 ...
14:40:13.709 [main] DEBUG c.BlockQuene - 任务超时 com.redisc.Run$$Lambda$1/836514715@7791a895
14:40:13.709 [main] DEBUG c.ThreadPool - 加入任务队列 com.redisc.Run$$Lambda$1/836514715@3a5ed7a6
14:40:13.709 [main] DEBUG c.BlockQuene - 等待加入任务队列 com.redisc.Run$$Lambda$1/836514715@3a5ed7a6 ...
14:40:14.213 [main] DEBUG c.BlockQuene - 等待加入任务队列 com.redisc.Run$$Lambda$1/836514715@3a5ed7a6 ...
14:40:14.213 [main] DEBUG c.BlockQuene - 任务超时 com.redisc.Run$$Lambda$1/836514715@3a5ed7a6
14:40:14.213 [main] DEBUG c.ThreadPool - 加入任务队列 com.redisc.Run$$Lambda$1/836514715@6325a3ee
14:40:14.213 [main] DEBUG c.BlockQuene - 等待加入任务队列 com.redisc.Run$$Lambda$1/836514715@6325a3ee ...
14:40:14.719 [main] DEBUG c.BlockQuene - 等待加入任务队列 com.redisc.Run$$Lambda$1/836514715@6325a3ee ...
14:40:14.719 [main] DEBUG c.BlockQuene - 任务超时 com.redisc.Run$$Lambda$1/836514715@6325a3ee
14:40:14.719 [main] DEBUG c.ThreadPool - 加入任务队列 com.redisc.Run$$Lambda$1/836514715@1d16f93d
14:40:14.719 [main] DEBUG c.BlockQuene - 等待加入任务队列 com.redisc.Run$$Lambda$1/836514715@1d16f93d ...
14:40:15.221 [main] DEBUG c.BlockQuene - 等待加入任务队列 com.redisc.Run$$Lambda$1/836514715@1d16f93d ...
14:40:15.221 [main] DEBUG c.BlockQuene - 任务超时 com.redisc.Run$$Lambda$1/836514715@1d16f93d
14:40:15.221 [main] DEBUG c.ThreadPool - 加入任务队列 com.redisc.Run$$Lambda$1/836514715@67b92f0a
14:40:15.221 [main] DEBUG c.BlockQuene - 等待加入任务队列 com.redisc.Run$$Lambda$1/836514715@67b92f0a ...
14:40:15.723 [main] DEBUG c.BlockQuene - 等待加入任务队列 com.redisc.Run$$Lambda$1/836514715@67b92f0a ...
14:40:15.723 [main] DEBUG c.BlockQuene - 任务超时 com.redisc.Run$$Lambda$1/836514715@67b92f0a
14:40:22.702 [Thread-0] DEBUG c.Test - 0
14:40:22.702 [Thread-1] DEBUG c.Test - 1
14:40:22.703 [Thread-1] DEBUG c.ThreadPool - 任务设置为空,com.redisc.Run$$Lambda$1/836514715@5a8e6209
14:40:22.703 [Thread-0] DEBUG c.ThreadPool - 任务设置为空,com.redisc.Run$$Lambda$1/836514715@2286778
14:40:22.703 [Thread-1] DEBUG c.ThreadPool - 正在执行... com.redisc.Run$$Lambda$1/836514715@731a74c
14:40:22.703 [Thread-0] DEBUG c.ThreadPool - 正在执行... com.redisc.Run$$Lambda$1/836514715@369f73a2
14:40:32.706 [Thread-1] DEBUG c.Test - 2
14:40:32.706 [Thread-0] DEBUG c.Test - 3
14:40:32.706 [Thread-0] DEBUG c.ThreadPool - 任务设置为空,com.redisc.Run$$Lambda$1/836514715@369f73a2
14:40:32.706 [Thread-1] DEBUG c.ThreadPool - 任务设置为空,com.redisc.Run$$Lambda$1/836514715@731a74c

可以发现很明显的发现就执行了 4 个任务。

不足之处

虽然增加了超时逻辑,但是也丧失了灵活性。

比如,有的任务比较不重要,我希望超时放弃,有的任务很重要,我希望一直等待,显然,这套框架不满足灵活性。

请我喝杯咖啡吧~