Java NIO IO多路复用

/ 默认分类 / 0 条评论 / 807浏览

JAVA.NIO

1. 简介

NIO(new io) 是从jdk1.4之后引入的新的IO操作包,相比于传统的IO操作,NIO中主要的不同是引入了缓冲区,channel通道和selector选择器的 概念,这里你可能会说,缓冲区在传统的IO模型中也有响应的实现,比如BufferedWriter和BufferedReader,的确如此,只不过NIO中对于缓冲区的 操作会更丰富,并且NIO中使用的是channel和缓冲区连通的,传统的IO使用的是输入输出流.

NIO是一种非阻塞io,它将数据写入在缓冲区中,在读取数据的时候,如果缓冲区中的数据为空那么会直接返回,没有读取到任何数据,不会发生阻塞 但是NIO属于同步io,主要是因为selector选择器会阻塞,原理其实和之前分析的linux中的io模型select,poll,epoll类似

我总结了下面这张图,可以先简单过一下,等看完本文应该就会有更深的体会

2. 主要概念

javaNIO中主要的几个概念如下:

  1. Channel
    通道, 表示的是一个关联硬件设备或文件或网络socketfd的连接,channel是双向的,可写可读,这一点是和流最大的区别, 流
  2. Buffer
    缓冲区,包括众多操作api,可以很方便的对缓冲区进行操作,在读取数据的时候,先从通道channel读取数据到buffer,写数据时也是先将数据写入缓冲区 然后再使用channel写入文件中
    3.Selector 选择器,这个我们后面再介绍,主要涉及到IO多路复用模型

综上,Channel只是连接文件或网络io的连接,读写的数据最终还是要经过缓冲区buffer,所以获取数据,操作数据都需要依靠buffer,所以下面我们 需要详细熟悉buffer缓冲区的操作

这里也就说明了一点,在传统IO操作中,因为直接面向流,只能顺序的从流中一个字节或多个字节地读取数据,期间需要对数据进行移动和暂存需要 在用户程序自行处理,但是nio中提供的buffer缓冲区是对数组的封装,其中添加了几个指针,进而为buffer加入了很多增强操作,可以从channel中 将数据全部放入缓冲区,也可以将一部分数据放入缓冲区,然后读取一部分之后,在缓冲区保留一部分未处理的,然后接着读取,这些操作都要求程序 对缓冲区正确地操作

3. Buffer

buffer按照缓冲区中存储的数据类型可以分为下面几种实现:
ByteBuffer, CharBuffer, DoubleBuffer, FloatBuffer, IntBuffer, LongBuffer, ShortBuffer

  1. buffer中的几个关键指针位置

指针名称 | 含义
---|--- Capacity | 容量,即可以容纳的最大数据量;在缓冲区创建时被设定并且不能改变
Limit | 表示缓冲区的当前终点,不能对缓冲区超过极限的位置进行读写操作。且极限是可以修改的
Position | 位置,下一个要被读或写的元素的索引,每次读写缓冲区数据时都会改变改值,为下次读写作准备
Mark | 标记,调用mark()来设置mark=position,再调用reset()可以让position恢复到标记的位置

所以,mark<=position<=limit<=capacity

  1. buffer中几个关键的操作
     * Flips this buffer.  The limit is set to the current position and then
     * the position is set to zero.  If the mark is defined then it is
     * discarded.
     *
     * <p> After a sequence of channel-read or <i>put</i> operations, invoke
     * this method to prepare for a sequence of channel-write or relative
     * <i>get</i> operations.  For example:
     *
     * <blockquote><pre>
     * buf.put(magic);    // Prepend header
     * in.read(buf);      // Read data into rest of buffer
     * buf.flip();        // Flip buffer
     * out.write(buf);    // Write header + data to channel</pre></blockquote>,也就是可以将buffer中的数据写入到channel了  
public static void main(String[] args) throws IOException {
        String path = System.getProperty("user.dir");
        System.out.println(path);
        FileInputStream fileInputStream = new FileInputStream(new File(path+"\\test.txt"));
        FileOutputStream fileOutputStream = new FileOutputStream(new File(path+"\\test_copy.txt"));
        FileChannel channel = fileInputStream.getChannel();
        FileChannel fileOutChannel = fileOutputStream.getChannel();
        //创建一个缓冲区buffer,大小为1024字节
        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
        //从连接test.txt的通道读取数据到缓冲区,该read返回值为读取到的字节数,如果到了文件末尾则返回-1
        //第一次调用channel.read(byteBuffer),会将test.txt文件从头读取1024字节的数据,read方法返回1024,此时缓冲区中position=1024,limit=1024
        //因为文件为2kb,所以第一次直接调用read()读取会塞满缓冲区
        while (channel.read(byteBuffer)!=-1){
            //flip之后.position=0,limit=1024,这样channel就可以读取缓缓区中的数据了,能够读取的范围是0-1024
            byteBuffer.flip();
            //读取完之后position=1024,limit=1024  但是此时缓冲区数据还是存在于其中的,只是position位置变化了
            //所以下面的byteBuffer.array()还是会读取到数据
            fileOutChannel.write(byteBuffer);
            System.out.println(new String(byteBuffer.array()));
        }
        /**
         * 但是,当再次进入while中,因为bytebuffer是满的,position=1024=limit=capacity(再次写会从position位置后开始写入缓冲区),所以这次channel.read(byteBuffer)返回值为0,
         * 表示没有读取到数据,但是还是会进入while循环中,byteBuffer.flip();之后,也就等于重复第一次while了,因为第一次的读入缓冲区
         * 的数据一直都在,第二次读取又没能读取到数据,所以上面的代码等于会无限次写入同样的1024字节的数据到test_copy.txt文件中,但是
         * 如果在fileOutChannel.write(byteBuffer);之后添加一句byteBuffer.clear();那么便可以正常复制文件数据了,因为clear()之后
         * 缓冲区的postion会回到原始位置,即position=0(注意,clear()之后也只是移动了指针位置,数据还在缓冲区,缓冲区的数据只会被再次写入覆盖,不会被清除)
         */
    }

所以正确的复制方式如下:

