java缓冲队列

java

浏览数:280

2019-1-8

片段 1片段 2片段 3


代码

import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedDeque;

/**
 * 缓冲队列
 * 一个线程安全的Queue,可实现缓冲功能,当满足指定缓冲数量或缓冲时间两个条件任意一个时,执行回调方法返回缓冲数据
 * @author ZhangShuzheng
 * @date 2018/5/26
 */
public class BufferQueue<T> {

    /**
     * 队列
     */
    private Queue<T> queue;

    /**
     * 队列监控线程
     */
    private BufferQueueThread bufferQueueThread;

    public BufferQueue(int bufferBatchCount, int bufferMillis, BufferQueueCallBack<List<T>> bufferQueueCallBack) {
        this.queue = new ConcurrentLinkedDeque<>();
        this.bufferQueueThread = new BufferQueueThread(queue, bufferBatchCount, bufferMillis, bufferQueueCallBack);
        this.bufferQueueThread.start();
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            bufferQueueThread.flush();
        }));
    }

    /**
     * 进队列方法
     * @param t
     */
    public void add(T t) {
        queue.add(t);
    }

}

/**
 * BufferQueue的队列监控线程
 * Created by ZhangShuzheng on 2018/5/26.
 */
class BufferQueueThread<T> extends Thread {

    /**
     * 监控队列
     */
    private Queue<T> queue;

    /**
     * 缓冲元素数量阈值
     */
    private int bufferBatchCount;

    /**
     * 缓冲时间阈值
     */
    private int bufferMillis;

    /**
     * 最后回调时间
     */
    private long lastCallBackMilliSecond = System.currentTimeMillis();

    /**
     * 回调接口
     */
    private BufferQueueCallBack<List<T>> bufferQueueCallBack;

    public BufferQueueThread(Queue<T> queue, int bufferBatchCount, int bufferMillis, BufferQueueCallBack<List<T>> bufferQueueCallBack) {
        this.queue = queue;
        this.bufferBatchCount = bufferBatchCount;
        this.bufferMillis = bufferMillis;
        this.bufferQueueCallBack = bufferQueueCallBack;
    }

    @Override
    public void run() {
        while (true) {
            long currentTimeMillis = System.currentTimeMillis();
            int pollCount = 0;
            // 当队列中缓冲数量达到指定数量阈值,将缓冲元素出队列
            if (queue.size() >= bufferBatchCount) {
                pollCount = bufferBatchCount;
            } else {
                long diffMillis = currentTimeMillis - lastCallBackMilliSecond;
                // 当达到缓冲时间阈值,则所有元素出队列
                if (diffMillis > bufferMillis || diffMillis < 0) {
                    pollCount = queue.size();
                }
            }
            if (pollCount > 0) {
                callBack(pollCount);
            } else {
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    /**
     * 回调指定数量的元素
     * @param pollCount
     */
    private void callBack(int pollCount) {
        List<T> items = new ArrayList<>(pollCount);
        for (int i = 0; i < pollCount; i++) {
            T item = queue.poll();
            items.add(item);
        }
        bufferQueueCallBack.run(items);
        lastCallBackMilliSecond = System.currentTimeMillis();
    }

    /**
     * 清空缓冲元素
     */
    public void flush() {
        while (queue.size() > 0) {
            int queueCount = queue.size();
            int pollCount = queueCount > bufferBatchCount ? bufferBatchCount : queueCount;
            callBack(pollCount);
        }
    }

}


接口

/**
 * BufferQueue的回调接口
 * @author ZhangShuzheng
 * @date 2018/5/26
 */
public interface BufferQueueCallBack<T> {

    /**
     * 回调方法
     * @param t
     */
    void run(T t);

}


示例

BufferQueue<String> bufferQueue = new BufferQueue<>(100, 5 * 1000, items -> {
    System.out.println(items.size());
});