起初是朋友在群里讨论了一个问题,NIO 返回的就绪事件为什么要手动删除?

起因

起初是朋友在群里讨论了一个问题

Netty 的selector 返回的就绪事件中,可能会返回已经处理过的就绪事件并重新复制吗?

nio 那个 selector,获取就绪事件的 selectedKeys,会返回一个就绪集合的 set

这个 set 会包含咱们已经处理过的 key,所以我们写这部分代码的时候,需要自己手动去 remove

但是,nio 底层是基于 epoll 的,epoll 针对于就绪事件的处理,是会将整个就绪链表拷贝到用户空间,既然是这样子,那为什么不直接将这个就绪 set 覆盖掉

这个点我不太理解

直接先说结论吧,这个问题的答案是 Netty 的默认实现的 NIO 是 epoll 水平触发模式,熟悉 epoll 的都知道,在水平触发模式下,当被监控的 socket 上有事件发生时,会不断的从 epoll_wait 中苏醒,直到内核缓存区的数据被 read 函数读完才结束。所以在这个模式下,NIO 需要为 socket 维护一些状态,在 Netty 中,一个 socket 连接就是一个 channel,所以 selectKeys 就是 channel 的集合,当有重复的事件进入 selectKeys 时,会根据状态把它在 selectKeys 中更新这个状态。

Mac OS 源码在 sun.nio.ch.KQueueSelectorImpl#updateSelectedKeys 方法处。

Linux OS 源码在 sun.nio.ch.EPollSelectorImpl#updateSelectedKeys 方法处。

而 Netty 基于边缘触发,后续重写了 EpollEventLoop 事件模型来实现更高效的 io 事件监听,也就没有必要再为 selectKeys 维护这个事件状态了,而是直接去事件的 socket 里读取数据。

水平触发和边缘触发

当我们监听的 socket 上有数据到来时,软中断会执行 epoll 的回调函数 ep_poll_callback ,在回调函数中会将 epoll 中描述 socket信息 的数据结构 epitem 插入到 epoll 中的就绪队列 rdllist 中。随后用户进程从 epoll 的等待队列中被唤醒, epoll_wait 将 IO就绪 的 socket 返回给用户进程,随即 epoll_wait 会清空 rdllist 。

水平触发 和 边缘触发 最关键的 区别 就在于当 socket 中的接收缓冲区还有数据可读时。 epoll_wait 是否会清空 rdllist 。

在 Netty 中实现的 EpollSocketChannel 默认的就是边缘触发模式。 JDK 的 NIO 默认是水平触发模式。

原文:https://mp.weixin.qq.com/s/zAh1yD5IfwuoYdrZ1tGf5Q

eventpoll 源码分析

eventpoll 源码地址:https://github.com/torvalds/linux/blob/master/fs/eventpoll.c

epoll_wait 删除就绪事件:

// 从就绪队列删除
// 这里是从 rdllink 删除而不是从rdllist,是因为 rdllink 的作用是将 epitem 与 rddlist 关联起来,
// 当调用 list_del_init(&epi->rdllink) 时,虽然你操作的是 rdllink,但它的作用是将整个 epitem(包括与之关联的所有数据)从 rdllist 链表中移除。
list_del_init(&epi->rdllink);

epoll_wait 添加就绪事件:

if (epi->event.events & EPOLLONESHOT)
    epi->event.events &= EP_PRIVATE_BITS;
// 如果不是边缘触发
else if (!(epi->event.events & EPOLLET)) {
    // 再次向就绪队列队列添加事件
    list_add_tail(&epi->rdllink, &ep->rdllist);
    ep_pm_stay_awake(epi);
}

Netty 事件模型种类

Netty 的事件模型可以由事件循环这个概念来体现,也就是各式各样的 EventLoop,比如说大多数开发者使用的 NioEventLoopGroup ,

EventLoopGroup bossGroup = new NioEventLoopGroup(1);

EventLoopGroup workerGroup = new NioEventLoopGroup();

或者是 Linux 使用的 EpollEventLoopGroup。

EventLoopGroup bossGroup = new EpollEventLoopGroup(1);

