Java NIO DirectByteBuffer 的使用与研究

Java基础

浏览数:113

2019-8-18

AD:资源代下载服务

一.结论

  DirectByteBuffer 与 ByteBuffer 最大区别就在于缓冲区内存管理的方式。ByteBuffer使用的是堆内存,DirectByteBuffer 使用的是堆外内存,堆外内存的优点就是在执行I/O操作时数据拷贝的次数相对较少,因此也获得了较高的性能。凡事总有但是,由于将缓冲区分配在堆外内存也引入一系列与内存分配和回收的问题,所幸JDK提供了一系列方案来解决问题,这些也是本文所要阐述的重点。

二.ByteBuffer 的缺点

  I/O基本上可以视为一系列倒腾数据的操作。举例来说,商品经中间商倒腾的次数越少,其价格越便宜;对于I/O来说,拷贝字节数组的次数越少,其I/O性能也就越高。而ByteBuffer性能低下的原因就是在使用ByteBuffer进行I/O操作时会执行以下操作:

  1.将堆内存中缓冲区数据拷贝到临时缓冲区

  2.对临时缓冲区的数据执行低层次I/O操作

  3.临时缓冲区对象离开作用域,并最终被回收成为无用数据

  与之相对,DirectByteBuffer 由于将内存分配在了堆外内存因此可以直接执行较低层次的I/O操作数据,减少了拷贝次数因此也获得了较高的性能。

  问题来了,为什么不直接在堆内存中的缓冲区执行低层次的I/O操作呢?

  推测最主要的原因就是,JVM的垃圾回收操作会移动对象在堆内存中的位置,以实现内存的清理,因此,如果直接在堆内存中的缓冲区执行可能会发现缓冲区内存地址变化的情况,也就无从执行I/O操作。

三.DirectByteBuffer 内存申请与回收

  由于DirectByteBuffer的 API使用与ByteBuffer并无太大的区别,因此本文将集中研究DirectByteBuffer是如何执行内存申请操作,以及如何对其进行内存回收操作。

3.1.内存申请

  在构造DirectByteBuffer时就已经执行了内存申请操作,其中我们主要关注 Bits.reserveMemory(size, cap) 以及  Cleaner.create(this, new Deallocator(base, size, cap))。

DirectByteBuffer(int cap) {                   // package-private

        super(-1, 0, cap, cap);
        boolean pa = VM.isDirectMemoryPageAligned();
        int ps = Bits.pageSize();
        long size = Math.max(1L, (long)cap + (pa ? ps : 0));
        //内存分配预处理
        Bits.reserveMemory(size, cap);

        long base = 0;
        try {
           //申请堆外内存,返回缓冲区内存的首地址
            base = unsafe.allocateMemory(size);
        } catch (OutOfMemoryError x) {
            Bits.unreserveMemory(size, cap);
            throw x;
        }
        unsafe.setMemory(base, size, (byte) 0);
        if (pa && (base % ps != 0)) {
            // Round up to page boundary
            address = base + ps - (base & (ps - 1));
        } else {
            address = base;
        }
        //此行代码用于实现当DirectByteBuffer被回收时,堆外内存也会被释放
        cleaner = Cleaner.create(this, new Deallocator(base, size, cap));
        att = null;
    }    

  Bits.reserveMemory

  

static void reserveMemory(long size, int cap) {
        // 设置最大内存设置
        if (!memoryLimitSet && VM.isBooted()) {
            maxMemory = VM.maxDirectMemory();
            memoryLimitSet = true;
        }
       
        // 乐观地尝试预定直接内存(DirectMemory)的内存
        // optimist!
        if (tryReserveMemory(size, cap)) {
            return;
        }
       
        // 如果预定内存失败,则对直接内存中无用的内存执行回收操作
        final JavaLangRefAccess jlra = SharedSecrets.getJavaLangRefAccess();

        // retry while helping enqueue pending Reference objects
        // which includes executing pending Cleaner(s) which includes
        // Cleaner(s) that free direct buffer memory
        while (jlra.tryHandlePendingReference()) {
            if (tryReserveMemory(size, cap)) {
                return;
            }
        }
        // 触发GC操作
        // trigger VM's Reference processing
        System.gc();
        
        // 执行多次循环,尝试进行内存回收操作,如果多次尝试失败之后,则抛出OutOfMemory异常
        // a retry loop with exponential back-off delays
        // (this gives VM some time to do it's job)
        boolean interrupted = false;
        try {
            long sleepTime = 1;
            int sleeps = 0;
            while (true) {
                if (tryReserveMemory(size, cap)) {
                    return;
                }
                if (sleeps >= MAX_SLEEPS) {
                    break;
                }
                if (!jlra.tryHandlePendingReference()) {
                    try {
                        Thread.sleep(sleepTime);
                        sleepTime <<= 1;
                        sleeps++;
                    } catch (InterruptedException e) {
                        interrupted = true;
                    }
                }
            }

            // no luck
            throw new OutOfMemoryError("Direct buffer memory");

        } finally {
            if (interrupted) {
                // don't swallow interrupts
                Thread.currentThread().interrupt();
            }
        }
    }
 tryReserveMemory 此方法的主要功能就是检查当前DirectMemory内存是否足够构建DirectByteBuffer的缓冲区,并通过CAS的方式设置当前已使用的内存   
