原创

图文全解IO模型

看书是十分枯燥的事情,特别是全文字,想要记住是一件很难的事情,所以我喜欢使用图形来总结所学习到的知识点,另外,学习一个知识点,需要先从整体上了解他的系统架构,了解这个知识点所属的层次,这样能更好的理解该知识点,所以,了解IO模型,需要翻看一些原因网络方面的书籍《鸟哥的Linux私房菜》《UNIX网络编程》,本笔记分为三部分:1.先梳理IO模型所属的系统架构层次,2.分别介绍5种不同的IO模型,3.使用Java来分别实现这些IO模型。

一、用户态和内核态

1. Linux的系统架构


 

 


 

Linux系统架构

宏观上来说,上图很好的反应了Linux操作系统的架构图。首先,Linux系统包含一个内核,内核,实质是一个软件,特殊的地方在于,内核具备直接操纵CPU资源,内存资源,I/O资源等的能力。

内核之外是系统调用。系统调用这层,相当于内核增加了一层封装。将内核操作的复杂性封装起来,对外层提供透明的调用。系统调用可以理解为操作系统的最小功能单位,所有有应用程序,都是将通过将系统调用组合而成的。

系统调用之外,就是应用程序,各种各样的应用程序。特殊的有两个,一个是Shell,它是一个特殊的应用程序,俗称命令行。类似于windowscmd,是一个命令解释器。有些应用程序是基于Shell做的,而不是基于系统调用。另一个特殊的是公用函数库。系统调用是操作系统的最小功能单位,一个Linux系统,根据版本不同,大约包含240~260个系统调用。为了使得操作更为简单,更加便于应用程序使用,Linux系统对系统调用的部分功能进行了再次封装,形成了公用函数库,以供应用程序调用。公用函数库中的一个方法,实质是若干个系统调用以特定的逻辑组合而成。

2.用户态和内核态

Linux的架构中,很重要的一个能力就是操纵系统资源的能力。但是,系统资源是有限的,如果不加限制的允许任何程序以任何方式去操纵系统资源,必然会造成资源的浪费,发生资源不足等情况。为了减少这种情况的发生,Linux制定了一个等级制定,即特权。Linux将特权分成两个层次,以03标识。0的特权级要高于3。换句话说,0特权级在操纵系统资源上是没有任何限制的,可以执行任何操作,而3,则会受到极大的限制。我们把特权级0称之为内核态,特权级3称之为用户态。

一个应用程序,在执行的过程中,会在用户态和内核态之间根据需要不断切换的。因为,如果只有用户态,那么必然有某些操纵系统资源的操作很难完成或不能完成,而如果一直都在内核态,那事实上,导致特权的分层失去了意义。大家全是最高权限,和大家都没有限制,没有什么区别。

所以,应用程序一般会运行于用户态,会在需要的时候,切换成内核态,完成相关任务后,会再切换加用户态。需要注意的是,一种切换是有一定系统开销的。

应用程序一般会在以下几种情况下切换到内核态:

1. 系统调用。

2. 异常事件。当发生某些预先不可知的异常时,就会切换到内核态,以执行相关的异常事件。

3. 设备中断。在使用外围设备时,如外围设备完成了用户请求,就会向CPU发送一个中断信号,此时,CPU就会暂停执行原本的下一条指令,转去处理中断事件。此时,如果原来在用户态,则自然就会切换到内核态。

3.DMA(Direct Memory Access)直接内存存取

DMA是一种能够允许输入输出设备(input/output IO)直接访问主内存,绕开CPU快速操作内存的方法,这个过程由一个称为DMA控制器的芯片来管理。


上图一共四个步骤:

1.把数据从磁盘上读取到内核态中的内核缓冲区中,

2.从内核缓存区中拷贝到用户态的用户空间缓冲区中
3.从用户态拷贝到内核态的socket缓冲区,

4.socket缓冲区通过DMA写入到网络,

二、I/O 模型

本章将向大家介绍五种 I/O 模型,包括阻塞 I/O、非阻塞 I/OI/O 复用、信号驱动式 I/O 、异步 I/O 等。

阻塞IO模型


上图中,应用进程通过系统调用 recvfrom 接收数据,但由于内核还未准备好数据报,应用进程就阻塞住了。直到内核准备好数据报,recvfrom 完成数据报复制工作,应用进程才能结束阻塞状态。