EventLoopGroup workerGroup = new EpollEventLoopGroup();

一个有意思的事情是,尽管你使用了 NioEventLoop,但是在 Netty 中,因为使用 Java 原生的 NIO,所以在内部实现上,会根据程序运行的操作系统,实现不同的事件选择器,事件选择器的底层技术,也是 epoll 或 kqueue,比如说我的 Netty 应用运行在 Linux 上,会选择 PollSelectorImpl 来执行真正的事件触发流程,去获取具体的 selectedKeys,来执行相应的事件流程,具体源码在 sun.nio.ch.DefaultSelectorProvider#create,

public static SelectorProvider create() {
    String osname = AccessController
        .doPrivileged(new GetPropertyAction("os.name"));
    if (osname.equals("SunOS"))
        return createProvider("sun.nio.ch.DevPollSelectorProvider");
    if (osname.equals("Linux"))
        return createProvider("sun.nio.ch.EPollSelectorProvider");
    return new sun.nio.ch.PollSelectorProvider();
}

可以看到,同样是使用 NioEventLoop,但是 NIO 在 Selector 的不同实现上做区分,以达到的 IO 多路复用目的,其对应的内容如下表:

事件模型 选择器 事件容器
NioEventLoop 有 2 种 selector, KQueueSelectorImpl和PollSelectorImpl KQueueArrayWrapper,PollArrayWrapper,扽别实现对应的事件容器。
EpollEventLoop epollWait 本地方法 EpollEventArrays 用来接收 epoll 事件。
KQueueEventLoop keventWait 本地方法 KQueueEventArrays 用来接收 kqueue 事件。

主要是 JDK 的 NIO 默认是水平触发模式,Netty 不满足水平触发的 IO 效率,所以选择边缘触发模式重新实现了 EpollSocketChannel(笔者猜想)。其源码在 io.netty.channel.epoll.EpollEventLoop#openFileDescriptors

// It is important to use EPOLLET here as we only want to get the notification once per
// wakeup and don't call eventfd_read(...).
Native.epollCtlAdd(epollFd.intValue(), eventFd.intValue(), Native.EPOLLIN | Native.EPOLLET);

NioEventLoop 运行流程

主流的 EventLoop 运行流程主要包括三个部分:轮询就绪IO事件、执行异步任务、处理就绪IO事件,下面主要讨论轮询与处理 IO 事件,也就是 select 与 process 流程。

select 流程

Netty 的 selector 的流程,根据selector的具体实现(Mac的Kqueue,Linux的Epoll),将就绪事件放入selectorKeys中,这其中的具体实现是 SelectorImpl 会维护一个 selectedKeys,根据不同的操作系统,执行不同的doSelect,来维护这个selectedKeys,其中这个selectedKeys的Keys,其实就是一个个的channel,所以里面的channel,都是有事件吗可以读写的channel。获取 selectkeys 调用栈如下。

io.netty.channel.nio.NioEventLoop#select

sun.nio.ch.SelectorImpl#lockAndDoSelect

sun.nio.ch.SelectorImpl#select(long)

sun.nio.ch.KQueueSelectorImpl#doSelect

sun.nio.ch.KQueueArrayWrapper#poll

sun.nio.ch.KQueueArrayWrapper#kevent0

sun.nio.ch.KQueueSelectorImpl#updateSelectedKeys

其中 updateSelectedKeys 在 Linux 选择器的具体实现及其解释如下:

