0%

java | 线程池 自定义阻塞队列 - 基本阻塞队列

自定义阻塞队列。

阻塞队列

阻塞队列通过 ReentrantLock 和 Condition 进行控制。

  • 阻塞队列是有容量的
  • 当任务队列满的时候,执行满队列条件
  • 当任务队列消耗后,执行有空队列条件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
// 阻塞队列
@Slf4j(topic = "c.BlockQuene")
class BlockQuene<T> {
// 任务队列
private Deque<T> queue = new ArrayDeque<>();
// 锁
private ReentrantLock lock = new ReentrantLock();
// 满条件变量
private Condition fullWaitSet = lock.newCondition();
// 不满条件变量
private Condition noFullWaitSet = lock.newCondition();
// 容量
private int capcity;

public BlockQuene(int capcity) {
this.capcity = capcity;
}


// 阻塞获取
public T take() {
lock.lock();
try {
while (queue.isEmpty()) {
log.debug("队列为空");
try {
noFullWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T t = queue.removeFirst();
log.debug("唤醒队列 ~ 进行添加");
fullWaitSet.signal();
return t;
} finally {
lock.unlock();
}
}

// 阻塞添加
public void put(T task) {
lock.lock();
try {
while (queue.size() == capcity) {
try {
log.debug("队列已满,等待...");
fullWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("添加任务 {}", task);
queue.addLast(task);
noFullWaitSet.signal();
} finally {
lock.unlock();
}
}

public int size() {
lock.lock();
try {
return queue.size();
} finally {
lock.unlock();
}
}
}

任务队列

任务队列有以下的元素

  • 执行线程数,即最大有多少个线程可以同时被执行
  • 任务队列「使用阻塞队列」
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
@Slf4j(topic = "c.ThreadPool")
class ThreadPool {
// 任务队列
private BlockQuene<Runnable> taskQueue;
// 线程集合
private HashSet<Worker> workers = new HashSet<>();
// 执行线程数
private int coreSize;


// 执行任务
public void execute(Runnable task) {
// 任务数没有超过 coreSize 时,直接交给 worker 执行
// 如果任务超过 coreSize 时,加入任务队列暂存
synchronized (workers) {
if (workers.size() < coreSize) {
Worker worker = new Worker(task);
log.debug("新增 worker {} {}", worker, task);
workers.add(worker);
worker.start();
} else {
log.debug("加入任务队列 {}", task);
taskQueue.put(task);
}
}
}

public ThreadPool(int coreSize, int queueCapcity) {
this.coreSize = coreSize;
taskQueue = new BlockQuene<>(queueCapcity);
}

class Worker extends Thread {
private Runnable task;

public Worker(Runnable task) {
this.task = task;
}

@Override
public void run() {
// 执行任务
// 当 task 不为空的时候,执行任务
// 当 task 为空的时候,执行任务队列的任务

while (task != null || (task = taskQueue.take()) != null) {// 没有超时
try {
log.debug("正在执行... {}", task);
task.run();
} catch (Exception e) {
e.printStackTrace();
} finally {
log.debug("任务设置为空,{}", task);
task = null;
}
}

synchronized (workers) {
workers.remove(this);
}
}
}

}

这里面有很多细节

  • 添加任务的超量处理
  • 执行线程的线程状态

代码下载

下载 JAVA 文件

上面代码具备

  • 2 个执行线程,任务队列大小为 2
  • 添加 10 个任务

输出的日志如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
12:26:59.980 [main] DEBUG c.ThreadPool - 新增 worker Thread[Thread-0,5,main] com.redisc.Run$$Lambda$1/836514715@2286778
12:26:59.985 [main] DEBUG c.ThreadPool - 新增 worker Thread[Thread-1,5,main] com.redisc.Run$$Lambda$1/836514715@5a8e6209
12:26:59.985 [Thread-0] DEBUG c.ThreadPool - 正在执行... com.redisc.Run$$Lambda$1/836514715@2286778
12:26:59.985 [main] DEBUG c.ThreadPool - 加入任务队列 com.redisc.Run$$Lambda$1/836514715@731a74c
12:26:59.985 [main] DEBUG c.BlockQuene - 添加任务 com.redisc.Run$$Lambda$1/836514715@731a74c
12:26:59.985 [Thread-1] DEBUG c.ThreadPool - 正在执行... com.redisc.Run$$Lambda$1/836514715@5a8e6209
12:26:59.985 [main] DEBUG c.ThreadPool - 加入任务队列 com.redisc.Run$$Lambda$1/836514715@369f73a2
12:26:59.985 [main] DEBUG c.BlockQuene - 添加任务 com.redisc.Run$$Lambda$1/836514715@369f73a2
12:26:59.985 [main] DEBUG c.ThreadPool - 加入任务队列 com.redisc.Run$$Lambda$1/836514715@1f28c152
12:26:59.985 [main] DEBUG c.BlockQuene - 队列已满,等待...
12:27:00.987 [Thread-0] DEBUG c.Test - 0
12:27:00.987 [Thread-1] DEBUG c.Test - 1
12:27:00.988 [Thread-1] DEBUG c.ThreadPool - 任务设置为空,com.redisc.Run$$Lambda$1/836514715@5a8e6209
12:27:00.988 [Thread-0] DEBUG c.ThreadPool - 任务设置为空,com.redisc.Run$$Lambda$1/836514715@2286778
12:27:00.988 [Thread-1] DEBUG c.BlockQuene - 唤醒队列 ~ 进行添加
12:27:00.988 [Thread-0] DEBUG c.BlockQuene - 唤醒队列 ~ 进行添加
12:27:00.988 [Thread-1] DEBUG c.ThreadPool - 正在执行... com.redisc.Run$$Lambda$1/836514715@731a74c
12:27:00.988 [Thread-0] DEBUG c.ThreadPool - 正在执行... com.redisc.Run$$Lambda$1/836514715@369f73a2
12:27:00.988 [main] DEBUG c.BlockQuene - 添加任务 com.redisc.Run$$Lambda$1/836514715@1f28c152
12:27:00.988 [main] DEBUG c.ThreadPool - 加入任务队列 com.redisc.Run$$Lambda$1/836514715@7791a895
12:27:00.988 [main] DEBUG c.BlockQuene - 添加任务 com.redisc.Run$$Lambda$1/836514715@7791a895
12:27:00.988 [main] DEBUG c.ThreadPool - 加入任务队列 com.redisc.Run$$Lambda$1/836514715@3a5ed7a6
12:27:00.988 [main] DEBUG c.BlockQuene - 队列已满,等待...
12:27:01.990 [Thread-1] DEBUG c.Test - 2
12:27:01.990 [Thread-0] DEBUG c.Test - 3
12:27:01.990 [Thread-1] DEBUG c.ThreadPool - 任务设置为空,com.redisc.Run$$Lambda$1/836514715@731a74c
12:27:01.990 [Thread-0] DEBUG c.ThreadPool - 任务设置为空,com.redisc.Run$$Lambda$1/836514715@369f73a2
12:27:01.990 [Thread-1] DEBUG c.BlockQuene - 唤醒队列 ~ 进行添加
12:27:01.990 [Thread-1] DEBUG c.ThreadPool - 正在执行... com.redisc.Run$$Lambda$1/836514715@1f28c152
12:27:01.990 [Thread-0] DEBUG c.BlockQuene - 唤醒队列 ~ 进行添加
12:27:01.991 [Thread-0] DEBUG c.ThreadPool - 正在执行... com.redisc.Run$$Lambda$1/836514715@7791a895
12:27:01.991 [main] DEBUG c.BlockQuene - 添加任务 com.redisc.Run$$Lambda$1/836514715@3a5ed7a6
12:27:01.991 [main] DEBUG c.ThreadPool - 加入任务队列 com.redisc.Run$$Lambda$1/836514715@6325a3ee
12:27:01.991 [main] DEBUG c.BlockQuene - 添加任务 com.redisc.Run$$Lambda$1/836514715@6325a3ee
12:27:01.991 [main] DEBUG c.ThreadPool - 加入任务队列 com.redisc.Run$$Lambda$1/836514715@1d16f93d
12:27:01.991 [main] DEBUG c.BlockQuene - 队列已满,等待...
12:27:02.994 [Thread-0] DEBUG c.Test - 5
12:27:02.994 [Thread-1] DEBUG c.Test - 4
12:27:02.994 [Thread-0] DEBUG c.ThreadPool - 任务设置为空,com.redisc.Run$$Lambda$1/836514715@7791a895
12:27:02.995 [Thread-0] DEBUG c.BlockQuene - 唤醒队列 ~ 进行添加
12:27:02.995 [Thread-1] DEBUG c.ThreadPool - 任务设置为空,com.redisc.Run$$Lambda$1/836514715@1f28c152
12:27:02.995 [Thread-1] DEBUG c.BlockQuene - 唤醒队列 ~ 进行添加
12:27:02.995 [Thread-0] DEBUG c.ThreadPool - 正在执行... com.redisc.Run$$Lambda$1/836514715@3a5ed7a6
12:27:02.995 [Thread-1] DEBUG c.ThreadPool - 正在执行... com.redisc.Run$$Lambda$1/836514715@6325a3ee
12:27:02.995 [main] DEBUG c.BlockQuene - 添加任务 com.redisc.Run$$Lambda$1/836514715@1d16f93d
12:27:02.995 [main] DEBUG c.ThreadPool - 加入任务队列 com.redisc.Run$$Lambda$1/836514715@67b92f0a
12:27:02.995 [main] DEBUG c.BlockQuene - 添加任务 com.redisc.Run$$Lambda$1/836514715@67b92f0a
12:27:04.000 [Thread-0] DEBUG c.Test - 6
12:27:04.000 [Thread-1] DEBUG c.Test - 7
12:27:04.000 [Thread-0] DEBUG c.ThreadPool - 任务设置为空,com.redisc.Run$$Lambda$1/836514715@3a5ed7a6
12:27:04.001 [Thread-0] DEBUG c.BlockQuene - 唤醒队列 ~ 进行添加
12:27:04.001 [Thread-1] DEBUG c.ThreadPool - 任务设置为空,com.redisc.Run$$Lambda$1/836514715@6325a3ee
12:27:04.001 [Thread-1] DEBUG c.BlockQuene - 唤醒队列 ~ 进行添加
12:27:04.001 [Thread-0] DEBUG c.ThreadPool - 正在执行... com.redisc.Run$$Lambda$1/836514715@1d16f93d
12:27:04.001 [Thread-1] DEBUG c.ThreadPool - 正在执行... com.redisc.Run$$Lambda$1/836514715@67b92f0a
12:27:05.005 [Thread-0] DEBUG c.Test - 8
12:27:05.005 [Thread-1] DEBUG c.Test - 9
12:27:05.006 [Thread-0] DEBUG c.ThreadPool - 任务设置为空,com.redisc.Run$$Lambda$1/836514715@1d16f93d
12:27:05.006 [Thread-1] DEBUG c.ThreadPool - 任务设置为空,com.redisc.Run$$Lambda$1/836514715@67b92f0a
12:27:05.006 [Thread-0] DEBUG c.BlockQuene - 队列为空
12:27:05.006 [Thread-1] DEBUG c.BlockQuene - 队列为空

根据日志,我们得出两个细节

  • 执行线程一直是 thread-0thread-1,也就是线程的复用
  • 当队列满的时候,就不会添加任务了,当队列不满的时候会添加任务

下面就两个现象进行说明

执行线程复用

主要是

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Override
public void run() {
// 执行任务
// 当 task 不为空的时候,执行任务
// 当 task 为空的时候,执行任务队列的任务

while (task != null || (task = taskQueue.take()) != null) {// 没有超时
try {
log.debug("正在执行... {}", task);
task.run();
} catch (Exception e) {
e.printStackTrace();
} finally {
log.debug("任务设置为空,{}", task);
task = null;
}
}

synchronized (workers) {
workers.remove(this);
}
}

这个是 worker 线程自己执行的,由于是 while 循环,所以,一直是该线程拿到任务进行重复执行。

超量任务的添加

主要是

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 阻塞添加
public void put(T task) {
lock.lock();
try {
while (queue.size() == capcity) {
try {
log.debug("队列已满,等待...");
fullWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("添加任务 {}", task);
queue.addLast(task);
noFullWaitSet.signal();
} finally {
lock.unlock();
}
}

当队列满的时候,会进行

1
fullWaitSet.await();

等待,所以,主线程会进行等待,只有线程不满的时候,才会重新唤醒主线程。

不足之处

  • 无限等待
    • 即便是所有任务都添加完毕,程序也不会停止,因为,会陷入 take() 没有任务,然后进入等待
  • 任务超时
    • 如果一个任务执行的时间很长,那么会陷入一直等待
    • 如果队列已经满了,也会陷入长时间任务等待
请我喝杯咖啡吧~