非阻塞IO模型


 上图中,应用进程通过 recvfrom 系统调用不停的去和内核交互,直到内核准备好数据报。从上面的流程中可以看出,应用进程进入轮询状态时等同于阻塞,所以非阻塞的 I/O 似乎并没有提高进程工作效率。

IO复用模型

 

Unix/Linux 环境下的 I/O 复用模型包含三组系统调用,分别是 selectpoll epoll

select 有三个文件描述符集(readfds),分别是可读文件描述符集(writefds)、可写文件描述符集和异常文件描述符集(exceptfds)。应用程序可将某个 socket (文件描述符)设置到感兴趣的文件描述符集中,并调用 select 等待所感兴趣的事件发生。比如某个 socket 处于可读状态了,此时应用进程就可调用 recvfrom 函数把数据从内核空间拷贝到进程空间内,无需再等待内核准备数据了。

信号驱动式IO模型


 信号驱动式 I/O 模型是指,应用进程告诉内核,如果某个 socket 的某个事件发生时,请向我发一个信号。在收到信号后,信号对应的处理函数会进行后续处理。

异步IO模型


异步 I/O 是指应用进程把文件描述符传给内核后,啥都不管了,完全由内核去操作这个文件描述符。内核完成相关操作后,会发信号告诉应用进程,某某 I/O 操作我完成了,你现在可以进行后续操作了。示意图如下:

上图通过 aio_read 把文件描述符、数据缓存空间,以及信号告诉内核,当文件描述符处于可读状态时,内核会亲自将数据从内核空间拷贝到应用进程指定的缓存空间呢。拷贝完在告诉进程 I/O 操作结束,你可以直接使用数据了。

多路复用的实现有多种方式:selectpollepoll

select调用过程

a. 从用户空间将fd_set拷贝到内核空间

b. 注册回调函数

c. 调用其对应的poll方法

d. poll方法会返回一个描述读写是否就绪的mask掩码,根据这个mask掩码给fd_set赋值。

e. 如果遍历完所有的fd都没有返回一个可读写的mask掩码,就会让select的进程进入休眠模式,直到发现可读写的资源后,重新唤醒等待队列上休眠的进程。如果在规定时间内都没有唤醒休眠进程,那么进程会被唤醒重新获得CPU,再去遍历一次fd

f. fd_set从内核空间拷贝到用户空间

优缺点

缺点:两次拷贝耗时、轮询所有fd耗时,支持的文件描述符太小

优点:跨平台支持

poll调用过程(select完全一致)

优缺点

优点:连接数(也就是文件描述符)没有限制(链表存储)
缺点:大量拷贝,水平触发(当报告了fd没有被处理,会重复报告,很耗性能)

epollETLT模式

LT:延迟处理,当检测到描述符事件通知应用程序,应用程序不立即处理该事件。那么下次会再次通知应用程序此事件。
ET:立即处理,当检测到描述符事件通知应用程序,应用程序会立即处理。

ET模式减少了epoll被重复触发的次数,效率比LT高。我们在使用ET的时候,必须采用非阻塞套接口,避免某文件句柄在阻塞读或阻塞写的时候将其他文件描述符的任务饿死

epoll调用过程

a. 当调用epoll_wait函数的时候,系统会创建一个epoll对象,每个对象有一个evenpoll类型的结构体与之对应,结构体成员结构如下。

rbn,代表将要通过epoll_ctlepll对象中添加的事件。这些事情都是挂载在红黑树中。
rdlist,里面存放的是将要发生的事件

b. 文件的fd状态发生改变,就会触发fd上的回调函数
c. 回调函数将相应的fd加入到rdlist,导致rdlist不空,进程被唤醒,epoll_wait继续执行。
d. 有一个事件转移函数——ep_events_transfer,它会将rdlist的数据拷贝到txlist上,并将rdlist的数据清空。
e. ep_send_events函数,它扫描txlist的每个数据,调用关联fd对应的poll方法去取fd中较新的事件,将取得的事件和对应的fd发送到用户空间。如果fdLT模式的话,会被txlist的该数据重新放回rdlist,等待下一次继续触发调用。

优缺点

优点:

没有最大并发连接的限制

