原创

Netty源码分析-EventLoop和EventLoopGroup

全网最全 Java 知识点整合总目录入口猛戳-->www.gameboys.cn

Netty 全网最全示例源码地址猛戳-->https://github.com/Sniper2016/NettyStudy

1.Netty 的线程模型

Netty线程模型十分难懂,需要很多知识储备,笔者也是理解了好几周,期间又去温习了很多知识点。

Netty 的线程模型
Netty 的线程模型

服务端启动的时候,创建了两个 NioEventLoopGroup,它们实际是两个独立的 Reactor 线程池。一个用于接收客户端的 TCP 连接,另一个用于处理 I/O 相关的读写操作,或者执行系统 Task、定时任务 Task 等。

1.1Netty 用于接收客户端请求的线程池职责如下。

  • 接收客户端 TCP 连接,初始化 Channel 参数;
  • 将链路状态变更事件通知给 ChannelPipeline。

1.2Netty 处理 I/O 操作的 Reactor 线程池职责如下。

  • 异步读取通信对端的数据报,发送读事件到 ChannelPipeline;
  • 异步发送消息到通信对端,调用 ChannelPipeline 的消息发送接口;
  • 执行系统调用 Task;
  • 执行定时任务 Task,例如链路空闲状态监测定时任务。

1.3Netty线程模型和Reactor模型对比

单单只看Netty线程模型会比较抽象,笔者拿Reactor的主从模式做对比,注意Netty的服务端,Boss线程每个端口只能有一个线程,因为一个SocketChannel的SelectionKey.OP_READ只能注册到一个selector,所以即使你boss线程设置很多个,但是在bind的时候只会使用一个线程,所以,Netty框架启动最少需要两个线程,boss和work,如果你硬是想一个线程处理所有逻辑,也可以,boss和work同一个EventLoop就满足你的贱贱的需求!!!

2.NioEventLoop 源码分析

2.1 必备知识

  • Java-NIO 客户端和服务端示例

请看博客

  • Java-NIO 底层原理和 4 种 IO 模型详解

请看博客

  • Reactor 模式详解

请看博客

2.2 源码分析

有两篇博客写得很好,地址为:

https://www.cnblogs.com/crazymakercircle/p/9847501.html

https://www.jianshu.com/p/f94f7005c2cd

笔者总结一下,就三步:注册、轮询、分发

  • 注册:将 channel 通道的就绪事件,注册到选择器 Selector。一般来说,一个 Reactor 对应一个选择器 Selector,一个 Reactor 拥有一个 Selector 成员属性。
  • 轮询:轮询的代码,是 Reactor 重要的一个组成部分,或者说核心的部分。轮询选择器是否有就绪事件。
  • 分发:将就绪事件,分发到事件附件的处理器 handler 中,由 handler 完成实际的处理。

2.2.1 完整的注册调用流程如下:

2.2.2 轮询

函数 AbstractChannel.AbstractUnsafe.register 中,有一个 eventLoop.execute()方法调用,这个调用,就是启动 EvnentLoop 的本地线程的的入口。

