线程同步,是为了在多线程的情况下,对数据进行修改仍能获得正常结果。

synchronized

JAVA 自带的锁,注意是可重入锁

可以锁实例对象、锁类、锁方法

线程安全类

如果一个类被设计为允许多线程正确访问,那就说这个类是 “线程安全的”

public class Counter {
    private int count = 0;

    public void add(int n) {
        synchronized(this) {
            count += n;
        }
    }

    public void dec(int n) {
        synchronized(this) {
            count -= n;
        }
    }

    public int get() {
        return count;
    }
}

使用 wait 和 notify

synchronized 解决了多线程竞争的问题,但并没有解决多线程协调的问题。

class TaskQueue {
    Queue<String> queue = new LinkedList<>();

    public synchronized void addTask(String s) {
        this.queue.add(s);
    }

    public synchronized String getTask() {
        while (queue.isEmpty()) {
        }
        return queue.remove();
    }
}

以上代码中,getTask() 内部先判断队列是否为空,如果为空则循环等待,直到另一线程放入任务,while() 循环退出,就可以返回队列中的元素了。

但实际上 while() 循环永远不会退出,因为在 getTask() 入口获取了 this 锁,其他线程根本无法调用 addTask() 方法,因为 addTask() 也需要先获取 this 锁。

因此,多线程协调运行的原则就是:当条件不满足时,线程进入等待状态;当条件满足时,线程被唤醒,继续执行任务。

对以上代码做以下改造:

public synchronized String getTask() {
    // 此处只能用 while, wait() 获取到锁后,需要再次判断 queue 的数据
    while (queue.isEmpty()) {
        // 释放 this 锁
        this.wait();
        // 重新获取 this 锁
    }
    return queue.remove();
}

调用 wait() 方法后,线程进入等待状态,wait() 方法不会返回,直到将来某个时刻,线程从等待状态被其他线程唤醒后,wait() 方法才会返回,然后,继续执行下一条语句。

必须在 synchronized 块中才能调用 wait() 方法,因为 wait() 方法调用时,会释放线程获得的锁,wait() 方法返回时,线程又会重新试图获得锁。

现在面临第二个问题:线程 wait() 后进入等待状态,如何让其被唤醒继续执行?

public synchronized void addTask(String s) {
    this.queue.add(s);
    this.notifyAll(); // 唤醒在this锁等待的线程
}

往队列中添加了任务后,线程立刻对 this 锁对象调用 notify() 方法,这个方法会唤醒一个正在 this 锁等待的线程(就是在 getTask() 中位于 this.wait() 的线程),从而使得等待线程从 this.wait() 方法返回。

ReentrantLock

从Java 5开始,引入了一个高级的处理并发的java.util.concurrent包,它提供了大量更高级的并发功能,能大大简化多线程程序的编写。

前文知道了 synchronized 关键字用于加锁,但是这种锁一是很重,二是获取时必须一直等待,没有额外的尝试机制。

所以 ReentrantLock 用于 替代 synchronized 加锁。

原 synchronized 代码:

public class Counter {
    private int count;

    public void add(int n) {
        synchronized(this) {
            count += n;
        }
    }
}

使用 ReentrantLock 替代:

public class Counter {
    private final Lock lock = new ReentrantLock();
    private int count;

    public void add(int n) {
        lock.lock();
        try {
            count += n;
        } finally {
            lock.unlock();
        }
    }
}

因为 synchronized 是Java语言层面提供的语法,所以我们不需要考虑异常,而 ReentrantLock 是Java代码实现的锁,我们就必须先获取锁,然后在 finally 中正确释放锁。

ReentrantLock 还可以尝试获取锁,这是 synchronized 无法做到的:

if (lock.tryLock(1, TimeUnit.SECONDS)) {
    try {
        ...
    } finally {
        lock.unlock();
    }
}

上述代码在尝试获取锁的时候,最多等待1秒。如果1秒后仍未获取到锁,tryLock()返回false,程序就可以做一些额外处理,而不是无限等待下去。

所以,使用ReentrantLock比直接使用synchronized更安全,线程在tryLock()失败的时候不会导致死锁。

使用 Condition

synchronized可以配合waitnotify实现线程在条件不满足时等待,条件满足时唤醒。

对于 ReentrantLock ,我们可以使用 Condition 来实现 wait 和 notify 来实现

class TaskQueue {
    private final Lock lock = new ReentrantLock();
    private final Condition condition = lock.newCondition();
    private Queue<String> queue = new LinkedList<>();

    public void addTask(String s) {
        lock.lock();
        try {
            queue.add(s);
            condition.signalAll();
        } finally {
            lock.unlock();
        }
    }

    public String getTask() {
        lock.lock();
        try {
            while (queue.isEmpty()) {
                condition.await();
            }
            return queue.remove();
        } finally {
            lock.unlock();
        }
    }
}

Condition 提供的 await()signal()signalAll() 原理和 synchronized 锁对象的 wait()notify()notifyAll() 是一致的,并且其行为也是一样的:

  • await() 会释放当前锁,进入等待状态;

  • signal() 会唤醒某个等待线程;

  • signalAll() 会唤醒所有等待线程;

  • 唤醒线程从 await() 返回后需要重新获得锁。

此外,和tryLock()类似,await()可以在等待指定时间后,如果还没有被其他线程通过signal()signalAll()唤醒,可以自己醒来:

if (condition.await(1, TimeUnit.SECOND)) {
    // 被其他线程唤醒
} else {
    // 指定时间内没有被其他线程唤醒
}