// 方法声明:processUpdateQueue 是一个私有方法,用于处理更新队列。
private void processUpdateQueue() {

    // 线程安全:使用 assert 语句确保当前线程持有该对象的锁,表示该方法是在一个安全的线程上下文中被调用的。
    assert Thread.holdsLock(this);

    // 同步块:对 updateLock 锁进行同步,以确保对更新队列的访问是线程安全的。
    synchronized (updateLock) {
        SelectionKeyImpl ski;
        // 循环处理更新队列:从 updateKeys 队列中取出第一个 SelectionKeyImpl 对象(ski),直到队列为空。
        while ((ski = updateKeys.pollFirst()) != null) {
            // 检查有效性:如果 ski 是有效的,获取其对应的文件描述符(fd)。
            if (ski.isValid()) {
                int fd = ski.getFDVal();
                // add to fdToKey if needed
                // 更新 fdToKey 映射:将 fd 和 ski 添加到 fdToKey 映射中(如果该文件描述符不存在),并通过断言确保之前没有其他 SelectionKeyImpl 与这个文件描述符关联。
                SelectionKeyImpl previous = fdToKey.putIfAbsent(fd, ski);
                assert (previous == null) || (previous == ski);
                
                // 获取新事件:调用 translateInterestOps() 方法获取新的兴趣操作(newEvents),并获取当前注册的事件(registeredEvents)。
                int newEvents = ski.translateInterestOps();
                int registeredEvents = ski.registeredEvents();
                if (newEvents != registeredEvents) {
                    // 检查事件是否有变化:如果新的兴趣操作与当前注册的事件不相同,进行进一步处理。
                    if (newEvents == 0) {
                        // remove from epoll
                        // 移除事件:如果新的兴趣操作为 0,则从 epoll 中删除该文件描述符。
                        EPoll.ctl(epfd, EPOLL_CTL_DEL, fd, 0);
                    } else {
                        // 添加或修改事件:如果当前注册的事件为 0,则将该文件描述符添加到 epoll 中。
                        //否则,调用 EPoll.ctl 修改已经注册的事件。
                        if (registeredEvents == 0) {
                            // add to epoll
                            EPoll.ctl(epfd, EPOLL_CTL_ADD, fd, newEvents);
                        } else {
                            // modify events
                            EPoll.ctl(epfd, EPOLL_CTL_MOD, fd, newEvents);
                        }
                    }
                    // 更新已注册的事件:将 ski 的注册事件更新为 newEvents,以保持 SelectionKeyImpl 的状态与 epoll 中的状态一致。
                    ski.registeredEvents(newEvents);
                }
            }
        }
    }
}

这段代码主要用于处理更新队列中的 SelectionKeyImpl 对象,确保它们的状态与 epoll 中注册的事件相匹配。

process 流程

在 Pocess 会处理在 Channel 读写,之后将这个Channel(Key),从selectedKeys里remove

// 处理读取内容调用栈如下

io.netty.channel.nio.NioEventLoop#processSelectedKeys

io.netty.channel.nio.NioEventLoop#processSelectedKey

io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe#read

io.netty.channel.ChannelPipeline#fireChannelRead

selectedKeys 优化

其中,著名的 Netty 对 selectedKeys 的优化,也发生在 processSelectedKeys 中,详细内容,你可以参考这篇文章:https://mp.weixin.qq.com/s/g69upk3juqsq6LbwmtitcQ,简单来说。就是 Java NIO 原生 Selector 用集合类型 HashSet (selectedKeys),来存放 IO 就绪事件(SelectionKey)。Netty 将用数组实现的 SelectedSelectionKeySet,替换掉了 selectedKeys 的 HashSet 实现。用来提升就绪事件集合的插入和遍历操作性能。源码见:io.netty.channel.nio.NioEventLoop#processSelectedKeys。

private void processSelectedKeys() {
    if (selectedKeys != null) {
        processSelectedKeysOptimized();
    } else {
        processSelectedKeysPlain(selector.selectedKeys());
    }
}.

epoll 空转优化

JDK NIO Epoll 的空轮询 BUG 存在,这样会导致 Reactor 线程 在没有任何事情可做的情况下被意外唤醒,导致 CPU 空转。其实 Netty 也没有从根本上解决这个 JDK BUG ,而是选择巧妙的绕过这个 BUG 。Netty 引入了计数变量 selectCnt,用于记录 select 操作的次数,如果事件轮询时间小于 timeoutMillis,并且在该时间周期内连续发生超过 SELECTOR_AUTO_REBUILD_THRESHOLD(默认512) 次空轮询,说明可能触发了 epoll 空轮询 Bug。Netty 通过重建新的 Selector 对象,将异常的 Selector 中所有的 SelectionKey 会重新注册到新建的 Selector,重建完成之后异常的 Selector 就可以废弃了。详细内容,你可以参考若地发表在拉钩教育的专栏---《Netty 核心原理剖析与 RPC 实践》