只有活跃可用的fd才会调用callback函数

内存拷贝是利用mmap()文件映射内存的方式加速与内核空间的消息传递,减少复制开销。(内核与用户空间共享一块内存)

只有存在大量的空闲连接和不活跃的连接的时候,使用epoll的效率才会比select/poll

三、Java中的IO模型

1.BIO

BIO是一个典型的网络编程模型,是通常我们实现一个服务端程序的过程,步骤如下:

主线程accept请求阻塞

请求到达,创建新的线程来处理这个套接字,完成对客户端的响应。

主线程继续accept下一个请求

这种模型有一个很大的问题是:当客户端连接增多时,服务端创建的线程也会暴涨,系统性能会急剧下降。因此,在此模型的基础上,类似于 tomcatbio connector,采用的是线程池来避免对于每一个客户端都创建一个线程。有些地方把这种方式叫做伪异步IO(把请求抛到线程池中异步等待处理)

代码:

BIOServer.java

package io;

import java.io.IOException;
import java.io.InputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.charset.Charset;
import java.util.concurrent.TimeUnit;

public class BIOServer {
private static ServerSocket serverSocket;
private static Socket socket;

public BIOServer() {
}

static {
System.out.println("server begin……");
try {
serverSocket = new ServerSocket(5555); // 等待客户端连接
} catch (IOException e) {
e.printStackTrace();
}
}

public static void main(String[] args) throws IOException, InterruptedException {
while (true) {
socket = serverSocket.accept(); // 这就是bio同步阻塞
new ReadThread(socket).start();
}
}

static class ReadThread extends Thread {
private Socket socket;

public ReadThread(Socket socket) {
this.socket = socket;
}

@Override
public void run() {
try {
System.out.println("ReadThread");
InputStream inpt = socket.getInputStream();
while (true) {
try {
byte[] buffer = new byte[1024];
// 这里阻塞获取数据
int result = inpt.read(buffer);
String str = new String(buffer, "utf-8");
System.err.println("from client info:" + str + "thread:" + Thread.currentThread().getId());
String server = new String(str);
Charset cs = Charset.forName("utf-8");
TimeUnit.MILLISECONDS.sleep(100);
byte[] bytes = server.getBytes(cs);
socket.getOutputStream().write(bytes);
} catch (IOException e) {
e.printStackTrace();
}
}
} catch (Exception e) {
e.printStackTrace();
}
}

}

}

BIOClient.java

package io;

import java.io.BufferedReader;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.Socket;