@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
if (eventLoop == null) {
throw new NullPointerException("eventLoop");
}
if (isRegistered()) {
        promise.setFailure(new IllegalStateException("registered to an event loop already"));
        return;
}
if (!isCompatible(eventLoop)) {
        promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
        return;
}
    AbstractChannel.this.eventLoop = eventLoop;
    if (eventLoop.inEventLoop()) {
        register0(promise);
else {
try {
            eventLoop.execute(new Runnable() {
@Override
public void run() {
                    register0(promise);
}
            });
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
    }
}

在 execute 的方法中,去调用 startThread(),启动线程。

@Override

public void execute(Runnable task) {

    if (task == null) {

        throw new NullPointerException("task");

    }

    boolean inEventLoop = inEventLoop();

    if (inEventLoop) {

        addTask(task);

    } else {

// 调用 startThread 方法, 启动EventLoop 线程.

        startThread();

        addTask(task);

        if (isShutdown() && removeTask(task)) {

            reject();

        }

    }

    if (!addTaskWakesUp && wakesUpForTask(task)) {

        wakeup(inEventLoop);

    }

}

 SingleThreadEventExecutor.startThread() 方法中了:

private void startThread() {

    if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {

        if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {

            thread.start();

        }

    }

}

事件的轮询,在 NioEventLoop.run() 方法, (下面版本为 4.0X,4.1X 源码不一样了),其源码如下:

@Override

protected void run() {

    for (;;) {

        boolean oldWakenUp = wakenUp.getAndSet(false);

        try {

//第一步,查询 IO 就绪

            if (hasTasks()) {

                selectNow();

            } else {

                select(oldWakenUp);

                if (wakenUp.get()) {

                    selector.wakeup();

                }

            }

//第二步,处理这些 IO 就绪

            cancelledKeys = 0;

            needsToSelectAgain = false;

            final int ioRatio = this.ioRatio;

            if (ioRatio == 100) {

 processSelectedKeys();

                runAllTasks();

            } else {

                final long ioStartTime = System.nanoTime();

  processSelectedKeys();

                final long ioTime = System.nanoTime() - ioStartTime;

                runAllTasks(ioTime * (100 - ioRatio) / ioRatio);

            }

            if (isShuttingDown()) {

                closeAll();

                if (confirmShutdown()) {

                    break;

                }

            }

        } catch (Throwable t) {

            ...

        }

    }

}




完成第二步 IO 就绪事件处理的调用是 processSelectedKeys() ,这个调用非常关键。

这个方法是查询就绪的 IO 事件, 然后处理它;第二个调用是 runAllTasks(), 这个方法的功能就是运行 taskQueue 中的任务。


void selectNow() throws IOException {

    try {

        selector.selectNow();

    } finally {

        // restore wakup state if needed

        if (wakenUp.get()) {

            selector.wakeup();

        }

    }

}

首先调用了selector.selectNow()方法,这个 selector 属性正是 Java NIO 中的多路复用器 Selector。selector.selectNow() 方法会检查当前是否有就绪的 IO 事件。如果有, 则返回就绪 IO 事件的个数;如果没有, 则返回 0。

注意,selectNow() 是立即返回的,不会阻塞当前线程。 当 selectNow() 调用后, finally 语句块中会检查 wakenUp 变量是否为 true,当为 true 时, 调用 selector.wakeup() 唤醒 select() 的阻塞调用。

2.2.3 分发

AbstractNioByteChannel 中,可以找到 unsafe.read( ) 调用的实现代码。 unsafe.read( )负责的是 Channel 的底层数据的 IO 读取,并且将读取的结果,dispatch(分派)给最终的 Handler。

AbstractNioByteChannel.NioByteUnsafe.read()的关键源码节选如下:

@Override

public final void read() {

    ...

    ByteBuf byteBuf = null;

    int messages = 0;

    boolean close = false;

    try {

        int totalReadAmount = 0;

        boolean readPendingReset = false;

        do {

             // 读取结果.

            byteBuf = allocHandle.allocate(allocator);

            int writable = byteBuf.writableBytes();

            int localReadAmount = doReadBytes(byteBuf);

             ...

 // dispatch结果到Handler

            pipeline.fireChannelRead(byteBuf);

            byteBuf = null;

            ...

            totalReadAmount += localReadAmount;

            ...

    }

}
网上找的一个EventLoop执行流程
网上找的一个EventLoop执行流程

3.netty 如何保证异步串行无锁化?

为了尽可能地提升性能,Netty 在很多地方进行了无锁化的设计,例如在 I/O 线程内部进行串行操作,避免多线程竞争导致的性能下降问题。表面上看,串行化设计似乎 CPU 利用率不高,并发程度不够。但是,通过调整 NIO 线程池的线程参数,可以同时启动多个串行化的线程并行运行,这种局部无锁化的串行线程设计相比一个队列—多个工作线程的模型性能更优。 Netty 的 NioEventLoop 读取到消息之后,直接调用 ChannelPipeline 的 fireChannelRead (Object msg)。只要用户不主动切换线程,一直都是由 NioEventLoop 调用用户的 Handler,期间不进行线程切换。这种串行化处理方式避免了多线程操作导致的锁的竞争,从性能角度看是最优的。

例子:

AbstractChannelHandlerContext.invokeChannelRead(final AbstractChannelHandlerContext next, Object msg)调用,


    static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
        final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeChannelRead(m);
        } else {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    next.invokeChannelRead(m);
                }
            });
        }
    }

