AIO

AIO

概述

什么是 AIO?

  • 历史:JDK 1.7 推出真正的 [异步非阻塞式的调用(AIO)]{.red}

  • 名称:

    • AIO 全称为 Asynchronous IO

    • AIO 实际上并不是官方定义的名称,而是根据新增加的核心类命名的,通常也可以称为 NIO2

      AIO 所有的核心类仍然是属于 java.nio 包下的,本质上是对 NIO 架构的完善

什么是异步非阻塞式调用?

:::info

此前的 IO 概述中已经提到过了,这里再简单的描述下

:::

  • 异步非阻塞式调用

    • 定义:

      • 进程向操作系统注册,[编写回调函数后将 IO 操作完全交付给操作系统实现]{.blue},进程继续执行自己其他的任务

        回调函数:名字听起来很高端,实际就是进程告诉操作系统在 IO 事件发生后该如何处理,毕竟进程自己不再管了

      • 操作系统在 IO 事件发生后,选择 [线程池中的某个线程执行进程提供的回调函数]{.blue} 处理发生的 IO 事件

      • 回调函数处理 IO 事件完成后,向进程发出信号通知 IO 事件已经完成,[进程不需要再使用任何系统调用来完成 IO 事件]{.blue}

        AIO
    • 区别:

      • 不同于同步非阻塞式中的 [Selector 采用轮询机制处理事件]{.red},异步非阻塞式采用 [订阅-通知]{.red} 模式
      • [无论是 NIO 的哪个版本最终都需要进程自己调用方法处理事件,AIO 则是将编写好的方法完全交给操作系统去执行,自己则不再关心]{.blue}

为什么要使用 AIO?

  • 异步非阻塞式 IO 是多路复用式 IO 的一种 [替代方案]{.blue}

    +++ 两种方案并没有本质上的优劣之分,实际开发就是 NIO + AIO 两者的结合

    • [NIO 是 Java 自己实现的方法,所以在不同的操作系统都是没有区别的]{.red}

    • [AIO 是涉及到操作系统的实现,所以在不同的操作系统下效率可能是不同的]{.red}

      Windows 平台采用 IOCP 技术真正实现异步操作,Linux 平台仅能够利用 epoll 模拟异步操作

    +++

核心类

AIO 的核心类有哪些?

  • [AsynchronousSocketChannel]{.red}
  • [AsynchronousServerSocketChannel]{.red}
  • AsynchronousFileChannel:文件操作并不常用,所以不会编写案例
  • [AsynchronousChannelGroup]{.blue}
  • [CompletionHandler<V,A>]{.red}
  • Future<V.>:[Future 类并不是 NIO 包下的类,而是 JUC 包中提供的类]{.green}

AIO 核心类如何使用?

  • 概述中提到了 AIO 本质是对 NIO 的完善,所以新增的核心类在使用方法上和 NIO 没有太大的区别
  • 主要区别在于新增的 CompletionHandler 类,该类是用于编写回调函数处理 IO 事件的,其余类没有太大区别

AsynchronousServerSocketChannel:创建服务器端

  • [创建线程池(推荐使用)]{.red}

    • 问题:IO 事件发生时,进程在执行其他的任务,显然主线程是肯定不可能执行回调函数了,那么操作系统应该如何执行回调函数呢?

    • 解决方式:[操作系统调用线程池中存在的线程去执行回调函数]{.red}

      操作系统调用 AsynchronousChannelGroup 中的线程池,线程在 IO 事件发生时执行回调函数

    • 创建方式:

      • 利用工具类 Excutors 创建线程池:[newFixedThreadPool、newCachedThreadPool、newScheduledThreadPool、newSingleThreadExecutor]{.blue}
      • 利用类 ThreadPoolExecutor 创建自定义线程池:[阿里开发手册推荐使用的方式]{.red}
  • 创建服务器端:[bind]{.blue} 方法绑定端口号

  • 服务器端接收客户端请求:调用 [accept]{.blue} 方法接收请求

  • 使用循环和阻塞函数:

    • 问题一:[accept]{.blue} 方法是异步非阻塞式的,也就意味着服务器端在执行完这个方法之后,会立刻执行之后的代码,如果没有,那么服务器就会直接结束,这显然是不合理的,因为客户端都还没有连接上,服务器就结束了,那么应该怎么解决?

    • 解决方式:[while(true)]{.blue} 循环显然可以避免这个问题

    • 问题二:[while(true)]{.blue} 循环会导致服务器不停的空转,也就是此前轮询模型的问题,这也是我们不想看到的,那么应该怎么解决?

    • 解决方式:此前的 [多路复用模型]{.red} 的解决方式就是使用 [select]{.blue} 阻塞方法来避免这个问题,那么我们也可以采用阻塞方法来解决

      此前学习过程中使用的方法是 System.in.read,虽然确实能够达到效果,但是感觉不是最终的解决方案

