原创

使用strace追踪Epoll系统调用

1.java使用epoll系统调用的NIO示例


package cn.gameboys.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;

/**
 * NIO服务端
 * 
 * @author www.gameboys.cn
 */
public class NIOServer {
 // 通道管理器
 private Selector selector;

 /**
  * 获得一个ServerSocket通道,并对该通道做一些初始化的工作
  * 
  * @param port
  *            绑定的端口号
  * @throws IOException
  */
 public void initServer(int port) throws IOException {
  // 获得一个ServerSocket通道
  ServerSocketChannel serverChannel = ServerSocketChannel.open();
  // 设置通道为非阻塞
  serverChannel.configureBlocking(false);
  // 将该通道对应的ServerSocket绑定到port端口
  serverChannel.socket().bind(new InetSocketAddress(port));
  // 获得一个通道管理器
  this.selector = Selector.open();
  // 将通道管理器和该通道绑定,并为该通道注册SelectionKey.OP_ACCEPT事件,注册该事件后,
  // 当该事件到达时,selector.select()会返回,如果该事件没到达selector.select()会一直阻塞。
  serverChannel.register(selector, SelectionKey.OP_ACCEPT);
 }

 /**
  * 采用轮询的方式监听selector上是否有需要处理的事件,如果有,则进行处理
  * 
  * @throws IOException
  */
 @SuppressWarnings("unchecked")
 public void listen() throws IOException {
  System.out.println("服务端启动成功!");
  // 轮询访问selector
  while (true) {
   // 当注册的事件到达时,方法返回;否则,该方法会一直阻塞
   selector.select();
   // 获得selector中选中的项的迭代器,选中的项为注册的事件
   Iterator ite = this.selector.selectedKeys().iterator();
   while (ite.hasNext()) {
    SelectionKey key = (SelectionKey) ite.next();
    // 删除已选的key,以防重复处理
    ite.remove();
    // 客户端请求连接事件
    if (key.isAcceptable()) {
     ServerSocketChannel server = (ServerSocketChannel) key.channel();
     // 获得和客户端连接的通道
     SocketChannel channel = server.accept();
     // 设置成非阻塞
     channel.configureBlocking(false);

     // 在这里可以给客户端发送信息哦
     channel.write(ByteBuffer.wrap(new String("向客户端发送了一条信息").getBytes()));
     // 在和客户端连接成功之后,为了可以接收到客户端的信息,需要给通道设置读的权限。
     channel.register(this.selector, SelectionKey.OP_READ);

     // 获得了可读的事件
    } else if (key.isReadable()) {
     read(key);
    }

   }

  }
 }

 /**
  * 处理读取客户端发来的信息 的事件
  * 
  * @param key
  * @throws IOException
  */
 public void read(SelectionKey key) throws IOException {
  // 服务器可读取消息:得到事件发生的Socket通道
  SocketChannel channel = (SocketChannel) key.channel();
  // 创建读取的缓冲区
  ByteBuffer buffer = ByteBuffer.allocate(1024);
  channel.read(buffer);
  byte[] data = buffer.array();
  String msg = new String(data).trim();
  System.out.println("服务端收到信息:" + msg);
  ByteBuffer outBuffer = ByteBuffer.wrap(msg.getBytes());
  channel.write(outBuffer);// 将消息回送给客户端
 }

 /**
  * 启动服务端测试
  * 
  * @throws IOException
  */
 public static void main(String[] args) throws IOException {
  NIOServer server = new NIOServer();
  server.initServer(8000);
  server.listen();
 }

}


2.一个epoll服务器流程:

new Socket
bind
listen
epoll_create
epoll_ctrl
while(true){
 epoll_wait
 accept
 epoll_ctrl
}

3.使用strace命令追踪Redis-Server的系统调用

3.1 窗口A开启跟踪命令

strace -ff -o out redis-server

3.2窗口B使用redis客户端命令链接上来

redis-cli

3.3过滤epoll系统调用

cat out.11074|grep "epoll" >a

3.4查看系统调用流程

vim a

epoll_create(1024)                      = 5
epoll_ctl(5, EPOLL_CTL_ADD, 6, {EPOLLIN, {u32=6, u64=6}}) = 0
epoll_ctl(5, EPOLL_CTL_ADD, 7, {EPOLLIN, {u32=7, u64=7}}) = 0
epoll_ctl(5, EPOLL_CTL_ADD, 3, {EPOLLIN, {u32=3, u64=3}}) = 0
epoll_wait(5, [], 10128, 0)             = 0
epoll_wait(5, [], 10128, 100)           = 0
epoll_wait(5, [], 10128, 100)           = 0
epoll_wait(5, [], 10128, 100)           = 0
epoll_wait(5, [], 10128, 100)           = 0
epoll_wait(5, [], 10128, 100)           = 0
epoll_wait(5, [], 10128, 100)           = 0
epoll_wait(5, [], 10128, 100)           = 0
epoll_wait(5, [], 10128, 100)           = 0
epoll_wait(5, [], 10128, 100)           = 0
epoll_wait(5, [], 10128, 100)           = 0
epoll_wait(5, [], 10128, 99)            = 0
epoll_wait(5, [], 10128, 100)           = 0
epoll_wait(5, [], 10128, 100)           = 0
epoll_wait(5, [], 10128, 100)           = 0
epoll_wait(5, [], 10128, 100)           = 0
epoll_wait(5, [{EPOLLIN, {u32=7, u64=7}}], 10128, 100) = 1
epoll_ctl(5, EPOLL_CTL_ADD, 8, {EPOLLIN, {u32=8, u64=8}}) = 0
epoll_wait(5, [{EPOLLIN, {u32=8, u64=8}}], 10128, 89) = 1
epoll_wait(5, [], 10128, 72)            = 0
epoll_wait(5, [], 10128, 100)           = 0
epoll_wait(5, [], 10128, 100)           = 0
epoll_wait(5, [], 10128, 100)           = 0
epoll_wait(5, [], 10128, 100)           = 0
epoll_wait(5, [], 10128, 100)           = 0
epoll_wait(5, [], 10128, 99)            = 0
epoll_wait(5, [], 10128, 100)           = 0
epoll_wait(5, [], 10128, 100)           = 0
epoll_wait(5, [], 10128, 100)           = 0
epoll_wait(5, [], 10128, 99)            = 0
epoll_wait(5, [], 10128, 100)           = 0
epoll_wait(5, [], 10128, 100)           = 0
epoll_wait(5, [{EPOLLIN, {u32=8, u64=8}}], 10128, 100) = 1
epoll_ctl(5, EPOLL_CTL_DEL, 8, 0x7ffd056a0510) = 0
epoll_wait(5, [], 10128, 7)             = 0
epoll_wait(5, [], 10128, 100)           = 0
epoll_wait(5, [], 10128, 100)           = 0
epoll_wait(5, [], 10128, 100)           = 0
epoll_wait(5, [], 10128, 99)            = 0
epoll_wait(5, [], 10128, 100)           = 0
epoll_wait(5, [], 10128, 100)           = 0