public static void main(String[] args) throws IOException {
        String path = System.getProperty("user.dir");
        System.out.println(path);
        FileInputStream fileInputStream = new FileInputStream(new File(path+"\\test.txt"));
        FileOutputStream fileOutputStream = new FileOutputStream(new File(path+"\\test_copy.txt"));
        FileChannel channel = fileInputStream.getChannel();
        FileChannel fileOutChannel = fileOutputStream.getChannel();
        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
        while (channel.read(byteBuffer)!=-1){
            byteBuffer.flip();
            fileOutChannel.write(byteBuffer);
            byteBuffer.clear();
            System.out.println(new String(byteBuffer.array()));
        }
    }

4. 网络IO

(1) 服务端使用单线程,不断监听客户端连接,但是这样,服务端还是每一时刻只能处理一个客户端连接,因为服务端read会阻塞

Server:

public class MyServer {

    public static void main(String[] args) throws IOException {
        byte[] data = new byte[1024];
        ServerSocket serverSocket = new ServerSocket(8080);
        System.out.println("服务端已创建,准备接收客户端连接...");
        while (true){
            Socket socket = serverSocket.accept();
            System.out.println("客户端已经连接");
            socket.getInputStream().read(data);
            System.out.println("收到了来自客户端的数据:"+new String(data));
        }
    }

}

Client:

public class MyClient {
    public static void main(String[] args) throws IOException, InterruptedException {
        Random random = new Random(1);
        for (int i = 0; i < 5; i++) {
            Socket socket = new Socket("127.0.0.1",8080);
            System.out.println("客户端已经连接上服务端,准备发送数据...");
            TimeUnit.SECONDS.sleep(3);
            socket.getOutputStream().write(String.valueOf(random.nextInt()).getBytes());
            System.out.println("客户端已经发送数据完毕");
            socket.close();
        }
    }
}
客户端已经连接上服务端,准备发送数据...
客户端已经连接上服务端,准备发送数据...
客户端已经连接上服务端,准备发送数据...
客户端已经连接上服务端,准备发送数据...
客户端已经连接上服务端,准备发送数据...
客户端已经发送数据完毕
客户端已经发送数据完毕
客户端已经发送数据完毕
客户端已经发送数据完毕
客户端已经发送数据完毕


服务端已创建,准备接收客户端连接...
客户端已经连接
收到了来自客户端的数据:1761283695
客户端已经连接
收到了来自客户端的数据:-1155869325
客户端已经连接
收到了来自客户端的数据:43152917625
客户端已经连接
收到了来自客户端的数据:89212850825
客户端已经连接
收到了来自客户端的数据:17499406265

(2)

如果Server端为每一个客户端连接创建一个线程进行处理

Server:

public class MyServer {

    public static void main(String[] args) throws IOException {
        byte[] data = new byte[1024];
        ServerSocket serverSocket = new ServerSocket(8080);
        System.out.println("服务端已创建,准备接收客户端连接...");
        while (true){
            Socket socket = serverSocket.accept();
            System.out.println("客户端已经连接,新开线程处理");
            new Thread(() -> {
                try {
                    socket.getInputStream().read(data);
                } catch (IOException e) {
                    e.printStackTrace();
                }
                System.out.println("收到了来自客户端的数据:"+new String(data));
            }).start();
        }
    }

}
客户端已经连接上服务端,准备发送数据...
客户端已经连接上服务端,准备发送数据...
客户端已经连接上服务端,准备发送数据...
客户端已经连接上服务端,准备发送数据...
客户端已经连接上服务端,准备发送数据...
客户端已经发送数据完毕
客户端已经发送数据完毕
客户端已经发送数据完毕
客户端已经发送数据完毕
客户端已经发送数据完毕



服务端已创建,准备接收客户端连接...
客户端已经连接,新开线程处理
客户端已经连接,新开线程处理
客户端已经连接,新开线程处理
客户端已经连接,新开线程处理
客户端已经连接,新开线程处理
收到了来自客户端的数据:-1155869325
收到了来自客户端的数据:43152917625
收到了来自客户端的数据:89212850825
收到了来自客户端的数据:17612836955
收到了来自客户端的数据:17499406265

可以看出来,这样服务端可以同时处理多个客户端请求

(3)
上面的第二种方式就是传统的解决阻塞IO的方式,服务端为每一个连接开启一个线程处理,但是这样处理会出现很多问题,如果客户端连接很多,那么服务端资源 压力会很大,并且如果客户端很多连接都不做任何数据交互操作只是连接到服务端,这样就会造成很多资源浪费,但是不用多线程处理并发要怎样做呢?于是:

下面这样使用NIO来处理和第一种也是一样的,会发生连接和读写阻塞

服务端启动后会不断打印'等待客户端连接',之后客户端启动后,会一次性和客户端建立5个连接(因为客户端代码中发送数据中的线程睡眠是开启的子线程),但是第一个连接和服务端连上之后,accept返回,此时if为true,进入if逻辑,但是该客户端连接需要等待3s才会发送数据,所以服务端会阻塞在read,当3s过后,客户端发送数据到服务端,并且其他客户端发送数据的线程也睡眠完毕了,所以此时就会一下出现五个数据接收.

ps: 这里就涉及到了之前我的篇文章中介绍的建立socket通信的过程,服务端会维持一个syn queue,即未建立连接成功队列(三次握手还没有结束),其实在我们这段java代码中的bind方法中的第二个参数(参数名为backlog)就是该队列的大小,因为队列 大小设置为20,所以客户端5次与服务端建立连接,虽然服务端阻塞在了第一个连接的read,但是没关系,此时虽然无法accept,但是客户端等于和服务端完成了第一次握手,然后服务端将这些客户端连接放入了syn queue队列中,所以可以看到即使服务端阻塞了,客户端还是等于和服务端建立了连接一样

public class MyNioServer {

    public static void main(String[] args) throws IOException {
        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.bind(new InetSocketAddress(8080),20);
        while (true){
            System.out.println("等待客户端连接");
            //accept()会发生阻塞
            SocketChannel socketChannel = serverSocketChannel.accept();
            if(Objects.nonNull(socketChannel)){
                System.out.println("客户端已连接");
                socketChannel.read(byteBuffer);
                System.out.println("收到客户端数据"+new String(byteBuffer.array()));
                byteBuffer.clear();
            }
        }
    }
}
        等待客户端连接 
        客户端已连接
        收到客户端数据-1155869325
        等待客户端连接
        客户端已连接
        收到客户端数据17612836955
        等待客户端连接
        客户端已连接
        收到客户端数据43152917655
        等待客户端连接
        客户端已连接
        收到客户端数据17499406265
        等待客户端连接
        客户端已连接
        收到客户端数据89212850865
        等待客户端连接
        
        
        客户端已经连接上服务端,准备发送数据...
        客户端已经连接上服务端,准备发送数据...
        客户端已经连接上服务端,准备发送数据...
        客户端已经连接上服务端,准备发送数据...
        客户端已经连接上服务端,准备发送数据...
        客户端已经发送数据完毕
        客户端已经发送数据完毕
        客户端已经发送数据完毕
        客户端已经发送数据完毕
        客户端已经发送数据完毕

