0%

java | 设计模式 异步模式 工作线程

让有限的工作线程(Worker Thread)来轮流异步处理无限多的任务。

典型代表就是线程池,也体现了经典设计模式中的享元模式。

注意,不同任务类型应该用不同的线程池,这样能够有效的避免饥饿,提升效率。

比如,一个餐馆的工人,不能既服务客人,又去做菜。

饥饿

固定大小线程池有饥饿现象。

  • 两个工人是同一个线程池的两个工人
  • 他们要做的事情是,为客人点餐和后厨做菜,这是两个阶段的任务
    • 客人点餐,必须先点完餐,等菜做好,上菜,在此期间处理点餐的工人必须等待
    • 后厨做菜
  • 工人 A 处理了点餐任务,接下来,他要等待工人 B 把菜做好,然后上菜
  • 但是,现在同时来了两个客人,他们都去处理点餐了,没人处理做饭,死锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
package com.redisc;

import lombok.extern.slf4j.Slf4j;

import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.*;

@Slf4j(topic = "c.Test")
public class Run {

static final List<String> MENU = Arrays.asList("地三鲜", "宫保鸡丁", "辣子鸡丁");
static Random RANDOM = new Random();

static String cokking() {
return MENU.get(RANDOM.nextInt(MENU.size()));
}

public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService pool = Executors.newFixedThreadPool(2);

pool.execute(() -> {
log.debug("处理点餐...");
Future<String> f = pool.submit(() -> {
log.debug("做菜");
return cokking();
});
try {
log.debug("上菜 {}", f.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
});

pool.execute(() -> {
log.debug("处理点餐...");
Future<String> f = pool.submit(() -> {
log.debug("做菜");
return cokking();
});
try {
log.debug("上菜 {}", f.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
});
}
}

饥饿解决

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
package com.redisc;

import lombok.extern.slf4j.Slf4j;

import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.*;

@Slf4j(topic = "c.Test")
public class Run {

static final List<String> MENU = Arrays.asList("地三鲜", "宫保鸡丁", "辣子鸡丁");
static Random RANDOM = new Random();

static String cokking() {
return MENU.get(RANDOM.nextInt(MENU.size()));
}

public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService waiterPool = Executors.newFixedThreadPool(1);
ExecutorService cookPool = Executors.newFixedThreadPool(1);

waiterPool.execute(() -> {
log.debug("处理点餐...");
Future<String> f = cookPool.submit(() -> {
log.debug("做菜");
return cokking();
});
try {
log.debug("上菜 {}", f.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
});

waiterPool.execute(() -> {
log.debug("处理点餐...");
Future<String> f = cookPool.submit(() -> {
log.debug("做菜");
return cokking();
});
try {
log.debug("上菜 {}", f.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
});
}
}

输出

1
2
3
4
5
6
22:38:20.794 [pool-1-thread-1] DEBUG c.Test - 处理点餐...
22:38:20.798 [pool-2-thread-1] DEBUG c.Test - 做菜
22:38:20.798 [pool-1-thread-1] DEBUG c.Test - 上菜 宫保鸡丁
22:38:20.799 [pool-1-thread-1] DEBUG c.Test - 处理点餐...
22:38:20.800 [pool-2-thread-1] DEBUG c.Test - 做菜
22:38:20.800 [pool-1-thread-1] DEBUG c.Test - 上菜 宫保鸡丁
请我喝杯咖啡吧~