原创

Netty源码分析-Future和Promise

全网最全 Java 知识点整合总目录入口猛戳-->www.gameboys.cn

Netty 全网最全示例源码地址猛戳-->https://github.com/Sniper2016/NettyStudy

1. Future/Promise 模式

1.1. ChannelFuture 的由来

由于 Netty 中的 Handler 处理都是异步 IO 操作,结果是未知的。

Netty 继承和扩展了 JDK Future 的 API,定义了自身的 Future 系列类型,实现异步操作结果的获取和监控。

其中,最为重要的是 ChannelFuture 。

```
public interface ChannelFuture extends Future<Void> {

	//...

	Channel channel();

	@Override

	ChannelFuture addListener(GenericFutureListener<? extends Future<? super Void>> listener);

	  @Override

	ChannelFuture sync() throws InterruptedException;

	//...

}

```

之所以命名为 ChannelFuture,表示跟 Channel 的操作有关。

ChannelFuture 用于获取 Channel 相关的操作结果,添加事件监听器,取消 IO 操作,同步等待。

1.2. 来自 Netty 的官方建议

  • java.util.concurrent.Future 是 Java 提供的接口,提供了对异步操作的简单干预。
  • Future 接口定义了 isDone()、isCancellable(),用来判断异步执行状态。Future 接口的 get 方法,可以用来获取结果。get 方法首先会判断任务是否执行完成,如果完成就返回结果,否则阻塞线程,直到任务完成。
  • Netty 官方文档直接说明——Netty 的网络操作都是异步的,Netty 源码上大量使用了 Future/Promise 模式。
  • 如果用户操作调用了 sync 或者 await 方法,会在对应的 future 对象上阻塞用户线程,例如future.channel().closeFuture().sync()
  • Netty 的 Future 接口,在继承了 java.util.concurrent.Future 的基础上,增加了一系列监听器方法,比如 addListener()、removeListener() 等等。Netty 强烈建议,通过添加监听器的方式获取 IO 结果,而不是通过 JDK Future 的同步等待的方式去获取 IO 结果。

1.3. Netty 的 Future 接口

Netty 扩展了 Java 的 Future,增加了监听器 Listener 接口,通过监听器可以让异步执行更加有效率,不需要通过 get 来等待异步执行结束,而是通过监听器回调来精确地控制异步执行结束的时间点。

这一点,正好是 Netty 在 Future 模式的最主要的改进。

public interface Future<V> extends java.util.concurrent.Future<V> {

    boolean isSuccess();

    boolean isCancellable();

    Throwable cause();

    Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);

    Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);

    Future<V> sync() throws InterruptedException;

    boolean awaitUninterruptibly(long timeout, TimeUnit unit);

    boolean awaitUninterruptibly(long timeoutMillis);

    V getNow();

    boolean cancel(boolean mayInterruptIfRunning);

}

1.4. ChannelFuture 使用的实例

Netty 的出站和入站操作,都是异步的。

以最为经典的 NIO 出站操作——write 出站为例,说一下 ChannelFuture 的使用。

代码如下:

ChannelFuture future= ctx.channel().write(msg);
   future.addListener(new GenericFutureListener<Future<? super Void>>() {
   @Override
   public void operationComplete(Future<? super Void> future) throws Exception {
    System.out.println("asdf");
   }
  });

在 write 操作调用后,Netty 并没有完成对 Java NIO 底层连接的写入操作,出站操作是异步执行的。 如果需要获取 IO 结果,可以使用回调的方式。

使用 ChannelFuture 的异步完成后的回调,需要搭配使用另外的一个接口 ChannelFutureListener ,他从父接口哪里继承了一个被回调到的 operationComplete 操作完成的方法。 ChannelFutureListener 的父亲接口是 GenericFutureListener 接口。

定义如下:

public interface GenericFutureListener<F extends Future<?>> extends EventListener {
    void operationComplete(F future) throws Exception;
}

异步操作完成后的回调代码,放在 operationComplete 方法中的实现中,就可以了。

ChannelFuture 的状态变迁图如下:

1.5. Netty 的 Promise 接口