// 服务器端主方法
public void start()
{
try
{
// 创建自定义线程池
ExecutorService threadPool = new ThreadPoolExecutor(
2,// 核心线程数量
3,// 最大线程数量
5,// 空闲线程的存活时间
TimeUnit.SECONDS,// 存活时间单位
new LinkedBlockingQueue<>(),// 阻塞队列:超过最大线程时还能够容纳的客户端
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy()// 拒绝策略:超过阻塞队列容量后的处理方式
);
// 创建 AsynchronousChannelGroup
AsynchronousChannelGroup acg = AsynchronousChannelGroup.withThreadPool(threadPool);
// 创建服务器端实例(assc 是我定义的成员变量):如果没有传入 AsynchronousChannelGroup,则会使用默认的
assc = AsynchronousServerSocketChannel.open(acg);
// 服务器端绑定端口号
assc.bind(new InetSocketAddress(DEFAULT_PORT));
System.out.printf("服务器[%d]:%s\n", DEFAULT_PORT, "启动成功");
// accept 方法参数介绍
while (true)
{
// 第一个参数是 Attachment:(1) 可以传入任意的类型 (2) 用于辅助回调函数处理 IO 事件的
// 诸如缓冲区等辅助信息都是可以传入的
// 第二个参数是 CompletionHandler(回调函数):用于处理 IO 事件的类
// 这里有两种写法:(1) 直接使用匿名内部类的写法(不推荐)(2) 编写实现 CompletionHandler 接口的内部类
// 不推荐第一种写法的原因是会导致代码看起来非常乱,虽然简洁,读者可以自己试试编写一下
assc.accept(null, new AcceptHandler());
assc.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>(){
// 回调函数
});
// 阻塞服务器:防止服务器空转
System.in.read();

}
}
catch (IOException e)
{
e.printStackTrace();
}
}

CompletionHandler<V,A>:编写回调函数处理 IO 事件

  • 前提:服务器创建完成后,显然需要向操作系统交代 IO 事件如何处理,即回调函数的编写

  • [处理客户端连接请求的回调函数]{.red}

    • 实现 [CompletionHandler]{.blue} 接口,传入两个类型

      第一个类型只能够是 AsynchronousSocketChannel、第二个类型根据自己的 Attachment而定,不过通常都是 Object

    • 重写 [completed]{.blue} 和 [failed]{.blue} 方法

      completed 方法在正确接收到客户端请求后触发,failed 方法在客户端异常断开后触发

  • [处理读取客户端发送的数据和向客户端写入数据的回调函数]{.red}

// 内部类: 用于处理客户端发出的连接请求
private class AcceptHandler
implements CompletionHandler<AsynchronousSocketChannel, Object>
{
// 第一个参数是客户端,第二个参数是附件
@Override
public void completed(AsynchronousSocketChannel asc, Object attachment)
{
// 注:对客户端进行操作前需要确保客户端是仍然是正常连接的
if (asc != null && asc.isOpen())
{
// 打印客户端信息:getRemoteAddress 可以得到客户端的进程号
// 注:getRemoteAddress 是会抛出异常的,这里我省略掉了,节省篇幅
System.out.println("客户端" + asc.getRemoteAddress() + ":连接成功");
// 对客户端执行操作
ByteBuffer buffer = ByteBuffer.allocate(1024);
Map<String, Object> info = new HashMap<>();
// 回调函数不仅要处理写还要处理,所以提供辅助信息用以判断
info.put("type", "read");
// 读取数据需要从缓冲区中读取,所以也应该作为辅助信息传入
info.put("buffer", buffer);
// 读取客户端的发送的数据: read 方法也是异步非阻塞式的方法,所以也需要相应的回调函数进行处理
asc.read(buffer, info, new ReadAndWriteHandler(asc));
}

// 服务器必须再次向操作系统注册,才能够继续连接客户端,否则服务器端就不会继续连接客户端了
if (assc.isOpen())
{
assc.accept(null, this);
}
}

// 出现异常执行的方法
@Override
public void failed(Throwable exc, Object attachment){

}
}