/**
*
* @Description:
* @author: sniper(1084038709@qq.com)
* @date:2019年9月3日 下午5:35:30
*/
public class BIOClient {

public BIOClient() {
}

public static void main(String[] args) throws Exception {
Socket socket = new Socket("127.0.0.1", 5555);
Thread sendThread = new ReadThread(socket);
Thread printThread = new WriteThread(socket);
sendThread.start();
printThread.start();
sendThread.join();
printThread.join();

}

static class WriteThread extends Thread {
private Socket socket;

public WriteThread(Socket socket) {
this.socket = socket;
}

@Override
public void run() {
try {
System.out.println("WriteThread");
DataOutputStream dout = new DataOutputStream(socket.getOutputStream());
while (true) {
BufferedReader bufReader = new BufferedReader(new InputStreamReader(System.in));
try {
String msgToSend = bufReader.readLine();
dout.writeUTF(msgToSend);
} catch (IOException e) {
e.printStackTrace();
}
}
} catch (Exception e) {
e.printStackTrace();
}
}

}

static class ReadThread extends Thread {
private Socket socket;

public ReadThread(Socket socket) {
this.socket = socket;
}

@Override
public void run() {
try {
System.out.println("ReadThread");
InputStream inpt = socket.getInputStream();
while (true) {
try {
byte[] buffer = new byte[1024];
// 这里阻塞获取数据
int result = inpt.read(buffer);
String str = new String(buffer, "utf-8");
System.out.println(" server: " + str);
} catch (IOException e) {
e.printStackTrace();
}
}
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}

}

}

2.NIO

JDK1.4开始引入了NIO类库,这里的NIO指的是Non-blcok IO,主要是使用Selector多路复用器来实现。SelectorLinux等主流操作系统上是通过epoll实现的。

 

NIO的实现流程,类似于select

创建ServerSocketChannel监听客户端连接并绑定监听端口,设置为非阻塞模式。

创建Reactor线程,创建多路复用器(Selector)并启动线程。

ServerSocketChannel注册到Reactor线程的Selector上。监听accept事件。

Selector在线程run方法中无线循环轮询准备就绪的Key

Selector监听到新的客户端接入,处理新的请求,完成tcp三次握手,建立物理连接。

将新的客户端连接注册到Selector上,监听读操作。读取客户端发送的网络消息。

客户端发送的数据就绪则读取客户端请求,进行处理。

相比BIONIO的编程非常复杂。 

代码:

NIOTcpServer.java

package io;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;


public class NIOTcpServer {

private static final int PORT = 5555;
private static ByteBuffer byteBuffer = ByteBuffer.allocate(10240);
private static int number = 0;

public static void main(String[] args) throws IOException {

ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.socket().bind(new InetSocketAddress(PORT));
serverSocketChannel.configureBlocking(false);

Selector selector = Selector.open();
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

System.out.println("server start on port " + PORT + " ...");

while (true) {
selector.select();
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
if (!selectionKey.isValid())
continue;
if (selectionKey.isAcceptable()) {
ServerSocketChannel serverChannel = (ServerSocketChannel) selectionKey.channel();
SocketChannel socketChannel = serverChannel.accept();
socketChannel.configureBlocking(false);
SelectionKey key = socketChannel.register(selector, SelectionKey.OP_READ);

// socketChannel.register(selector, SelectionKey.OP_WRITE);
// start the thread of writing
new Thread(new SendRunnable(socketChannel)).start();
Socket socket = socketChannel.socket();
System.out.println("Get a client, the remote client address: " + socket.getRemoteSocketAddress());
} else if (selectionKey.isReadable()) {
// Read data from channel to buffer
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
socketChannel.socket().setReceiveBufferSize(102400);
String remoteAddress = socketChannel.socket().getRemoteSocketAddress().toString();
// do nothing
//processNormally(socketChannel);
// variable length
processByHead(socketChannel);

// fix length
//processByFixLength(socketChannel);
}

iterator.remove();
}
}
}

private static class SendRunnable implements Runnable {

private SocketChannel socketChannel;

public SendRunnable(SocketChannel socketChannel) {
this.socketChannel = socketChannel;
}

public void run() {
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in));
while (true) {
try {
String msg = bufferedReader.readLine();
this.socketChannel.write(ByteBuffer.wrap(msg.getBytes()));
} catch (IOException e) {
e.printStackTrace();
}
}
}
}


/**
* 普通包处理逻辑
* @param socketChannel
* @throws IOException
*/
private static void processNormally(SocketChannel socketChannel) throws IOException {
StringBuilder sb = new StringBuilder();
ByteBuffer tmpByteBuffer = ByteBuffer.allocate(1024);
while (socketChannel.read(tmpByteBuffer) > 0) {
tmpByteBuffer.flip();
sb.append(new String(tmpByteBuffer.array()));
tmpByteBuffer.clear();
}
System.out.println(sb.toString() + " <---> " + number++);
}

/**
* 通过包头来解析数据
* @param socketChannel
* @throws IOException
*/
private static void processByHead(SocketChannel socketChannel) throws IOException {
while (socketChannel.read(byteBuffer) > 0) {
int position = byteBuffer.position();
int limit = byteBuffer.limit();
byteBuffer.flip();
if (byteBuffer.remaining() < 4) {
byteBuffer.position(position);
byteBuffer.limit(limit);
continue;
}
int length = byteBuffer.getInt();
if (byteBuffer.remaining() < length) {
byteBuffer.position(position);
byteBuffer.limit(limit);
continue;
}
byte[] data = new byte[length];
byteBuffer.get(data, 0, length);
System.out.println(new String(data) + " <----> " + number++);
byteBuffer.compact();
}
}

public static final int MAX_LENGTH = 32;
/**
* 定长数据格式解析
* @param socketChannel
* @throws IOException
*/
private static void processByFixLength(SocketChannel socketChannel) throws IOException {

while (socketChannel.read(byteBuffer) > 0) {
byteBuffer.flip();
while (byteBuffer.remaining() >= MAX_LENGTH) {
byte[] data = new byte[MAX_LENGTH];
byteBuffer.get(data, 0, MAX_LENGTH);
System.out.println(new String(data) + " <---> " + number++);
}
byteBuffer.compact();
}
}
}

