0%

java | 自定义阻塞队列 - 拒绝策略阻塞队列

java | 自定义阻塞队列 - 超时阻塞 使用策略模式进行改进。

使用接口的方式,让用户自己传入执行方法。

阻塞队列

增加 tryPut

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
// 阻塞队列
@Slf4j(topic = "c.BlockQuene")
class BlockQuene<T> {
// 任务队列
private Deque<T> queue = new ArrayDeque<>();
// 锁
private ReentrantLock lock = new ReentrantLock();
// 满条件变量
private Condition fullWaitSet = lock.newCondition();
// 不满条件变量
private Condition noFullWaitSet = lock.newCondition();
// 容量
private int capcity;

public BlockQuene(int capcity) {
this.capcity = capcity;
}


// 阻塞获取
public T take() {
lock.lock();
try {
while (queue.isEmpty()) {
log.debug("队列为空");
try {
noFullWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T t = queue.removeFirst();
log.debug("唤醒队列 ~ 进行添加");
fullWaitSet.signalAll();
return t;
} finally {
lock.unlock();
}
}

// 阻塞添加
public void put(T task) {
lock.lock();
try {
while (queue.size() == capcity) {
try {
log.debug("队列已满,等待...");
fullWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("添加任务 {}", task);
queue.addLast(task);
noFullWaitSet.signalAll();
} finally {
lock.unlock();
}
}

// 带超时的阻塞获取
public T poll(long timeout, TimeUnit unit) {
lock.lock();
try {
// 将 timeout 统一转化为纳秒
long nanos = unit.toNanos(timeout);
while (queue.isEmpty()) {
try {
// 返回的是剩余时间
if (nanos <= 0) {
return null;
}
nanos = noFullWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T t = queue.removeFirst();
fullWaitSet.signal();
return t;
} finally {
lock.unlock();
}
}

// 带超时时间的阻塞添加
// 防止队列满了,一直死等
public boolean offer(T task, long timeout, TimeUnit unit) {
lock.lock();
try {
// 将 timeout 统一转化为纳秒
long nanos = unit.toNanos(timeout);
while (queue.size() == capcity) {
try {
log.debug("等待加入任务队列 {} ...", task);
if (nanos <= 0) {
log.debug("任务超时 {}", task);
return false;
}
nanos = fullWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("加入任务队列 {}", task);
queue.addLast(task);
noFullWaitSet.signalAll();
return true;
} finally {
lock.unlock();
}
}

public int size() {
lock.lock();
try {
return queue.size();
} finally {
lock.unlock();
}
}

public void tryPut(RejectPolicy<T> rejectPolicy, T task) {
lock.lock();
try {
if (queue.size() == capcity) {
log.debug("队列已满,直接执行");
rejectPolicy.reject(this, task);
} else {
log.debug("加入队列 {}", task);
queue.addLast(task);
noFullWaitSet.signalAll();
}
} finally {
lock.unlock();
}
}
}

增加函数接口

1
2
3
4
5
// 拒绝策略
@FunctionalInterface
interface RejectPolicy<T> {
void reject(BlockQuene<T> quene, T task);
}

用户编写策略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@Slf4j(topic = "c.Test")
public class Run {

public static void main(String[] args) {
ThreadPool threadPool = new ThreadPool(2, 500, TimeUnit.MILLISECONDS, 2, ((quene, task) -> {
// 死等
// quene.put(task);
// 带超时的等待
// quene.offer(task, 500, TimeUnit.MILLISECONDS);
// 让调用者放弃任务执行
// log.debug("放弃");
// 调用者抛出异常
// throw new RuntimeException("任务失败" + task);
// 让调用者自己执行任务
task.run();
}));
for (int i = 0; i < 10; i++) {
int j = i;
threadPool.execute(() -> {
try {
if (j > 3) {
Thread.sleep(1000);
} else {
Thread.sleep(10000);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug("{}", j);
});
}
}
}

用户可以自己编写策略,如上面

1
2
3
4
5
6
7
8
9
10
// 死等
// quene.put(task);
// 带超时的等待
// quene.offer(task, 500, TimeUnit.MILLISECONDS);
// 让调用者放弃任务执行
// log.debug("放弃");
// 调用者抛出异常
// throw new RuntimeException("任务失败" + task);
// 让调用者自己执行任务
task.run();

代码下载

下载 JAVA 文件

日志输出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
22:13:55.407 [main] DEBUG c.ThreadPool - 新增 worker Thread[Thread-0,5,main] com.redisc.Run$$Lambda$2/858242339@3108bc
22:13:55.415 [main] DEBUG c.ThreadPool - 新增 worker Thread[Thread-1,5,main] com.redisc.Run$$Lambda$2/858242339@7d907bac
22:13:55.416 [Thread-0] DEBUG c.ThreadPool - 正在执行... com.redisc.Run$$Lambda$2/858242339@3108bc
22:13:55.416 [main] DEBUG c.ThreadPool - 加入任务队列 com.redisc.Run$$Lambda$2/858242339@3a5ed7a6
22:13:55.416 [main] DEBUG c.BlockQuene - 加入队列 com.redisc.Run$$Lambda$2/858242339@3a5ed7a6
22:13:55.416 [Thread-1] DEBUG c.ThreadPool - 正在执行... com.redisc.Run$$Lambda$2/858242339@7d907bac
22:13:55.416 [main] DEBUG c.ThreadPool - 加入任务队列 com.redisc.Run$$Lambda$2/858242339@6325a3ee
22:13:55.416 [main] DEBUG c.BlockQuene - 加入队列 com.redisc.Run$$Lambda$2/858242339@6325a3ee
22:13:55.416 [main] DEBUG c.ThreadPool - 加入任务队列 com.redisc.Run$$Lambda$2/858242339@1d16f93d
22:13:55.416 [main] DEBUG c.BlockQuene - 队列已满,直接执行
22:13:56.417 [main] DEBUG c.Test - 4
22:13:56.417 [main] DEBUG c.ThreadPool - 加入任务队列 com.redisc.Run$$Lambda$2/858242339@67b92f0a
22:13:56.417 [main] DEBUG c.BlockQuene - 队列已满,直接执行
22:13:57.422 [main] DEBUG c.Test - 5
22:13:57.422 [main] DEBUG c.ThreadPool - 加入任务队列 com.redisc.Run$$Lambda$2/858242339@2b9627bc
22:13:57.422 [main] DEBUG c.BlockQuene - 队列已满,直接执行
22:13:58.427 [main] DEBUG c.Test - 6
22:13:58.427 [main] DEBUG c.ThreadPool - 加入任务队列 com.redisc.Run$$Lambda$2/858242339@65e2dbf3
22:13:58.427 [main] DEBUG c.BlockQuene - 队列已满,直接执行
22:13:59.428 [main] DEBUG c.Test - 7
22:13:59.428 [main] DEBUG c.ThreadPool - 加入任务队列 com.redisc.Run$$Lambda$2/858242339@4f970963
22:13:59.428 [main] DEBUG c.BlockQuene - 队列已满,直接执行
22:14:00.433 [main] DEBUG c.Test - 8
22:14:00.433 [main] DEBUG c.ThreadPool - 加入任务队列 com.redisc.Run$$Lambda$2/858242339@61f8bee4
22:14:00.434 [main] DEBUG c.BlockQuene - 队列已满,直接执行
22:14:01.439 [main] DEBUG c.Test - 9
22:14:05.421 [Thread-0] DEBUG c.Test - 0
22:14:05.421 [Thread-1] DEBUG c.Test - 1
22:14:05.421 [Thread-0] DEBUG c.ThreadPool - 任务设置为空,com.redisc.Run$$Lambda$2/858242339@3108bc
22:14:05.421 [Thread-1] DEBUG c.ThreadPool - 任务设置为空,com.redisc.Run$$Lambda$2/858242339@7d907bac
22:14:05.421 [Thread-1] DEBUG c.ThreadPool - 正在执行... com.redisc.Run$$Lambda$2/858242339@3a5ed7a6
22:14:05.422 [Thread-0] DEBUG c.ThreadPool - 正在执行... com.redisc.Run$$Lambda$2/858242339@6325a3ee
22:14:15.422 [Thread-0] DEBUG c.Test - 3
22:14:15.422 [Thread-1] DEBUG c.Test - 2
22:14:15.422 [Thread-0] DEBUG c.ThreadPool - 任务设置为空,com.redisc.Run$$Lambda$2/858242339@6325a3ee
22:14:15.422 [Thread-1] DEBUG c.ThreadPool - 任务设置为空,com.redisc.Run$$Lambda$2/858242339@3a5ed7a6
请我喝杯咖啡吧~