Netty 的 Future,只是增加了监听器。整个异步的状态,是不能进行设置和修改的。换句话说,Future 是只读的,是不可以写的。于是,Netty 的 Promise 接口扩展了 Netty 的 Future 接口,它表示一种可写的 Future,就是可以设置异步执行的结果。

部分源码如下:

public interface Promise<V> extends Future<V> {

    Promise<V> setSuccess(V result);

    Promise<V> setFailure(Throwable cause);

    boolean setUncancellable();

   //....

}

在 IO 操作过程,如果顺利完成、或者发生异常,都可以设置 Promise 的结果,并且通知 Promise 的 Listener 们。 而 ChannelPromise 接口,则继承扩展了 Promise 和 ChannelFuture。所以,ChannelPromise 既绑定了 Channel,又具备了监听器的功能,还可以设置 IO 操作的结果,是 Netty 实际编程使用的最多的接口。 在 AbstratChannel 的代码中,相当多的 IO 操作,都会返回 ChannelPromise 类型实例作为调用的返回值。 通过这个返回值,客户程序可以用于读取 IO 操作的结果,执行 IO 操作真正完成后的回调。

1.6. ChannelPromise 的监控流程

在 AbstractChannel 中,定义了几个对 Channel 的异步状态进行监控的 Promise 和 Future 成员,用于监控 Channel 的连接是否成功,连接是否关闭。

源码如下:

public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {

        //连接成功的监控
        private final ChannelFuture succeededFuture = new SucceededChannelFuture(this, null);

        //连接关闭的监控
        private final CloseFuture closeFuture = new CloseFuture(this);

//...
}

对于每个 Channel 对象,都会有唯一的一个 CloseFuture 成员,用来表示关闭的异步干预。如果要监控 Channel 的关闭,或者同步等待 Channel 关闭。 一般情况下,在应用程序中使用如下的代码:

// Start the server.
ChannelFuture f = b.bind(PORT).sync();
// Wait until the server socket is closed.
f.channel().closeFuture().sync();

一般来说,编写以上代码的都是在 Main 线程中用来启动 ServerBootStrap 的,所以 Main 线程会被阻塞,保证服务端 Channel 的正常运行。

上面的代码中,channel.closeFuture()不做任何操作,只是简单的返回 channel 对象中的 closeFuture 对象。而 CloseFuture 的 sync 方法,会将当前线程阻塞在 CloseFuture 上。

那么,f.channel().closeFuture().sync() 实际是如何工作的呢?

1.7. CloseFuture 的 sync 同步方法

CloseFuture 继承了 DefaultPromise 的 sync 同步方法。

DefaultPromise 的代码如下:

public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V>{

     private volatile Object result;
      //...
    @Override
    public Promise<V>  sync() throws InterruptedException {

       await();
        //...
    }

   @Override
    public Promise<V> await() throws InterruptedException {
        //...
        synchronized (this) {
            while (!isDone()) {
                incWaiters();
               try {
                    wait();  //阻塞了,死等
                } finally {
                   decWaiters();
                }
            }
        }
        return this;
    }
//...

}

从源码可以看出,sync 方法,调用了 await 方法。 在 await 方法中,CloseFuture 使用 java 基础的 synchronized 方法进行线程同步;并且,使用 CloseFuture.wait / notify 这组来自 Object 根类中的古老方法进行线程之间的等待和唤醒。 在 await 方法,不断的自旋,判断当前的 CloseFuture 实例的结果是否已经完成,如果没有完成 !isDone() ,就不断的等待。一直到 isDone() 的值为 true。

isDone() 的源码如下:

@Override
public boolean isDone() {
        return isDone0(result);
}

private static boolean isDone0(Object result) {
        return result != null && result != UNCANCELLABLE;
}

CloseFuture 的 isDone() 的条件是否能够满足,和 Channel 的 close 关闭连接的出站操作有关。 下一步,我们来看 isDone() 的条件,如何才能够满足?

1.8. close 出站处理流程

在 Netty 中,close 关闭连接的操作,属于所有的出站操作的一种。关于 Netty 出站处理的流程,在前面的文字中,已经非常详细的介绍了。这里不再赘述,只是简单的列出一个流程图。

close 关闭连接的出站操作,其流程如下图所示:

一溜儿下来,最终会落地到 unsafe.doClose 方法。

看看 unsafe.doClose,是如何与 CloseFuture 的 isDone() 的条件进行关联的。

