大家好,我是七哥,今天我们聊聊一个 Java 中的难点 NIO。
NIO(Non-blocking I/O,在Java领域,也称为New I/O),是一种同步非阻塞的I/O模型,也是I/O多路复用的基础,已经被越来越多地应用到大型应用服务器,成为解决高并发与大量连接、I/O处理问题的有效方式。
之前我们也聊过了同步非阻塞 IO 的特点是,如果TCP RecvBuffer有数据,就把数据从网卡读到内存,并且返回给用户;反之则直接返回0,永远不会阻塞。
传统的BIO里面socket.read(),如果TCP RecvBuffer里没有数据,线程会一直阻塞,直到用户空间收到数据,才返回。
上图红色表示线程处于阻塞状态,绿色表示线程处于非阻塞状态。
最新的AIO(Async I/O)里面会更进一步:不但等待就绪是非阻塞的,就连数据从网卡到内存的过程也是异步的。
换句话说,BIO里用户最关心“我要读”,NIO里用户最关心”我可以读了”,在AIO模型里用户更需要关注的是“读完了”。
NIO一个重要的特点是:socket主要的读、写、注册和接收函数,在等待就绪阶段都是非阻塞的,真正的I/O操作是同步阻塞的(消耗CPU但性能非常高)。
回忆BIO模型,之所以需要多线程,是因为在进行I/O操作的时候,一是没有办法知道到底能不能写、能不能读,只能”傻等”,即使通过各种估算,算出来操作系统没有能力进行读写,线程也没法在socket.read() 和 socket.write() 函数中返回,这两个函数无法进行有效的中断。所以除了多开线程另起炉灶,没有好的办法利用CPU。
下面我们先直接使用 java NIO 的非阻塞特性来实现一段代码:
public class NioTest {
public static void main(String[] args) throws IOException {
//新接连池
List<SocketChannel> socketChannelList = new ArrayList<>(8);
//开启服务端Socket
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(5555));
//设置为非阻塞
serverSocketChannel.configureBlocking(false);
while (true) {
//探测新连接,由于设置了非阻塞,这里即使没有新连接也不会阻塞,而是直接返回null
SocketChannel socketChannel = serverSocketChannel.accept();
//当返回值不为null的时候,证明存在新连接
if (socketChannel != null) {
System.out.println("新连接接入");
//将客户端设置为非阻塞 这样read/write不会阻塞
socketChannel.configureBlocking(false);
//将新连接加入到线程池
socketChannelList.add(socketChannel);
}
//迭代器遍历连接池
Iterator<SocketChannel> iterator = socketChannelList.iterator();
while (iterator.hasNext()) {
ByteBuffer byteBuffer = ByteBuffer.allocate(128);
SocketChannel channel = iterator.next();
//读取客户端数据 当客户端数据没有写入完成的时候也不会阻塞,长度为0
int read = channel.read(byteBuffer);
if (read > 0) {
//当存在数据的时候打印数据
System.out.println(new String(byteBuffer.array()));
} else if (read == -1) {
//客户端退出的时候删除该连接
iterator.remove();
System.out.println("断开连接");
}
}
}
}
}
上述代码我们可以看到一个关键的逻辑:
serverSocketChannel.configureBlocking(false);
这里被设置为非阻塞的时候无论是 accept 还是 read/write 都不会阻塞!
其实这里我们只用了非阻塞的这一特性,并没有使用 IO 多路复用中的选择器,我们看一下这种实现逻辑有什么问题!
表面上看,我们似乎的确使用了一条线程处理了所有的连接以及读写操作,但是假设我们有10w连接,活跃连接(经常read/write)只有1000,但是我们这个线程需要每次都轮询10w条数据处理,极大的消耗了CPU!是不是和 IO 多用复用中的 select/poll 实现相似呢,都是会做无用功。
我们期待什么?期待的是,每次轮询只轮询有数据的Channel, 没有数据的就不管他(死循环 CPU 空跑也是浪费),比如刚刚的例子,只有1000个活跃连接,那么每次就只轮询这1000个,其他的有数据才轮询,没读写就不轮询!同时这个死循环,
多路复用模型是 JAVA NIO 推荐使用的经典模型,内部通过 Selector 进行事件选择,Selector 事件选择通过系统实现。
因为 NIO 的读写函数可以立刻返回,这就给了我们不开线程利用CPU的最好机会:如果一个连接不能读写,即 socket.read() 返回0或者 socket.write() 返回0,我们可以把这件事记下来,记录的方式通常是在 Selector 上注册标记位,然后切换到其它就绪的连接(channel)继续进行读写。
下面具体看下如何利用事件模型单线程处理所有I/O请求:
NIO的主要事件有几个:读就绪、写就绪、有新连接到来。
我们首先需要注册当这几个事件到来的时候所对应的处理器。然后在合适的时机告诉事件选择器:我对这个事件感兴趣。
对于写操作,就是写不出去的时候对写就绪事件感兴趣(可以写了叫我);对于读操作,就是完成连接和系统没有办法承载新读入的数据时对读就绪事件感兴趣(可以读了叫我);对于新连接到来,一般是服务器刚启动的时候对这个事件感兴趣(启动后就等新连接到)。
其次,用一个死循环选择就绪的事件,会执行系统调用(Linux 2.6之前是select、poll,2.6之后是epoll,Windows是IOCP),还会阻塞的等待新事件的到来。新事件到来的时候,会在selector
上注册标记位,标示可读、可写或者有连接到来。
注意,select是阻塞的,无论是通过操作系统的通知(epoll)还是不停的轮询(select,poll),这个函数是阻塞的。所以你可以放心大胆地在一个 while(true) 里面调用这个函数而不用担心CPU空转。
所以我们的程序大概的模样是:
interface ChannelHandler{
void channelReadable(Channel channel);
void channelWritable(Channel channel);
}
class Channel{
Socket socket;
//读,写或者连接
Event event;
}
//IO线程主循环:
class IoThread extends Thread{
public void run(){
Channel channel;
//选择就绪的事件和对应的连接
while(channel=Selector.select()){
//如果是新连接,则注册一个新的读写处理器
if(channel.event==accept){
registerNewChannelHandler(channel);
}
//如果可以写,则执行写事件
if(channel.event==write){
getChannelHandler(channel).channelWritable(channel);
}
//如果可以读,则执行读事件
if(channel.event==read){
getChannelHandler(channel).channelReadable(channel);
}
}
}
//所有channel的对应事件处理器
Map<Channel,ChannelHandler> handlerMap;
}
这个程序很简短,也是最简单的Reactor模式:注册所有感兴趣的事件处理器,单线程轮询选择就绪事件,执行事件处理器。
到这里,我们就解决了上面的一个同步非阻塞I/O的痛点:CPU总是在做很多无用的轮询。在这个模型里被解决了,这个模型从selector中获取到的Channel全部是就绪的,也就是说他每次轮询都不会做无用功!
通过上面的学习,你应该已经知道了 Java NIO 有什么用了,但是具体到写代码,我们还是要看下它都包含了哪些东西。
那么NIO都提供了什么呢?
在NIO中提供了各种不同的Buffer,最常用的就是 ByteBuffer:
可以看到,Buffer 中有几个比较重要的变量:
capacity——容量,这个值是一开始申请就确定好的,类似c语言申请数组的大小。
limit——剩余,在写模式下初始的时候等于capacity(表示剩余可以写到的最大容量位置);在读模式下,等于最后一次写入的位置(还能读到的最大位置)。
mark——标记位,标记一下position的位置,可以调用reset()方法回到这个标记位置。
posistion——位置,写模式下表示开始写入的位置;读模式下表示开始读的位置。
NIO 的 Buffer 有两种模式,读模式和写模式。刚创建默认就是写模式,使用 flip() 可以切换到读模式。
关于这几个位置的使用,可以参考下面的代码,没啥多说的,你跑一下就理解了!
public class ByteBufferTest {
public static void main(String[] args) {
ByteBuffer buffer = ByteBuffer.allocate(88);
System.out.println(buffer);
String value = "七哥测试NIO";
buffer.put(value.getBytes());
System.out.println(buffer);
// 切换到读模式
buffer.flip();
System.out.println(buffer);
byte[] v = new byte[buffer.remaining()];
buffer.get(v);
System.out.println(buffer);
System.out.println(new String(v));
}
}
得到的输出为:
java.nio.HeapByteBuffer[pos=15 lim=88 cap=88]
java.nio.HeapByteBuffer[pos=0 lim=15 cap=88]
java.nio.HeapByteBuffer[pos=15 lim=15 cap=88]
七哥测试NIO
最后关于ByteBuffer在Channel中的使用,可以参考下面的代码:
public class BufferTest {
public static void main(String[] args) throws IOException {
String file = "/Users/sevenluo/dev_tools/IdeaProjects/Java-Tutorial/netty-practice/src/main/resources/test.txt";
RandomAccessFile accessFile = new RandomAccessFile(file,"rw");
FileChannel fileChannel = accessFile.getChannel();
// 20个字节
ByteBuffer buffer = ByteBuffer.allocate(20);
int bytesRead = fileChannel.read(buffer);
// buffer.put()也能写入buffer
while(bytesRead!=-1){
// 写切换到读
buffer.flip();
while(buffer.hasRemaining()){
System.out.println((char)buffer.get());
}
// buffer.rewind() 重新读
// buffer.mark() 标记position
// buffer.reset() 恢复
// 清除缓冲区
buffer.clear();
// buffer.compact(); 清除读过的数据
System.out.println("=================");
bytesRead = fileChannel.read(buffer);
}
}
}
这样,就熟悉了 Channel 和 ByteBuffer 的使用。
通常情况下,操作系统的一次写操作分为两步:1. 将数据从用户空间拷贝到系统空间。2. 从系统空间往网卡写。同理,读操作也分为两步:1. 将数据从网卡拷贝到系统空间;2. 将数据从系统空间拷贝到用户空间。
对于NIO来说,缓存的使用可以使用DirectByteBuffer和HeapByteBuffer。如果使用了DirectByteBuffer,一般来说可以减少一次系统空间到用户空间的拷贝。但Buffer创建和销毁的成本更高,更不宜维护,通常会用内存池来提高性能。
如果数据量比较小的中小应用情况下,可以考虑使用HeapBuffer,反之可以用DirectBuffer。
接下来,看看服务器中的具体应用吧。
之前介绍 BIO 的服务器,是来一个连接就创建一个新的线程响应。这里基于NIO的多路复用,可以这样写:
public class NIOServerTest {
public static void main(String[] args) throws IOException {
//开启服务端Socket
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(5555));
//设置为非阻塞
serverSocketChannel.configureBlocking(false);
//开启一个选择器
Selector selector = Selector.open();
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
// 阻塞等待需要处理的事件发生
selector.select();
// 获取selector中注册的全部事件的 SelectionKey 实例
Set<SelectionKey> selectionKeys = selector.selectedKeys();
//获取已经准备完成的key
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey next = iterator.next();
//当发现连接事件
if (next.isAcceptable()) {
//获取客户端连接
SocketChannel socketChannel = serverSocketChannel.accept();
//设置非阻塞
socketChannel.configureBlocking(false);
//将该客户端连接注册进选择器 并关注读事件
socketChannel.register(selector, SelectionKey.OP_READ);
System.out.println("accepted connection from " + socketChannel);
//如果是读事件
} else if (next.isReadable()) {
ByteBuffer allocate = ByteBuffer.allocate(128);
//获取与此key唯一绑定的channel
SocketChannel channel = (SocketChannel) next.channel();
//开始读取数据
int read = channel.read(allocate);
if (read > 0) {
System.out.println(new String(allocate.array()));
} else if (read == -1) {
System.out.println("断开连接");
channel.close();
}
}
//删除这个事件
iterator.remove();
}
}
}
}
Java NIO 这个编程模型,抽象来看就是如下步骤:
创建ServerSocketChannel并绑定端口;
创建Selector多路复用器,并注册Channel;
循环监听是否有感兴趣的事件发生selector.select();
获得事件的句柄,并进行处理;
我们之前讲 5 种网络 IO 模型 时,具体聊过 IO多路复用中的 epoll 模型,这里 JavaNIO 对应 epoll 中的三个关键函数关系如下:
epoll_create 对应JDK NIO代码中的 Selector.open()
epoll_ctl 对应JDK NIO代码中的 socketChannel.register(selector,xxxx);
epoll_wait 对应JDK NIO代码中的 selector.select();
使用NIO != 高性能,当连接数<1000,并发程度不高或者局域网环境下NIO并没有显著的性能优势。
NIO并没有完全屏蔽平台差异,它仍然是基于各个操作系统的I/O系统实现的,差异仍然存在。使用NIO做网络编程构建事件驱动模型并不容易,陷阱重重。
推荐大家使用成熟的NIO框架,如Netty,MINA等。解决了很多NIO的陷阱,并屏蔽了操作系统的差异,有较好的性能和编程模型。
最后总结一下到底NIO给我们带来了些什么:
作者介绍: 七哥,一个热爱技术的程序员,写文章也经常拍视频,专注于 Java 技术干货分享,愿望是陪家人平淡快乐的度过一生!
七哥最近一年一直在视频号更新技术知识视频,建议大家关注下,上下班路上看一看收获会很大😄。