原创

Reactor模式详解

全网最全 Java 知识点整合总目录入口猛戳-->www.gameboys.cn

Netty 全网最全示例源码地址猛戳-->https://github.com/Sniper2016/NettyStudy

什么是 Reactor 模型

Reactor 模式称之为响应器模式,通常用于 NIO 非阻塞 IO 的网络通信框架中。在笔者看来,reactor 模型关心的是线程,是针对非阻塞 IO 的,通过各种手段来增加线程,尽可能的增加程序并发能力。

Reactor 模型的 4 个发展流程

本文的例子来源于《Scalable IO in Java》,文档下载地址:http://gee.cs.oswego.edu/dl/cpjslides/nio.pdf

看下面文档之前,建议读者去温习一遍IO的4种模型,重点理解IO多路复用模型的原理,熟悉JavaNIO多路复用的代码模板。

第一阶段:Classic Service Designs

Classic Service Designs
Classic Service Designs

参考代码


class Server implements Runnable {
    public void run() {
        try {
            ServerSocket ss = new ServerSocket(PORT);
            while (!Thread.interrupted())
            new Thread(new Handler(ss.accept())).start(); //创建新线程来handle
            // or, single-threaded, or a thread pool
        } catch (IOException ex) { /* ... */ }
    }

    static class Handler implements Runnable {
        final Socket socket;
        Handler(Socket s) { socket = s; }
        public void run() {
            try {
                byte[] input = new byte[MAX_INPUT];
                socket.getInputStream().read(input);
                byte[] output = process(input);
                socket.getOutputStream().write(output);
            } catch (IOException ex) { /* ... */ }
        }
        private byte[] process(byte[] cmd) { /* ... */ }
    }
}

该阶段 IO 模型为阻塞 IO,对于每一个请求都分发给一个线程,每个线程中都独自处理上面的流程。

这种模型由于 IO 在阻塞时会一直等待,因此在用户负载增加时,性能下降的非常快。

改进:IO 模型升级为多路复用,采用基于事件驱动的设计,当有事件触发时,才会调用处理器进行数据处理。这种模式为非阻塞 IO 模型,事件驱动

第二阶段:Basic Reactor Design

Basic Reactor Design
Basic Reactor Design

参考代码

class Reactor implements Runnable {
    final Selector selector;
    final ServerSocketChannel serverSocket;
    Reactor(int port) throws IOException { //Reactor初始化
        selector = Selector.open();
        serverSocket = ServerSocketChannel.open();
        serverSocket.socket().bind(new InetSocketAddress(port));
        serverSocket.configureBlocking(false); //非阻塞
        SelectionKey sk = serverSocket.register(selector, SelectionKey.OP_ACCEPT); //分步处理,第一步,接收accept事件
        sk.attach(new Acceptor()); //attach callback object, Acceptor
    }

    public void run() {
        try {
            while (!Thread.interrupted()) {
                selector.select();
                Set selected = selector.selectedKeys();
                Iterator it = selected.iterator();
                while (it.hasNext())
                    dispatch((SelectionKey)(it.next()); //Reactor负责dispatch收到的事件
                selected.clear();
            }
        } catch (IOException ex) { /* ... */ }
    }

    void dispatch(SelectionKey k) {
        Runnable r = (Runnable)(k.attachment()); //调用之前注册的callback对象
        if (r != null)
            r.run();
    }

    class Acceptor implements Runnable { // inner
        public void run() {
            try {
                SocketChannel c = serverSocket.accept();
                if (c != null)
                new Handler(selector, c);
            }
            catch(IOException ex) { /* ... */ }
        }
    }
}

final class Handler implements Runnable {
    final SocketChannel socket;
    final SelectionKey sk;
    ByteBuffer input = ByteBuffer.allocate(MAXIN);
    ByteBuffer output = ByteBuffer.allocate(MAXOUT);
    static final int READING = 0, SENDING = 1;
    int state = READING;

