起初是朋友在群里讨论了一个问题,NIO 返回的就绪事件为什么要手动删除?
起因
起初是朋友在群里讨论了一个问题
Netty 的selector 返回的就绪事件中,可能会返回已经处理过的就绪事件并重新复制吗?
nio 那个 selector,获取就绪事件的 selectedKeys,会返回一个就绪集合的 set
这个 set 会包含咱们已经处理过的 key,所以我们写这部分代码的时候,需要自己手动去 remove
但是,nio 底层是基于 epoll 的,epoll 针对于就绪事件的处理,是会将整个就绪链表拷贝到用户空间,既然是这样子,那为什么不直接将这个就绪 set 覆盖掉
这个点我不太理解
直接先说结论吧,这个问题的答案是 Netty 的默认实现的 NIO 是 epoll 水平触发模式,熟悉 epoll 的都知道,在水平触发模式下,当被监控的 socket 上有事件发生时,会不断的从 epoll_wait 中苏醒,直到内核缓存区的数据被 read 函数读完才结束。所以在这个模式下,NIO 需要为 socket 维护一些状态,在 Netty 中,一个 socket 连接就是一个 channel,所以 selectKeys 就是 channel 的集合,当有重复的事件进入 selectKeys 时,会根据状态把它在 selectKeys 中更新这个状态。
Mac OS 源码在 sun.nio.ch.KQueueSelectorImpl#updateSelectedKeys 方法处。
Linux OS 源码在 sun.nio.ch.EPollSelectorImpl#updateSelectedKeys 方法处。
而 Netty 基于边缘触发,后续重写了 EpollEventLoop 事件模型来实现更高效的 io 事件监听,也就没有必要再为 selectKeys 维护这个事件状态了,而是直接去事件的 socket 里读取数据。
水平触发和边缘触发
当我们监听的 socket 上有数据到来时,软中断会执行 epoll 的回调函数 ep_poll_callback ,在回调函数中会将 epoll 中描述 socket信息 的数据结构 epitem 插入到 epoll 中的就绪队列 rdllist 中。随后用户进程从 epoll 的等待队列中被唤醒, epoll_wait 将 IO就绪 的 socket 返回给用户进程,随即 epoll_wait 会清空 rdllist 。
水平触发 和 边缘触发 最关键的 区别 就在于当 socket 中的接收缓冲区还有数据可读时。 epoll_wait 是否会清空 rdllist 。
在 Netty 中实现的 EpollSocketChannel 默认的就是边缘触发模式。 JDK 的 NIO 默认是水平触发模式。
原文:https://mp.weixin.qq.com/s/zAh1yD5IfwuoYdrZ1tGf5Q
eventpoll 源码分析
eventpoll 源码地址:https://github.com/torvalds/linux/blob/master/fs/eventpoll.c
epoll_wait 删除就绪事件:
// 从就绪队列删除
// 这里是从 rdllink 删除而不是从rdllist,是因为 rdllink 的作用是将 epitem 与 rddlist 关联起来,
// 当调用 list_del_init(&epi->rdllink) 时,虽然你操作的是 rdllink,但它的作用是将整个 epitem(包括与之关联的所有数据)从 rdllist 链表中移除。
list_del_init(&epi->rdllink);
epoll_wait 添加就绪事件:
if (epi->event.events & EPOLLONESHOT)
epi->event.events &= EP_PRIVATE_BITS;
// 如果不是边缘触发
else if (!(epi->event.events & EPOLLET)) {
// 再次向就绪队列队列添加事件
list_add_tail(&epi->rdllink, &ep->rdllist);
ep_pm_stay_awake(epi);
}
Netty 事件模型种类
Netty 的事件模型可以由事件循环这个概念来体现,也就是各式各样的 EventLoop,比如说大多数开发者使用的 NioEventLoopGroup ,
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
或者是 Linux 使用的 EpollEventLoopGroup。
EventLoopGroup bossGroup = new EpollEventLoopGroup(1);
EventLoopGroup workerGroup = new EpollEventLoopGroup();
一个有意思的事情是,尽管你使用了 NioEventLoop,但是在 Netty 中,因为使用 Java 原生的 NIO,所以在内部实现上,会根据程序运行的操作系统,实现不同的事件选择器,事件选择器的底层技术,也是 epoll 或 kqueue,比如说我的 Netty 应用运行在 Linux 上,会选择 PollSelectorImpl 来执行真正的事件触发流程,去获取具体的 selectedKeys,来执行相应的事件流程,具体源码在 sun.nio.ch.DefaultSelectorProvider#create,
public static SelectorProvider create() {
String osname = AccessController
.doPrivileged(new GetPropertyAction("os.name"));
if (osname.equals("SunOS"))
return createProvider("sun.nio.ch.DevPollSelectorProvider");
if (osname.equals("Linux"))
return createProvider("sun.nio.ch.EPollSelectorProvider");
return new sun.nio.ch.PollSelectorProvider();
}
可以看到,同样是使用 NioEventLoop,但是 NIO 在 Selector 的不同实现上做区分,以达到的 IO 多路复用目的,其对应的内容如下表:
事件模型 | 选择器 | 事件容器 |
---|---|---|
NioEventLoop | 有 2 种 selector, KQueueSelectorImpl和PollSelectorImpl | KQueueArrayWrapper,PollArrayWrapper,扽别实现对应的事件容器。 |
EpollEventLoop | epollWait 本地方法 | EpollEventArrays 用来接收 epoll 事件。 |
KQueueEventLoop | keventWait 本地方法 | KQueueEventArrays 用来接收 kqueue 事件。 |
主要是 JDK 的 NIO 默认是水平触发模式,Netty 不满足水平触发的 IO 效率,所以选择边缘触发模式重新实现了 EpollSocketChannel(笔者猜想)。其源码在 io.netty.channel.epoll.EpollEventLoop#openFileDescriptors
// It is important to use EPOLLET here as we only want to get the notification once per
// wakeup and don't call eventfd_read(...).
Native.epollCtlAdd(epollFd.intValue(), eventFd.intValue(), Native.EPOLLIN | Native.EPOLLET);
NioEventLoop 运行流程
主流的 EventLoop 运行流程主要包括三个部分:轮询就绪IO事件、执行异步任务、处理就绪IO事件,下面主要讨论轮询与处理 IO 事件,也就是 select 与 process 流程。
select 流程
Netty 的 selector 的流程,根据selector的具体实现(Mac的Kqueue,Linux的Epoll),将就绪事件放入selectorKeys中,这其中的具体实现是 SelectorImpl 会维护一个 selectedKeys,根据不同的操作系统,执行不同的doSelect,来维护这个selectedKeys,其中这个selectedKeys的Keys,其实就是一个个的channel,所以里面的channel,都是有事件吗可以读写的channel。获取 selectkeys 调用栈如下。
io.netty.channel.nio.NioEventLoop#select
sun.nio.ch.SelectorImpl#lockAndDoSelect
sun.nio.ch.SelectorImpl#select(long)
sun.nio.ch.KQueueSelectorImpl#doSelect
sun.nio.ch.KQueueArrayWrapper#poll
sun.nio.ch.KQueueArrayWrapper#kevent0
sun.nio.ch.KQueueSelectorImpl#updateSelectedKeys
其中 updateSelectedKeys 在 Linux 选择器的具体实现及其解释如下:
// 方法声明:processUpdateQueue 是一个私有方法,用于处理更新队列。
private void processUpdateQueue() {
// 线程安全:使用 assert 语句确保当前线程持有该对象的锁,表示该方法是在一个安全的线程上下文中被调用的。
assert Thread.holdsLock(this);
// 同步块:对 updateLock 锁进行同步,以确保对更新队列的访问是线程安全的。
synchronized (updateLock) {
SelectionKeyImpl ski;
// 循环处理更新队列:从 updateKeys 队列中取出第一个 SelectionKeyImpl 对象(ski),直到队列为空。
while ((ski = updateKeys.pollFirst()) != null) {
// 检查有效性:如果 ski 是有效的,获取其对应的文件描述符(fd)。
if (ski.isValid()) {
int fd = ski.getFDVal();
// add to fdToKey if needed
// 更新 fdToKey 映射:将 fd 和 ski 添加到 fdToKey 映射中(如果该文件描述符不存在),并通过断言确保之前没有其他 SelectionKeyImpl 与这个文件描述符关联。
SelectionKeyImpl previous = fdToKey.putIfAbsent(fd, ski);
assert (previous == null) || (previous == ski);
// 获取新事件:调用 translateInterestOps() 方法获取新的兴趣操作(newEvents),并获取当前注册的事件(registeredEvents)。
int newEvents = ski.translateInterestOps();
int registeredEvents = ski.registeredEvents();
if (newEvents != registeredEvents) {
// 检查事件是否有变化:如果新的兴趣操作与当前注册的事件不相同,进行进一步处理。
if (newEvents == 0) {
// remove from epoll
// 移除事件:如果新的兴趣操作为 0,则从 epoll 中删除该文件描述符。
EPoll.ctl(epfd, EPOLL_CTL_DEL, fd, 0);
} else {
// 添加或修改事件:如果当前注册的事件为 0,则将该文件描述符添加到 epoll 中。
//否则,调用 EPoll.ctl 修改已经注册的事件。
if (registeredEvents == 0) {
// add to epoll
EPoll.ctl(epfd, EPOLL_CTL_ADD, fd, newEvents);
} else {
// modify events
EPoll.ctl(epfd, EPOLL_CTL_MOD, fd, newEvents);
}
}
// 更新已注册的事件:将 ski 的注册事件更新为 newEvents,以保持 SelectionKeyImpl 的状态与 epoll 中的状态一致。
ski.registeredEvents(newEvents);
}
}
}
}
}
这段代码主要用于处理更新队列中的 SelectionKeyImpl 对象,确保它们的状态与 epoll 中注册的事件相匹配。
process 流程
在 Pocess 会处理在 Channel 读写,之后将这个Channel(Key),从selectedKeys里remove
// 处理读取内容调用栈如下
io.netty.channel.nio.NioEventLoop#processSelectedKeys
io.netty.channel.nio.NioEventLoop#processSelectedKey
io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe#read
io.netty.channel.ChannelPipeline#fireChannelRead
selectedKeys 优化
其中,著名的 Netty 对 selectedKeys 的优化,也发生在 processSelectedKeys 中,详细内容,你可以参考这篇文章:https://mp.weixin.qq.com/s/g69upk3juqsq6LbwmtitcQ,简单来说。就是 Java NIO 原生 Selector 用集合类型 HashSet (selectedKeys),来存放 IO 就绪事件(SelectionKey)。Netty 将用数组实现的 SelectedSelectionKeySet,替换掉了 selectedKeys 的 HashSet 实现。用来提升就绪事件集合的插入和遍历操作性能。源码见:io.netty.channel.nio.NioEventLoop#processSelectedKeys。
private void processSelectedKeys() {
if (selectedKeys != null) {
processSelectedKeysOptimized();
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}.
epoll 空转优化
JDK NIO Epoll 的空轮询 BUG 存在,这样会导致 Reactor 线程 在没有任何事情可做的情况下被意外唤醒,导致 CPU 空转。其实 Netty 也没有从根本上解决这个 JDK BUG ,而是选择巧妙的绕过这个 BUG 。Netty 引入了计数变量 selectCnt,用于记录 select 操作的次数,如果事件轮询时间小于 timeoutMillis,并且在该时间周期内连续发生超过 SELECTOR_AUTO_REBUILD_THRESHOLD(默认512) 次空轮询,说明可能触发了 epoll 空轮询 Bug。Netty 通过重建新的 Selector 对象,将异常的 Selector 中所有的 SelectionKey 会重新注册到新建的 Selector,重建完成之后异常的 Selector 就可以废弃了。详细内容,你可以参考若地发表在拉钩教育的专栏---《Netty 核心原理剖析与 RPC 实践》
EpollEventLoop 运行流程
说完了 Netty 使用原生的 NioEventLoop 主要的的执行流程,接下来说说 Netty 后续推出的 EpollEventLoop 的执行流程。EpollEventLoop 与 NioEventLoop 最大的区别是在事件模型的使用方面,NioEventLoop 会根据不同的操作系统,使用不同的事件模型,而 EpollEventLoop 见名知义,当程序运行的操作系统不支持 epoll 时,程序启动会报错。
Caused by: java.lang.IllegalStateException: Only supported on Linux
at io.netty.channel.epoll.Native.loadNativeLibrary(Native.java:177)
at io.netty.channel.epoll.Native.<clinit>(Native.java:61)
C 使用 epoll
int main() {
int epfd = epoll_create1(0);
if (epfd == -1) {
perror("epoll_create1");
return -1;
}
// 假设有一个监听的 socket fd
int listen_fd = ...;
struct epoll_event ev, events[10];
ev.events = EPOLLIN;
ev.data.fd = listen_fd;
// 将监听的文件描述符添加到 epoll 中
if (epoll_ctl(epfd, EPOLL_CTL_ADD, listen_fd, &ev) == -1) {
perror("epoll_ctl");
return -1;
}
while (1) {
int nfds = epoll_wait(epfd, events, 10, -1);
if (nfds == -1) {
perror("epoll_wait");
break;
}
for (int i = 0; i < nfds; ++i) {
if (events[i].data.fd == listen_fd) {
// 处理新的连接
int conn_fd = accept(listen_fd, NULL, NULL);
ev.events = EPOLLIN | EPOLLET;
ev.data.fd = conn_fd;
epoll_ctl(epfd, EPOLL_CTL_ADD, conn_fd, &ev);
} else {
// 处理其他就绪事件,比如可读
char buf[1024];
int n = read(events[i].data.fd, buf, sizeof(buf));
if (n <= 0) {
// 关闭连接
close(events[i].data.fd);
} else {
// 处理读到的数据
write(STDOUT_FILENO, buf, n);
}
}
}
}
close(epfd);
return 0;
}
可以看到,主要有三个步骤,
- 创建一个 epoll: int epfd = epoll_create1(0);
- 将 socket 文件描述符 加入 epoll:epoll_ctl(epfd, EPOLL_CTL_ADD, listen_fd, &ev)
- 等待事件发生:int nfds = epoll_wait(epfd, events, 10, -1);
当有事件发生时,再处理这个事件
- 处理连接:int conn_fd = accept(listen_fd, NULL, NULL);
- 获取事件内容: int n = read(events[i].data.fd, buf, sizeof(buf));
很清晰的步骤,那么 Netty 是怎么按这些步骤来实现高效的 epoll 呢?
Java 使用 epoll
JNI 方法
熟悉 epoll 的知道,epoll 主要有三个函数 epoll_create、epoll_ctl、epoll_wait,Netty 在 Native 为这三个函数做了封装,由 Java 代码调用 C 语言代码。
以 epoll_wait 为例,由上至下的调用栈如下:
Java 代码
io.netty.channel.epoll.EpollEventLoop#epollWait
io.netty.channel.epoll.Native#epollWait
io.netty.channel.epoll.Native#epollWait0
C语言代码
netty_epoll_native.c#netty_epoll_native_epollWait0
netty_epoll_native.c#netty_epoll_native_epollWait
netty_epoll_native_epollWait 详细代码如下,最终会执行系统调用 epoll_wait
static jint netty_epoll_native_epollWait(JNIEnv* env, jclass clazz, jint efd, jlong address, jint len, jint timeout) {
struct epoll_event *ev = (struct epoll_event*) (intptr_t) address;
int result, err;
do {
result = epoll_wait(efd, ev, len, timeout);
if (result >= 0) {
return result;
}
} while((err = errno) == EINTR);
return -err;
}
创建 epoll
当使用使用 EpollEventLoopGroup 事件组时,会创建一个子线程 EpollEventLoop,用来处理 IO 事件。
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
// ... 省略
return new EpollEventLoop(this, executor, maxEvents,
selectStrategyFactory.newSelectStrategy(),
rejectedExecutionHandler, taskQueueFactory, tailTaskQueueFactory);
}
EpollEventLoop 类的构造方法会调 #openFileDescriptors,里面会执行 epoll 的创建,返回一个 epoll 文件描述符。
this.epollFd = epollFd = Native.newEpollCreate();
加入 epoll
this.eventFd = eventFd = Native.newEventFd();
Native.epollCtlAdd(epollFd.intValue(), eventFd.intValue(), Native.EPOLLIN | Native.EPOLLET);
在 C 语言中,eventfd() 是一个用于进程间或线程间通信的系统调用,通常用于事件通知。它会创建一个文件描述符,称为 eventfd,用来在多个线程或进程之间进行事件传递和通知。
这里创建了一个空的文件描述符加入epoll,并不是真实连接实例的文件描述符,而是后面 epoll_wait 函数调用返回的 EPOLLIN事件,在连接建立后,才会讲连接的文件描述符加入 epoll。
监听 epoll
Epoll 监听的流程
- 线程运行(io.netty.channel.epoll.EpollEventLoop#run):EpollEventLoop 线程执行的内容。
- 执行 epollWait(io.netty.channel.epoll.EpollEventLoop#epollWait):执行 epollWait 系统调用。
- 处理事件(io.netty.channel.epoll.EpollEventLoop#processReady):处理 epollWait 返回的就绪事件。
根据 epollWait 返回就绪事件,分别会处理连接建立与数据读取,它们的执行流程相似,都是在 epollInReady 中
建立连接
调用栈如下,主要是通过 channel 将连接内容传递给 ServerBootstrapAcceptor,再层层将文件描述符注册到 Epoll,其详细的调用栈如下:
io.netty.channel.epoll.AbstractEpollChannel.AbstractEpollUnsafe#epollInReady
io.netty.channel.ChannelPipeline#fireChannelRead
io.netty.bootstrap.ServerBootstrap.ServerBootstrapAcceptor#channelRead
io.netty.channel.EventLoopGroup#register(io.netty.channel.Channel)
io.netty.channel.SingleThreadEventLoop#register(io.netty.channel.Channel)
io.netty.channel.AbstractChannel.AbstractUnsafe#register
io.netty.channel.AbstractChannel.AbstractUnsafe#register0
io.netty.channel.epoll.AbstractEpollChannel#doRegister
io.netty.channel.epoll.EpollEventLoop#add
ServerBootstrapAcceptor 是 ServerSocketChannel 的一个 Handler,在接收到新的客户端连接时,将客户端 Channel 注册到 EventLoop 的关键辅助处理器,fireChannelRead(newChildChannel(...)) 触发事件并沿 pipeline 传播,最终由 ServerBootstrapAcceptor#channelRead 处理。其hannelRead 方法会在新的 Channel 被创建时调用,负责配置该 Channel 并注册到 EventLoop,使其准备好处理数据读取和写入操作。
数据读取
io.netty.channel.epoll.AbstractEpollStreamChannel.EpollStreamUnsafe#epollInReady
void epollInReady() {
// 省略
byteBuf = allocHandle.allocate(allocator);
allocHandle.lastBytesRead(doReadBytes(byteBuf));
if (allocHandle.lastBytesRead() <= 0) {
// nothing was read, release the buffer.
byteBuf.release();
byteBuf = null;
close = allocHandle.lastBytesRead() < 0;
if (close) {
// There is nothing left to read as we received an EOF.
readPending = false;
}
break;
}
allocHandle.incMessagesRead(1);
readPending = false;
pipeline.fireChannelRead(byteBuf);
// 省略
}
与建立连接的 EPOLLIN 事件不同,读取数据的 EPOLLIN 事件是将数据放到 byteBuf 中,之后在传递给后续的 channelRead 处理。