java | 自定义阻塞队列 - 超时阻塞 使用策略模式进行改进。
使用接口的方式,让用户自己传入执行方法。
阻塞队列
增加 tryPut
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138
| // 阻塞队列 @Slf4j(topic = "c.BlockQuene") class BlockQuene<T> { // 任务队列 private Deque<T> queue = new ArrayDeque<>(); // 锁 private ReentrantLock lock = new ReentrantLock(); // 满条件变量 private Condition fullWaitSet = lock.newCondition(); // 不满条件变量 private Condition noFullWaitSet = lock.newCondition(); // 容量 private int capcity;
public BlockQuene(int capcity) { this.capcity = capcity; }
// 阻塞获取 public T take() { lock.lock(); try { while (queue.isEmpty()) { log.debug("队列为空"); try { noFullWaitSet.await(); } catch (InterruptedException e) { e.printStackTrace(); } } T t = queue.removeFirst(); log.debug("唤醒队列 ~ 进行添加"); fullWaitSet.signalAll(); return t; } finally { lock.unlock(); } }
// 阻塞添加 public void put(T task) { lock.lock(); try { while (queue.size() == capcity) { try { log.debug("队列已满,等待..."); fullWaitSet.await(); } catch (InterruptedException e) { e.printStackTrace(); } } log.debug("添加任务 {}", task); queue.addLast(task); noFullWaitSet.signalAll(); } finally { lock.unlock(); } }
// 带超时的阻塞获取 public T poll(long timeout, TimeUnit unit) { lock.lock(); try { // 将 timeout 统一转化为纳秒 long nanos = unit.toNanos(timeout); while (queue.isEmpty()) { try { // 返回的是剩余时间 if (nanos <= 0) { return null; } nanos = noFullWaitSet.awaitNanos(nanos); } catch (InterruptedException e) { e.printStackTrace(); } } T t = queue.removeFirst(); fullWaitSet.signal(); return t; } finally { lock.unlock(); } }
// 带超时时间的阻塞添加 // 防止队列满了,一直死等 public boolean offer(T task, long timeout, TimeUnit unit) { lock.lock(); try { // 将 timeout 统一转化为纳秒 long nanos = unit.toNanos(timeout); while (queue.size() == capcity) { try { log.debug("等待加入任务队列 {} ...", task); if (nanos <= 0) { log.debug("任务超时 {}", task); return false; } nanos = fullWaitSet.awaitNanos(nanos); } catch (InterruptedException e) { e.printStackTrace(); } } log.debug("加入任务队列 {}", task); queue.addLast(task); noFullWaitSet.signalAll(); return true; } finally { lock.unlock(); } }
public int size() { lock.lock(); try { return queue.size(); } finally { lock.unlock(); } }
public void tryPut(RejectPolicy<T> rejectPolicy, T task) { lock.lock(); try { if (queue.size() == capcity) { log.debug("队列已满,直接执行"); rejectPolicy.reject(this, task); } else { log.debug("加入队列 {}", task); queue.addLast(task); noFullWaitSet.signalAll(); } } finally { lock.unlock(); } } }
|
增加函数接口
1 2 3 4 5
| @FunctionalInterface interface RejectPolicy<T> { void reject(BlockQuene<T> quene, T task); }
|
用户编写策略
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| @Slf4j(topic = "c.Test") public class Run {
public static void main(String[] args) { ThreadPool threadPool = new ThreadPool(2, 500, TimeUnit.MILLISECONDS, 2, ((quene, task) -> {
task.run(); })); for (int i = 0; i < 10; i++) { int j = i; threadPool.execute(() -> { try { if (j > 3) { Thread.sleep(1000); } else { Thread.sleep(10000); } } catch (InterruptedException e) { e.printStackTrace(); } log.debug("{}", j); }); } } }
|
用户可以自己编写策略,如上面
1 2 3 4 5 6 7 8 9 10
| // 死等 // quene.put(task); // 带超时的等待 // quene.offer(task, 500, TimeUnit.MILLISECONDS); // 让调用者放弃任务执行 // log.debug("放弃"); // 调用者抛出异常 // throw new RuntimeException("任务失败" + task); // 让调用者自己执行任务 task.run();
|
代码下载
下载 JAVA 文件
日志输出
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| 22:13:55.407 [main] DEBUG c.ThreadPool - 新增 worker Thread[Thread-0,5,main] com.redisc.Run$$Lambda$2/858242339@3108bc 22:13:55.415 [main] DEBUG c.ThreadPool - 新增 worker Thread[Thread-1,5,main] com.redisc.Run$$Lambda$2/858242339@7d907bac 22:13:55.416 [Thread-0] DEBUG c.ThreadPool - 正在执行... com.redisc.Run$$Lambda$2/858242339@3108bc 22:13:55.416 [main] DEBUG c.ThreadPool - 加入任务队列 com.redisc.Run$$Lambda$2/858242339@3a5ed7a6 22:13:55.416 [main] DEBUG c.BlockQuene - 加入队列 com.redisc.Run$$Lambda$2/858242339@3a5ed7a6 22:13:55.416 [Thread-1] DEBUG c.ThreadPool - 正在执行... com.redisc.Run$$Lambda$2/858242339@7d907bac 22:13:55.416 [main] DEBUG c.ThreadPool - 加入任务队列 com.redisc.Run$$Lambda$2/858242339@6325a3ee 22:13:55.416 [main] DEBUG c.BlockQuene - 加入队列 com.redisc.Run$$Lambda$2/858242339@6325a3ee 22:13:55.416 [main] DEBUG c.ThreadPool - 加入任务队列 com.redisc.Run$$Lambda$2/858242339@1d16f93d 22:13:55.416 [main] DEBUG c.BlockQuene - 队列已满,直接执行 22:13:56.417 [main] DEBUG c.Test - 4 22:13:56.417 [main] DEBUG c.ThreadPool - 加入任务队列 com.redisc.Run$$Lambda$2/858242339@67b92f0a 22:13:56.417 [main] DEBUG c.BlockQuene - 队列已满,直接执行 22:13:57.422 [main] DEBUG c.Test - 5 22:13:57.422 [main] DEBUG c.ThreadPool - 加入任务队列 com.redisc.Run$$Lambda$2/858242339@2b9627bc 22:13:57.422 [main] DEBUG c.BlockQuene - 队列已满,直接执行 22:13:58.427 [main] DEBUG c.Test - 6 22:13:58.427 [main] DEBUG c.ThreadPool - 加入任务队列 com.redisc.Run$$Lambda$2/858242339@65e2dbf3 22:13:58.427 [main] DEBUG c.BlockQuene - 队列已满,直接执行 22:13:59.428 [main] DEBUG c.Test - 7 22:13:59.428 [main] DEBUG c.ThreadPool - 加入任务队列 com.redisc.Run$$Lambda$2/858242339@4f970963 22:13:59.428 [main] DEBUG c.BlockQuene - 队列已满,直接执行 22:14:00.433 [main] DEBUG c.Test - 8 22:14:00.433 [main] DEBUG c.ThreadPool - 加入任务队列 com.redisc.Run$$Lambda$2/858242339@61f8bee4 22:14:00.434 [main] DEBUG c.BlockQuene - 队列已满,直接执行 22:14:01.439 [main] DEBUG c.Test - 9 22:14:05.421 [Thread-0] DEBUG c.Test - 0 22:14:05.421 [Thread-1] DEBUG c.Test - 1 22:14:05.421 [Thread-0] DEBUG c.ThreadPool - 任务设置为空,com.redisc.Run$$Lambda$2/858242339@3108bc 22:14:05.421 [Thread-1] DEBUG c.ThreadPool - 任务设置为空,com.redisc.Run$$Lambda$2/858242339@7d907bac 22:14:05.421 [Thread-1] DEBUG c.ThreadPool - 正在执行... com.redisc.Run$$Lambda$2/858242339@3a5ed7a6 22:14:05.422 [Thread-0] DEBUG c.ThreadPool - 正在执行... com.redisc.Run$$Lambda$2/858242339@6325a3ee 22:14:15.422 [Thread-0] DEBUG c.Test - 3 22:14:15.422 [Thread-1] DEBUG c.Test - 2 22:14:15.422 [Thread-0] DEBUG c.ThreadPool - 任务设置为空,com.redisc.Run$$Lambda$2/858242339@6325a3ee 22:14:15.422 [Thread-1] DEBUG c.ThreadPool - 任务设置为空,com.redisc.Run$$Lambda$2/858242339@3a5ed7a6
|