【详解】并发程序的等待方式

Java基础

浏览数:124

2019-7-28

AD:资源代下载服务

引言:

  有时候我们执行一个操作,需要一个前提条件,只有在条件满足的情况下,才能继续执行。在单线程程序中,如果某个状态变量不满足条件,则基本上可以直接返回。但是,在并发程序中,基于状态的条件可能会由于其他线程的操作而改变。而且存在这种需要,即某个操作一定要完成,如果当前条件不满足,没关系,我可以等,等到条件满足的时候再执行。今天,我们就来聊一聊等待的几种方式。

  • 忙等待 / 自旋等待。
  • 让权等待 / 轮询与休眠
  • 条件队列

情景条件

  我们要实现一个有界缓存,其中用不同的等待方式处理前提条件失败的问题。在每种实现中都扩展了BaseBoundedBuffer,这个类中实现了一个基于数组的循环缓存,其中各个缓存状态变量(buf、head、tail和count)均由缓存的内置锁来保护。它还提供了同步的doPut和doTake方法,并在子类中通过这些方法来实现put和take操作,底层的状态将对子类隐藏。

此段代码来自《Java Concurrency in Practice》

public abstract class BaseBoundedBuffer<V> {
    private final V[] buf;   //缓冲数组
    private int tail;        //缓冲数据尾部索引
    private int head;        //头部索引
    private int count;       //存储的数据量

    public BaseBoundedBuffer(int capacity) {
        this.buf = (V[]) new Object[capacity];
    }

    protected synchronized final void doPut(V v) {
        buf[tail] = v;
        if (++tail == buf.length)
            tail = 0;
        ++count;
    }

    protected synchronized final V doTake() {
        V v = buf[head];
        buf[head] = null;
        if (++head == buf.length)
            head = 0;
        --count;
        return v;
    }
    
    public synchronized final boolean isFull() {
        return count == buf.length;
    }
    
    public synchronized final boolean isEmpty() {
        return count == 0;
    }
}

一、忙等待

反复检查条件是否为真,直到条件达到,继而完成后续任务。

优点:响应性好,只要条件符合,马上就能做出响应。

缺点:这样做,虽然在逻辑上实现了功能要求,但是在性能上却可能消耗过多的CPU时间。

我们来看看,忙等待的实现方式:

public class BusyWaitBoundedBuffer<V> extends BaseBoundedBuffer<V> {

    public BusyWaitBoundedBuffer(int size) {
        super(size);
    }
    
    public void put(V v) throws InterruptedException {
        while(true) {
            synchronized (this) {
                if(!isFull()) {
                    doPut(v);
                    return;
                }
            }
        }
    }
    
    public V take() throws InterruptedException {
        while(true) {
            synchronized (this) {
                if(!isEmpty())
                    return doTake();
            }
        }
    }
}

这里的两个方法在访问缓存时都采用”先检查,再运行“的逻辑策略,非线程安全,因为条件可能在”检查之后,运行之前“的中间时刻,被其他线程修改,以至于,在运行的时候,前提条件已经不满足了,故需要对put和take两个方法都进行同步,共用同一个锁以确保实现对缓冲状态的独占访问,即某一时刻只能有一个线程可以访问操作缓冲数组。也就是说,在put方法执行的一次尝试中,take方法不能被调用,不能改变缓冲数组状态。

还有一点,值得注意的是,while循环并不在同步块内,而是同步块在while循环内,也就是每执行一次条件检查,如果不满足,需要释放掉锁。不然另一个方法就拿不到锁,也就不能改变状态,条件就永远不能发生改变,这个方法就变成了死等待

“尚未解决的疑惑”:线程等待锁的时候是否会被JVM挂起,调出CPU?如果是这样的话,那么上下文切换的开销也会很大,因为每检查一次条件,需要进出CPU两次。

二、让权等待 / 轮询与休眠

类似忙等待,但是在每次检查条件后,若不合符条件,将进入休眠状态,避免消耗过多的CPU时间。而不是马上进入下一次检查。

优点:避免消耗过多的CPU时间

缺点:响应性差

实现代码:

public class SleepyBoundedBuffer<V> extends BaseBoundedBuffer<V> {
    private static final long SLEEP_GRANULARITY = 3000L;

    public SleepyBoundedBuffer(int size) {
        super(size);
    }

    public void put(V v) throws InterruptedException {
        while (true) {
            synchronized (this) {
                if (!isFull()) {
                    doPut(v);
                    return;
                }
            }
            Thread.sleep(SLEEP_GRANULARITY); //休眠固定时间
        }
    }
    
    public V take() throws InterruptedException {
        while(true) {
            synchronized (this) {
                if(!isEmpty()) {
                    return doTake();
                }
            }
            Thread.sleep(SLEEP_GRANULARITY);
        }
    }

}

需要强调的是,sleep方法不会释放当前线程拥有的锁,所以不能在同步块内调用,如果在同步块内调用的话,虽然不会马上进行下一次条件检查,但是在这期间,锁没有被释放,其他线程无法获取到这个对象的锁,故无法改变状态,此操作也会变成死等待。依据如下(monitor指的就是锁)。

