0%

java | AQS

AQS 「abstactQueuedSynchronizer」原理,是阻塞式和相关同步器工具的框架。

特点

  • 用 state 属性来表示资源状态「分独占模式和共享模式」,子类需要定义如何维护这个状态,控制如何获取锁和释放锁
    • getState 获取 state 状态
    • setState 设置 state 状态
    • compareAndSetState cas 机制设置 state 状态
    • 独占模式是只有一个线程能够访问资源,而共享模式可以允许多个线程访问资源
  • 提供了基于 FIFO 的等待队列,类似于 Monitor 的 EntryList
  • 条件变量来实现等待、唤醒机制、支持多个条件变量,类似于 Monitor 的 waitSet

子类主要实现了一些方法(默认抛出 UnsupportedOperationException)

  • tryAcquire
  • tryRelease
  • tryAcquireShared
  • tryReleaseShared
  • isHeldExclusively

获取锁

1
2
3
4
// 如果获取锁失败
if(tryAcquire(arg)){
// 入队,可以选择阻塞当前线程
}

释放锁

1
2
3
4
// 如果释放锁成功
if(tryRelease(arg)){
//让阻塞线程恢复运行
}

自定义锁「自己实现一个不可重入锁」

该锁是不可重入锁,连续锁两次会死锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
package com.redisc;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

@Slf4j(topic = "c.Test")
public class Run {

public static void main(String[] args) throws InterruptedException, ExecutionException {
MyLock lock = new MyLock();
new Thread(() -> {
lock.lock();
log.debug("locking...");
// lock.lock();
try {
log.debug("locking...");
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
log.debug("unlocking...");
lock.unlock();
}
}, "t1").start();

new Thread(() -> {
lock.lock();
try {
log.debug("locking...");
} finally {
log.debug("unlocking...");
lock.unlock();
}
}, "t2").start();
}
}

//自定义锁(不可重入锁)
class MyLock implements Lock {

class MySync extends AbstractQueuedSynchronizer {
@Override
protected boolean tryAcquire(int arg) {
if (compareAndSetState(0, 1)) {
// 加上锁,并设置 owner 为当前线程
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}

@Override
protected boolean tryRelease(int arg) {
setExclusiveOwnerThread(null);
// setState 放后面,是因为写屏障
setState(0);
return true;
}

@Override // 是否持有独占锁
protected boolean isHeldExclusively() {
return getState() == 1;
}

public Condition newCondition() {
return new ConditionObject();
}
}

private MySync sync = new MySync();

@Override // 加锁(不成功进入等待队列)
public void lock() {
sync.acquire(1);
}

@Override // 加锁,可打断
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}

@Override // 尝试加锁,一次
public boolean tryLock() {
return sync.tryAcquire(1);
}

@Override// 尝试加锁,带超时
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(time));
}

@Override // 解锁
public void unlock() {
sync.release(1);
}

@Override // 创建条件变量
public Condition newCondition() {
return sync.newCondition();
}
}
请我喝杯咖啡吧~