下面我们将服务端的backlog设置为2,首先按照原理,此时当服务端阻塞后,客户端应该只会有两个可以和服务端"建立连接"(synqueue)

可以看到,只会有三个客户端会连接上,第一个连接会直接和服务端握手成功,进入accept queue,其后两个也会进行第一次握手,进入sync queue,对于客户端来说 也是建立了连接的,之后服务端sync queue满了,客户端还想继续连接,此时就会出现服务端拒绝连接,代码执行结果可以看出此时的socket是上一次的socket对象 ,这是因为本次执行出错,该变量引用没有被替换; 连接被拒绝自然就会出现之后的数据发送失败,因为第一次握手都没有成功

服务端代码如下:

public class MyNioServer {

    public static void main(String[] args) throws IOException {
        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.bind(new InetSocketAddress(8080),2);
        serverSocketChannel.configureBlocking(false);
        while (true){
//            System.out.println("等待客户端连接");
            SocketChannel socketChannel = serverSocketChannel.accept();
            if(Objects.nonNull(socketChannel)){
                System.out.println("客户端已连接");
                socketChannel.read(byteBuffer);
                System.out.println("收到客户端数据"+new String(byteBuffer.array()));
                byteBuffer.clear();
            }
        }
    }
}

客户端代码如下:

public class MyClient {
    public static void main(String[] args) throws IOException, InterruptedException {
        Random random = new Random(1);
        Socket socket = null;
        for (int i = 0; i < 5; i++) {
            try{
                socket = new Socket("127.0.0.1",8080);
                System.out.println(socket+"客户端已经连接上服务端,准备发送数据...");
            }catch(Exception e)
            {
                System.out.println(socket+"服务端sync queue已满,"+e.getMessage());
            }
            Socket finalSocket = socket;
            new Thread(() -> {
                try {
                    TimeUnit.SECONDS.sleep(30);
                    finalSocket.getOutputStream().write(String.valueOf(random.nextInt()).getBytes());
                    System.out.println(finalSocket+"客户端已经发送数据完毕");
                    finalSocket.close();
                } catch (Exception e) {
                    System.out.println(finalSocket+"客户端发送数据失败,"+e.getMessage());
                }
            }).start();
        }
    }
}
客户端已连接
收到客户端数据-1155869325                                                                                                                     客户端已连接
收到客户端数据17612836955                                                                                                                    
客户端已连接
收到客户端数据43152917655


Socket[addr=/127.0.0.1,port=8080,localport=52300]客户端已经连接上服务端,准备发送数据...
Socket[addr=/127.0.0.1,port=8080,localport=52301]客户端已经连接上服务端,准备发送数据...
Socket[addr=/127.0.0.1,port=8080,localport=52302]客户端已经连接上服务端,准备发送数据...
Socket[addr=/127.0.0.1,port=8080,localport=52302]服务端sync queue已满,Connection refused: connect
Socket[addr=/127.0.0.1,port=8080,localport=52302]服务端sync queue已满,Connection refused: connect
Socket[addr=/127.0.0.1,port=8080,localport=52300]客户端已经发送数据完毕
Socket[addr=/127.0.0.1,port=8080,localport=52302]客户端已经发送数据完毕
Socket[addr=/127.0.0.1,port=8080,localport=52301]客户端已经发送数据完毕
Socket[addr=/127.0.0.1,port=8080,localport=52302]客户端发送数据失败,Socket is closed
Socket[addr=/127.0.0.1,port=8080,localport=52302]客户端发送数据失败,Socket is closed

原理还是上篇文章中介绍的socket连接细节,在服务端accept之后,服务端维护的其实是另一个socketfd(connectsocketfd),设置该socket为非阻塞,这样read操作 就不会阻塞,但是如果直接在服务端中添加socketChannel.configureBlocking(false); 那么服务端会接收不到任何数据,为什么呢?可以来看下代码:

public class MyClient {
    public static void main(String[] args) throws IOException, InterruptedException {
        Random random = new Random(1);
        Socket socket = null;
        for (int i = 0; i < 5; i++) {
            try{
                socket = new Socket("127.0.0.1",8080);
                System.out.println(socket+"客户端已经连接上服务端,准备发送数据...");
            }catch(Exception e)
            {
                System.out.println(socket+"服务端sync queue已满,"+e.getMessage());
            }
            Socket finalSocket = socket;
            new Thread(() -> {
                try {
//                    TimeUnit.SECONDS.sleep(3);
                    finalSocket.getOutputStream().write(String.valueOf(random.nextInt()).getBytes());
                    System.out.println(finalSocket+"客户端已经发送数据完毕");
                    finalSocket.close();
                } catch (Exception e) {
                    System.out.println(finalSocket+"客户端发送数据失败,"+e.getMessage());
                }
            }).start();
        }
    }
}
public class MyNioServer {

    public static void main(String[] args) throws IOException {
        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.bind(new InetSocketAddress(8080),20);
        serverSocketChannel.configureBlocking(false);
        while (true){
//            System.out.println("等待客户端连接");
            SocketChannel socketChannel = serverSocketChannel.accept();
            if(Objects.nonNull(socketChannel)){
                socketChannel.configureBlocking(false);
                System.out.println("客户端已连接");
                int readResult = socketChannel.read(byteBuffer);
                if(readResult != -1 && readResult != 0){
                    System.out.println("收到客户端数据"+new String(byteBuffer.array()));
                }else {
                    System.out.println("未接收到客户端发送的数据");
                }
                byteBuffer.clear();
            }
        }
    }
}

