JavaNIO

/ 默认分类 / 0 条评论 / 6浏览

1.背景

前面我们介绍了几种linux的网络IO模型的原理, 这些属于底层操作系统的实现。 对于高级计算机语言,有对这些操作系统功能进行封装,比如Java中的NIO,而Java NIO也就是连接操作系统底层的IO模型和高级框架(如netty)的中间层。Java NIO基于Linux的多路复用IO模型实现,这种模型被Netty、Tomcat高版本等主流框架采用。这里的"多路"指代多个网络连接,每个连接对应一个Channel,实际上实现了多个连接共享少量线程的机制。

下面我们来一起学习总结下Java的NIO。

2.详细分析

2.1 Java NIO三大核心组件概述

1. Buffer(缓冲区)

2. Channel(通道)

3. Selector(选择器)

2.2 深入学习

2.2.1 Buffer

Buffer是一个对象,它包含一些要写入或者刚读出的数据。在NIO中,所有数据都是用缓冲区处理的。在读取数据时,它是直接读到缓冲区中的;在写入数据时,它是写入到缓冲区中的。任何时候访问NIO中的数据,都是通过缓冲区进行操作。

缓冲区实质上是一个数组,通常它是一个字节数组(ByteBuffer),也可以使用其他种类的数组。但是一个缓冲区不仅仅是一个数组,缓冲区提供了对数据的结构化访问以及维护读写位置等信息。

2.2.1.1 Buffer的基本属性

2.2.1.2 Buffer的常用api

2.2.1.3 Buffer使用程序示例

// 创建一个容量为48字节的ByteBuffer
// 底层实际上是创建了一个48字节的byte数组(堆内存)
ByteBuffer buffer = ByteBuffer.allocate(48); 

// 写入数据到Buffer的两种方式:
// 1. 从Channel读取数据写入Buffer(底层写入byte数组)
int bytesRead = channel.read(buffer);
// 2. 直接通过put方法写入数据(操作底层byte数组)
buffer.put(byteArray);    

// 从Buffer中读取数据前必须切换模式
buffer.flip();  // 将limit=position, position=0(准备读取)
byte b = buffer.get(); // 从底层byte数组读取一个字节

// 将Buffer数据写入Channel(读取底层byte数组)
int bytesWritten = channel.write(buffer);

// 清空Buffer(position=0, limit=capacity,但底层byte数组数据仍在)
buffer.clear();

// 压缩Buffer(将未读取的数据移到数组头部)
buffer.compact();

// ByteBuffer的底层实现(关键代码解析)
public static ByteBuffer allocate(int capacity) {
    if (capacity < 0)
        throw new IllegalArgumentException();
    // 实际创建的是HeapByteBuffer(堆内存实现)
    return new HeapByteBuffer(capacity, capacity);
}

// HeapByteBuffer类(堆内存实现)
class HeapByteBuffer extends ByteBuffer {
    // 构造方法:创建指定容量的byte数组
    HeapByteBuffer(int cap, int lim) {  // package-private
        // 调用父类构造器,传入byte数组
        super(-1, 0, lim, cap, new byte[cap], 0);
        // 等价于:
        // hb = new byte[cap];  // 实际存储数据的byte数组
        // offset = 0;          // 数组偏移量
    }
}

// ByteBuffer抽象类
public abstract class ByteBuffer extends Buffer {
    // 关键字段说明:
    final byte[] hb;     // 实际存储数据的byte数组(非堆内存实现时为null)
    final int offset;    // 数组偏移量
    boolean isReadOnly;  // 是否只读
    
    // 构造方法
    ByteBuffer(int mark, int pos, int lim, int cap, 
             byte[] hb, int offset) {
        super(mark, pos, lim, cap);  // 调用Buffer类初始化位置信息
        this.hb = hb;    // 绑定byte数组
        this.offset = offset;
    }
}

可以看出ByteBuffer本质上是基于堆内存的byte数组

2.2.2 Channel

2.2.2.1 基本属性介绍