1.9. unsafe.doClose

unsafe.doClose 方法中,设置了 CloseFuture 的 result 值。 unsafe.doClose 源码如下:

protected abstract class AbstractUnsafe implements Unsafe

{

   private void close(final ChannelPromise promise,…) {
        //…
        try {
            // Close the channel
            doClose0(promise);
        } finally {
            // Fail all the queued messages.
            outboundBuffer.failFlushed(cause, notify);
            outboundBuffer.close(closeCause);
        }
         //……
    }
    }
   private void doClose0(ChannelPromise promise) {
    try {
        doClose();
        closeFuture.setClosed();
        safeSetSuccess(promise);
    } catch (Throwable t) {
        closeFuture.setClosed();
        safeSetFailure(promise, t);
    }
  }

  //……

}

1.10. closeFuture.setClosed()

在 closeFuture.setClosed() 设置关闭的结果的过程中,主要完成以下三个工作:

1 设置 result 的值 2 notifyAll,唤醒在本 Promise 上等待的线程 3 回调 listener closeFuture.setClosed()的主要源码如下:


boolean setClosed() {
     return super.trySuccess();
}

@Override

//这个定义在父类中
public boolean trySuccess(V result) {
    if (setSuccess0(result)) {
         notifyListeners();
         return true;
    }
    return false;
}

private boolean setSuccess0(V result) {
       return setValue0(result == null ? SUCCESS : result);
}

上面的 notifyListeners()调用,就是用来唤醒了等待在 closeFuture 实例对象上的等待线程。

之前通过 f.channel().closeFuture().sync() 同步操作,阻塞在哪儿的 Main 线程,终于通过 channel.close() 方法,给唤醒了。

1.11. 警惕死锁:Reactor 线程不能 sync

在上面的源码中,最终触发 future 对象的 notify 动作的线程,都是 eventLoop 线程(Reactor 线程)。 一般情况下,Channel 的出站和入站操作,也都是在 eventLoop 线程的轮询任务中完成的。

例如因为不论是用户直接关闭 channel,或者 eventLoop 的轮询状态关闭 channel,都会在 eventLoop 的线程内完成 notify 动作。notify 那些通过 sync 操作,正在等待 CloseFuture 的哪些阻塞线程。

所以不要在 Reactor 线程内调用 future 对象的 sync 或者 await 方法。如果在 Reactor 线程进行 sync 或者 await,会有可能引起死锁。

为什么呢?

在 Reactor 线程进行 sync 时,会进入等待状态,等待 Future(DefaultPromise)的 isDone 的条件满足。通过前面的例子,我们已经看到了,而 Future 的 isDone 的条件,又需要 Reactor 线程的出站或者入站操作来满足。这是,Reactor 线程既然已经处于等待状态,怎么可能再进行其他的出站或者入站操作呢?相当于自己等自己,这就是典型的死锁。

在实际开发中,由于应用程序代码都是编写在自定义的 channelHandler 处理器中,而 channelHandler 是在 eventLoop 线程(Reactor 线程)内执行的。所以,不能在 channelHandler 中调用 Future(DefaultPromise)的 sync 或者 await 两个同步方法。

正确的做法是:通过给 Future(DefaultPromise) 增加 listeners 监听器 的方式,来干预异步操作的过程,处理异步操作的结果。这样,可以避免使用 Future 带来的死锁。

2.总结

  • 由于 Netty 中的 Handler 处理都是异步 IO 操作,结果是未知的,Netty 继承和扩展了 JDK Future 的 API,定义了自身的 Future 系列类型,实现异步操作结果的获取和监控。Netty 强烈建议,通过添加监听器的方式获取 IO 结果,而不是通过 JDK Future 的同步等待的方式去获取 IO 结果。
  • Netty 的 Future,只是增加了监听器。整个异步的状态,是不能进行设置和修改的。换句话说,Future 是只读的,是不可以写的。于是,Netty 的 Promise 接口扩展了 Netty 的 Future 接口,它表示一种可写的 Future,就是可以设置异步执行的结果。
  • 不能在 channelHandler 中调用 Future(DefaultPromise)的 sync 或者 await 两个同步方法,这样会造成死锁。
正文到此结束