上面我们把客户端发送数据的3秒睡眠去掉,但是还是会接收不到数据,这里问题就出在,服务端accept之后,read是非阻塞的,这样read会立即执行,所以需要要求客户端在执行完连接服务端之后,立即写数据到服务端(保证在服务端read之前发送数据),但是上面的程序我们开启了子线程去发送数据,这样新建线程会消耗时间,所以会导致服务端什么数据都接收不到,如果直接在父线程中发送数据,此时就会有随机性,可能五个连接都没有数据,可能有几个连接有数据

Socket[addr=/127.0.0.1,port=8080,localport=56132]客户端已经连接上服务端,准备发送数据...
Socket[addr=/127.0.0.1,port=8080,localport=56132]客户端已经发送数据完毕
Socket[addr=/127.0.0.1,port=8080,localport=56134]客户端已经连接上服务端,准备发送数据...
Socket[addr=/127.0.0.1,port=8080,localport=56134]客户端已经发送数据完毕
Socket[addr=/127.0.0.1,port=8080,localport=56135]客户端已经连接上服务端,准备发送数据...
Socket[addr=/127.0.0.1,port=8080,localport=56135]客户端已经发送数据完毕
Socket[addr=/127.0.0.1,port=8080,localport=56136]客户端已经连接上服务端,准备发送数据...
Socket[addr=/127.0.0.1,port=8080,localport=56136]客户端已经发送数据完毕
Socket[addr=/127.0.0.1,port=8080,localport=56137]客户端已经连接上服务端,准备发送数据...
Socket[addr=/127.0.0.1,port=8080,localport=56137]客户端已经发送数据完毕

客户端已连接
收到客户端数据-1155869325
客户端已连接
收到客户端数据43152917625
客户端已连接
收到客户端数据17612836955
客户端已连接
未接收到客户端发送的数据
客户端已连接
未接收到客户端发送的数据

有时候服务端也会这样:  
客户端已连接
未接收到客户端发送的数据
客户端已连接
未接收到客户端发送的数据
客户端已连接
未接收到客户端发送的数据
客户端已连接
未接收到客户端发送的数据
客户端已连接
未接收到客户端发送的数据

之所以发生上面的问题,原因很简单,因为服务端接收到连接后,read方法又没有阻塞,所以导致如果客户端没有在执行完read之前发送完数据,服务端就会弄丢这个 客户端连接的socket,所以就算之后客户端发送了数据,也无法读取到,那么怎样解决呢?很明显,将客户端的socket保存起来不就可以了



import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.TreeMap;

/**
 * @author zuohui
 */
public class MyNioServer {

    public static void main(String[] args) throws IOException {
        List<SocketChannel> clients = new ArrayList<>();
        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.bind(new InetSocketAddress(8080),20);
        serverSocketChannel.configureBlocking(false);
        while (true){
//            System.out.println("等待客户端连接");
            SocketChannel socketChannel = serverSocketChannel.accept();
            if(Objects.nonNull(socketChannel)) {
                clients.add(socketChannel);
                socketChannel.configureBlocking(false);
                System.out.println("客户端已连接");
            }
            for (SocketChannel channel : clients) {
                int readResult = channel.read(byteBuffer);
                if(readResult != -1 && readResult != 0){
                    System.out.println("收到客户端数据"+new String(byteBuffer.array()));
                }else {
//                    System.out.println("未接收到客户端发送的数据");
                }
                byteBuffer.clear();
            }
        }
    }
}


public class MyClient {
    public static void main(String[] args) throws IOException, InterruptedException {
        Random random = new Random(1);
        Socket socket = null;
        for (int i = 0; i < 5; i++) {
            try{
                socket = new Socket("127.0.0.1",8080);
                System.out.println(socket+"客户端已经连接上服务端,准备发送数据...");
            }catch(Exception e)
            {
                System.out.println(socket+"服务端sync queue已满,"+e.getMessage());
            }
            Socket finalSocket = socket;
            new Thread(() -> {
                try {
                    TimeUnit.SECONDS.sleep(1);
                    finalSocket.getOutputStream().write(String.valueOf(random.nextInt()).getBytes());
                    System.out.println(finalSocket+"客户端已经发送数据完毕");
                    finalSocket.close();
                } catch (Exception e) {
                    System.out.println(finalSocket+"客户端发送数据失败,"+e.getMessage());
                }
            }).start();
        }
    }
}
Socket[addr=/127.0.0.1,port=8080,localport=53245]客户端已经连接上服务端,准备发送数据...
Socket[addr=/127.0.0.1,port=8080,localport=53247]客户端已经连接上服务端,准备发送数据...
Socket[addr=/127.0.0.1,port=8080,localport=53248]客户端已经连接上服务端,准备发送数据...
Socket[addr=/127.0.0.1,port=8080,localport=53249]客户端已经连接上服务端,准备发送数据...
Socket[addr=/127.0.0.1,port=8080,localport=53250]客户端已经连接上服务端,准备发送数据...
Socket[addr=/127.0.0.1,port=8080,localport=53245]客户端已经发送数据完毕
Socket[addr=/127.0.0.1,port=8080,localport=53247]客户端已经发送数据完毕
Socket[addr=/127.0.0.1,port=8080,localport=53248]客户端已经发送数据完毕
Socket[addr=/127.0.0.1,port=8080,localport=53249]客户端已经发送数据完毕
Socket[addr=/127.0.0.1,port=8080,localport=53250]客户端已经发送数据完毕


客户端已连接
客户端已连接
客户端已连接
客户端已连接
客户端已连接
收到客户端数据-1155869325
收到客户端数据43152917625
收到客户端数据17612836955
收到客户端数据17499406265
收到客户端数据89212850865

可以看到,即使在客户端连接后,过了1s后再发送请求,服务端也可以在客户端准备好发送的时候准确接收到数据

但事实上,这还不是最好的解决办法,至少这不是目前的IO多路复用模型中的方法,为什么呢?
我们上面的方法,是在用户空间维护一份客户端socket,然后在用户程序中通过全部轮询的方法来寻找fd状态发生变化的socket,这样做需要cpu在内核和用户空间来回进行socket数据的拷贝,另外需要进行多次内核态和用户态的切换

5. IO多路复用模型

参考我的博客文章

6. Selector