// 内部类: 用于处理读取客户端的数据的请求和向客户端写入数据的请求
private class ReadAndWriteHandler
implements CompletionHandler<Integer, Object>
{
// 记录当前处理的客户端
private AsynchronousSocketChannel asc;

public ReadAndWriteHandler(AsynchronousSocketChannel asc)
{
this.asc = asc;
}

// 第一个参数是每次读取或者写入的字节数量,第二个参数依然是附件信息
@Override
public void completed(Integer result, Object attachment)
{
// 强制转换获取辅助信息
Map<String, Object> info = (Map<String, Object>) attachment;
// 获取缓冲区
ByteBuffer buffer = (ByteBuffer) info.get("buffer");
// 获取当前处理的是读取还是写入
if ("read".equals(info.get("type")))
{
// 读取客户端发送的数据并且显示在服务器端
buffer.flip();
String msg = new String(buffer.array());
// 依然省略捕获异常的过程
System.out.println(asc.getRemoteAddress() + ":\t" + msg + "\t字节数量[" + result +"]");
// 清空缓冲区
buffer.clear();
// 更新辅助信息
info.put("type", "write");
// 向客户端发送信息,原封不动的发送回去,当然你也可以发送其他内容,修改缓冲区就行
// 向客户端发送信息后,服务器端显然需要继续读取客户端发送的信息,所以不可以将回调函数置为 null,那样后续就接收不到信息了
asc.write(buffer, info, this);
}
else if ("write".equals(info.get("type")))
{
asc.read(buffer, info, this);
}
}

@Override
public void failed(Throwable exc, Object attachment){

}
}

AsynchronousSocketChannel:创建客户端

  • 前提:客户端的创建比较简单,所以采用未来时 Future 来处理
  • 创建线程池(推荐):客户端依然有对应的 AsynchronousChannelGroup 来处理回调函数,只不过我这里就不再演示了
  • 创建客户端:[connect]{.blue} 连接服务器端
public void  start()
{
try
{
// 创建客户端
asc = AsynchronousSocketChannel.open();
// 向服务器端发出连接请求
asc.connect(new InetSocketAddress(DEFAULT_LOCALHOST, DEFAULT_PORT));
}
catch (IOException | InterruptedException | ExecutionException e)
{
e.printStackTrace();
}
}

Future:利用未来时处理 IO 事件

  • 异步非阻塞式的方法都具有 Future 类型的返回值
public void  start()
{
try
{
// 创建客户端
asc = AsynchronousSocketChannel.open();
// 获取返回值
Future<Void> connect = asc.connect(new InetSocketAddress(DEFAULT_LOCALHOST, DEFAULT_PORT));
// 客户端将会等待连接建立完成: 如果连接没有建立完成,就会阻塞,如果建立完成,将继续执行
connect.get();
// 从控制台中读取信息
String msg;
while ((msg = reader.readLine()) != null)
{
// 向服务端发送信息
ByteBuffer buffer = StandardCharsets.UTF_8.encode(msg);
Future<Integer> write = asc.write(buffer);
// 数据发送完毕后才会继续执行
write.get();
// 读取服务器端返回的消息
buffer.clear();
Future<Integer> read = asc.read(buffer);
// 接收数据完成后才会继续执行
read.get();
System.out.println(new String(buffer.array()));
}

}
catch (IOException | InterruptedException | ExecutionException e)
{
e.printStackTrace();
}
}

实战:多人聊天室

package chatroom.v4;

import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;