看 Netty 源码你会在很多地方看到 executor.inEventLoop()这种判断,如果当前线程不是Channel关联的线程,会将执行逻辑封装成一个任务放到任务队列,等待执行

4.netty 如何解决 jdk 空轮询的 bug?

出现问题原因:selector 在没有结果的情况下,依然被唤醒,导致一直空轮询,cpu100%

netty 是通过记录每次 select 的次数,如果空 select512 次,则重置 selector,调用 rebuildSelector();

 @Override
    protected void run() {
        int selectCnt = 0;
        for (;;) {
            try {
                int strategy;
                try {
                    strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
                    switch (strategy) {
                    case SelectStrategy.CONTINUE:
                        continue;

                    case SelectStrategy.BUSY_WAIT:
                        // fall-through to SELECT since the busy-wait is not supported with NIO

                    case SelectStrategy.SELECT:
                        long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
                        if (curDeadlineNanos == -1L) {
                            curDeadlineNanos = NONE; // nothing on the calendar
                        }
                        nextWakeupNanos.set(curDeadlineNanos);
                        try {
                            if (!hasTasks()) {
                                strategy = select(curDeadlineNanos);
                            }
                        } finally {
                            // This update is just to help block unnecessary selector wakeups
                            // so use of lazySet is ok (no race condition)
                            nextWakeupNanos.lazySet(AWAKE);
                        }
                        // fall through
                    default:
                    }
                } catch (IOException e) {
                    // If we receive an IOException here its because the Selector is messed up. Let's rebuild
                    // the selector and retry. https://github.com/netty/netty/issues/8566
                    rebuildSelector0();
                    selectCnt = 0;
                    handleLoopException(e);
                    continue;
                }

                selectCnt++;
                cancelledKeys = 0;
                needsToSelectAgain = false;
                final int ioRatio = this.ioRatio;
                boolean ranTasks;
                if (ioRatio == 100) {
                    try {
                        if (strategy > 0) {
                            processSelectedKeys();
                        }
                    } finally {
                        // Ensure we always run tasks.
                        ranTasks = runAllTasks();
                    }
                } else if (strategy > 0) {
                    final long ioStartTime = System.nanoTime();
                    try {
                        processSelectedKeys();
                    } finally {
                        // Ensure we always run tasks.
                        final long ioTime = System.nanoTime() - ioStartTime;
                        ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                    }
                } else {
                    ranTasks = runAllTasks(0); // This will run the minimum number of tasks
                }

                if (ranTasks || strategy > 0) {
                    if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {
                        logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
                                selectCnt - 1, selector);
                    }
                    selectCnt = 0;
                } else if (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case)
                    selectCnt = 0;
                }
            } catch (CancelledKeyException e) {
                // Harmless exception - log anyway
                if (logger.isDebugEnabled()) {
                    logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
                            selector, e);
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
            // Always handle shutdown even if the loop processing threw an exception.
            try {
                if (isShuttingDown()) {
                    closeAll();
                    if (confirmShutdown()) {
                        return;
                    }
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
        }
    }

    private boolean unexpectedSelectorWakeup(int selectCnt) {
        if (Thread.interrupted()) {
            // Thread was interrupted so reset selected keys and break so we not run into a busy loop.
            // As this is most likely a bug in the handler of the user or it's client library we will
            // also log it.
            //
            // See https://github.com/netty/netty/issues/2426
            if (logger.isDebugEnabled()) {
                logger.debug("Selector.select() returned prematurely because " +
                        "Thread.currentThread().interrupt() was called. Use " +
                        "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
            }
            return true;
        }
        if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
                selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
            // The selector returned prematurely many times in a row.
            // Rebuild the selector to work around the problem.
            logger.warn("Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",
                    selectCnt, selector);
            rebuildSelector();
            return true;
        }
        return false;
    }


5.Boss线程NioEventLoop收到客户端SelectionKey.OP_ACCEPT连接请求怎么开启的work线程?

下面示例是客户端请求连接,服务端收到请求之后新建一个NioSocketChannel,初始化Channel,分配EventLoop给指定的Channel,然后Start线程,下图为笔者断点的一个堆栈信息,从boss线程的run方法开始直到work线程开启。

 NioEventLoop(SingleThreadEventExecutor).execute(Runnable, boolean) line: 830 
 NioEventLoop(SingleThreadEventExecutor).execute(Runnable) line: 818 
 NioSocketChannel$NioSocketChannelUnsafe(AbstractChannel$AbstractUnsafe).register(EventLoop, ChannelPromise) line: 471 
 NioEventLoop(SingleThreadEventLoop).register(ChannelPromise) line: 87 
 NioEventLoop(SingleThreadEventLoop).register(Channel) line: 81 
 NioEventLoopGroup(MultithreadEventLoopGroup).register(Channel) line: 86 
 ServerBootstrap$ServerBootstrapAcceptor.channelRead(ChannelHandlerContext, Object) line: 218 
 DefaultChannelHandlerContext(AbstractChannelHandlerContext).invokeChannelRead(Object) line: 379 
 AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext, Object) line: 365 
 DefaultChannelPipeline$HeadContext(AbstractChannelHandlerContext).fireChannelRead(Object) line: 357 
 DefaultChannelPipeline$HeadContext.channelRead(ChannelHandlerContext, Object) line: 1410 
 DefaultChannelPipeline$HeadContext(AbstractChannelHandlerContext).invokeChannelRead(Object) line: 379 
 AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext, Object) line: 365 
 DefaultChannelPipeline.fireChannelRead(Object) line: 919 
 AbstractNioMessageChannel$NioMessageUnsafe.read() line: 93 
 NioEventLoop.processSelectedKey(SelectionKey, AbstractNioChannel) line: 714 
 NioEventLoop.processSelectedKeysOptimized() line: 650 
 NioEventLoop.processSelectedKeys() line: 576 
 NioEventLoop.run() line: 493 
 SingleThreadEventExecutor$4.run() line: 989 
 ThreadExecutorMap$2.run() line: 74 
 FastThreadLocalRunnable.run() line: 30 
 FastThreadLocalThread(Thread).run() line: 619 
 

6.Netty 多线程编程的最佳实践如下:

  • 服务端创建两个 EventLoopGroup,用于逻辑隔离 NIO acceptor 和 NIO IO 线程;
  • 尽量避免在用户 Handler 里面启动用户线程(解码后将 POJO 消息发送到后端业务线程除外);
  • 解码要在 NIO 线程调用的解码 Handler 中进行,不要切换到用户线程中完成消息的解码;
  • 如果业务逻辑比较简单,没有复杂的业务逻辑计算,没有可能阻塞线程的操作如磁盘操作、数据库操作、网络操作等,可以直接在 NIO 线程中进行业务逻辑操作,不用切换到用户线程;
  • 如果业务逻辑比较复杂,不要在 NIO 线程上操作,应将解码后的 POJO 封装成 Task 提交到业务线程池中执行,以保证 NIO 线程被尽快释放,处理其他 IO 操作;
正文到此结束