前面介绍了缓冲区buffer,通道channel,下面介绍nio中一个最为重要的概念选择器Selector
前面的文章介绍了IO多路复用模型,这样就比较好理解selector了,selector的功能就是一个io多路复用器,很多的channel可以注册通过注册不同的事件将自己和 selector绑定,这样selector调用指定的方法就可以获取到当前注册到该selector的所有相应事件已经就绪的通道,这样就可以实现单线程并发,听了是不是感觉很像IO多路复用模型select,poll,epoll这些,是的,没错,javanio实际利用的就是这些IO复用模型

下面实现的是一个简单的nio服务端模型(参考netty权威教程)

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Set;

/**
 * @date 2021/7/6 - 20:46
 */
public class MultiplexerServer implements Runnable{

    //选择器
    Selector selector;

    //服务端listen_socket
    ServerSocketChannel serverSocketChannel;

    public MultiplexerServer(int serverPort) {
        try {
            selector = Selector.open();
            serverSocketChannel = ServerSocketChannel.open();
            //服务端connect_socket设置为非阻塞模式
            serverSocketChannel.configureBlocking(false);
            //绑定服务端监听端口和sync queue队列大小
            serverSocketChannel.socket().bind(new InetSocketAddress(serverPort),20);
            //为该服务端connect_socket注册客户端连接事件到selector上
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
            System.out.println("服务器已启动,端口:"+serverPort);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void run() {
        while (true){
            try {
                //等于执行了select的第一步,阻塞返回当前状态变化的fd的数量
                selector.select();
                //再次遍历,获取监听到的状态以及修改的socket
                Set<SelectionKey> keys = selector.selectedKeys();
                //遍历所有的就绪socket,并处理其中的事件
                Iterator<SelectionKey> iterator = keys.iterator();
                while (iterator.hasNext()){
                    SelectionKey key = iterator.next();
                    //处理后的监听事件一定需要从selector中的注册列表中删除,因为
                    iterator.remove();
                    handleRequest(key);
                }

            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    //处理筛选出的key
    private void handleRequest(SelectionKey key) throws IOException {
        if(key.isValid()){
            //如果是已连接状态
            if(key.isAcceptable()){
                ServerSocketChannel channel = (ServerSocketChannel) key.channel();
                SocketChannel socketChannel = channel.accept();
                socketChannel.configureBlocking(false);
                socketChannel.register(selector,SelectionKey.OP_READ);
                System.out.println("客户端已连接");
                //客户端连接上后向其发送响应数据
                doResponseWrite(socketChannel);
            }

            //如果是可读状态
            if(key.isReadable()){
                SocketChannel channel = (SocketChannel) key.channel();
                ByteBuffer buffer = ByteBuffer.allocate(1024);
                int read = channel.read(buffer);
                if(read > 0){
                    buffer.flip();
                    byte[] data = new byte[buffer.remaining()];
                    buffer.get(data);
                    System.out.println("服务器接收到数据:"+new String(data));
                }else if(read < 0){
                    key.cancel();
                    channel.close();
                }else{
                    System.out.println("无数据");
                }

            }
        }
    }

    private void doResponseWrite(SocketChannel channel) throws IOException {
        ByteBuffer dataForResp = ByteBuffer.allocate(1024);
        dataForResp.put("服务端响应的数据".getBytes());
        dataForResp.flip();
        channel.write(dataForResp);
    }
}

public class MyNioSelectorServerNetty {

    public static void main(String[] args) {
        MultiplexerServer server = new MultiplexerServer(8080);
        new Thread(server).start();
    }
}

上面就是一个简单的NIO服务端的实现,重点了解下面几个问题:
Q1: ServerSocketChannel和SocketChannel

前面我们介绍过,整个socket和tcp连接的关系,即整个网络连接,数据传输的过程,其中一个重要的知识点就是服务端socketfd的创建,可以查看这篇文章 就是说,服务端进行listen系统调用后会打开一个listen_socket_fd,之后客户端连接后会为该客户端生成一个对应的connect_socket_fd,java语言是一种支持跨平台的语言,其实就是因为java将很多底层操作,比如io中的read,wirte,网络中accept,listen都进行了封装,jvm发现当前的方法为native方法(即jni调用),就会加载相应的库,然后调用相应c库,这样设计让我们在java的源码中看不到真实的调用listen和accept,我们看到的都是native方法。说了这么多,其实上面的ServerSocketChannel和SocketChannel就是上面所说的listen_socket_fd和connect_socket_fd

首先创建一个服务端listen_socket
下面即为ServerSocketChannel的签名,是一个抽象类,

public abstract class ServerSocketChannel
    extends AbstractSelectableChannel
    implements NetworkChannel

查看ServerSocketChannel的源码可以发现这样一句话:
A selectable channel for stream-oriented listening sockets.
ServerSocketChannel是连通listen_socket的通道

很容易想到,SocketChannel 也就是连接connect_socket的通道
上面的nio服务端代码例子中可以看到:

            if(key.isAcceptable()){
                ServerSocketChannel channel = (ServerSocketChannel) key.channel();
                SocketChannel socketChannel = channel.accept();

当调用accept之后,即listen_socket不再负责和当前客户端交互,而是创建了一个connect_socket和客户端进行三次握手之后的数据交互,这个在前面的socket和tcp连接的文章中有解释过了,所以SocketChannel也就是连接connect_socket的通道,在其官方注释中也有这样一句话:
A selectable channel for stream-oriented connecting sockets.

Q2: ServerSocketChannel和Selector选择器,selector选择器是如何封装的

代码中ServerSocketChannel调用open来创建一个实例

    public static ServerSocketChannel open() throws IOException {
        return SelectorProvider.provider().openServerSocketChannel();
    }

因为是抽象类,所以这个返回的一定是一个子类
SelectorProvider是一个抽象类

public abstract class SelectorProvider {

    private static final Object lock = new Object();
    private static SelectorProvider provider = null;

SelectorProvider是java中封装好的多路复用器实现,其中有一个成员变量provider,是SelectorProvider实现类的实例。
SelectorProvider提供了下面几种方式来初始化SelectorProvider,即决定使用哪一个Selector多路复用器实现类

方法1:
按照系统配置中指定的参数来确定使用哪一种多路复用器

private static boolean loadProviderFromProperty() {
        //获取系统配置(需要配置一个实现类的全路径名)
        String cn = System.getProperty("java.nio.channels.spi.SelectorProvider");
        if (cn == null)
            return false;
        try {
            //通过反射实例化selector实现类
            Class<?> c = Class.forName(cn, true,
                                       ClassLoader.getSystemClassLoader());
            provider = (SelectorProvider)c.newInstance();
            return true;
        } catch (ClassNotFoundException x) {
            throw new ServiceConfigurationError(null, x);
        } catch (IllegalAccessException x) {
            throw new ServiceConfigurationError(null, x);
        } catch (InstantiationException x) {
            throw new ServiceConfigurationError(null, x);
        } catch (SecurityException x) {
            throw new ServiceConfigurationError(null, x);
        }
    }

方法2:
返回一个当前环境中默认的selector provider的实现类

public static SelectorProvider provider() {
        synchronized (lock) {
            if (provider != null)
                return provider;
            return AccessController.doPrivileged(
                new PrivilegedAction<SelectorProvider>() {
                    public SelectorProvider run() {
                            if (loadProviderFromProperty())
                                return provider;
                            if (loadProviderAsService())
                                return provider;
                                //使用默认的SelectorProvider
                            provider = sun.nio.ch.DefaultSelectorProvider.create();
                            return provider;
                        }
                    });
        }
    }
package sun.nio.ch;

import java.nio.channels.spi.SelectorProvider;

public class DefaultSelectorProvider {
    private DefaultSelectorProvider() {
    }

    public static SelectorProvider create() {
    //我当前使用的os是win10,所以使用的是windown上的多路复用器
        return new WindowsSelectorProvider();
    }
}


public class WindowsSelectorProvider extends SelectorProviderImpl {
    public WindowsSelectorProvider() {
    }
    //openSelector()调用后就可以获取到window平台支持的多路复用器 
    public AbstractSelector openSelector() throws IOException {
        return new WindowsSelectorImpl(this);
    }
}

SelectorProvider的实现类WindowsSelectorProvider的父类是SelectorProviderImpl,父类中有如下的方法:

public abstract class SelectorProviderImpl extends SelectorProvider {
    public SelectorProviderImpl() {
    }

    public DatagramChannel openDatagramChannel() throws IOException {
        return new DatagramChannelImpl(this);
    }

    public DatagramChannel openDatagramChannel(ProtocolFamily var1) throws IOException {
        return new DatagramChannelImpl(this, var1);
    }

    public Pipe openPipe() throws IOException {
        return new PipeImpl(this);
    }

    public abstract AbstractSelector openSelector() throws IOException;

    //创建服务端listen_socket,并且关联当前选择器  ServerSocketChannelImpl是ServerSocketChannel的实现类
    public ServerSocketChannel openServerSocketChannel() throws IOException {
        return new ServerSocketChannelImpl(this);
    }

    public SocketChannel openSocketChannel() throws IOException {
        return new SocketChannelImpl(this);
    }
}

前面selector选择器的创建也是一样的逻辑,最终openSelector执行在windows上获取到的就是WindowsSelectorImpl的实例

    selector = Selector.open();

    public static Selector open() throws IOException {
        return SelectorProvider.provider().openSelector();
    }
    
    

下面跟一下selector调用select()方法 (其实就是对select函数的封装,和前面介绍linux中的io多路复用的系统调用函数思路一致,只不过在java中进行了封装)

sun.nio.ch.SelectorImpl:

    public int select() throws IOException {
        return select(0);
    }

这里传入的参数0L是timeout参数,即前面文章中我们解析的select()系统调用的参数
这里需要注意的是,封装后的select函数的timeout参数如果传入的是0,那么实际上调用的时候传入的是-1,逻辑如下:

    public int select(long timeout)
        throws IOException
    {
        if (timeout < 0)
            throw new IllegalArgumentException("Negative timeout");
        return lockAndDoSelect((timeout == 0) ? -1 : timeout);
    }

按照前面解析的的linux的select函数,上面如果传入一个timeout是正数,那么经过timeout时间后还没有状态改变的fd,那么就直接返回,另外这里java封装的select()函数传入0,则实际调用lockAndDoSelect传入的参数是-1,因为在jni中定义的是这样的:

// Java timeout == -1 : wait forever : timespec timeout of NULL
// Java timeout == 0  : return immediately

所以上面的public int select(long timeout)方法是阻塞的,按照linux中select的功能,select函数也可以设置为立即返回(这里不要理解为非阻塞,因为select其实是阻塞的(关于阻塞和非阻塞,异步和同步可以看下我的这篇文章),只不过立即返回是因为只会对fd集合进行一次遍历),当然在java中也有封装,也就是下面的方法:

    public int selectNow() throws IOException {
        return this.lockAndDoSelect(0L);
    }

可以看到这里直接传入的是lockAndDoSelect(0L),也就是真实传递到jni调用中的timeout的值是0,也就是直接调用后立刻返回,关于如何实现的,在我的另一片文章中有深入解析(其实就是何时终止fd集合的无限轮询啦)

    private int lockAndDoSelect(long var1) throws IOException {
        synchronized(this) {
            if (!this.isOpen()) {
                throw new ClosedSelectorException();
            } else {
                int var10000;
                synchronized(this.publicKeys) {
                    synchronized(this.publicSelectedKeys) {
                        var10000 = this.doSelect(var1);
                    }
                }

                return var10000;
            }
        }
    }
 protected abstract int doSelect(long var1) throws IOException;

可以看到最终调用的是SelectorImpl中的抽象方法doSelect,也就是我们调用open后去一个selector实现类中实现的doSelect方法,在windows上也就是WindowsSelectorImpl中的下面的方法

protected int doSelect(long var1) throws IOException {
        if (this.channelArray == null) {
            throw new ClosedSelectorException();
        } else {
            this.timeout = var1;
            this.processDeregisterQueue();
            if (this.interruptTriggered) {
                this.resetWakeupSocket();
                return 0;
            } else {
                this.adjustThreadsCount();
                this.finishLock.reset();
                this.startLock.startThreads();

                try {
                    this.begin();

                    try {
                        //调用window下的select
                        this.subSelector.poll();
                    } catch (IOException var7) {
                        this.finishLock.setException(var7);
                    }

                    if (this.threads.size() > 0) {
                        this.finishLock.waitForHelperThreads();
                    }
                } finally {
                    this.end();
                }

                this.finishLock.checkForException();
                this.processDeregisterQueue();
                int var3 = this.updateSelectedKeys();
                this.resetWakeupSocket();
                return var3;
            }
        }
    }

上面我们看到的是window平台上的DefaultSelectorProvider的实现,下面是linux环境下的jdk中的实现

public class DefaultSelectorProvider {
    private DefaultSelectorProvider() {
    }

    //多了一个可以传入全路径名来指定使用哪一个多路复用器
    private static SelectorProvider createProvider(String var0) {
        Class var1;
        try {
            var1 = Class.forName(var0);
        } catch (ClassNotFoundException var4) {
            throw new AssertionError(var4);
        }

        try {
            return (SelectorProvider)var1.newInstance();
        } catch (InstantiationException | IllegalAccessException var3) {
            throw new AssertionError(var3);
        }
    }

    //默认的无参的创建方法
    public static SelectorProvider create() {
        String var0 = (String)AccessController.doPrivileged(new GetPropertyAction("os.name"));
        if (var0.equals("SunOS")) {
            return createProvider("sun.nio.ch.DevPollSelectorProvider");
        } else {
            return (SelectorProvider)(var0.equals("Linux") ? createProvider("sun.nio.ch.EPollSelectorProvider") : new PollSelectorProvider());
        }
    }
}

可以看到,如果是Linux系统,则使用Epoll(激动的心,颤抖的手,epoll大佬终于登场)

ps:SunOS是Sun公司研发的操作系统的最初叫法,之后也称为Solaris,它被认为是UNIX操作系统的衍生版本之一。

EPollSelectorProvider的实现如下:

public class EPollSelectorProvider extends SelectorProviderImpl {
    public EPollSelectorProvider() {
    }

    public AbstractSelector openSelector() throws IOException {
        return new EPollSelectorImpl(this);
    }

    public Channel inheritedChannel() throws IOException {
        return InheritedChannel.getChannel();
    }
}

EPollSelectorImpl也是SelectorImpl的实现类,也是实现了最终调用的doSelect方法

protected int doSelect(long var1) throws IOException {
        if (this.closed) {
            throw new ClosedSelectorException();
        } else {
            this.processDeregisterQueue();

            try {
                this.begin();
                this.pollWrapper.poll(var1);
            } finally {
                this.end();
            }

            this.processDeregisterQueue();
            int var3 = this.updateSelectedKeys();
            if (this.pollWrapper.interrupted()) {
                this.pollWrapper.putEventOps(this.pollWrapper.interruptedIndex(), 0);
                synchronized(this.interruptLock) {
                    this.pollWrapper.clearInterrupted();
                    IOUtil.drain(this.fd0);
                    this.interruptTriggered = false;
                }
            }

            return var3;
        }
    }

该doSelect方法肯定也是封装的epoll系列的函数调用,没错就是这样,这里的封装主要是在EPollArrayWrapper中
下面是epoll的三种系统调用

int epoll_create(int size);
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);

在java中分别对应了下面三个jni调用

    private native int epollCreate();

    private native void epollCtl(int var1, int var2, int var3, int var4);

    private native int epollWait(long var1, int var3, long var4, int var6) throws IOException;

ps:关于epoll的原理解析,可以查看之前的linux内核源码之epoll分析这篇文章

下面画了一张图来总结javanio对操作系统io多路复用器的封装:

java nio总结图

Q3: 为什么每次处理完当前的key之后需要remove

前面写的NIO服务端有这样一个操作

                while (iterator.hasNext()){
                    SelectionKey key = iterator.next();
                    //处理后的监听事件一定需要从selector中的注册列表中删除
                    iterator.remove();
                    handleRequest(key);
                }

那么为什么需要remove掉呢?

来看这样一个测试:

@Data
@AllArgsConstructor
@NoArgsConstructor
public class Key {

    String keyName;

}

public class MySelector {

    private Set<Key> selectorKeys;


    public MySelector() {
        this.selectorKeys = new HashSet<>();
    }

    public void select(){
        selectorKeys.add(new Key(UUID.randomUUID().toString(),UUID.randomUUID().toString()));
    }

    public Set<Key> selectKeys(){
        return selectorKeys;
    }

}
public class TestSelectorRemove {

    public static void main(String[] args) throws InterruptedException {
        MySelector selector = new MySelector();

        while (true){
            selector.select();
            Set<Key> keys = selector.selectKeys();
            Iterator<Key> iterator = keys.iterator();
            while (iterator.hasNext()){
                Key key = iterator.next();
                try{
                    UUID.fromString(key.getChannelName()).version();
                    System.out.println("处理准备好的key:"+key.getKeyName());
                    key.setChannelName(key.getChannelName()+"_over");
                }catch(Exception e)
                {
                    System.out.println("发生异常,当前key已经被处理过,无socketchannel与其对应,key:"+key.getKeyName());
                }
            }
            System.out.println("=====");
            TimeUnit.SECONDS.sleep(2);
        }
    }

}

运行结果:

处理准备好的key:1848c2e5-8a47-4c6a-91cd-98af4c239744
=====
发生异常,当前key已经被处理过,无socketchannel与其对应,key:1848c2e5-8a47-4c6a-91cd-98af4c239744
处理准备好的key:d14f3eff-423f-4023-89f4-afe0223922a2
=====
发生异常,当前key已经被处理过,无socketchannel与其对应,key:1848c2e5-8a47-4c6a-91cd-98af4c239744
发生异常,当前key已经被处理过,无socketchannel与其对应,key:d14f3eff-423f-4023-89f4-afe0223922a2
处理准备好的key:4d2c93b8-f9eb-4000-8545-15714fa76bb3
=====

同样的道理,调用selectedKeys()之后返回的数据即publicSelectedKeys

public abstract class SelectorImpl extends AbstractSelector {
    
    private Set<SelectionKey> publicSelectedKeys;
    
    public Set<SelectionKey> selectedKeys() {
    if (!this.isOpen() && !Util.atBugLevel("1.4")) {
        throw new ClosedSelectorException();
    } else {
        return this.publicSelectedKeys;
    }
}

nio本身代码没有删除已经处理的key,所以下次再次调用还会带出来,所以需要我们手动删除,否则该key获取到的channel为null,继而会出现空指针问题

6. NIO 客户端

上面完成并分析了NIO服务端的简单实现和部分原理,下面我们来看下NIO的客户端实现

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

/**
 * @date 2021/7/16 - 14:47
 */
public class NIOClient implements Runnable{

    SocketChannel socketChannel;

    String serverIp;

    int port;

    Selector selector;

    public NIOClient(String serverIp,int port) {
        this.serverIp = serverIp;
        this.port = port;
        try {
            socketChannel = SocketChannel.open();
            socketChannel.configureBlocking(false);
            selector = Selector.open();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void run() {
        try {
            //连接服务端
            doConnect();
        } catch (IOException e) {
            e.printStackTrace();
        }
        //客户端执行选择器select
        while (true){
            try {
                selector.select();
                Set<SelectionKey> keys = selector.selectedKeys();
                Iterator<SelectionKey> iterator = keys.iterator();
                SelectionKey key = null;
                while (iterator.hasNext()){
                    key = iterator.next();
                    iterator.remove();
                    //遍历每个准备好状态的key,然后进行处理
                    handler(key);
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    private void handler(SelectionKey key) throws IOException {
        //如果key是有效的
        if(key.isValid()){
            SocketChannel channel = (SocketChannel) key.channel();
            //如果key是可连接状态
            if(key.isConnectable()){
                try{
                    //如果连接成功
                    if(channel.finishConnect()){
                        //连接后向服务端发送数据
                        doWrite(channel);
                        //监听读事件(如果服务端有数据响应过来)
                        channel.register(selector,SelectionKey.OP_READ);
                    }else{
                        System.out.println("连接尚未完成...");
                    }
                }catch(Exception e)
                {
                    System.out.println("连接失败,客户端程序结束,原因:" +  e.getMessage());
                    System.exit(1);
                }
            }

            //如果是可读状态(说明服务端有响应数据过来)
            if(key.isReadable()){
                ByteBuffer data = ByteBuffer.allocate(1024);
                int read = channel.read(data);
                if(read > 0){
                    data.flip();
                    byte[] byteArr = new byte[data.remaining()];
                    data.get(byteArr);
                    System.out.println("客户端接收到数据:"+new String(byteArr));
                }else if(read < 0){
                    System.out.println("客户端接无法接收到数据,对端链路已关闭");
                    key.cancel();
                    socketChannel.close();
                }
            }
        }
    }

    private void doConnect() throws IOException {
        boolean isConnect = socketChannel.connect(new InetSocketAddress(serverIp, port));
        //如果此时就连接上了,那么直接向服务端发送数据
        if(isConnect){
            doWrite(socketChannel);
        }else{
            //如果此时没有连接上,那么交给selector,监听连接完成事件
            socketChannel.register(selector, SelectionKey.OP_CONNECT);
        }
    }

    private void doWrite(SocketChannel socketChannel) throws IOException {
        ByteBuffer dataForWrite = ByteBuffer.allocate(1024);
        dataForWrite.put("我是来自客户端的数据".getBytes());
        dataForWrite.flip();
        socketChannel.write(dataForWrite);
    }
}

参考文章

 https://note.youdao.com/web/#/file/recent/markdown/WEB03d7200ced28e7ba62fc79b3ce848672/
https://nowjava.com/readcode/jdk8/7892
https://www.baidu.com/s?ie=utf-8&f=8&rsv_bp=1&rsv_idx=1&tn=baidu&wd=linux%20select%E5%87%BD%E6%95%B0&fenlei=256&oq=lockAndDoSelect&rsv_pq=ade2fc67000c2261&rsv_t=2d9bfLpevvYYcq0xmiVc90NOpcYiRMdHJ4WPebc9sZzhj45r5m4yt5T8fAs&rqlang=cn&rsv_dl=tb&rsv_enter=1&rsv_btype=t&inputT=4550&rsv_sug3=87&rsv_sug1=59&rsv_sug7=100&rsv_sug2=0&rsv_sug4=4615
https://www.cnblogs.com/alantu2018/p/8612722.html
https://www.imooc.com/article/257797
https://www.cnblogs.com/LemonFive/p/11409333.html
https://my.oschina.net/u/2307589/blog/1834022
https://www.cnblogs.com/hark0623/p/6693329.html
https://cloud.tencent.com/developer/information/linux%E4%B8%AD%E6%96%87man%E6%89%8B%E5%86%8C%E4%B8%8B%E8%BD%BD
https://www.zhihu.com/question/19728793
https://blog.csdn.net/jltsun/article/details/52563772
https://blog.csdn.net/bboxhe/article/details/77367896
https://www.pinlue.com/article/2019/05/2020/508996387817.html
https://code.woboq.org/linux/linux/fs/select.c.html
https://www.bilibili.com/
https://www.chinastor.com/manuals/linuxfunctions/
http://fxr.watson.org/fxr/source/fs/select.c?v=linux-2.6
https://www.jianshu.com/p/d30893c4d6bb  
https://www.jianshu.com/p/35663fa64671  
http://www.52im.net/thread-2846-1-1.html
http://www.52im.net/thread-2635-1-1.html
https://zhuanlan.zhihu.com/p/115912936  
http://www.52im.net/thread-2846-1-1.html  
https://blog.csdn.net/forezp/article/details/88414741/
http://www.52im.net/thread-2846-1-1.html
https://zhuanlan.zhihu.com/p/169589455  
http://www.jasongj.com/java/concurrenthashmap/  
https://www.jianshu.com/p/d30893c4d6bb  
http://www.52im.net/thread-2640-1-1.html   
https://blog.csdn.net/xyls12345/article/details/26576281   
https://blog.csdn.net/wuyangyang555/article/details/81240411?utm_medium=distribute.pc_relevant.none-task-blog-2%7Edefault%7EBlogCommendFromMachineLearnPai2%7Edefault-1.control&depth_1-utm_source=distribute.pc_relevant.none-task-blog-2%7Edefault%7EBlogCommendFromMachineLearnPai2%7Edefault-1.control 
https://nowjava.com/readcode/jdk8/7892
 
 linux源码在线查看: 
 https://code.woboq.org/linux/linux/fs/select.c.html
 https://www.chinastor.com/manuals/linuxfunctions/
 http://lxr.free-electrons.com/

http://code.woboq.org/linux/linux/   带有函数调用关系等

http://lxr.linux.no/    版本选择比较多

http://fxr.watson.org/

http://sourceforge.net/projects/lxr/

http://lxr.oss.org.cn/