EpollEventLoop 运行流程

说完了 Netty 使用原生的 NioEventLoop 主要的的执行流程,接下来说说 Netty 后续推出的 EpollEventLoop 的执行流程。EpollEventLoop 与 NioEventLoop 最大的区别是在事件模型的使用方面,NioEventLoop 会根据不同的操作系统,使用不同的事件模型,而 EpollEventLoop 见名知义,当程序运行的操作系统不支持 epoll 时,程序启动会报错。

Caused by: java.lang.IllegalStateException: Only supported on Linux
        at io.netty.channel.epoll.Native.loadNativeLibrary(Native.java:177)
        at io.netty.channel.epoll.Native.<clinit>(Native.java:61)

C 使用 epoll

int main() {
    int epfd = epoll_create1(0);
    if (epfd == -1) {
        perror("epoll_create1");
        return -1;
    }

    // 假设有一个监听的 socket fd
    int listen_fd = ...;
    
    struct epoll_event ev, events[10];
    ev.events = EPOLLIN;
    ev.data.fd = listen_fd;

    // 将监听的文件描述符添加到 epoll 中
    if (epoll_ctl(epfd, EPOLL_CTL_ADD, listen_fd, &ev) == -1) {
        perror("epoll_ctl");
        return -1;
    }

    while (1) {
        int nfds = epoll_wait(epfd, events, 10, -1);
        if (nfds == -1) {
            perror("epoll_wait");
            break;
        }

        for (int i = 0; i < nfds; ++i) {
            if (events[i].data.fd == listen_fd) {
                // 处理新的连接
                int conn_fd = accept(listen_fd, NULL, NULL);
                ev.events = EPOLLIN | EPOLLET;
                ev.data.fd = conn_fd;
                epoll_ctl(epfd, EPOLL_CTL_ADD, conn_fd, &ev);
            } else {
                // 处理其他就绪事件,比如可读
                char buf[1024];
                int n = read(events[i].data.fd, buf, sizeof(buf));
                if (n <= 0) {
                    // 关闭连接
                    close(events[i].data.fd);
                } else {
                    // 处理读到的数据
                    write(STDOUT_FILENO, buf, n);
                }
            }
        }
    }
    close(epfd);
    return 0;
}

可以看到,主要有三个步骤,

  1. 创建一个 epoll: int epfd = epoll_create1(0);
  2. 将 socket 文件描述符 加入 epoll:epoll_ctl(epfd, EPOLL_CTL_ADD, listen_fd, &ev)
  3. 等待事件发生:int nfds = epoll_wait(epfd, events, 10, -1);

当有事件发生时,再处理这个事件

  1. 处理连接:int conn_fd = accept(listen_fd, NULL, NULL);
  2. 获取事件内容: int n = read(events[i].data.fd, buf, sizeof(buf));

很清晰的步骤,那么 Netty 是怎么按这些步骤来实现高效的 epoll 呢?

Java 使用 epoll

JNI 方法

熟悉 epoll 的知道,epoll 主要有三个函数 epoll_create、epoll_ctl、epoll_wait,Netty 在 Native 为这三个函数做了封装,由 Java 代码调用 C 语言代码。

以 epoll_wait 为例,由上至下的调用栈如下:

Java 代码

io.netty.channel.epoll.EpollEventLoop#epollWait

io.netty.channel.epoll.Native#epollWait

io.netty.channel.epoll.Native#epollWait0

C语言代码

netty_epoll_native.c#netty_epoll_native_epollWait0

netty_epoll_native.c#netty_epoll_native_epollWait

netty_epoll_native_epollWait 详细代码如下,最终会执行系统调用 epoll_wait

static jint netty_epoll_native_epollWait(JNIEnv* env, jclass clazz, jint efd, jlong address, jint len, jint timeout) {
    struct epoll_event *ev = (struct epoll_event*) (intptr_t) address;
    int result, err;

    do {
        result = epoll_wait(efd, ev, len, timeout);
        if (result >= 0) {
            return result;
        }
    } while((err = errno) == EINTR);
    return -err;
}
创建 epoll