4.epoll三个函数详解

4.1. int epoll_create(int size);

创建一个epoll的句柄,size用来告诉内核这个监听的数目一共有多大。这个参数不同于select()中的第一个参数,给出最大监听的fd+1的值。需要注意的是,当创建好epoll句柄后,它就是会占用一个fd值,在linux下如果查看/proc/进程id/fd/,是能够看到这个fd的,所以在使用完epoll后,必须调用close()关闭,否则可能导致fd被耗尽。

注意:size参数只是告诉内核这个 epoll对象会处理的事件大致数目,而不是能够处理的事件的最大个数。在 Linux最新的一些内核版本的实现中,这个 size参数没有任何意义。

4.2. int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

epoll的事件注册函数,epoll_ctl向 epoll对象中添加、修改或者删除感兴趣的事件,返回0表示成功,否则返回–1,此时需要根据errno错误码判断错误类型。

它不同与select()是在监听事件时告诉内核要监听什么类型的事件,而是在这里先注册要监听的事件类型。

epoll_wait方法返回的事件必然是通过 epoll_ctl添加到 epoll中的。

第一个参数是epoll_create()的返回值,第二个参数表示动作,用三个宏来表示:

  • EPOLL_CTL_ADD:注册新的fd到epfd中;
  • EPOLL_CTL_MOD:修改已经注册的fd的监听事件;
  • EPOLL_CTL_DEL:从epfd中删除一个fd;

第三个参数是需要监听的fd,第四个参数是告诉内核需要监听什么事,struct epoll_event结构如下:

events可以是以下几个宏的集合:

  • EPOLLIN :表示对应的文件描述符可以读(包括对端SOCKET正常关闭);
  • EPOLLOUT:表示对应的文件描述符可以写;
  • EPOLLPRI:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来);
  • EPOLLERR:表示对应的文件描述符发生错误;
  • EPOLLHUP:表示对应的文件描述符被挂断;
  • EPOLLET: 将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的。
  • EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里

4.3. int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);

等待事件的产生,类似于select()调用。参数events用来从内核得到事件的集合,maxevents告之内核这个events有多大,这个 maxevents的值不能大于创建epoll_create()时的size,参数timeout是超时时间(毫秒,0会立即返回,-1将不确定,也有说法说是永久阻塞)。该函数返回需要处理的事件数目,如返回0表示已超时。如果返回–1,则表示出现错误,需要检查 errno错误码判断错误类型。

  • 第1个参数 epfd是 epoll的描述符。

  • 第2个参数 events则是分配好的 epoll_event结构体数组,epoll将会把发生的事件复制到 events数组中(events不可以是空指针,内核只负责把数据复制到这个 events数组中,不会去帮助我们在用户态中分配内存。内核这种做法效率很高)。

  • 第3个参数 maxevents表示本次可以返回的最大事件数目,通常 maxevents参数与预分配的events数组的大小是相等的。

  • 第4个参数 timeout表示在没有检测到事件发生时最多等待的时间(单位为毫秒),如果 timeout为0,则表示 epoll_wait在 rdllist链表中为空,立刻返回,不会等待。

5.关于ET、LT两种工作模式:

epoll有两种工作模式:

  • LT(水平触发)模式
  • ET(边缘触发)模式

默认情况下,epoll采用 LT模式工作,这时可以处理阻塞和非阻塞套接字,而上表中的 EPOLLET表示可以将一个事件改为 ET模式。ET模式的效率要比 LT模式高,它只支持非阻塞套接字。

ET模式与LT模式的区别在于:

当一个新的事件到来时,ET模式下当然可以从 epoll_wait调用中获取到这个事件,可是如果这次没有把这个事件对应的套接字缓冲区处理完,在这个套接字没有新的事件再次到来时,在 ET模式下是无法再次从 epoll_wait调用中获取这个事件的;而 LT模式则相反,只要一个事件对应的套接字缓冲区还有数据,就总能从 epoll_wait中获取这个事件。因此,在 LT模式下开发基于 epoll的应用要简单一些,不太容易出错,而在 ET模式下事件发生时,如果没有彻底地将缓冲区数据处理完,则会导致缓冲区中的用户请求得不到响应。默认情况下,Nginx是通过 ET模式使用 epoll的。

正文到此结束