// 多人聊天室服务器端: 采用异步非阻塞式 IO 实现
@SuppressWarnings("unchecked")
public class ChatServer
{
// 服务器端口号: 默认使用的端口号
private static final int DEFAULT_PORT = 8888;
// 缓冲区固定分配的空间大小
private static final int BUFFER_SIZE = 1024;
// 退出消息标识符
private static final String EXIT = "退出";
// 调用者可以根据需要使用其他的端口号
private int port;
// 保存客户端的集合: 因为采用的是订阅-通知的模式, 所以没有方法可以直接获取到所有的客户端
private List<AsynchronousSocketChannel> clients;
// 服务器端通道: 调用者可以根据需要传入自定义的通道
private AsynchronousServerSocketChannel server;
// 线程池
private ExecutorService threadPool;
private AsynchronousChannelGroup acg;


// 处理客户端连接请求的回调函数
private class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel, Object>
{
@Override
public void completed(AsynchronousSocketChannel client, Object attachment)
{
// 服务器端继续接收其他的客户端连接请求
if (server.isOpen())
{
server.accept(attachment, this);
}
// 提示客户端连接成功
System.out.println(getClientName(client) + "连接成功");
// 客户端加入集合中
addClient(client);
// 服务器端主要功能: 接收客户端发送的消息并且转发给其他客户端
ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);
buffer.put(getClientName(client).getBytes());
// 客户端通道实际只有读事件才需要使用回调函数, 写事件是不需要的
client.read(buffer, buffer, new ReadHandler(client));
}

// TODO 异常处理
@Override
public void failed(Throwable exc, Object attachment)
{
}
}

// 处理客户端读写请求的回调函数: 泛型中传入的是整型, 所以为了知道客户端的信息, 我们需要将其作为辅助信息传入
private class ReadHandler implements CompletionHandler<Integer, Object>
{
private AsynchronousSocketChannel client;

public ReadHandler(AsynchronousSocketChannel client)
{
this.client = client;
}

@Override
public void completed(Integer result, Object attachment)
{
ByteBuffer buffer = (ByteBuffer) attachment;
// 从写模式切换成读模式
buffer.flip();
// 判断客户端是否想要退出
if (isExit(buffer))
{
removeClient(client);
// 关闭该客户端, 不需要再继续读取该客户端的信息, 所以没必要传递回调函数
client.write(StandardCharsets.UTF_8.encode(EXIT));
return;
}
// 服务器端显示客户端发送的消息
String msg = String.valueOf(StandardCharsets.UTF_8.decode(buffer));
System.out.println(msg + "(" + result + "字节)");
// 转发消息
forward(client, msg);
// 读模式切换成写模式
buffer.clear();
// 更新辅助信息
buffer.put(getClientName(client).getBytes());
// 服务器端继续读取当前客户端发送的其他消息
client.read(buffer, buffer, this);
}

// TODO 异常处理
@Override
public void failed(Throwable exc, Object attachment)
{

}
}

public ChatServer()
{
this(DEFAULT_PORT, null);
}

public ChatServer(int port, AsynchronousServerSocketChannel server)
{
this.port = port;
this.server = server;
clients = new LinkedList<>();
}

// 默认使用的服务器端通道
private AsynchronousServerSocketChannel getAsynchronousServerSocketChannel() throws IOException
{
// 默认使用的线程池
threadPool = new ThreadPoolExecutor(
8,
10,
10,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy()
);
acg = AsynchronousChannelGroup.withThreadPool(threadPool);
return AsynchronousServerSocketChannel.open(acg);
}

// 转发消息: 最为关键的修改
private void forward(AsynchronousSocketChannel client, String msg)
{
for (AsynchronousSocketChannel other : clients)
{
if (client != other)
// 每个客户端都应该拥有属于自己的缓冲区, 否则在多线程环境下共用一个, 真的不好检测会出现什么问题
// (其实主要问题还是他妈的 IDEA 调试异步函数进不去, 就离谱)
other.write(StandardCharsets.UTF_8.encode(msg));
}
}

// 判断客户端是否想要退出
private boolean isExit(ByteBuffer buffer)
{
return EXIT.equals(new String(buffer.array()));
}

// 客户端加入服务器端
private void addClient(AsynchronousSocketChannel client)
{
clients.add(client);
}

// 客户端退出服务器端
private void removeClient(AsynchronousSocketChannel client)
{
close(client);
clients.remove(client);
}