避免消耗过多的CPU时间是怎么回事?

  这里,我们需要先了解一下线程调度机制,和时间片轮转算法。

我们知道,线程并非至始至终占用CPU,而是每运行一段时间便退出CPU,让其他线程继续执行。来回切换,以实现程序的并发执行。

线程调度机制如图所示:(此图来自汤小丹《计算机操作系统》)

再来说一说,时间片轮转算法,看到这个,我想当然的以为,时间片的分配类似于内存空间的分配,也就是说,给你分配多少空间,那这个空间就归属你了,用多用少看你自己情况,多出来的空间其他人不能用。但是,认真看了轮转调度算法后,就明白了,原来时间片并不是那么回事,而是相当于一个定时器。即,假如时间片划分为30ms,那么从每个线程进入CPU,就开始倒计时,如果时间片用完,即倒计时为0,或者线程任务执行完毕,就调出线程。调入新线程后,定时器重新从30ms开始倒计时。

基于时间片轮转的解释

  对于忙等待来说,如果条件长时间没有达到,则可能耗尽其本次分配的时间片,甚至在等待过程中会消耗多个时间片。在这段时间内,此线程占据着此CPU,却只是等待,不做计算,其他线程不能获得此CPU,计算资源就算是浪费了。

  而对于让步等待来说,如果此次检查条件没有达到的话,会休眠一段时间,这时候,会退出CPU,让出当前时间片多余的部分,而且休眠的这段时间内也不会参与调度。

为什么说,让步等待响应性差?

   因为,可能出现这种情况,即线程刚进入休眠或者进入休眠状态没多久,条件就成立了,但是线程已进入休眠,只能等到休眠结束,才能继续执行。也就不能够及时响应了。

如图所示:(此图来自《Java Concurrency in Practice》,L、U不太清楚是何意思)

三、条件队列

“条件队列”这个名字来源于:它使得一组线程(称之为等待线程集合)能够通过某种方式来等待特点的条件变成真。传统队列的元素是一个个数据,而与之不同的是,条件队列中的元素是一个个正在等待相关条件的线程。

优点:响应性高,且不会消耗额外的CPU时间

实现代码:

public class BoundedBuffer<V> extends BaseBoundedBuffer<V>{
    public BoundedBuffer(int size) {
        super(size);
    }
    
    public synchronized void put(V v) throws InterruptedException {
        while(isFull())
            wait();
        doPut(v);
        notifyAll();
    }
    
    public synchronized V take() throws InterruptedException {
        while(isEmpty())
            wait();
        V v = doTake();
        notifyAll();
        return v;
    }
}

注:wait和notify/notifyAll等方法必须在拥有对应锁的情况下,才能执行。所以wait和notify/notifyAll方法必须在同步块中,并且同步块的锁和wait等方法的调用必须来源于同一个对象。

问题一:在条件不符合的情况下,wait方法将被调用,此线程将被阻塞挂起,移出CPU。但是,问题就出现了,如果执行到wait,程序就不再执行了,也就跳不出同步块,那么其他线程怎么获取到锁,这不就又出现了前面提到的死等待了吗?

答案是,wait方法会释放掉锁。等到条件达到,被唤醒的时候会重新尝试获取锁。

问题二:为何需要while循环,难道wait方法是一个空函数,那这样做,不就跟忙等待一样了吗,反复执行一个空函数,然后条件达到跳出循环?

wait方法并不是一个空函数,它是一个本地方法,确实实现了休眠的功能。一般情况下,确实调用一次就够了,但是线程可能在条件达到之前被唤醒,比如某个其他线程调用了notifyAll方法,唤醒所有等待条件的线程,但是此刻线程的条件可能尚未满足。需要再次判断条件是否成立,如果未成立,则继续wait()。

问题三:为何需要notifyAll而不是notify?

因为,在这个情景下,如果使用notify可能出现类似”信号丢失”的情况,考虑一下这种情况:队列已满,这时候一个线程调用了take()方法取出一个数据。就有空闲空间可以存储数据了,如果调用notify方法,只能唤醒一个线程,此时队列中有多个等待执行pu()t方法的线程,但是却唤醒了一个等待执行take()方法的线程。导致的问题是:缓冲空间明明有空闲,而我还在等待这个条件。

2018-09-11 19:50:12 修改:上面划线那段话有问题,在这个情景下,有只有两个条件,非此即彼,故队列已满的情况下,条件队列里只有等到执行put方法的线程。notify方法唤醒一个线程,只会是执行put方法的线程。故语义上没有问题。

可能出现的错误案例应该是这样的:

条件队列里有两个线程,线程A等待条件谓词PA,线程B等待条件谓词PB。条件谓词PA和PB相互独立。现在由于线程C的操作,使得PB变为真,并且线程C执行一个notify操作。JVM将唤醒条件队列中的一个线程,如果唤醒了线程A,那么条件没有达到,线程A调用wait方法继续等待。这时,就出现这种情况了,即PB已经为真,按道理线程B应该被唤醒,而线程B没有收到这个信号,而继续等待一个已经发生过的信号。相当于“丢失了”信号。

 

作者:猫毛·波拿巴