自定义阻塞队列。
阻塞队列
阻塞队列通过 ReentrantLock 和 Condition 进行控制。
- 阻塞队列是有容量的
- 当任务队列满的时候,执行满队列条件
- 当任务队列消耗后,执行有空队列条件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69
| @Slf4j(topic = "c.BlockQuene") class BlockQuene<T> { private Deque<T> queue = new ArrayDeque<>(); private ReentrantLock lock = new ReentrantLock(); private Condition fullWaitSet = lock.newCondition(); private Condition noFullWaitSet = lock.newCondition(); private int capcity;
public BlockQuene(int capcity) { this.capcity = capcity; }
public T take() { lock.lock(); try { while (queue.isEmpty()) { log.debug("队列为空"); try { noFullWaitSet.await(); } catch (InterruptedException e) { e.printStackTrace(); } } T t = queue.removeFirst(); log.debug("唤醒队列 ~ 进行添加"); fullWaitSet.signal(); return t; } finally { lock.unlock(); } }
public void put(T task) { lock.lock(); try { while (queue.size() == capcity) { try { log.debug("队列已满,等待..."); fullWaitSet.await(); } catch (InterruptedException e) { e.printStackTrace(); } } log.debug("添加任务 {}", task); queue.addLast(task); noFullWaitSet.signal(); } finally { lock.unlock(); } }
public int size() { lock.lock(); try { return queue.size(); } finally { lock.unlock(); } } }
|
任务队列
任务队列有以下的元素
- 执行线程数,即最大有多少个线程可以同时被执行
- 任务队列「使用阻塞队列」
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64
| @Slf4j(topic = "c.ThreadPool") class ThreadPool { private BlockQuene<Runnable> taskQueue; private HashSet<Worker> workers = new HashSet<>(); private int coreSize;
public void execute(Runnable task) { synchronized (workers) { if (workers.size() < coreSize) { Worker worker = new Worker(task); log.debug("新增 worker {} {}", worker, task); workers.add(worker); worker.start(); } else { log.debug("加入任务队列 {}", task); taskQueue.put(task); } } }
public ThreadPool(int coreSize, int queueCapcity) { this.coreSize = coreSize; taskQueue = new BlockQuene<>(queueCapcity); }
class Worker extends Thread { private Runnable task;
public Worker(Runnable task) { this.task = task; }
@Override public void run() {
while (task != null || (task = taskQueue.take()) != null) { try { log.debug("正在执行... {}", task); task.run(); } catch (Exception e) { e.printStackTrace(); } finally { log.debug("任务设置为空,{}", task); task = null; } }
synchronized (workers) { workers.remove(this); } } }
}
|
这里面有很多细节
代码下载
下载 JAVA 文件
上面代码具备
2
个执行线程,任务队列大小为 2
- 添加
10
个任务
输出的日志如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
| 12:26:59.980 [main] DEBUG c.ThreadPool - 新增 worker Thread[Thread-0,5,main] com.redisc.Run$$Lambda$1/836514715@2286778 12:26:59.985 [main] DEBUG c.ThreadPool - 新增 worker Thread[Thread-1,5,main] com.redisc.Run$$Lambda$1/836514715@5a8e6209 12:26:59.985 [Thread-0] DEBUG c.ThreadPool - 正在执行... com.redisc.Run$$Lambda$1/836514715@2286778 12:26:59.985 [main] DEBUG c.ThreadPool - 加入任务队列 com.redisc.Run$$Lambda$1/836514715@731a74c 12:26:59.985 [main] DEBUG c.BlockQuene - 添加任务 com.redisc.Run$$Lambda$1/836514715@731a74c 12:26:59.985 [Thread-1] DEBUG c.ThreadPool - 正在执行... com.redisc.Run$$Lambda$1/836514715@5a8e6209 12:26:59.985 [main] DEBUG c.ThreadPool - 加入任务队列 com.redisc.Run$$Lambda$1/836514715@369f73a2 12:26:59.985 [main] DEBUG c.BlockQuene - 添加任务 com.redisc.Run$$Lambda$1/836514715@369f73a2 12:26:59.985 [main] DEBUG c.ThreadPool - 加入任务队列 com.redisc.Run$$Lambda$1/836514715@1f28c152 12:26:59.985 [main] DEBUG c.BlockQuene - 队列已满,等待... 12:27:00.987 [Thread-0] DEBUG c.Test - 0 12:27:00.987 [Thread-1] DEBUG c.Test - 1 12:27:00.988 [Thread-1] DEBUG c.ThreadPool - 任务设置为空,com.redisc.Run$$Lambda$1/836514715@5a8e6209 12:27:00.988 [Thread-0] DEBUG c.ThreadPool - 任务设置为空,com.redisc.Run$$Lambda$1/836514715@2286778 12:27:00.988 [Thread-1] DEBUG c.BlockQuene - 唤醒队列 ~ 进行添加 12:27:00.988 [Thread-0] DEBUG c.BlockQuene - 唤醒队列 ~ 进行添加 12:27:00.988 [Thread-1] DEBUG c.ThreadPool - 正在执行... com.redisc.Run$$Lambda$1/836514715@731a74c 12:27:00.988 [Thread-0] DEBUG c.ThreadPool - 正在执行... com.redisc.Run$$Lambda$1/836514715@369f73a2 12:27:00.988 [main] DEBUG c.BlockQuene - 添加任务 com.redisc.Run$$Lambda$1/836514715@1f28c152 12:27:00.988 [main] DEBUG c.ThreadPool - 加入任务队列 com.redisc.Run$$Lambda$1/836514715@7791a895 12:27:00.988 [main] DEBUG c.BlockQuene - 添加任务 com.redisc.Run$$Lambda$1/836514715@7791a895 12:27:00.988 [main] DEBUG c.ThreadPool - 加入任务队列 com.redisc.Run$$Lambda$1/836514715@3a5ed7a6 12:27:00.988 [main] DEBUG c.BlockQuene - 队列已满,等待... 12:27:01.990 [Thread-1] DEBUG c.Test - 2 12:27:01.990 [Thread-0] DEBUG c.Test - 3 12:27:01.990 [Thread-1] DEBUG c.ThreadPool - 任务设置为空,com.redisc.Run$$Lambda$1/836514715@731a74c 12:27:01.990 [Thread-0] DEBUG c.ThreadPool - 任务设置为空,com.redisc.Run$$Lambda$1/836514715@369f73a2 12:27:01.990 [Thread-1] DEBUG c.BlockQuene - 唤醒队列 ~ 进行添加 12:27:01.990 [Thread-1] DEBUG c.ThreadPool - 正在执行... com.redisc.Run$$Lambda$1/836514715@1f28c152 12:27:01.990 [Thread-0] DEBUG c.BlockQuene - 唤醒队列 ~ 进行添加 12:27:01.991 [Thread-0] DEBUG c.ThreadPool - 正在执行... com.redisc.Run$$Lambda$1/836514715@7791a895 12:27:01.991 [main] DEBUG c.BlockQuene - 添加任务 com.redisc.Run$$Lambda$1/836514715@3a5ed7a6 12:27:01.991 [main] DEBUG c.ThreadPool - 加入任务队列 com.redisc.Run$$Lambda$1/836514715@6325a3ee 12:27:01.991 [main] DEBUG c.BlockQuene - 添加任务 com.redisc.Run$$Lambda$1/836514715@6325a3ee 12:27:01.991 [main] DEBUG c.ThreadPool - 加入任务队列 com.redisc.Run$$Lambda$1/836514715@1d16f93d 12:27:01.991 [main] DEBUG c.BlockQuene - 队列已满,等待... 12:27:02.994 [Thread-0] DEBUG c.Test - 5 12:27:02.994 [Thread-1] DEBUG c.Test - 4 12:27:02.994 [Thread-0] DEBUG c.ThreadPool - 任务设置为空,com.redisc.Run$$Lambda$1/836514715@7791a895 12:27:02.995 [Thread-0] DEBUG c.BlockQuene - 唤醒队列 ~ 进行添加 12:27:02.995 [Thread-1] DEBUG c.ThreadPool - 任务设置为空,com.redisc.Run$$Lambda$1/836514715@1f28c152 12:27:02.995 [Thread-1] DEBUG c.BlockQuene - 唤醒队列 ~ 进行添加 12:27:02.995 [Thread-0] DEBUG c.ThreadPool - 正在执行... com.redisc.Run$$Lambda$1/836514715@3a5ed7a6 12:27:02.995 [Thread-1] DEBUG c.ThreadPool - 正在执行... com.redisc.Run$$Lambda$1/836514715@6325a3ee 12:27:02.995 [main] DEBUG c.BlockQuene - 添加任务 com.redisc.Run$$Lambda$1/836514715@1d16f93d 12:27:02.995 [main] DEBUG c.ThreadPool - 加入任务队列 com.redisc.Run$$Lambda$1/836514715@67b92f0a 12:27:02.995 [main] DEBUG c.BlockQuene - 添加任务 com.redisc.Run$$Lambda$1/836514715@67b92f0a 12:27:04.000 [Thread-0] DEBUG c.Test - 6 12:27:04.000 [Thread-1] DEBUG c.Test - 7 12:27:04.000 [Thread-0] DEBUG c.ThreadPool - 任务设置为空,com.redisc.Run$$Lambda$1/836514715@3a5ed7a6 12:27:04.001 [Thread-0] DEBUG c.BlockQuene - 唤醒队列 ~ 进行添加 12:27:04.001 [Thread-1] DEBUG c.ThreadPool - 任务设置为空,com.redisc.Run$$Lambda$1/836514715@6325a3ee 12:27:04.001 [Thread-1] DEBUG c.BlockQuene - 唤醒队列 ~ 进行添加 12:27:04.001 [Thread-0] DEBUG c.ThreadPool - 正在执行... com.redisc.Run$$Lambda$1/836514715@1d16f93d 12:27:04.001 [Thread-1] DEBUG c.ThreadPool - 正在执行... com.redisc.Run$$Lambda$1/836514715@67b92f0a 12:27:05.005 [Thread-0] DEBUG c.Test - 8 12:27:05.005 [Thread-1] DEBUG c.Test - 9 12:27:05.006 [Thread-0] DEBUG c.ThreadPool - 任务设置为空,com.redisc.Run$$Lambda$1/836514715@1d16f93d 12:27:05.006 [Thread-1] DEBUG c.ThreadPool - 任务设置为空,com.redisc.Run$$Lambda$1/836514715@67b92f0a 12:27:05.006 [Thread-0] DEBUG c.BlockQuene - 队列为空 12:27:05.006 [Thread-1] DEBUG c.BlockQuene - 队列为空
|
根据日志,我们得出两个细节
- 执行线程一直是
thread-0
、thread-1
,也就是线程的复用
- 当队列满的时候,就不会添加任务了,当队列不满的时候会添加任务
下面就两个现象进行说明
执行线程复用
主要是
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| @Override public void run() { // 执行任务 // 当 task 不为空的时候,执行任务 // 当 task 为空的时候,执行任务队列的任务
while (task != null || (task = taskQueue.take()) != null) {// 没有超时 try { log.debug("正在执行... {}", task); task.run(); } catch (Exception e) { e.printStackTrace(); } finally { log.debug("任务设置为空,{}", task); task = null; } }
synchronized (workers) { workers.remove(this); } }
|
这个是 worker
线程自己执行的,由于是 while
循环,所以,一直是该线程拿到任务进行重复执行。
超量任务的添加
主要是
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| public void put(T task) { lock.lock(); try { while (queue.size() == capcity) { try { log.debug("队列已满,等待..."); fullWaitSet.await(); } catch (InterruptedException e) { e.printStackTrace(); } } log.debug("添加任务 {}", task); queue.addLast(task); noFullWaitSet.signal(); } finally { lock.unlock(); } }
|
当队列满的时候,会进行
等待,所以,主线程会进行等待,只有线程不满的时候,才会重新唤醒主线程。
不足之处
- 无限等待
- 即便是所有任务都添加完毕,程序也不会停止,因为,会陷入 take() 没有任务,然后进入等待
- 任务超时
- 如果一个任务执行的时间很长,那么会陷入一直等待
- 如果队列已经满了,也会陷入长时间任务等待