当使用使用 EpollEventLoopGroup 事件组时,会创建一个子线程 EpollEventLoop,用来处理 IO 事件。

@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
   // ... 省略
    return new EpollEventLoop(this, executor, maxEvents,
            selectStrategyFactory.newSelectStrategy(),
            rejectedExecutionHandler, taskQueueFactory, tailTaskQueueFactory);
}

EpollEventLoop 类的构造方法会调 #openFileDescriptors,里面会执行 epoll 的创建,返回一个 epoll 文件描述符。

this.epollFd = epollFd = Native.newEpollCreate();
加入 epoll
this.eventFd = eventFd = Native.newEventFd();
Native.epollCtlAdd(epollFd.intValue(), eventFd.intValue(), Native.EPOLLIN | Native.EPOLLET);

在 C 语言中,eventfd() 是一个用于进程间或线程间通信的系统调用,通常用于事件通知。它会创建一个文件描述符,称为 eventfd,用来在多个线程或进程之间进行事件传递和通知。

这里创建了一个空的文件描述符加入epoll,并不是真实连接实例的文件描述符,而是后面 epoll_wait 函数调用返回的 EPOLLIN事件,在连接建立后,才会讲连接的文件描述符加入 epoll。

监听 epoll

Epoll 监听的流程

  1. 线程运行(io.netty.channel.epoll.EpollEventLoop#run):EpollEventLoop 线程执行的内容。
  2. 执行 epollWait(io.netty.channel.epoll.EpollEventLoop#epollWait):执行 epollWait 系统调用。
  3. 处理事件(io.netty.channel.epoll.EpollEventLoop#processReady):处理 epollWait 返回的就绪事件。

根据 epollWait 返回就绪事件,分别会处理连接建立与数据读取,它们的执行流程相似,都是在 epollInReady 中

建立连接

调用栈如下,主要是通过 channel 将连接内容传递给 ServerBootstrapAcceptor,再层层将文件描述符注册到 Epoll,其详细的调用栈如下:

io.netty.channel.epoll.AbstractEpollChannel.AbstractEpollUnsafe#epollInReady

io.netty.channel.ChannelPipeline#fireChannelRead

io.netty.bootstrap.ServerBootstrap.ServerBootstrapAcceptor#channelRead

io.netty.channel.EventLoopGroup#register(io.netty.channel.Channel)

io.netty.channel.SingleThreadEventLoop#register(io.netty.channel.Channel)

io.netty.channel.AbstractChannel.AbstractUnsafe#register

io.netty.channel.AbstractChannel.AbstractUnsafe#register0

io.netty.channel.epoll.AbstractEpollChannel#doRegister

io.netty.channel.epoll.EpollEventLoop#add

ServerBootstrapAcceptor 是 ServerSocketChannel 的一个 Handler,在接收到新的客户端连接时,将客户端 Channel 注册到 EventLoop 的关键辅助处理器,fireChannelRead(newChildChannel(...)) 触发事件并沿 pipeline 传播,最终由 ServerBootstrapAcceptor#channelRead 处理。其hannelRead 方法会在新的 Channel 被创建时调用,负责配置该 Channel 并注册到 EventLoop,使其准备好处理数据读取和写入操作。

数据读取

io.netty.channel.epoll.AbstractEpollStreamChannel.EpollStreamUnsafe#epollInReady

void epollInReady() {
    // 省略
    byteBuf = allocHandle.allocate(allocator);
    allocHandle.lastBytesRead(doReadBytes(byteBuf));
    if (allocHandle.lastBytesRead() <= 0) {
        // nothing was read, release the buffer.
        byteBuf.release();
        byteBuf = null;
        close = allocHandle.lastBytesRead() < 0;
        if (close) {
            // There is nothing left to read as we received an EOF.
            readPending = false;
        }
        break;
    }
    allocHandle.incMessagesRead(1);
    readPending = false;
    pipeline.fireChannelRead(byteBuf);
    // 省略
}  

与建立连接的 EPOLLIN 事件不同,读取数据的 EPOLLIN 事件是将数据放到 byteBuf 中,之后在传递给后续的 channelRead 处理。