在 Java NIO 中,Channel(通道)是用于数据传输的抽象管道,可以理解为连接数据源(如文件、网络Socket)和程序之间的桥梁。 可以通过它读取和写入数据。它就像自来水管一样,网络数据通过Channel读取和写入。通道与流的不同之处在于通道是双向的,流只是在一个方向上移动(一个流必须是InputStream或者OutputStream的子类),而且通道可以用于读、写或者同时用于读写。

Channel 可以连接不同类型的数据源:文件(FileChannel):用于文件读写 ,TCP 网络(SocketChannel / ServerSocketChannel):用于网络通信 ,UDP 网络(DatagramChannel):用于无连接的数据报传输.

Channel 可以设置为 阻塞模式(默认) 或 非阻塞模式(有点类似在linuxIO模型中介绍的socket阻塞属性的设置)。 在非阻塞模式下,读写操作不会让线程等待,而是立即返回结果(成功、失败或未就绪)。 非阻塞模式通常配合 Selector(选择器) 使用,实现高效的 IO 多路复用(单线程管理多个连接)。

Channel 提供 map()方法,允许将文件的一部分直接映射到内存,程序可以直接操作内存,而不需要频繁的磁盘读写。这种方式特别适合 大文件处理,比如数据库、日志分析等场景。

Channel 支持transferTo() 和 transferFrom() 方法,可以在 文件与网络之间直接传输数据,而不需要经过用户缓冲区,减少数据拷贝,提高性能。

2.2.2.2 Channel 的主要实现类

2.2.2.3 FileChannel的使用示例

import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.charset.StandardCharsets;

public class FileChannelExample {
    public static void main(String[] args) throws Exception {
        //创建FileChannel
        RandomAccessFile aFile = new RandomAccessFile("data/nio-data.txt", "rw");
        FileChannel channel = aFile.getChannel();
        
        //创建Buffer
        ByteBuffer buffer = ByteBuffer.allocate(48);
        
        //读取数据到Buffer
        int bytesRead = channel.read(buffer);
        while (bytesRead != -1) {
            System.out.println("读取了 " + bytesRead + " 字节");
            
            //切换到读模式
            buffer.flip();
            
            //读取Buffer中的数据
            while(buffer.hasRemaining()) {
                System.out.print((char) buffer.get());
            }
            
            //清空Buffer
            buffer.clear();
            bytesRead = channel.read(buffer);
        }
        
        //写入数据到文件
        String newData = "新数据" + System.currentTimeMillis();
        ByteBuffer buf = ByteBuffer.wrap(newData.getBytes(StandardCharsets.UTF_8));
        while(buf.hasRemaining()) {
            channel.write(buf);
        }
        
        //关闭Channel
        channel.close();
        aFile.close();
    }
}

2.2.3 Selector

2.2.3.1 基本属性介绍

用于检查一个或多个NIO Channel的状态是否处于可读、可写等。这样,一个单独的线程可以管理多个channel,从而管理多个网络连接,提高效率。基本的使用步骤一般是:

  1. 创建Selector:通过调用Selector.open()方法创建一个新的选择器。这是使用选择器的第一步,它会返回一个新的Selector实例。
  2. 将Channel注册到Selector上:只有非阻塞的Channel才能注册到Selector上。调用通道的register()方法将其注册到选择器上,同时指定对哪些事件感兴趣。可以监听的事件类型包括:
    • SelectionKey.OP_CONNECT:连接就绪事件,表示客户端连接已经建立成功
    • SelectionKey.OP_ACCEPT:接收连接事件,表示服务器端监听到了客户端连接请求
    • SelectionKey.OP_READ:读就绪事件,表示通道中有数据可以读取
    • SelectionKey.OP_WRITE:写就绪事件,表示可以向通道写入数据
  3. 通过Selector选择Channel:调用选择器的select()方法阻塞等待,直到至少有一个通道在你注册的事件上就绪了。select()方法返回的int值表示有多少通道已经就绪。然后,可以通过调用选择器的selectedKeys()方法获取已就绪的通道集合,并遍历处理这些通道上的事件。

2.2.3.2 Selector的使用示例

