java | 线程池 自定义阻塞队列 - 基本阻塞队列 完成了基本的阻塞队列,但是,其还有不足。
这一章节,将加上超时逻辑。
超时逻辑,分别是
主要针对的是阻塞队列。
超时阻塞队列
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122
| @Slf4j(topic = "c.BlockQuene") class BlockQuene<T> { private Deque<T> queue = new ArrayDeque<>(); private ReentrantLock lock = new ReentrantLock(); private Condition fullWaitSet = lock.newCondition(); private Condition noFullWaitSet = lock.newCondition(); private int capcity;
public BlockQuene(int capcity) { this.capcity = capcity; }
public T take() { lock.lock(); try { while (queue.isEmpty()) { log.debug("队列为空"); try { noFullWaitSet.await(); } catch (InterruptedException e) { e.printStackTrace(); } } T t = queue.removeFirst(); log.debug("唤醒队列 ~ 进行添加"); fullWaitSet.signalAll(); return t; } finally { lock.unlock(); } }
public void put(T task) { lock.lock(); try { while (queue.size() == capcity) { try { log.debug("队列已满,等待..."); fullWaitSet.await(); } catch (InterruptedException e) { e.printStackTrace(); } } log.debug("添加任务 {}", task); queue.addLast(task); noFullWaitSet.signalAll(); } finally { lock.unlock(); } }
public T poll(long timeout, TimeUnit unit) { lock.lock(); try { long nanos = unit.toNanos(timeout); while (queue.isEmpty()) { try { if (nanos <= 0) { return null; } nanos = noFullWaitSet.awaitNanos(nanos); } catch (InterruptedException e) { e.printStackTrace(); } } T t = queue.removeFirst(); fullWaitSet.signal(); return t; } finally { lock.unlock(); } }
public boolean offer(T task, long timeout, TimeUnit unit) { lock.lock(); try { long nanos = unit.toNanos(timeout); while (queue.size() == capcity) { try { log.debug("等待加入任务队列 {} ...", task); if (nanos <= 0) { log.debug("任务超时 {}", task); return false; } nanos = fullWaitSet.awaitNanos(nanos); } catch (InterruptedException e) { e.printStackTrace(); } } log.debug("加入任务队列 {}", task); queue.addLast(task); noFullWaitSet.signalAll(); return true; } finally { lock.unlock(); } }
public int size() { lock.lock(); try { return queue.size(); } finally { lock.unlock(); } } }
|
里面主要增加了逻辑
poll(long timeout, TimeUnit unit)
offer(T task, long timeout, TimeUnit unit)
代码下载
下载 JAVA 文件
日志输出如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| 14:40:12.691 [main] DEBUG c.ThreadPool - 新增 worker Thread[Thread-0,5,main] com.redisc.Run$$Lambda$1/836514715@2286778 14:40:12.702 [main] DEBUG c.ThreadPool - 新增 worker Thread[Thread-1,5,main] com.redisc.Run$$Lambda$1/836514715@5a8e6209 14:40:12.702 [Thread-0] DEBUG c.ThreadPool - 正在执行... com.redisc.Run$$Lambda$1/836514715@2286778 14:40:12.702 [main] DEBUG c.ThreadPool - 加入任务队列 com.redisc.Run$$Lambda$1/836514715@731a74c 14:40:12.702 [main] DEBUG c.BlockQuene - 加入任务队列 com.redisc.Run$$Lambda$1/836514715@731a74c 14:40:12.702 [Thread-1] DEBUG c.ThreadPool - 正在执行... com.redisc.Run$$Lambda$1/836514715@5a8e6209 14:40:12.702 [main] DEBUG c.ThreadPool - 加入任务队列 com.redisc.Run$$Lambda$1/836514715@369f73a2 14:40:12.702 [main] DEBUG c.BlockQuene - 加入任务队列 com.redisc.Run$$Lambda$1/836514715@369f73a2 14:40:12.702 [main] DEBUG c.ThreadPool - 加入任务队列 com.redisc.Run$$Lambda$1/836514715@1f28c152 14:40:12.702 [main] DEBUG c.BlockQuene - 等待加入任务队列 com.redisc.Run$$Lambda$1/836514715@1f28c152 ... 14:40:13.208 [main] DEBUG c.BlockQuene - 等待加入任务队列 com.redisc.Run$$Lambda$1/836514715@1f28c152 ... 14:40:13.208 [main] DEBUG c.BlockQuene - 任务超时 com.redisc.Run$$Lambda$1/836514715@1f28c152 14:40:13.208 [main] DEBUG c.ThreadPool - 加入任务队列 com.redisc.Run$$Lambda$1/836514715@7791a895 14:40:13.208 [main] DEBUG c.BlockQuene - 等待加入任务队列 com.redisc.Run$$Lambda$1/836514715@7791a895 ... 14:40:13.709 [main] DEBUG c.BlockQuene - 等待加入任务队列 com.redisc.Run$$Lambda$1/836514715@7791a895 ... 14:40:13.709 [main] DEBUG c.BlockQuene - 任务超时 com.redisc.Run$$Lambda$1/836514715@7791a895 14:40:13.709 [main] DEBUG c.ThreadPool - 加入任务队列 com.redisc.Run$$Lambda$1/836514715@3a5ed7a6 14:40:13.709 [main] DEBUG c.BlockQuene - 等待加入任务队列 com.redisc.Run$$Lambda$1/836514715@3a5ed7a6 ... 14:40:14.213 [main] DEBUG c.BlockQuene - 等待加入任务队列 com.redisc.Run$$Lambda$1/836514715@3a5ed7a6 ... 14:40:14.213 [main] DEBUG c.BlockQuene - 任务超时 com.redisc.Run$$Lambda$1/836514715@3a5ed7a6 14:40:14.213 [main] DEBUG c.ThreadPool - 加入任务队列 com.redisc.Run$$Lambda$1/836514715@6325a3ee 14:40:14.213 [main] DEBUG c.BlockQuene - 等待加入任务队列 com.redisc.Run$$Lambda$1/836514715@6325a3ee ... 14:40:14.719 [main] DEBUG c.BlockQuene - 等待加入任务队列 com.redisc.Run$$Lambda$1/836514715@6325a3ee ... 14:40:14.719 [main] DEBUG c.BlockQuene - 任务超时 com.redisc.Run$$Lambda$1/836514715@6325a3ee 14:40:14.719 [main] DEBUG c.ThreadPool - 加入任务队列 com.redisc.Run$$Lambda$1/836514715@1d16f93d 14:40:14.719 [main] DEBUG c.BlockQuene - 等待加入任务队列 com.redisc.Run$$Lambda$1/836514715@1d16f93d ... 14:40:15.221 [main] DEBUG c.BlockQuene - 等待加入任务队列 com.redisc.Run$$Lambda$1/836514715@1d16f93d ... 14:40:15.221 [main] DEBUG c.BlockQuene - 任务超时 com.redisc.Run$$Lambda$1/836514715@1d16f93d 14:40:15.221 [main] DEBUG c.ThreadPool - 加入任务队列 com.redisc.Run$$Lambda$1/836514715@67b92f0a 14:40:15.221 [main] DEBUG c.BlockQuene - 等待加入任务队列 com.redisc.Run$$Lambda$1/836514715@67b92f0a ... 14:40:15.723 [main] DEBUG c.BlockQuene - 等待加入任务队列 com.redisc.Run$$Lambda$1/836514715@67b92f0a ... 14:40:15.723 [main] DEBUG c.BlockQuene - 任务超时 com.redisc.Run$$Lambda$1/836514715@67b92f0a 14:40:22.702 [Thread-0] DEBUG c.Test - 0 14:40:22.702 [Thread-1] DEBUG c.Test - 1 14:40:22.703 [Thread-1] DEBUG c.ThreadPool - 任务设置为空,com.redisc.Run$$Lambda$1/836514715@5a8e6209 14:40:22.703 [Thread-0] DEBUG c.ThreadPool - 任务设置为空,com.redisc.Run$$Lambda$1/836514715@2286778 14:40:22.703 [Thread-1] DEBUG c.ThreadPool - 正在执行... com.redisc.Run$$Lambda$1/836514715@731a74c 14:40:22.703 [Thread-0] DEBUG c.ThreadPool - 正在执行... com.redisc.Run$$Lambda$1/836514715@369f73a2 14:40:32.706 [Thread-1] DEBUG c.Test - 2 14:40:32.706 [Thread-0] DEBUG c.Test - 3 14:40:32.706 [Thread-0] DEBUG c.ThreadPool - 任务设置为空,com.redisc.Run$$Lambda$1/836514715@369f73a2 14:40:32.706 [Thread-1] DEBUG c.ThreadPool - 任务设置为空,com.redisc.Run$$Lambda$1/836514715@731a74c
|
可以发现很明显的发现就执行了 4
个任务。
不足之处
虽然增加了超时逻辑,但是也丧失了灵活性。
比如,有的任务比较不重要,我希望超时放弃,有的任务很重要,我希望一直等待,显然,这套框架不满足灵活性。