线程同步,是为了在多线程的情况下,对数据进行修改仍能获得正常结果。
synchronized
JAVA 自带的锁,注意是可重入锁
可以锁实例对象、锁类、锁方法
线程安全类
如果一个类被设计为允许多线程正确访问,那就说这个类是 “线程安全的”
public class Counter {
private int count = 0;
public void add(int n) {
synchronized(this) {
count += n;
}
}
public void dec(int n) {
synchronized(this) {
count -= n;
}
}
public int get() {
return count;
}
}使用 wait 和 notify
synchronized 解决了多线程竞争的问题,但并没有解决多线程协调的问题。
class TaskQueue {
Queue<String> queue = new LinkedList<>();
public synchronized void addTask(String s) {
this.queue.add(s);
}
public synchronized String getTask() {
while (queue.isEmpty()) {
}
return queue.remove();
}
}以上代码中,getTask() 内部先判断队列是否为空,如果为空则循环等待,直到另一线程放入任务,while() 循环退出,就可以返回队列中的元素了。
但实际上 while() 循环永远不会退出,因为在 getTask() 入口获取了 this 锁,其他线程根本无法调用 addTask() 方法,因为 addTask() 也需要先获取 this 锁。
因此,多线程协调运行的原则就是:当条件不满足时,线程进入等待状态;当条件满足时,线程被唤醒,继续执行任务。
对以上代码做以下改造:
public synchronized String getTask() {
// 此处只能用 while, wait() 获取到锁后,需要再次判断 queue 的数据
while (queue.isEmpty()) {
// 释放 this 锁
this.wait();
// 重新获取 this 锁
}
return queue.remove();
}调用 wait() 方法后,线程进入等待状态,wait() 方法不会返回,直到将来某个时刻,线程从等待状态被其他线程唤醒后,wait() 方法才会返回,然后,继续执行下一条语句。
必须在 synchronized 块中才能调用 wait() 方法,因为 wait() 方法调用时,会释放线程获得的锁,wait() 方法返回时,线程又会重新试图获得锁。
现在面临第二个问题:线程 wait() 后进入等待状态,如何让其被唤醒继续执行?
public synchronized void addTask(String s) {
this.queue.add(s);
this.notifyAll(); // 唤醒在this锁等待的线程
}往队列中添加了任务后,线程立刻对 this 锁对象调用 notify() 方法,这个方法会唤醒一个正在 this 锁等待的线程(就是在 getTask() 中位于 this.wait() 的线程),从而使得等待线程从 this.wait() 方法返回。
ReentrantLock
从Java 5开始,引入了一个高级的处理并发的
java.util.concurrent包,它提供了大量更高级的并发功能,能大大简化多线程程序的编写。
前文知道了 synchronized 关键字用于加锁,但是这种锁一是很重,二是获取时必须一直等待,没有额外的尝试机制。
所以 ReentrantLock 用于 替代 synchronized 加锁。
原 synchronized 代码:
public class Counter {
private int count;
public void add(int n) {
synchronized(this) {
count += n;
}
}
}使用 ReentrantLock 替代:
public class Counter {
private final Lock lock = new ReentrantLock();
private int count;
public void add(int n) {
lock.lock();
try {
count += n;
} finally {
lock.unlock();
}
}
}因为 synchronized 是Java语言层面提供的语法,所以我们不需要考虑异常,而 ReentrantLock 是Java代码实现的锁,我们就必须先获取锁,然后在 finally 中正确释放锁。
ReentrantLock 还可以尝试获取锁,这是 synchronized 无法做到的:
if (lock.tryLock(1, TimeUnit.SECONDS)) {
try {
...
} finally {
lock.unlock();
}
}上述代码在尝试获取锁的时候,最多等待1秒。如果1秒后仍未获取到锁,tryLock()返回false,程序就可以做一些额外处理,而不是无限等待下去。
所以,使用ReentrantLock比直接使用synchronized更安全,线程在tryLock()失败的时候不会导致死锁。
使用 Condition
synchronized可以配合wait和notify实现线程在条件不满足时等待,条件满足时唤醒。
对于 ReentrantLock ,我们可以使用 Condition 来实现 wait 和 notify 来实现
class TaskQueue {
private final Lock lock = new ReentrantLock();
private final Condition condition = lock.newCondition();
private Queue<String> queue = new LinkedList<>();
public void addTask(String s) {
lock.lock();
try {
queue.add(s);
condition.signalAll();
} finally {
lock.unlock();
}
}
public String getTask() {
lock.lock();
try {
while (queue.isEmpty()) {
condition.await();
}
return queue.remove();
} finally {
lock.unlock();
}
}
}Condition 提供的 await()、signal()、signalAll() 原理和 synchronized 锁对象的 wait()、notify()、notifyAll() 是一致的,并且其行为也是一样的:
await()会释放当前锁,进入等待状态;signal()会唤醒某个等待线程;signalAll()会唤醒所有等待线程;唤醒线程从
await()返回后需要重新获得锁。
此外,和tryLock()类似,await()可以在等待指定时间后,如果还没有被其他线程通过signal()或signalAll()唤醒,可以自己醒来:
if (condition.await(1, TimeUnit.SECOND)) {
// 被其他线程唤醒
} else {
// 指定时间内没有被其他线程唤醒
}
使用 ReadWriteLock
public class Counter {
private final Lock lock = new ReentrantLock();
private int[] counts = new int[10];
public void inc(int index) {
lock.lock();
try {
counts[index] += 1;
} finally {
lock.unlock();
}
}
public int[] get() {
lock.lock();
try {
return Arrays.copyOf(counts, counts.length);
} finally {
lock.unlock();
}
}
}在上面的代码中,我们会发现,任何时刻,只允许一个线程修改,但是 get() 方法只读取数据,不修改数据,它实际上允许多个线程同时调用。
使用 ReadWriteLock 可以解决这个问题,它保证:
- 只允许一个线程写入(其他线程既不能写入也不能读取)
- 没有写入时,多个线程允许同时读(提高性能)
public class CounterByReadWriteLock {
private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
// 注意:一对读锁和写锁必须从同一个 rwlock 获取
private final Lock rlock = rwLock.readLock();
private final Lock wlock = rwLock.writeLock();
private int[] counts = new int[10];
public void inc (int index) {
wlock.lock();// 加写锁
try {
counts[index] += 1;
} finally {
wlock.unlock();// 释放写锁
}
}
public int[] get() {
rlock.lock();// 加读锁
try {
return Arrays.copyOf(counts, counts.length);
} finally {
rlock.unlock();// 释放读锁
}
}
}
使用ReadWriteLock时,适用条件是同一个数据,有大量线程读取,但仅有少数线程修改。
使用 StampedLock
乐观锁:认为读的过程大概率不会有写入
悲观锁:
前文提到的 ReadWriteLock 有个潜在的问题:如果有线程正在读,写线程需要等待读线程释放锁后才能获取写锁,即读的过程不允许写,这是一种悲观的读锁。
而 StampedLock 允许在读的过程中获取写锁。但是这种可能会导致数据不一致,所以需要额外的代码来判断读的过程中是否有写入,这种读锁是一种乐观锁。
public class PointByStampedLock {
private final StampedLock stampedLock = new StampedLock();
private double x;
private double y;
public void move(double deltaX, double deltaY) {
long stamp = stampedLock.writeLock(); // 获取写锁
try {
x += deltaX;
y += deltaY;
} finally {
stampedLock.unlockWrite(stamp); // 获取读锁
}
}
public double distanceFromOrigin() {
long stamp = stampedLock.tryOptimisticRead(); // 获得一个乐观读锁
// 以下两行不是原子操作,读取中可能会被更改
double currentX = x;
double currentY = y;
// 检查乐观读锁后 如果有其他写锁发生
if (!stampedLock.validate(stamp)) {
stamp = stampedLock.readLock(); // 获取一个悲观读锁
try {
currentX = x;
currentY = y;
} finally {
stampedLock.unlockRead(stamp); // 释放悲观读锁
}
}
return Math.sqrt(currentX * currentX + currentY * currentY);
}
}注意到首先我们通过 tryOptimisticRead() 获取一个乐观读锁,并返回版本号。接着进行读取,读取完成后,我们通过 validate() 去验证版本号,如果在读取过程中没有写入,版本号不变,验证成功,我们就可以放心地继续后续操作。如果在读取过程中有写入,版本号会发生变化,验证将失败。在失败的时候,我们再通过获取悲观读锁再次读取。
可见,StampedLock 把读锁细分为乐观读和悲观读,能进一步提升并发效率。但这也是有代价的:一是代码更加复杂,二是 StampedLock 是不可重入锁,不能在一个线程中反复获取同一个锁。
StampedLock 还提供了更复杂的将悲观读锁升级为写锁的功能,它主要使用在if-then-update的场景:即先读,如果读的数据满足条件,就返回,如果读的数据不满足条件,再尝试写。
【TODO: 下次实践】
使用 semaphore
前面我们讲了各种锁的实现,本质上锁的目的是保护一种受限资源,保证同一时刻只有一个线程能访问(ReentrantLock),或者只有一个线程能写入(ReadWriteLock)。
还有一种受限资源,它需要保证同一时刻最多有N个线程能访问,比如同一时刻最多创建100个数据库连接,最多允许10个用户下载等。
这种限制数量的锁,如果用Lock数组来实现,就太麻烦了。
这种情况就可以使用 Semaphore,例如,最多允许3个线程同时访问:
public class AccessLimitControl {
// 任意时刻仅允许最多3个线程获取许可:
final Semaphore semaphore = new Semaphore(3);
public String access() throws Exception {
// 如果超过了许可数量,其他线程将在此等待:
semaphore.acquire();
try {
// TODO:
return UUID.randomUUID().toString();
} finally {
semaphore.release();
}
}
}
使用Semaphore先调用acquire()获取,然后通过try ... finally保证在finally中释放。
调用acquire()可能会进入等待,直到满足条件为止。也可以使用tryAcquire()指定等待时间:
if (semaphore.tryAcquire(3, TimeUnit.SECONDS)) {
// 指定等待时间3秒内获取到许可:
try {
// TODO:
} finally {
semaphore.release();
}
}Semaphore本质上就是一个信号计数器,用于限制同一时间的最大访问数量。