使用 ReadWriteLock

public class Counter {
    private final Lock lock = new ReentrantLock();
    private int[] counts = new int[10];

    public void inc(int index) {
        lock.lock();
        try {
            counts[index] += 1;
        } finally {
            lock.unlock();
        }
    }

    public int[] get() {
        lock.lock();
        try {
            return Arrays.copyOf(counts, counts.length);
        } finally {
            lock.unlock();
        }
    }
}

在上面的代码中,我们会发现,任何时刻,只允许一个线程修改,但是 get() 方法只读取数据,不修改数据,它实际上允许多个线程同时调用。

使用 ReadWriteLock 可以解决这个问题,它保证:

- 只允许一个线程写入(其他线程既不能写入也不能读取)

- 没有写入时,多个线程允许同时读(提高性能)

public class CounterByReadWriteLock {
    
    private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
    // 注意:一对读锁和写锁必须从同一个 rwlock 获取
    private final Lock rlock = rwLock.readLock();
    private final Lock wlock = rwLock.writeLock();
    private int[] counts = new int[10];
    
    public void inc (int index) {
        wlock.lock();// 加写锁
        try {
            counts[index] += 1;
        } finally {
            wlock.unlock();// 释放写锁
        }
    }
    
    public int[] get() {
        rlock.lock();// 加读锁
        try {
            return Arrays.copyOf(counts, counts.length);
        } finally {
            rlock.unlock();// 释放读锁
        }
    }
}

使用ReadWriteLock时,适用条件是同一个数据,有大量线程读取,但仅有少数线程修改。

使用 StampedLock

乐观锁:认为读的过程大概率不会有写入

悲观锁:

前文提到的 ReadWriteLock 有个潜在的问题:如果有线程正在读,写线程需要等待读线程释放锁后才能获取写锁,即读的过程不允许写,这是一种悲观的读锁。

StampedLock 允许在读的过程中获取写锁。但是这种可能会导致数据不一致,所以需要额外的代码来判断读的过程中是否有写入,这种读锁是一种乐观锁。

public class PointByStampedLock {
    private final StampedLock stampedLock = new StampedLock();

    private double x;
    private double y;

    public void move(double deltaX, double deltaY) {
        long stamp = stampedLock.writeLock();   // 获取写锁
        try {
            x += deltaX;
            y += deltaY;
        } finally {
            stampedLock.unlockWrite(stamp);     // 获取读锁
        }
    }

    public double distanceFromOrigin() {
        long stamp = stampedLock.tryOptimisticRead(); // 获得一个乐观读锁
        // 以下两行不是原子操作,读取中可能会被更改
        double currentX = x;
        double currentY = y;

        // 检查乐观读锁后 如果有其他写锁发生
        if (!stampedLock.validate(stamp)) {
            stamp = stampedLock.readLock();        // 获取一个悲观读锁
            try {
                currentX = x;
                currentY = y;
            } finally {
                stampedLock.unlockRead(stamp);     // 释放悲观读锁
            }
        }
        return Math.sqrt(currentX * currentX + currentY * currentY);
    }
}

注意到首先我们通过 tryOptimisticRead() 获取一个乐观读锁,并返回版本号。接着进行读取,读取完成后,我们通过 validate() 去验证版本号,如果在读取过程中没有写入,版本号不变,验证成功,我们就可以放心地继续后续操作。如果在读取过程中有写入,版本号会发生变化,验证将失败。在失败的时候,我们再通过获取悲观读锁再次读取。

可见,StampedLock 把读锁细分为乐观读和悲观读,能进一步提升并发效率。但这也是有代价的:一是代码更加复杂,二是 StampedLock 是不可重入锁,不能在一个线程中反复获取同一个锁。

StampedLock 还提供了更复杂的将悲观读锁升级为写锁的功能,它主要使用在if-then-update的场景:即先读,如果读的数据满足条件,就返回,如果读的数据不满足条件,再尝试写。

【TODO: 下次实践】

使用 semaphore

前面我们讲了各种锁的实现,本质上锁的目的是保护一种受限资源,保证同一时刻只有一个线程能访问(ReentrantLock),或者只有一个线程能写入(ReadWriteLock)。

还有一种受限资源,它需要保证同一时刻最多有N个线程能访问,比如同一时刻最多创建100个数据库连接,最多允许10个用户下载等。

这种限制数量的锁,如果用Lock数组来实现,就太麻烦了。

这种情况就可以使用 Semaphore,例如,最多允许3个线程同时访问:

public class AccessLimitControl {
    // 任意时刻仅允许最多3个线程获取许可:
    final Semaphore semaphore = new Semaphore(3);

    public String access() throws Exception {
        // 如果超过了许可数量,其他线程将在此等待:
        semaphore.acquire();
        try {
            // TODO:
            return UUID.randomUUID().toString();
        } finally {
            semaphore.release();
        }
    }
}

使用Semaphore先调用acquire()获取,然后通过try ... finally保证在finally中释放。

调用acquire()可能会进入等待,直到满足条件为止。也可以使用tryAcquire()指定等待时间:

if (semaphore.tryAcquire(3, TimeUnit.SECONDS)) {
    // 指定等待时间3秒内获取到许可:
    try {
        // TODO:
    } finally {
        semaphore.release();
    }
}

Semaphore本质上就是一个信号计数器,用于限制同一时间的最大访问数量。

做自己,而不是解释自己!