这里说一下,生产者-消费者和 保护性暂停 的区别。
- 与保护性暂停不同,不需要产生结果和消费结果一一对应
- 消费队列可以平衡生产和消费的线程资源
- 生产者仅负责产生结果,消费者仅负责处理结果
- 消息队列是有容量限制的,满时不会再加入数据,空时不会再消耗数据
- JDK 中各种堵塞队列,采用该模式
- 异步的意思是,消息产生了,会先放在队列中,可能不会立刻消费
代码如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113
| package com.redisc;
import lombok.extern.slf4j.Slf4j;
import java.util.*;
@Slf4j(topic = "c.Run") public class Run {
public static void main(String[] args) throws Exception { MessageQueue queue = new MessageQueue(2); for (int i = 0; i < 3; i++) { int value = i; new Thread(() -> { queue.put(new Message(value, "值" + value)); }, "生产者" + i).start(); }
new Thread(() -> { try { while (true) { Thread.sleep(1000); Message message = queue.take(); } } catch (InterruptedException e) { e.printStackTrace(); } }, "消费者").start(); }
}
@Slf4j(topic = "c.MessageQueue") class MessageQueue { private LinkedList<Message> list = new LinkedList<>(); private int capcity;
public MessageQueue(int capcity) { this.capcity = capcity; }
public Message take() { synchronized (list) { while (list.isEmpty()) { try { log.debug("队列为空,消费者线程等待"); list.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } Message message = list.removeFirst(); log.debug("已消费消息,{}", message.toString()); list.notifyAll(); return message; } }
public void put(Message message) { synchronized (list) { while (list.size() == capcity) { try { log.debug("队列满了,生产者线程等待"); list.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } log.debug("已生产消息,{}", message.toString()); list.addLast(message); list.notifyAll(); } }
}
final class Message { private int id; private Object value;
public Message(int id, Object value) { this.value = value; this.id = id; }
public int getId() { return id; }
public Object getValue() { return value; }
@Override public String toString() { return "Message{" + "id=" + id + ",value=" + value + "}"; } }
|
输出
1 2 3 4 5 6 7 8
| 17:50:44.184 [生产者0] DEBUG c.MessageQueue - 已生产消息,Message{id=0,value=值0} 17:50:44.187 [生产者2] DEBUG c.MessageQueue - 已生产消息,Message{id=2,value=值2} 17:50:44.187 [生产者1] DEBUG c.MessageQueue - 队列满了,生产者线程等待 17:50:45.185 [消费者] DEBUG c.MessageQueue - 已消费消息,Message{id=0,value=值0} 17:50:45.186 [生产者1] DEBUG c.MessageQueue - 已生产消息,Message{id=1,value=值1} 17:50:46.188 [消费者] DEBUG c.MessageQueue - 已消费消息,Message{id=2,value=值2} 17:50:47.191 [消费者] DEBUG c.MessageQueue - 已消费消息,Message{id=1,value=值1} 17:50:48.192 [消费者] DEBUG c.MessageQueue - 队列为空,消费者线程等待
|