NIOTcpClient.java

package io;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import java.net.SocketException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;

public class NIOTcpClient {

private static final int PORT = 5555;
private static final String IP_ADDRESS = "localhost";

public static void main(String[] args) throws IOException {

Selector selector = Selector.open();
SocketChannel socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);

socketChannel.connect(new InetSocketAddress(IP_ADDRESS, PORT));
while (!socketChannel.finishConnect()) {
}
new Thread(new SendRunnable(socketChannel)).start();

System.out.println("Connecting to " + IP_ADDRESS + " on " + PORT);
while (true) {
selector.select();
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
if (selectionKey.isReadable()) {
SocketChannel channel = (SocketChannel) selectionKey.channel();
StringBuilder sb = new StringBuilder();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
while (channel.read(byteBuffer) > 0) {
byteBuffer.flip();
sb.append(new String(byteBuffer.array()));
byteBuffer.clear();
}
System.out.println("[server] " + sb.toString());
}
iterator.remove();
}
}
}

private static class SendRunnable implements Runnable {

private SocketChannel socketChannel;

public SendRunnable(SocketChannel socketChannel) {
this.socketChannel = socketChannel;
}

public void run() {
System.out.println("Type to send message:");
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in));
try {
socketChannel.socket().setSendBufferSize(102400);
} catch (SocketException e) {
e.printStackTrace();
}
// int number = 0;
while (true) {
try {
// take input as the message source
String msg = bufferedReader.readLine();
// send the message continuously
// String msg = "hello world " + number++;
// send data normally
// socketChannel.write(ByteBuffer.wrap(msg.getBytes()));

// add the head represent the data length
socketChannel.write(ByteBuffer.wrap(new PacketWrapper(msg).getBytes()));

// make the data length fixed
// socketChannel.write(ByteBuffer.wrap(new FixLengthWrapper(msg).getBytes()));

} catch (IOException e) {
e.printStackTrace();
}
}
}
}

static class FixLengthWrapper {

public static final int MAX_LENGTH = 32;
private byte[] data;

public FixLengthWrapper(String msg) {
ByteBuffer byteBuffer = ByteBuffer.allocate(MAX_LENGTH);
byteBuffer.put(msg.getBytes());
byte[] fillData = new byte[MAX_LENGTH - msg.length()];
byteBuffer.put(fillData);
data = byteBuffer.array();
}

public FixLengthWrapper(byte[] msg) {
ByteBuffer byteBuffer = ByteBuffer.allocate(MAX_LENGTH);
byteBuffer.put(msg);
byte[] fillData = new byte[MAX_LENGTH - msg.length];
byteBuffer.put(fillData);
data = byteBuffer.array();
}

public byte[] getBytes() {
return data;
}

public String toString() {
StringBuilder sb = new StringBuilder();
for (byte b : getBytes()) {
sb.append(String.format("0x%02X ", b));
}
return sb.toString();
}
}


static class PacketWrapper {

private int length;
private byte[] payload;

public PacketWrapper(String payload) {
this.payload = payload.getBytes();
this.length = this.payload.length;
}

public PacketWrapper(byte[] payload) {
this.payload = payload;
this.length = this.payload.length;
}

public byte[] getBytes() {
ByteBuffer byteBuffer = ByteBuffer.allocate(this.length + 4);
byteBuffer.putInt(this.length);
byteBuffer.put(payload);
return byteBuffer.array();
}

public String toString() {
StringBuilder sb = new StringBuilder();
for (byte b : getBytes()) {
sb.append(String.format("0x%02X ", b));
}
return sb.toString();
}
}

}

3.AIO

JDK1.7引入NIO2.0,提供了异步文件通道和异步套接字通道的实现。其底层在windows上是通过IOCP,在Linux上是通过epoll来实现的(LinuxAsynchronousChannelProvider.java,UnixAsynchronousServerSocketChannelImpl.java)

创建AsynchronousServerSocketChannel,绑定监听端口

调用AsynchronousServerSocketChannelaccpet方法,传入自己实现的CompletionHandler。包括上一步,都是非阻塞的

连接传入,回调CompletionHandlercompleted方法,在里面,调用AsynchronousSocketChannelread方法,传入负责处理数据的CompletionHandler

数据就绪,触发负责处理数据的CompletionHandlercompleted方法。继续做下一步处理即可。

写入操作类似,也需要传入CompletionHandler

AIOTcpServer.java

package io;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;

public class AIOTcpServer {

private static final int PORT = 5555;

public static void main(String[] args) throws IOException, InterruptedException {

System.out.println("main thread id: " + Thread.currentThread().getId());
AsynchronousServerSocketChannel serverSocketChannel = AsynchronousServerSocketChannel.open().bind(new InetSocketAddress(PORT));

serverSocketChannel.accept(serverSocketChannel, new AcceptCompletionHandler());

// keep the main thread out of exiting
Thread t = new Thread(new Runnable() {
@Override
public void run() {
System.out.println("Listening on port: " + PORT);
while (true)
;
}
});

t.start();
t.join();
}

private static class AcceptCompletionHandler implements CompletionHandler<AsynchronousSocketChannel, AsynchronousServerSocketChannel> {

@Override
public void completed(AsynchronousSocketChannel result, AsynchronousServerSocketChannel attachment) {

System.out.println("Get a new client!!!");
System.out.println("accept thread id: " + Thread.currentThread().getId());
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
result.read(byteBuffer, result, new ReadCompletionHandler(byteBuffer, "client"));
// start the send thread
new Thread(new AIOSendRunnable(result)).start();
// continue listening
attachment.accept(attachment, this);
}

@Override
public void failed(Throwable exc, AsynchronousServerSocketChannel attachment) {

System.out.println("Get a new client failed!!!");
attachment.accept(attachment, this);
}
}

}

AIOTcpClient.java

package io;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

public class AIOTcpClient {

private static final String ADDRESS = "localhost";
private static final int PORT = 5555;

public static void main(String[] args) throws IOException, ExecutionException, InterruptedException {

AsynchronousSocketChannel socketChannel = AsynchronousSocketChannel.open();
Future<Void> future = socketChannel.connect(new InetSocketAddress(ADDRESS, PORT));
future.get();

System.out.println("Connecting to " + ADDRESS + " on " + PORT);
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
socketChannel.read(byteBuffer, socketChannel, new ReadCompletionHandler(byteBuffer, "server"));

Thread t = new Thread(new AIOSendRunnable(socketChannel));
t.start();
t.join();
}
}

class AIOSendRunnable implements Runnable {

private AsynchronousSocketChannel socketChannel;

public AIOSendRunnable(AsynchronousSocketChannel socketChannel) {
this.socketChannel = socketChannel;
}

@Override
public void run() {
System.out.println("Type to send message:");
BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
while (true) {
try {
String msg = reader.readLine();
this.socketChannel.write(ByteBuffer.wrap(msg.getBytes()), null, new WriteCompletionHandler());
} catch (IOException e) {
e.printStackTrace();
}
}
}
}

WriteCompletionHandler.java

package io;

import java.nio.channels.CompletionHandler;


public class WriteCompletionHandler implements CompletionHandler<Integer, Void> {

@Override
public void completed(Integer result, Void attachment) {
}

@Override
public void failed(Throwable exc, Void attachment) {
System.out.println("Write failed!!!");
}
}

ReadCompletionHandler.java

package io;

import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;


public class ReadCompletionHandler implements CompletionHandler<Integer, AsynchronousSocketChannel> {

private ByteBuffer byteBuffer;
private String remoteName;
public ReadCompletionHandler(ByteBuffer byteBuffer, String remoteName) {
this.byteBuffer = byteBuffer;
this.remoteName = remoteName;
}

@Override
public void completed(Integer result, AsynchronousSocketChannel attachment) {
if (result <= 0)
return;

byteBuffer.flip();
System.out.println("[" + this.remoteName + "] " + new String(byteBuffer.array()));

byteBuffer.clear();
attachment.read(byteBuffer, attachment, this);
}

@Override
public void failed(Throwable exc, AsynchronousSocketChannel attachment) {
byteBuffer.clear();
attachment.read(byteBuffer, attachment, this);
}
}


 


正文到此结束
本文目录