public class SelectorExample {
    public static void main(String[] args) throws IOException {
        //创建Selector,Selector用于监听多个通道的事件,实现单线程管理多路连接
        Selector selector = Selector.open();

        //创建ServerSocketChannel,作为服务端通道,监听客户端连接
        ServerSocketChannel serverSocket = ServerSocketChannel.open();
        serverSocket.socket().bind(new InetSocketAddress("localhost", 8080));

        //设置为非阻塞模式,只有非阻塞通道才能注册到Selector
        serverSocket.configureBlocking(false);

        //将ServerSocketChannel注册到Selector,并监听ACCEPT事件(有新连接时触发)
        serverSocket.register(selector, SelectionKey.OP_ACCEPT);

        //分配缓冲区,用于读写数据
        ByteBuffer buffer = ByteBuffer.allocate(256);

        while (true) {
            //等待感兴趣的事件发生,select()会阻塞直到至少有一个事件就绪
            int readyChannels = selector.select();
            if (readyChannels == 0) continue;

            //获取所有已就绪事件的SelectionKey集合
            Set<SelectionKey> selectedKeys = selector.selectedKeys();
            Iterator<SelectionKey> keyIterator = selectedKeys.iterator();

            while (keyIterator.hasNext()) {
                SelectionKey key = keyIterator.next();

                if (key.isAcceptable()) {
                    //isAcceptable()为true,说明有新的客户端连接到来
                    ServerSocketChannel server = (ServerSocketChannel) key.channel();
                    SocketChannel client = server.accept();
                    client.configureBlocking(false); //设置新连接为非阻塞

                    //将新连接注册到Selector,监听READ事件(有数据可读时触发)
                    client.register(selector, SelectionKey.OP_READ);
                    System.out.println("接受了新连接:" + client);
                } else if (key.isReadable()) {
                    //isReadable()为true,说明通道有数据可读
                    SocketChannel client = (SocketChannel) key.channel();
                    buffer.clear(); //清空缓冲区,准备读取数据
                    int bytesRead = client.read(buffer); //读取数据到缓冲区

                    if (bytesRead == -1) {
                        //客户端关闭了连接
                        client.close();
                    } else if (bytesRead > 0) {
                        //读取到数据,进行处理
                        buffer.flip(); //切换为读模式
                        byte[] data = new byte[buffer.limit()];
                        buffer.get(data); //从缓冲区获取数据
                        System.out.println("收到数据:" + new String(data));

                        //回复客户端消息
                        buffer.clear();
                        buffer.put("已收到你的消息".getBytes());
                        buffer.flip();
                        client.write(buffer);
                    }
                }

                //处理完事件后,必须移除SelectionKey,否则下次循环还会处理
                keyIterator.remove();
            }
        }
    }
}

2.2.3.3 Selector联合LinuxIO模型进行源码分析

查看ServerSocketChannel的源码可以发现这样一句话:

A selectable channel for stream-oriented listening sockets.

也就是说,ServerSocketChannel是连通listen_socket的通道,很容易想到,SocketChannel 也就是连接connect_socket的通道(也就是我们在之前介绍liunux网络IO模型的时候说到的监听连接的socket)

从上面的java nio服务端代码例子中可也是以看到:当调用accept之后,即listen_socket不再负责和当前客户端交互,而是创建了一个connect_socket和客户端进行三次握手之后的数据交互(建立连接后会创建一个新的socket用于处理数据传输),这个在前面的Liunux网络IO模型的文章中有解释过了。

在Java NIO中, Selector.open() 和 ServerSocketChannel.open() 方法本质上是对Linux底层系统调用的封装:

下面我们再来看下他们的源码。

public abstract class Selector implements Closeable {

    /**
     * Initializes a new instance of this class.
     */
    protected Selector() { }

