先看定义:
Netty is a NIO client server framework which enables quick and easy development of network applications such as protocol servers and clients. It greatly simplifies and streamlines network programming such as TCP and UDP socket server.
可见Netty就是基于NIO的网络(Socket)客户端服务端实现框架,它简化了TCP/UDP客户端服务端编程,开发人员不再关注底层的Socket读取和写入,而且Netty提供了不少的handler(如http、mqtt、redis协议等)实现,简化了基于网络协议的编程复杂度。
啥是NIO?
NIO三件套:
- Channel:Channel保存了socket连接的有关信息,以及ChannelPipeLine(串起ChannelHandlerContext)/Unsafe(实现底层传输)/EventLoop(对应一个IO线程)/SelectionKey(如果为NIO模式,标识该Channel此时的连接读写事件)等有效信息,NIO服务端负责响应连接的是NioServerSocketChannel,客户端为NioSocketChannel
- Buffer:缓冲区,用于和NIO Channel交互,从Channel中读取数据到Buffer,从Buffer将数据写入到Channel
- Selector:选择器,实现一个线程就能监听多个Channel(Channel需先register到Selector)的读写等状态,然后触发ChannelPipeLine的fireChannelRead操作(串行ChannelHandlerContext持有的ChannelHandler)/底层Channel的数据写入(数据来源于Buffer)
传统的IO基于流(Stream),读写都是阻塞的,在读取写入Socket数据过程中,线程什么都不能做,而NIO里面有个Selector,由它来监听所有Channel相关的文件描述符(在linux中,设备、Socket连接等都对应文件描述符)状态,这样其他线程(业务开发的工作线程)就可以释放出来,不被阻塞掉(如果涉及IO,可以走Future异步处理),当然真正读取数据的时候Channel对应的IO线程(在NIO中为NioEventLoop)还是阻塞读写的,但数据被保存到了Buffer中,后面的处理都是面向Buffer,不再是面向流的了。
我们来看看Netty(NIO模式)是怎么和NIO对应的
- 见顶图,处理客户端连接的EventLoopGroup一般包含一个NioEventLoop,NioEventLoop即为一个Selector(也是一个线程,负责NIO),负责处理NioServerSocketChannel的状态监测,当有连接到来时,执行accept(),新建一个NioSocketChannel负责与Client端的通信,NioSocketChannel从Worker EventLoopGroup(NioEventLoop数量根据配置生成)中选择一个NioEventLoop register进去,该Channel后续所有的NIO操作均由该NioEventLoop负责处理
NioEventLoop主要包含两部分操作:
- processSelectedKeys() :即selector功能
- runAllTasks():执行任务队列中的任务,主要是定时任务和外部工作线程添加的读写任务
Netty中,耗时的业务代码可以写在哪?
Netty允许在非NIO线程中写消息,如果当前线程是Channel对应的NIO线程则直接写,如果不是,则写消息操作会被封装成一个WriteTask,然后再由NioEventLoop的runAllTasks()定期处理
相关代码(见AbstractChannelHandlerContext.class):
private void write(Object msg, boolean flush, ChannelPromise promise) { AbstractChannelHandlerContext next = findContextOutbound(); final Object m = pipeline.touch(msg, next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { if (flush) { next.invokeWriteAndFlush(m, promise); } else { next.invokeWrite(m, promise); } } else { AbstractWriteTask task; if (flush) { task = WriteAndFlushTask.newInstance(next, m, promise); } else { task = WriteTask.newInstance(next, m, promise); } safeExecute(executor, task, promise, m); } }复制代码
历史文章: