网络编程 – NIO

Java基础

浏览数:110

2020-6-15

Reactor模式

Reactor模式称为叫反应堆或则反应器。在网络编程 – BIO中,我们了解到,大量的线程休眠导致资源浪费,在BIO中,通过Reactor来优化,下面举个简单的例子:
有个高级游戏乐园,里面有5个不同游戏的玩法,玩家进来的时候,有一对一的工作人员给玩家讲解每个玩法。
传统BIO模式是这样的:
A玩家进来,工作人员1接待A,工作人员1讲解完第一个游戏玩法,然后A玩家开始玩,此时,工作人员1就在等待A玩家玩完(比如br.readLine())。玩家玩第二个游戏的时候,工作人员1开始讲解第二个游戏玩法,然后A玩家继续玩,一直玩到第五个游戏。
如果还有玩家进来呢?那派出第二个工作人员,第三个工作人员。。。。。。
我们可以看到,工作人员在玩家玩的时候,他是处于休息空闲的状态,而且玩家越来越多的时候,工作人员就需要越来越多。
Reactor模式:
工作人员1给玩家A讲解完了,他就去处理其他事情,而玩家开始玩游戏,当玩家准备玩下一个游戏的时候,他就呼叫工作人员,工作人员就过来讲解下一个游戏玩法。本来一个工作人员只能服务一个玩家,但是通过这个模式,他可以同时服务多个玩家,在玩家A玩游戏的时候,他可以为其他玩家提供讲解服务。
在Reactor模式中,应用程序并不会调用某个方法直至完成,而是逆置了事件处理流程,具体事件处理程序向反应器注册一个事件处理器,等到事件来了,具体事件处理程序再处理相关事件。

NIO三大组件

Buffer

Buffer在NIO中,本质是一块内存,用于和NIO通道进行交互。我们可以把数据从通道读取中出来,写入到Buffer,也可以把Buffer的数据读到出来,写到通道中。
在NIO中,java定义了IntBuffer、FloatBuffer、ByteBuffer等,我们比较常用的是ByteBuffer。

主要属性

Buffer有几个重要的属性:position、limit、capacity。

  • capacity:指的是内存块的固定大小,一旦设定,就不能再修改。往内存写满数据后,就不能再写,除非将其清空。
  • position:指的是读和写的下一个位置。每次读或写的时候,就会加1。从写模式切换到读模式的时候(flip方法),position就会重置为0。
  • limit:在写模式下,不超过capacity,在读模式下,position不能大于limit。意思其实很简单,就是写的时候,不能超过指定的capacity大小。切换到读的时候,limit等于写的长度,读取的position不能超过写的数据limit。

下面通过一个简单的例子深入了解一下这几个属性。

public static void main(String[] args) {
    ByteBuffer buffer = ByteBuffer.allocate(8);
    System.out.println("init:" + buffer);
    buffer.put((byte) 'a');
    System.out.println("put-a:" + buffer);
    buffer.put((byte) 'b');
    System.out.println("put-b:" + buffer);
    buffer.put((byte) 'c');
    System.out.println("put-c:" + buffer);
    // 切换到读模式
    buffer.flip();
    System.out.println("flip:" + buffer);
    buffer.get();
    System.out.println("get-a:" + buffer);
    buffer.get();
    System.out.println("get-b:" + buffer);
    buffer.get();
}

输出结果如下:

初始化时,pos指向0,capacity和limit都等于指定大小8。

put-a时,pos+1,等于1,capacity和limit不变。

put-b时,pos+1,等于2,capacity和limit不变。

put-c时,pos+1,等于3,capacity和limit不变。

flip后,pos把值赋值给limit,并重置为0,capacity不变。此时,pos等于0,limit等于3,capacity等于8。

get-a时,pos+1,等于1,capacity和limit不变。

get-b时,pos+1,等于1,capacity和limit不变。

主要方法

初始化buffer

allocate方法,在上面例子中,我们看到了ByteBuffer.allocate(8)的使用。

public static ByteBuffer allocate(int capacity) {
    if (capacity < 0)
        throw new IllegalArgumentException();
    // lim也传capacity,所以两个刚开始是相等的
    return new HeapByteBuffer(capacity, capacity);
}

HeapByteBuffer(int cap, int lim) {  
    // 这边pos赋值为0,字节长度为cap
    super(-1, 0, lim, cap, new byte[cap], 0);
}