//尝试预定内存
private static boolean tryReserveMemory(long size, int cap) { // -XX:MaxDirectMemorySize limits the total capacity rather than the // actual memory usage, which will differ when buffers are page // aligned. long totalCap;
     //检查内存是否足够 while (cap <= maxMemory - (totalCap = totalCapacity.get())) {
       //如果内存足够,则尝试CAS设置totalCapacity  if (totalCapacity.compareAndSet(totalCap, totalCap + cap)) { reservedMemory.addAndGet(size); count.incrementAndGet(); return true; } } return false; }

   jlra.tryHandlePendingReference 为什么可以执行内存回收操作呢?其原理如下节所示。

3.2.内存释放

  结论:DirectByteBuffer中的直接内存缓冲区释放的方式有两种

  1.ReferenceHandler线程会自动检查有无被回收的DirectByteBuffer,如果有则执行Cleaner.clean方法释放其对应的直接内存

  2.通过调用SharedSecrets.getJavaLangRefAccess()方法来释放内存,具体见Reference类代码分析。

3.2.1代码分析

  此句代码便是直接内存释放的关键了。

 cleaner = Cleaner.create(this, new Deallocator(base, size, cap));

  Deallocator 的代码如下所示:

    private static class Deallocator
        implements Runnable
    {

        private static Unsafe unsafe = Unsafe.getUnsafe();
        //直接内存缓冲区的首地址
        private long address;
        private long size;
        private int capacity;

        private Deallocator(long address, long size, int capacity) {
            assert (address != 0);
            this.address = address;
            this.size = size;
            this.capacity = capacity;
        }

        public void run() {
            if (address == 0) {
                // Paranoia
                return;
            }
            //释放内存
            unsafe.freeMemory(address);
            address = 0;
            //已预定的内存 - 已释放的内存
            Bits.unreserveMemory(size, capacity);
        }

    }

   Cleaner 内部维护着一个双向队列,此类的定义如下所示。

       请注意以下关键点:

  Cleaner 继承了PhantomReference幽灵引用,并且维护了一个ReferenceQueue<Object> 队列。

  

public class Cleaner extends PhantomReference<Object> {
    private static final ReferenceQueue<Object> dummyQueue = new ReferenceQueue();
    private static Cleaner first = null;
    private Cleaner next = null;
    private Cleaner prev = null;
    private final Runnable thunk;
   
       private static synchronized Cleaner add(Cleaner var0) {
        if (first != null) {
            var0.next = first;
            first.prev = var0;
        }

        first = var0;
        return var0;
    }    
   
     private Cleaner(Object var1, Runnable var2) {
        super(var1, dummyQueue);
        this.thunk = var2;
    }
     
     public static Cleaner create(Object var0, Runnable var1) {
        return var1 == null ? null : add(new Cleaner(var0, var1));
    }
        // 执行Dealloactor.run()
        public void clean() {
        if (remove(this)) {
            try {
                this.thunk.run();
            } catch (final Throwable var2) {
                AccessController.doPrivileged(new PrivilegedAction<Void>() {
                    public Void run() {
                        if (System.err != null) {
                            (new Error("Cleaner terminated abnormally", var2)).printStackTrace();
                        }

                        System.exit(1);
                        return null;
                    }
                });
            }

        }
    }
} 

  而幽灵引用的定义如下所示:

public class PhantomReference<T> extends Reference<T> {
   public T get() {
        return null;
    }
    public PhantomReference(T referent, ReferenceQueue<? super T> q) {
        super(referent, q);
    }
}

   问题来了,说了这么多到底是谁在调用DirectByteBuffer的内存回收代码(Cleaner.clean() -> Deallocator.run())

 Reference中代码说明了一切:
    static {
        ThreadGroup tg = Thread.currentThread().getThreadGroup();
        for (ThreadGroup tgn = tg;
             tgn != null;
             tg = tgn, tgn = tg.getParent());
        Thread handler = new ReferenceHandler(tg, "Reference Handler");
        /* If there were a special system-only priority greater than
         * MAX_PRIORITY, it would be used here
         */
        handler.setPriority(Thread.MAX_PRIORITY);
        handler.setDaemon(true);
        handler.start();

        // provide access in SharedSecrets
        SharedSecrets.setJavaLangRefAccess(new JavaLangRefAccess() {
            @Override
            public boolean tryHandlePendingReference() {
                return tryHandlePending(false);
            }
        });
    }
ReferenceHandler的主要代码如下所示,主要是不断的执行tryHandlePending 方法
        public void run() {
            while (true) {
                tryHandlePending(true);
            }
        }
    static boolean tryHandlePending(boolean waitForNotify) {
        Reference<Object> r;
        Cleaner c;
        try {
            synchronized (lock) {
                if (pending != null) {
                    r = pending;
                    // 'instanceof' might throw OutOfMemoryError sometimes
                    // so do this before un-linking 'r' from the 'pending' chain...
                    c = r instanceof Cleaner ? (Cleaner) r : null;
                    // unlink 'r' from 'pending' chain
                    pending = r.discovered;
                    r.discovered = null;
                } else {
                    // The waiting on the lock may cause an OutOfMemoryError
                    // because it may try to allocate exception objects.
                    if (waitForNotify) {
                        lock.wait();
                    }
                    // retry if waited
                    return waitForNotify;
                }
            }
        } catch (OutOfMemoryError x) {
            // Give other threads CPU time so they hopefully drop some live references
            // and GC reclaims some space.
            // Also prevent CPU intensive spinning in case 'r instanceof Cleaner' above
            // persistently throws OOME for some time...
            Thread.yield();
            // retry
            return true;
        } catch (InterruptedException x) {
            // retry
            return true;
        }
        // Cleaner.clean 方法调用处  
        // Fast path for cleaners
        if (c != null) {
            c.clean(); 
            return true;
        }

        ReferenceQueue<? super Object> q = r.queue;
        if (q != ReferenceQueue.NULL) q.enqueue(r);
        return true;
    }


 

作者:柯三