java缓冲队列
2018-12-27
import java.util.ArrayList; import java.util.List; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedDeque; /** * 缓冲队列 * 一个线程安全的Queue,可实现缓冲功能,当满足指定缓冲数量或缓冲时间两个条件任意一个时,执行回调方法返回缓冲数据 * @author ZhangShuzheng * @date 2018/5/26 */ public class BufferQueue<T> { /** * 队列 */ private Queue<T> queue; /** * 队列监控线程 */ private BufferQueueThread bufferQueueThread; public BufferQueue(int bufferBatchCount, int bufferMillis, BufferQueueCallBack<List<T>> bufferQueueCallBack) { this.queue = new ConcurrentLinkedDeque<>(); this.bufferQueueThread = new BufferQueueThread(queue, bufferBatchCount, bufferMillis, bufferQueueCallBack); this.bufferQueueThread.start(); Runtime.getRuntime().addShutdownHook(new Thread(() -> { bufferQueueThread.flush(); })); } /** * 进队列方法 * @param t */ public void add(T t) { queue.add(t); } } /** * BufferQueue的队列监控线程 * Created by ZhangShuzheng on 2018/5/26. */ class BufferQueueThread<T> extends Thread { /** * 监控队列 */ private Queue<T> queue; /** * 缓冲元素数量阈值 */ private int bufferBatchCount; /** * 缓冲时间阈值 */ private int bufferMillis; /** * 最后回调时间 */ private long lastCallBackMilliSecond = System.currentTimeMillis(); /** * 回调接口 */ private BufferQueueCallBack<List<T>> bufferQueueCallBack; public BufferQueueThread(Queue<T> queue, int bufferBatchCount, int bufferMillis, BufferQueueCallBack<List<T>> bufferQueueCallBack) { this.queue = queue; this.bufferBatchCount = bufferBatchCount; this.bufferMillis = bufferMillis; this.bufferQueueCallBack = bufferQueueCallBack; } @Override public void run() { while (true) { long currentTimeMillis = System.currentTimeMillis(); int pollCount = 0; // 当队列中缓冲数量达到指定数量阈值,将缓冲元素出队列 if (queue.size() >= bufferBatchCount) { pollCount = bufferBatchCount; } else { long diffMillis = currentTimeMillis - lastCallBackMilliSecond; // 当达到缓冲时间阈值,则所有元素出队列 if (diffMillis > bufferMillis || diffMillis < 0) { pollCount = queue.size(); } } if (pollCount > 0) { callBack(pollCount); } else { try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } } } /** * 回调指定数量的元素 * @param pollCount */ private void callBack(int pollCount) { List<T> items = new ArrayList<>(pollCount); for (int i = 0; i < pollCount; i++) { T item = queue.poll(); items.add(item); } bufferQueueCallBack.run(items); lastCallBackMilliSecond = System.currentTimeMillis(); } /** * 清空缓冲元素 */ public void flush() { while (queue.size() > 0) { int queueCount = queue.size(); int pollCount = queueCount > bufferBatchCount ? bufferBatchCount : queueCount; callBack(pollCount); } } }
/** * BufferQueue的回调接口 * @author ZhangShuzheng * @date 2018/5/26 */ public interface BufferQueueCallBack<T> { /** * 回调方法 * @param t */ void run(T t); }
BufferQueue<String> bufferQueue = new BufferQueue<>(100, 5 * 1000, items -> { System.out.println(items.size()); });
相关推荐
android 蓝牙通讯示例源码(在线聊天)
java
2020-8-25
视频小程序 前后端(含数据库脚本)
java
2020-8-25
android 异步加载 自动切换
java
2020-8-25