ByteBuffer(int mark, int pos, int lim, int cap, 
             byte[] hb, int offset)
{
    super(mark, pos, lim, cap);
    this.hb = hb;
    this.offset = offset;
}

wrap方法,跟allocate方法都可以初始化buffer,不同的是可以指定pos和limit,以及指定字节数组的初始值。

public static ByteBuffer wrap(byte[] array) {
    return wrap(array, 0, array.length);
}
public static ByteBuffer wrap(byte[] array,
                                    int offset, int length)
{
    try {
        // 传递字节数组,pos,偏移量length,用于计算limit
        return new HeapByteBuffer(array, offset, length);
    } catch (IllegalArgumentException x) {
        throw new IndexOutOfBoundsException();
    }
}

写数据

除了上面例子演示的,put(byte),还有以下这些。

从源码中看pos会加1的原因:

public abstract ByteBuffer put(byte b);

public ByteBuffer put(byte x) {
    hb[ix(nextPutIndex())] = x;
    return this;
}
final int nextPutIndex() {                          // package-private
    if (position >= limit)
        throw new BufferOverflowException();
    // 这边加1
    return position++;
}

也可以把通道中的数据写入到buffer:

// 这边用read指的是把通道的数据读取出来,再写入buffer,read返回的是写入buffer的数据大小。
channel.read(buf)

flip

从源码中也可以看出,把pos的值赋值给limit,并重置为0。

public final Buffer flip() {
    limit = position;
    position = 0;
    mark = -1;
    return this;
}

读数据

除了上面例子演示的,get,还有以下这些

从源码中看pos会加1的原因:

public abstract byte get();

public byte get() {
    return hb[ix(nextGetIndex())];
}

final int nextGetIndex() {                          // package-private
    if (position >= limit)
        throw new BufferUnderflowException();
    return position++;
}

也可以把buffer的数据写入到通道中:

// 把buffer的数据读取出来,写入到channel中
channel.write(buf)

标记与重置

调用mark的时候,会把pos的值给mark,调用reset的时候,会把mark的值给pos。在实际过程中,我们在读操作的时候,先调用mark方法标记位置,比如此时为4,当我们读到7的时候,再调用reset方法,此时又重新从4开始读。

public final Buffer mark() {
    mark = position;
    return this;
}

public final Buffer reset() {
    int m = mark;
    if (m < 0)
        throw new InvalidMarkException();
    position = m;
    return this;
}

rewind、clear、compact

从源码可以看出,rewind把pos置为0,所以就是从头开始读写。
clear方法,把pos置0,并重置limit为capacity,这个时候进行写的时候,就是从第一个位置开始写,如果原先有数据,就是要被覆盖,相当于清空了整个内存。
compact与clear不一样的是,他会把pos和limit之间的数据,移到前面去,并设置pos的值,写的时候,会从新的位置开始写。比如pos为2,limit为4,他会把2-4之间的值移到0,再把pos设置为2,这样没读的数据,就不会被覆盖而消失消失。

public final Buffer rewind() {
    position = 0;
    mark = -1;
    return this;
}

public final Buffer clear() {
    position = 0;
    limit = capacity;
    mark = -1;
    return this;
}

public abstract ByteBuffer compact();
public ByteBuffer compact() {
    System.arraycopy(hb, ix(position()), hb, ix(0), remaining());
    position(remaining());
    limit(capacity());
    discardMark();
    return this;
}

public final int remaining() {
    return limit - position;
}

Channel

Channel,通道,操作系统和应用程序之间的数据交互,就是通过通道来的。

  • ServerSocketChannel:用于TCP的服务端
  • SocketChannel:用于TCP的客户端
  • DatagramChannel:用于UDP

Selector

选择器,把Channel和需要的事件注册到Selector上面,让Selector进行监听。这些事件包括以下几种:

// 读
public static final int OP_READ = 1 << 0;
// 写
public static final int OP_WRITE = 1 << 2;
// 请求连接
public static final int OP_CONNECT = 1 << 3;
// 接收连接
public static final int OP_ACCEPT = 1 << 4;

当需要监听多个事件时,比如OP_ACCEPT和OP_CONNECT可以这样写SelectionKey.OP_ACCEPT | SelectionKey.OP_CONNECT

作者:大军