    Handler(Selector sel, SocketChannel c) throws IOException {
        socket = c; c.configureBlocking(false);
        // Optionally try first read now
        sk = socket.register(sel, 0);
        sk.attach(this); //将Handler作为callback对象
        sk.interestOps(SelectionKey.OP_READ); //第二步,接收Read事件
        sel.wakeup();
    }
    boolean inputIsComplete() { /* ... */ }
    boolean outputIsComplete() { /* ... */ }
    void process() { /* ... */ }

    public void run() {
        try {
            if (state == READING) read();
            else if (state == SENDING) send();
        } catch (IOException ex) { /* ... */ }
    }

    void read() throws IOException {
        socket.read(input);
        if (inputIsComplete()) {
            process();
            state = SENDING;
            // Normally also do first write now
            sk.interestOps(SelectionKey.OP_WRITE); //第三步,接收write事件
        }
    }
    void send() throws IOException {
        socket.write(output);
        if (outputIsComplete()) sk.cancel(); //write完就结束了, 关闭select key
    }
}

//上面 的实现用Handler来同时处理Read和Write事件, 所以里面出现状态判断
//我们可以用State-Object pattern来更优雅的实现
class Handler { // ...
    public void run() { // initial state is reader
        socket.read(input);
        if (inputIsComplete()) {
            process();
            sk.attach(new Sender());  //状态迁移, Read后变成write, 用Sender作为新的callback对象
            sk.interest(SelectionKey.OP_WRITE);
            sk.selector().wakeup();
        }
    }
    class Sender implements Runnable {
        public void run(){ // ...
            socket.write(output);
            if (outputIsComplete()) sk.cancel();
        }
    }
}

这里用到了 Reactor 模式。 关于 Reactor 模式的一些概念: Reactor:负责响应 IO 事件,当检测到一个新的事件,将其发送给相应的 Handler 去处理。 Handler:负责处理非阻塞的行为,标识系统管理的资源;同时将 handler 与事件绑定。 Reactor 为单个线程,需要处理 accept 连接,同时发送请求到处理器中。

由于只有单个线程,所以处理器中的业务需要能够快速处理完。 改进:使用多线程处理业务逻辑。

第三阶段 Worker Thread Pools

Worker Thread Pools
Worker Thread Pools

参考代码

class Server implements Runnable {
    public void run() {
        try {
            ServerSocket ss = new ServerSocket(PORT);
            while (!Thread.interrupted())
            new Thread(new Handler(ss.accept())).start(); //创建新线程来handle
            // or, single-threaded, or a thread pool
        } catch (IOException ex) { /* ... */ }
    }

    static class Handler implements Runnable {
        final Socket socket;
        Handler(Socket s) { socket = s; }
        public void run() {
            try {
                byte[] input = new byte[MAX_INPUT];
                socket.getInputStream().read(input);
                byte[] output = process(input);
                socket.getOutputStream().write(output);
            } catch (IOException ex) { /* ... */ }
        }
        private byte[] process(byte[] cmd) { /* ... */ }
    }
}

将处理器的执行放入线程池,多线程进行业务处理。但 Reactor 仍为单个线程。

继续改进:对于多个 CPU 的机器,为充分利用系统资源,将 Reactor 拆分为两部分。

第四阶段 Using Multiple Reactors

Using Multiple Reactors
Using Multiple Reactors

参考代码

Selector[] selectors; //subReactors集合, 一个selector代表一个subReactor
int next = 0;
class Acceptor { // ...
    public synchronized void run() { ...
        Socket connection = serverSocket.accept(); //主selector负责accept
        if (connection != null)
            new Handler(selectors[next], connection); //选个subReactor去负责接收到的connection
        if (++next == selectors.length) next = 0;
    }
}

mainReactor 负责监听连接,accept 连接给 subReactor 处理,为什么要单独分一个 Reactor 来处理监听呢?因为像 TCP 这样需要经过 3 次握手才能建立连接,这个建立连接的过程也是要耗时间和资源的,单独分一个 Reactor 来处理,可以提高性能。

正文到此结束