    /**
     * Opens a selector.
     *
     * <p> The new selector is created by invoking the {@link
     * java.nio.channels.spi.SelectorProvider#openSelector openSelector} method
     * of the system-wide default {@link
     * java.nio.channels.spi.SelectorProvider} object.  </p>
     *
     * @return  A new selector
     *
     * @throws  IOException
     *          If an I/O error occurs
     */
    public static Selector open() throws IOException {
        return SelectorProvider.provider().openSelector();
    }
public abstract class ServerSocketChannel
    extends AbstractSelectableChannel
    implements NetworkChannel
{

    /**
     * Initializes a new instance of this class.
     *
     * @param  provider
     *         The provider that created this channel
     */
    protected ServerSocketChannel(SelectorProvider provider) {
        super(provider);
    }

    /**
     * Opens a server-socket channel.
     *
     * <p> The new channel is created by invoking the {@link
     * java.nio.channels.spi.SelectorProvider#openServerSocketChannel
     * openServerSocketChannel} method of the system-wide default {@link
     * java.nio.channels.spi.SelectorProvider} object.
     *
     * <p> The new channel's socket is initially unbound; it must be bound to a
     * specific address via one of its socket's {@link
     * java.net.ServerSocket#bind(SocketAddress) bind} methods before
     * connections can be accepted.  </p>
     *
     * @return  A new socket channel
     *
     * @throws  IOException
     *          If an I/O error occurs
     */
    public static ServerSocketChannel open() throws IOException {
        return SelectorProvider.provider().openServerSocketChannel();
    }

SelectorProvider是java中封装好的多路复用器实现,其中有一个成员变量provider,是SelectorProvider实现类的实例。
SelectorProvider提供了下面几种方式来初始化SelectorProvider,即决定使用哪一个Selector多路复用器实现类.(可以理解为就是决定底层使用哪一种网络IO模型,这个我们在前面的文章已经详细学习过了)

public static SelectorProvider provider() {
        synchronized (lock) {
            if (provider != null)
                return provider;
            return AccessController.doPrivileged(
                new PrivilegedAction<SelectorProvider>() {
                    public SelectorProvider run() {
                            if (loadProviderFromProperty())
                                return provider;
                            if (loadProviderAsService())
                                return provider;
                                //使用默认的SelectorProvider
                            provider = sun.nio.ch.DefaultSelectorProvider.create();
                            return provider;
                        }
                    });
        }
    }
private static boolean loadProviderFromProperty() {
        //获取系统配置(需要配置一个实现类的全路径名)
        String cn = System.getProperty("java.nio.channels.spi.SelectorProvider");
        if (cn == null)
            return false;
        try {
            //通过反射实例化selector实现类
            Class<?> c = Class.forName(cn, true,
                                       ClassLoader.getSystemClassLoader());
            provider = (SelectorProvider)c.newInstance();
            return true;
        } catch (ClassNotFoundException x) {
            throw new ServiceConfigurationError(null, x);
        } catch (IllegalAccessException x) {
            throw new ServiceConfigurationError(null, x);
        } catch (InstantiationException x) {
            throw new ServiceConfigurationError(null, x);
        } catch (SecurityException x) {
            throw new ServiceConfigurationError(null, x);
        }
    }
package sun.nio.ch;

import java.nio.channels.spi.SelectorProvider;

public class DefaultSelectorProvider {
    private DefaultSelectorProvider() {
    }

    public static SelectorProvider create() {
    //我当前使用的os是win10,所以使用的是windown上的多路复用器
        return new WindowsSelectorProvider();
    }
}


public class WindowsSelectorProvider extends SelectorProviderImpl {
    public WindowsSelectorProvider() {
    }
    //openSelector()调用后就可以获取到window平台支持的多路复用器 
    public AbstractSelector openSelector() throws IOException {
        return new WindowsSelectorImpl(this);
    }
}

SelectorProvider的实现类WindowsSelectorProvider的父类是SelectorProviderImpl,父类中有如下的方法:

public abstract class SelectorProviderImpl extends SelectorProvider {
    public SelectorProviderImpl() {
    }

    public DatagramChannel openDatagramChannel() throws IOException {
        return new DatagramChannelImpl(this);
    }

    public DatagramChannel openDatagramChannel(ProtocolFamily var1) throws IOException {
        return new DatagramChannelImpl(this, var1);
    }

    public Pipe openPipe() throws IOException {
        return new PipeImpl(this);
    }

    public abstract AbstractSelector openSelector() throws IOException;

    //创建服务端listen_socket,并且关联当前选择器  ServerSocketChannelImpl是ServerSocketChannel的实现类
    public ServerSocketChannel openServerSocketChannel() throws IOException {
        return new ServerSocketChannelImpl(this);
    }

    public SocketChannel openSocketChannel() throws IOException {
        return new SocketChannelImpl(this);
    }
}

前面selector选择器的创建也是一样的逻辑,最终openSelector执行在windows上获取到的就是WindowsSelectorImpl的实例

    selector = Selector.open();

    public static Selector open() throws IOException {
        return SelectorProvider.provider().openSelector();
    }
    
    

但是如果是mac版本的jdk就是下面的默认实现, 也就是之前我们说的Kqueue,这是一种和epoll机制类似的多路复用的事件通知类的IO模型,我们可以当做是epoll。

package sun.nio.ch;

import java.nio.channels.spi.SelectorProvider;

public class DefaultSelectorProvider {
    private DefaultSelectorProvider() {
    }

    public static SelectorProvider create() {
        return new KQueueSelectorProvider();
    }
}

下面我们再跟一下selector调用的select()方法 (其实就是对select函数的封装,和前面介绍linux中的io多路复用的系统调用函数思路一致,只不过在java中进行了封装)

sun.nio.ch.SelectorImpl:

    public int select() throws IOException {
        return select(0);
    }

这里传入的参数0L是timeout参数,即前面文章中我们解析的select()系统调用的参数
这里需要注意的是,封装后的select函数的timeout参数如果传入的是0,那么实际上调用的时候传入的是-1,逻辑如下:

    public int select(long timeout)
        throws IOException
    {
        if (timeout < 0)
            throw new IllegalArgumentException("Negative timeout");
        return lockAndDoSelect((timeout == 0) ? -1 : timeout);
    }

按照前面解析的的linux的select函数,上面如果传入一个timeout是正数,那么经过timeout时间后还没有状态改变的fd,那么就直接返回,另外这里java封装的select()函数传入0,则实际调用lockAndDoSelect传入的参数是-1,因为在jni中定义的是这样的:

// Java timeout == -1 : wait forever : timespec timeout of NULL
// Java timeout == 0  : return immediately

所以上面的public int select(long timeout)方法是阻塞的,按照linux中select的功能,select函数也可以设置为立即返回(这里立即返回是因为select模型下只会对fd集合进行一次遍历或者在epoll模型下只会检测一次当前就绪队列是否有数据),当然在java中也有封装,也就是下面的方法:

    public int selectNow() throws IOException {
        return this.lockAndDoSelect(0L);
    }

可以看到这里直接传入的是lockAndDoSelect(0L),也就是真实传递到jni调用中的timeout的值是0。

    private int lockAndDoSelect(long var1) throws IOException {
        synchronized(this) {
            if (!this.isOpen()) {
                throw new ClosedSelectorException();
            } else {
                int var10000;
                synchronized(this.publicKeys) {
                    synchronized(this.publicSelectedKeys) {
                        var10000 = this.doSelect(var1);
                    }
                }

                return var10000;
            }
        }
    }
 protected abstract int doSelect(long var1) throws IOException;

可以看到最终调用的是SelectorImpl中的抽象方法doSelect,也就是我们调用open后去一个selector实现类中实现的doSelect方法,在windows上也就是WindowsSelectorImpl中的下面的方法

protected int doSelect(long var1) throws IOException {
        if (this.channelArray == null) {
            throw new ClosedSelectorException();
        } else {
            this.timeout = var1;
            this.processDeregisterQueue();
            if (this.interruptTriggered) {
                this.resetWakeupSocket();
                return 0;
            } else {
                this.adjustThreadsCount();
                this.finishLock.reset();
                this.startLock.startThreads();

                try {
                    this.begin();

                    try {
                        //调用window下的select
                        this.subSelector.poll();
                    } catch (IOException var7) {
                        this.finishLock.setException(var7);
                    }

                    if (this.threads.size() > 0) {
                        this.finishLock.waitForHelperThreads();
                    }
                } finally {
                    this.end();
                }

                this.finishLock.checkForException();
                this.processDeregisterQueue();
                int var3 = this.updateSelectedKeys();
                this.resetWakeupSocket();
                return var3;
            }
        }
    }

上面我们看到的是window平台上的DefaultSelectorProvider的实现,下面是linux环境下的jdk中的实现

public class DefaultSelectorProvider {
    private DefaultSelectorProvider() {
    }

    //多了一个可以传入全路径名来指定使用哪一个多路复用器
    private static SelectorProvider createProvider(String var0) {
        Class var1;
        try {
            var1 = Class.forName(var0);
        } catch (ClassNotFoundException var4) {
            throw new AssertionError(var4);
        }

        try {
            return (SelectorProvider)var1.newInstance();
        } catch (InstantiationException | IllegalAccessException var3) {
            throw new AssertionError(var3);
        }
    }

    //默认的无参的创建方法
    public static SelectorProvider create() {
        String var0 = (String)AccessController.doPrivileged(new GetPropertyAction("os.name"));
        if (var0.equals("SunOS")) {
            return createProvider("sun.nio.ch.DevPollSelectorProvider");
        } else {
            return (SelectorProvider)(var0.equals("Linux") ? createProvider("sun.nio.ch.EPollSelectorProvider") : new PollSelectorProvider());
        }
    }
}

可以看到,如果是Linux系统,则使用Epoll(激动的心,颤抖的手,epoll大佬终于登场)

ps:SunOS是Sun公司研发的操作系统的最初叫法,之后也称为Solaris,它被认为是UNIX操作系统的衍生版本之一。

EPollSelectorProvider的实现如下:

public class EPollSelectorProvider extends SelectorProviderImpl {
    public EPollSelectorProvider() {
    }

    public AbstractSelector openSelector() throws IOException {
        return new EPollSelectorImpl(this);
    }

    public Channel inheritedChannel() throws IOException {
        return InheritedChannel.getChannel();
    }
}

EPollSelectorImpl也是SelectorImpl的实现类,也是实现了最终调用的doSelect方法

protected int doSelect(long var1) throws IOException {
        if (this.closed) {
            throw new ClosedSelectorException();
        } else {
            this.processDeregisterQueue();

            try {
                this.begin();
                this.pollWrapper.poll(var1);
            } finally {
                this.end();
            }

            this.processDeregisterQueue();
            int var3 = this.updateSelectedKeys();
            if (this.pollWrapper.interrupted()) {
                this.pollWrapper.putEventOps(this.pollWrapper.interruptedIndex(), 0);
                synchronized(this.interruptLock) {
                    this.pollWrapper.clearInterrupted();
                    IOUtil.drain(this.fd0);
                    this.interruptTriggered = false;
                }
            }

            return var3;
        }
    }

该doSelect方法肯定也是封装的epoll系列的函数调用,没错就是这样,这里的封装主要是在EPollArrayWrapper中
下面是epoll的三种系统调用

int epoll_create(int size);
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);

在java中分别对应了下面三个jni调用

    private native int epollCreate();

    private native void epollCtl(int var1, int var2, int var3, int var4);

    private native int epollWait(long var1, int var3, long var4, int var6) throws IOException;

上面我们说的实际可以传入一个正数表示一个很短的超时时间,这样就可以实现用户线程不用阻塞,直接返回。这其实也是封装的操作系统的IO模型操作,传入的超时参数实现的,

Linux 的 select 和 epoll_wait 都可以通过设置超时参数实现 非阻塞立即返回:

int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);

int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);

两者均适用于用户程序进程进行非阻塞轮询场景,但需注意空转可能导致的CPU占用问题,通常需要结合业务逻辑优化(如短暂休眠)。epoll_wait 更推荐用于高并发场景(如服务器),而 select 适合兼容性要求较高的简单应用。