阻塞队列和生产者-消费者模式

Java基础

浏览数:230

2019-7-28

何为阻塞队列,其与普通队列有何差别?

  总的来说,就是能够在适当的时候阻塞”存”和”取”两个操作,以达到控制任务流程的效果。阻塞队列提供了可阻塞的put和take方法。如果队列已经满了,那么put方法将阻塞直到有空间可用;如果队列为空,那么take方法将会阻塞直到有元素可用。

阻塞队列接口及实现来自于Java并发包(java.util.concurrent),常见的实现有LinkedBlockingQueue、ArrayBlockingQueue、PriorityBlockingQueue

生产者-消费者模式

  生产者-消费者模式是非常常见的设计模式。该模式将”找出需要完成的工作”与”执行工作”这两个过程分离开来,并把工作项放入一个”待完成”列表中以便在随后处理,而不是找出后立即处理。生产者-消费者模式能简化开发过程,因为它消除了生产者类与消费者类之间的代码依赖性,此外,该模式还将生产数据的过程与使用数据的过程解耦开来以简化工作负载的管理,因为这两个过程在处理数据的速率上有所不同。

 

阻塞队列对于生产者-消费者模式有何裨益?

  生产者-消费者模式都是基于队列的。就说说普通的有界队列存在的问题吧,队列存在”满”和”空”的问题,如果队列已满,那生产者继续往队列里存数据就会出问题,存不进去要如何处理,生产者代码中就要有相应的处理代码。同样的,如果队列为空,消费者取不到数据又要如何反应。而阻塞队列,就可以在”存不进”和”取不出”的时候,直接阻塞操作,生产者和消费者代码直接阻塞在存取操作上。当然这种阻塞并不是永久的,就拿生产者来说吧,如果因为”存不进”而阻塞的话,只要消费者取出数据,便会”通知”生产者就能继续生产并存储数据。这样就能极大地简化生产者-消费者的编码。

  值得一提的是,阻塞队列还能提供更灵活的选项:offer(对应put)和 poll(对应take)

boolean offer(E e);

boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException;

  如果数据项不能被添加到队列中,将返回一个失败状态。而不必一直阻塞下去。这样你就可以选择让生产者做点其他的事。但是一般情况下,如果队列充满,很有可能是因为

V生>V消,以至于数据项囤积,如果任其阻塞,则生产者可能被长时间搁置,浪费资源,利用率降低。这时候就要使用一些灵活的策略进行调控,例如减去负载,将多余的工作项序列化并写入磁盘,减少生产者线程的数量,或者通过某种方式来抑制生产者线程。

使用示例

示例说明:本示例模拟一个生产-消费环境,工厂生产可乐,肥宅消费。这里对于生产者的调控比较粗暴,直接新建或中断一个生产者任务。

public class BlockingQueueDemo {

    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<CocaCola> queue = new ArrayBlockingQueue<>(100); //容量100的队列
        ExecutorService exec = Executors.newCachedThreadPool();
        for (int i = 0; i < 5; i++) {
            exec.execute(new Producer(queue, exec));
        }
        TimeUnit.SECONDS.sleep(3); //先生产一点库存
        for (int i = 0; i < 5; i++) {
            exec.execute(new FatIndoorsman(queue, exec));
        }
    }
}

class CocaCola { //可口可乐

}

class Producer implements Runnable {
    private static int counter = 0;
    private final int id = counter++;
    private static List<Producer> producers = new ArrayList<>(); //类管理其实例列表
    private Executor exec;
    private BlockingQueue queue;

    public Producer(BlockingQueue queue, Executor exec) {
        this.queue = queue;
        this.exec = exec;
        producers.add(this);
    }

    public synchronized static void adjust(int flag, BlockingQueue queue, Executor exec) { // 1 添加  -1减少
        if (flag == 1) {
            Producer producer = new Producer(queue, exec); //添加的生产者共享同一个队列
            exec.execute(producer);
        } else if (flag == -1) {
            Producer producer = producers.remove(0);
            producer.cancel();
        }
    }

    private void cancel() { //利用中断取消生产任务
        Thread.currentThread().interrupt();
    }

    @Override
    public void run() {
        while (!Thread.interrupted()) {
            try {
                TimeUnit.SECONDS.sleep(1); //模拟生产需耗时1秒
                boolean success = queue.offer(new CocaCola()); //通过offer尝试添加
                if (!success) { //如果队列已满,则移除1个生产者
                    System.out.println("remove a producer");
                    adjust(-1, queue, exec);
                }
                System.out.println(this + " produced a coca-cola!");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.out.println(this + " is stoped!");
    }

    @Override
    public String toString() {
        return "Producer[" + id + "]";
    }
}

class FatIndoorsman implements Runnable {
    private static int counter = 0;
    private final int id = counter++;
    private BlockingQueue queue;
    private Executor exec;

    public FatIndoorsman(BlockingQueue queue, Executor exec) {
        this.queue = queue;
        this.exec = exec;
    }

    @Override
    public void run() {
        while (!Thread.interrupted()) {
            CocaCola cocaCola = (CocaCola) queue.poll();
            if (cocaCola != null) {
                try {
                    TimeUnit.SECONDS.sleep(10); //模拟肥宅每隔10秒要喝一瓶
                    System.out.println(this + " drink a coca-cola");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            } else {
                Producer.adjust(1, queue, exec); //添加生产者
            }
        }
    }

    @Override
    public String toString() {
        return "FatIndoorsman[" + id + "]";
    }
}

阻塞队列是如何实现的,即如何进行阻塞?

显示锁Lock+条件队列Condition。这里维护两个条件队列,对应take和put操作。两个Condition绑定同一个Lock(即由同一个Lock.newCondition生成)。拿ArrayBlockingQueue来说,其他实现类实现阻塞的方式应该类似。如果某一方take和put失败,则调用对应Condition的await方法,调用的线程将被阻塞,进入等待条件的队列,并释放锁。如果某一方成功(即成功调用dequeue或enqueue方法),则会调用对应的Condition的signal方法,等待条件队列中的某个线程将被选中并激活。

为何要使用Lock+Condition,而不是隐式的synchronized和wait/notify条件队列?

  首先我们有两个需要,第一,我们需要两个条件队列来维护take和put操作,而一个对象Object只绑定一个条件队列,如果触发notify(),我们不知道到底是哪个条件达到了。不过我们可以创建两个内部的全局变量Object作为锁,将这两个对象的内置锁作为take和put操作的同步锁,这样就可以有两个条件队列了。但是,我们说了,还有一个需要,那就是take和put操作必须共有同一个锁。

2018-09-11 20:28:07 补充:内置的条件队列也可以实现多个条件控制,条件队列里存的是等待条件的线程,每个线程等待的条件可以不一样。notifyAll能够使得所有线程苏醒,并检查条件谓词,查看是否达到条件,如果未达到条件,则继续wait。达到则向下执行。这里引入显式的条件队列Condition的作用是,分离等待条件的线程,将等待同一条件的线程分配到同一个队列,这时候,就可以只唤醒等待特定条件的线程了。

注意:notifyAll并没有关联什么条件谓词,而是提示所有线程,某个状态发生改变了,你们自己看看是不是自己等待的条件达到了。

以下是ArrayBlockingQueue.java的部分源码截图

 

作者:猫毛·波拿巴