AIO
概述
什么是 AIO?
什么是异步非阻塞式调用?
:::info
此前的 IO 概述中已经提到过了,这里再简单的描述下
:::
异步非阻塞式调用
定义:
进程向操作系统注册,[编写回调函数后将 IO 操作完全交付给操作系统实现]{.blue},进程继续执行自己其他的任务
回调函数:名字听起来很高端,实际就是进程告诉操作系统在 IO 事件发生后该如何处理,毕竟进程自己不再管了
操作系统在 IO 事件发生后,选择 [线程池中的某个线程执行进程提供的回调函数]{.blue} 处理发生的 IO 事件
回调函数处理 IO 事件完成后,向进程发出信号通知 IO 事件已经完成,[进程不需要再使用任何系统调用来完成 IO 事件]{.blue}

区别:
- 不同于同步非阻塞式中的 [Selector 采用轮询机制处理事件]{.red},异步非阻塞式采用 [订阅-通知]{.red} 模式
- [无论是 NIO 的哪个版本最终都需要进程自己调用方法处理事件,AIO 则是将编写好的方法完全交给操作系统去执行,自己则不再关心]{.blue}
为什么要使用 AIO?
核心类
AIO 的核心类有哪些?
- [AsynchronousSocketChannel]{.red}
- [AsynchronousServerSocketChannel]{.red}
- AsynchronousFileChannel:文件操作并不常用,所以不会编写案例
- [AsynchronousChannelGroup]{.blue}
- [CompletionHandler<V,A>]{.red}
- Future<V.>:[Future 类并不是 NIO 包下的类,而是 JUC 包中提供的类]{.green}
AIO 核心类如何使用?
- 概述中提到了 AIO 本质是对 NIO 的完善,所以新增的核心类在使用方法上和 NIO 没有太大的区别
- 主要区别在于新增的 CompletionHandler 类,该类是用于编写回调函数处理 IO 事件的,其余类没有太大区别
AsynchronousServerSocketChannel:创建服务器端
public void start() { try { ExecutorService threadPool = new ThreadPoolExecutor( 2, 3, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy() ); AsynchronousChannelGroup acg = AsynchronousChannelGroup.withThreadPool(threadPool); assc = AsynchronousServerSocketChannel.open(acg); assc.bind(new InetSocketAddress(DEFAULT_PORT)); System.out.printf("服务器[%d]:%s\n", DEFAULT_PORT, "启动成功"); while (true) { assc.accept(null, new AcceptHandler()); assc.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>(){ }); System.in.read();
} } catch (IOException e) { e.printStackTrace(); } }
|
CompletionHandler<V,A>:编写回调函数处理 IO 事件
private class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel, Object> { @Override public void completed(AsynchronousSocketChannel asc, Object attachment) { if (asc != null && asc.isOpen()) { System.out.println("客户端" + asc.getRemoteAddress() + ":连接成功"); ByteBuffer buffer = ByteBuffer.allocate(1024); Map<String, Object> info = new HashMap<>(); info.put("type", "read"); info.put("buffer", buffer); asc.read(buffer, info, new ReadAndWriteHandler(asc)); }
if (assc.isOpen()) { assc.accept(null, this); } }
@Override public void failed(Throwable exc, Object attachment){
} }
private class ReadAndWriteHandler implements CompletionHandler<Integer, Object> { private AsynchronousSocketChannel asc;
public ReadAndWriteHandler(AsynchronousSocketChannel asc) { this.asc = asc; }
@Override public void completed(Integer result, Object attachment) { Map<String, Object> info = (Map<String, Object>) attachment; ByteBuffer buffer = (ByteBuffer) info.get("buffer"); if ("read".equals(info.get("type"))) { buffer.flip(); String msg = new String(buffer.array()); System.out.println(asc.getRemoteAddress() + ":\t" + msg + "\t字节数量[" + result +"]"); buffer.clear(); info.put("type", "write"); asc.write(buffer, info, this); } else if ("write".equals(info.get("type"))) { asc.read(buffer, info, this); } }
@Override public void failed(Throwable exc, Object attachment){
} }
|
AsynchronousSocketChannel:创建客户端
- 前提:客户端的创建比较简单,所以采用未来时 Future 来处理
- 创建线程池(推荐):客户端依然有对应的 AsynchronousChannelGroup 来处理回调函数,只不过我这里就不再演示了
- 创建客户端:[connect]{.blue} 连接服务器端
public void start() { try { asc = AsynchronousSocketChannel.open(); asc.connect(new InetSocketAddress(DEFAULT_LOCALHOST, DEFAULT_PORT)); } catch (IOException | InterruptedException | ExecutionException e) { e.printStackTrace(); } }
|
Future:利用未来时处理 IO 事件
- 异步非阻塞式的方法都具有 Future 类型的返回值
public void start() { try { asc = AsynchronousSocketChannel.open(); Future<Void> connect = asc.connect(new InetSocketAddress(DEFAULT_LOCALHOST, DEFAULT_PORT)); connect.get(); String msg; while ((msg = reader.readLine()) != null) { ByteBuffer buffer = StandardCharsets.UTF_8.encode(msg); Future<Integer> write = asc.write(buffer); write.get(); buffer.clear(); Future<Integer> read = asc.read(buffer); read.get(); System.out.println(new String(buffer.array())); }
} catch (IOException | InterruptedException | ExecutionException e) { e.printStackTrace(); } }
|
实战:多人聊天室
package chatroom.v4;
import java.io.Closeable; import java.io.IOException; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousChannelGroup; import java.nio.channels.AsynchronousServerSocketChannel; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.*;
@SuppressWarnings("unchecked") public class ChatServer { private static final int DEFAULT_PORT = 8888; private static final int BUFFER_SIZE = 1024; private static final String EXIT = "退出"; private int port; private List<AsynchronousSocketChannel> clients; private AsynchronousServerSocketChannel server; private ExecutorService threadPool; private AsynchronousChannelGroup acg;
private class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel, Object> { @Override public void completed(AsynchronousSocketChannel client, Object attachment) { if (server.isOpen()) { server.accept(attachment, this); } System.out.println(getClientName(client) + "连接成功"); addClient(client); ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE); buffer.put(getClientName(client).getBytes()); client.read(buffer, buffer, new ReadHandler(client)); }
@Override public void failed(Throwable exc, Object attachment) { } }
private class ReadHandler implements CompletionHandler<Integer, Object> { private AsynchronousSocketChannel client;
public ReadHandler(AsynchronousSocketChannel client) { this.client = client; }
@Override public void completed(Integer result, Object attachment) { ByteBuffer buffer = (ByteBuffer) attachment; buffer.flip(); if (isExit(buffer)) { removeClient(client); client.write(StandardCharsets.UTF_8.encode(EXIT)); return; } String msg = String.valueOf(StandardCharsets.UTF_8.decode(buffer)); System.out.println(msg + "(" + result + "字节)"); forward(client, msg); buffer.clear(); buffer.put(getClientName(client).getBytes()); client.read(buffer, buffer, this); }
@Override public void failed(Throwable exc, Object attachment) {
} }
public ChatServer() { this(DEFAULT_PORT, null); }
public ChatServer(int port, AsynchronousServerSocketChannel server) { this.port = port; this.server = server; clients = new LinkedList<>(); }
private AsynchronousServerSocketChannel getAsynchronousServerSocketChannel() throws IOException { threadPool = new ThreadPoolExecutor( 8, 10, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy() ); acg = AsynchronousChannelGroup.withThreadPool(threadPool); return AsynchronousServerSocketChannel.open(acg); }
private void forward(AsynchronousSocketChannel client, String msg) { for (AsynchronousSocketChannel other : clients) { if (client != other) other.write(StandardCharsets.UTF_8.encode(msg)); } }
private boolean isExit(ByteBuffer buffer) { return EXIT.equals(new String(buffer.array())); }
private void addClient(AsynchronousSocketChannel client) { clients.add(client); }
private void removeClient(AsynchronousSocketChannel client) { close(client); clients.remove(client); }
private String getClientName(AsynchronousSocketChannel client) { int port = -1; try { InetSocketAddress address = (InetSocketAddress) client.getRemoteAddress(); port = address.getPort(); } catch (IOException e) { e.printStackTrace(); }
return "客户端[" + port + "]:"; }
private void close(Closeable closeable) { try { if (closeable != null) { closeable.close(); } } catch (IOException e) { e.printStackTrace(); } }
private void start() { try { if (server == null) server = getAsynchronousServerSocketChannel(); server.bind(new InetSocketAddress(port)); System.out.printf("服务器[%d]:%s\n", port, "启动成功"); while (true) { server.accept(null, new AcceptHandler()); System.in.read(); } } catch (IOException e) { e.printStackTrace(); } finally { close(server); }
}
public static void main(String[] args) { new ChatServer().start(); }
}
|
package chatroom.v4;
import java.io.BufferedReader; import java.io.Closeable; import java.io.IOException; import java.io.InputStreamReader; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousChannelGroup; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.Map; import java.util.concurrent.*;
@SuppressWarnings("unchecked") public class ChatClient { private static final String DEFAULT_LOCALHOST = "127.0.0.1"; private static final int DEFAULT_PORT = 8888; private static final int BUFFER_SIZE = 1024; private static final String EXIT = "退出"; private int port; private AsynchronousSocketChannel client; private AsynchronousChannelGroup acg; private ExecutorService threadPool;
public ChatClient() { this(DEFAULT_PORT, null); }
public ChatClient(int port, AsynchronousSocketChannel client) { this.port = port; this.client = client; }
private AsynchronousSocketChannel getAsynchronousSocketChannel() throws IOException { threadPool = new ThreadPoolExecutor( 15, 20, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy() ); acg = AsynchronousChannelGroup.withThreadPool(threadPool); return AsynchronousSocketChannel.open(acg); }
private String getClientName(AsynchronousSocketChannel client) { int port = -1; try { InetSocketAddress address = (InetSocketAddress) client.getRemoteAddress(); port = address.getPort(); } catch (IOException e) { e.printStackTrace(); }
return "客户端[" + port + "]:"; }
private void close(Closeable closeable) { try { if (closeable != null) closeable.close(); } catch (IOException e) { e.printStackTrace(); } }
private void start() { try { if (client == null) client = getAsynchronousSocketChannel(); Future<Void> connect = client.connect(new InetSocketAddress(DEFAULT_LOCALHOST, port)); connect.get(); System.out.println(getClientName(client) + "启动成功"); ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE); new Thread(new ChatClientInputThread(client)).start(); while (true) { Future<Integer> read = client.read(buffer); int res = read.get(); if (res <= 0) { close(client); System.exit(1); } else { buffer.flip(); System.out.println(StandardCharsets.UTF_8.decode(buffer)); buffer.clear(); }
}
} catch (IOException | InterruptedException | ExecutionException e) { e.printStackTrace(); } }
public static void main(String[] args) { new ChatClient().start(); } }
|
package chatroom.v4;
import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.nio.channels.AsynchronousSocketChannel; import java.nio.charset.StandardCharsets; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future;
public class ChatClientInputThread implements Runnable { private static final String EXIT = "退出"; private static final BufferedReader READER = new BufferedReader( new InputStreamReader( System.in)); private AsynchronousSocketChannel client;
public ChatClientInputThread(AsynchronousSocketChannel client) { this.client = client; }
@Override public void run() { try { String msg; while ((msg = READER.readLine()) != null) { Future<Integer> write = client.write(StandardCharsets.UTF_8.encode(msg)); write.get(); }
} catch (IOException | InterruptedException | ExecutionException e) { e.printStackTrace(); } } }
|