让有限的工作线程(Worker Thread)来轮流异步处理无限多的任务。
典型代表就是线程池,也体现了经典设计模式中的享元模式。
注意,不同任务类型应该用不同的线程池,这样能够有效的避免饥饿,提升效率。
比如,一个餐馆的工人,不能既服务客人,又去做菜。
饥饿
固定大小线程池有饥饿现象。
- 两个工人是同一个线程池的两个工人
- 他们要做的事情是,为客人点餐和后厨做菜,这是两个阶段的任务
- 客人点餐,必须先点完餐,等菜做好,上菜,在此期间处理点餐的工人必须等待
- 后厨做菜
- 工人 A 处理了点餐任务,接下来,他要等待工人 B 把菜做好,然后上菜
- 但是,现在同时来了两个客人,他们都去处理点餐了,没人处理做饭,死锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
| package com.redisc;
import lombok.extern.slf4j.Slf4j;
import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Random; import java.util.concurrent.*;
@Slf4j(topic = "c.Test") public class Run {
static final List<String> MENU = Arrays.asList("地三鲜", "宫保鸡丁", "辣子鸡丁"); static Random RANDOM = new Random();
static String cokking() { return MENU.get(RANDOM.nextInt(MENU.size())); }
public static void main(String[] args) throws InterruptedException, ExecutionException { ExecutorService pool = Executors.newFixedThreadPool(2);
pool.execute(() -> { log.debug("处理点餐..."); Future<String> f = pool.submit(() -> { log.debug("做菜"); return cokking(); }); try { log.debug("上菜 {}", f.get()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } });
pool.execute(() -> { log.debug("处理点餐..."); Future<String> f = pool.submit(() -> { log.debug("做菜"); return cokking(); }); try { log.debug("上菜 {}", f.get()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } }); } }
|
饥饿解决
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
| package com.redisc;
import lombok.extern.slf4j.Slf4j;
import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Random; import java.util.concurrent.*;
@Slf4j(topic = "c.Test") public class Run {
static final List<String> MENU = Arrays.asList("地三鲜", "宫保鸡丁", "辣子鸡丁"); static Random RANDOM = new Random();
static String cokking() { return MENU.get(RANDOM.nextInt(MENU.size())); }
public static void main(String[] args) throws InterruptedException, ExecutionException { ExecutorService waiterPool = Executors.newFixedThreadPool(1); ExecutorService cookPool = Executors.newFixedThreadPool(1);
waiterPool.execute(() -> { log.debug("处理点餐..."); Future<String> f = cookPool.submit(() -> { log.debug("做菜"); return cokking(); }); try { log.debug("上菜 {}", f.get()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } });
waiterPool.execute(() -> { log.debug("处理点餐..."); Future<String> f = cookPool.submit(() -> { log.debug("做菜"); return cokking(); }); try { log.debug("上菜 {}", f.get()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } }); } }
|
输出
1 2 3 4 5 6
| 22:38:20.794 [pool-1-thread-1] DEBUG c.Test - 处理点餐... 22:38:20.798 [pool-2-thread-1] DEBUG c.Test - 做菜 22:38:20.798 [pool-1-thread-1] DEBUG c.Test - 上菜 宫保鸡丁 22:38:20.799 [pool-1-thread-1] DEBUG c.Test - 处理点餐... 22:38:20.800 [pool-2-thread-1] DEBUG c.Test - 做菜 22:38:20.800 [pool-1-thread-1] DEBUG c.Test - 上菜 宫保鸡丁
|