// 获取客户端标识符
private String getClientName(AsynchronousSocketChannel client)
{
int port = -1;
try
{
InetSocketAddress address = (InetSocketAddress) client.getRemoteAddress();
port = address.getPort();
}
catch (IOException e)
{
e.printStackTrace();
}

return "客户端[" + port + "]:";
}


// 关闭各种通道或者流
private void close(Closeable closeable)
{
try
{
if (closeable != null)
{
closeable.close();
}
}
catch (IOException e)
{
e.printStackTrace();
}
}

// 服务器端主方法
private void start()
{
try
{
// 服务器端初始化
if (server == null)
server = getAsynchronousServerSocketChannel();
// 绑定端口号
server.bind(new InetSocketAddress(port));
// 提示服务器启动成功
System.out.printf("服务器[%d]:%s\n", port, "启动成功");
// 接收客户端的连接请求:
while (true)
{
server.accept(null, new AcceptHandler());
// 阻塞服务器
System.in.read();
}
}
catch (IOException e)
{
e.printStackTrace();
}
finally
{
close(server);
}

}

public static void main(String[] args)
{
// 启动服务器
new ChatServer().start();
}

}

package chatroom.v4;


import java.io.BufferedReader;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.*;

// 多人聊天室: 客户端
@SuppressWarnings("unchecked")
public class ChatClient
{
private static final String DEFAULT_LOCALHOST = "127.0.0.1";
private static final int DEFAULT_PORT = 8888;
private static final int BUFFER_SIZE = 1024;
private static final String EXIT = "退出";
private int port;
private AsynchronousSocketChannel client;
private AsynchronousChannelGroup acg;
private ExecutorService threadPool;


public ChatClient()
{
this(DEFAULT_PORT, null);
}

public ChatClient(int port, AsynchronousSocketChannel client)
{
this.port = port;
this.client = client;
}

private AsynchronousSocketChannel getAsynchronousSocketChannel() throws IOException
{
// 默认使用的线程池
threadPool = new ThreadPoolExecutor(
15,
20,
10,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy()
);
acg = AsynchronousChannelGroup.withThreadPool(threadPool);
return AsynchronousSocketChannel.open(acg);
}

// 获取客户端标识符
private String getClientName(AsynchronousSocketChannel client)
{
int port = -1;
try
{
InetSocketAddress address = (InetSocketAddress) client.getRemoteAddress();
port = address.getPort();
}
catch (IOException e)
{
e.printStackTrace();
}

return "客户端[" + port + "]:";
}

private void close(Closeable closeable)
{
try
{
if (closeable != null)
closeable.close();
}
catch (IOException e)
{
e.printStackTrace();
}
}

private void start()
{
try
{
if (client == null)
client = getAsynchronousSocketChannel();
Future<Void> connect = client.connect(new InetSocketAddress(DEFAULT_LOCALHOST, port));
// 阻塞客户端
connect.get();
System.out.println(getClientName(client) + "启动成功");
// 发送消息
ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);
// 控制台输入必然阻塞: 只能够新开线程处理
new Thread(new ChatClientInputThread(client)).start();
while (true)
{
Future<Integer> read = client.read(buffer);
int res = read.get();
if (res <= 0)
{
close(client);
System.exit(1);
}
else
{
buffer.flip();
System.out.println(StandardCharsets.UTF_8.decode(buffer));
buffer.clear();
}

}

}
catch (IOException | InterruptedException | ExecutionException e)
{
e.printStackTrace();
}
}

public static void main(String[] args)
{
new ChatClient().start();
}
}

package chatroom.v4;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

public class ChatClientInputThread implements Runnable
{
private static final String EXIT = "退出";
private static final BufferedReader READER = new BufferedReader(
new InputStreamReader(
System.in));
private AsynchronousSocketChannel client;

public ChatClientInputThread(AsynchronousSocketChannel client)
{
this.client = client;
}

@Override
public void run()
{
try
{
String msg;
while ((msg = READER.readLine()) != null)
{
Future<Integer> write = client.write(StandardCharsets.UTF_8.encode(msg));
write.get();
}

}
catch (IOException | InterruptedException | ExecutionException e)
{
e.printStackTrace();
}
}
}

Author: Fuyusakaiori
Link: http://example.com/2021/09/09/java/io/AIO/
Copyright Notice: All articles in this blog are licensed under CC BY-NC-SA 4.0 unless stating additionally.