NIO

NIO

:::primary

前提:熟练掌握传统 I/O 核心思想及其核心类,便于和新 I/O 进行相应的对比学习

:::

概述

什么是 NIO?

  • 名称:
    • NIOJDK 1.4 之后提供,所以官方定义为 New I/O,用于区别 JDK 1.1 提供的传统 I/O
    • NIO 具有 ++非阻塞++ 的特性,所以也通常被称为 No-Blocking I/O,用于区别传统的 Blocking I/O
  • 细节:[推出 NIO 之后,传统 I/O 的底层重新采用 NIO 实现了一次,从而提升传统 I/O 的速度]{.blue}

什么是阻塞式 I/O?什么又是非阻塞式 I/O(之前 I/O 概述中已经提到过再次重复一遍)

  • 阻塞与非阻塞:[读写方法是否为阻塞式调用]{.red}
    • 阻塞式:服务器的线程调用 传统 I/O,此线程就必须一直等待客户端发送消息,[线程在此期间什么都不能做]{.red}
    • 非阻塞式:服务器的线程调用 新 I/O,此线程只需要在客户端发送消息的时候才去接收,[线程在此期间可以完成其他任务]{.red}

为什么需要提供非阻塞式的 I/O?

  • 核心:[减少线程对于服务器资源的占用以及服务器上下文切换占用的时间]{.red}

    !!线程、线程池以及上下文切换的概念及其特性参考操作系统相关知识!!{.bulr}

  • 服务器初期设计(BIO):

    • 背景:服务器显然需要处理大量客户端发来的请求,每个线程因为采用传统 I/O,所以仅能够处理一个客户端请求

    • 核心:

      • [多线程处理用户请求]{.red}

      • 每个客户端发来请求时,服务器都 [创建相应的线程]{.red} 负责处理客户端的请求,处理结束后 [线程销毁]{.red}

        85bed0bbdb4705d0e11ebf6ebe860b23.png
    • 缺点:

      • [大量线程的创建和销毁需要占用非常多的服务器资源]{.green}
      • [大量线程的存在会导致服务器频繁的执行上下文切换,浪费线程的执行时间]{.green}
  • 服务器改进设计(伪异步 I/O):

    • 背景:没有推出 新 I/O 的情况下,显然想要避免大量线程的创建和销毁就只能采用 [线程池技术]{.blue}

    • 核心:

      • [线程池技术]{.red}

      • 服务器创建线程池,线程的管理全部交给线程池负责,服务器 [“可以完成其他的任务”]{.blue},这就是所谓的伪异步

      • 服务器 “可以完成其他任务”只是给人的一种错觉,因为只要建立的是 [TCP]{.red} 连接,那么服务器只能处于阻塞状态,等待客户端发来请求

        5ab06e96145063ffd4c734a80e1b4551.png

    • 优点:[避免服务器创建和销毁大量的线程造成的资源浪费]{.red}

    • 缺点:

      • [大量的线程依然会导致频繁的执行上下文切换]{.green}
      • [线程池的线程数量是固定的就会导致在大量请求同时发出时,必然存在客户端长时间等待的情况]{.green}
  • 服务器现有设计(NIO):

    • 背景:无论如何,大量的线程都会造成服务器资源消耗,所以考虑能不能 [每个线程处理多个客户端请求]{.blue},从而减少线程的数量

    • 核心:

      • [Channel 类使得线程可以采用非阻塞式的读写方法,而不需要一直等待客户端请求]{.red}

        那么线程如何才能在做其他事情的时候,知道有客户端发送请求了呢?

      • [Selector 类可以监视多个客户端是否发送消息]{.red}

      • 服务器只需要创建少量的线程,线程监视每个客户端

        1. 如果没有任何客户端发送请求,线程可以根据服务器的需要完成其他的事件
        2. 如果某个客户端突然发送请求,线程立刻就回去处理该客户端发送的请求
        3. 如果线程监视的多个客户端同时发送请求,线程需要按照一定的顺序依次处理
      1509fb72aeea89d73f037b7c9cada762.png
    • 优点:[减少服务器需要的线程数量,避免资源的大量占用和上下文切换占用的时间]{.red}

    • 缺点:[单个线程可能来不及同时处理多个客户端请求,依然存在客户端等待的情况]{.green}

核心类

新特性

  • 核心:[传统 I/O 是 面向流 设计的,新 I/O 是面向 缓冲区 设计的]{.red}

  • Channel

    • 名称:通常直译为通道
    • 新特性:
      • [Channel 取代传统 I/O 中的 InputStream 和 OutputStream,实现双向读写]{.blue}
      • [Channel 具有非阻塞的特性,不过需要手动设置,默认为阻塞]{.blue}
    • 细节:
      • [Channel 必须借助缓冲区 Buffer 才可以将数据交付给进程]{.blue}
      • [Channel 之间可以直接交换数据]{.blue}
    5689b4d52a045ff5ae72f35155029a89.png
  • Buffer

    • 名称:缓冲区
    • 新特性:[可以存储大量的数据,便于一次性读写完成,实现双向读写]{.blue}
  • Selector

    • 名称:选择器
    • 新特性:
      • [采用轮询的方式监视多个 Channel 上的事件,如果发生事件就会通知服务器进行处理,如果没有发生事件就阻塞服务器]{.blue}
      • [实现 I/O 多路复用模型中的 Reactor 模型]{.blue}
    bb0faea705a6c70db4c16fe1b510a1ba.png

Buffer

缓冲区:基础知识

  • 定义:每个Channel 都需要先将 [数据写入缓冲区或者从缓冲区中读入数据]{.blue},然后客户端或者服务器从缓冲区中获取数据

  • 特点:① 抽象类 ② [底层采用数组实现]{.red} ③ [既可以读取也可以写入]{.red}

  • 子类

    • [核心子类]{.red}:ByteBuffer

    • 其余子类:IntBuffer、LongBuffer、CharBuffer、FloatBuffer、DoubleBuffer

      注:[ByteBuffer 用于传输二进制的数据,也就意味着可以传输任何类型的文件,用途相比于其他子类更加广泛]{.blue}

  • 内部结构

    • 写模式:每次向缓冲区中写入数据,position 指针都会相应的向前移动

      b8e02b3c24bb877d3e9d5a8f2039228b.png
    • 翻转:

      • 缓冲区读取数据是从起始指针开始读取的,而写模式下的 position 指针并没有指向写入的数据

      • [处于写模式下的缓冲区是无法直接读取到写入的数据,需要经过翻转操作后才可以读取数据]{.red}

      • position 指针重新指向缓冲区开头,limit 指针指向写入数据的末尾

        b81a2df08a90aca872acf6bff4326eca.png
    • 读模式:

      • 每次从缓冲区中读出数据,position 指针都会向前移动

      • 读取数据存在两种情况:

        1. 缓冲区中的数据已经被读取完毕:调用 [清空]{.red} 的方法重新返回写模式

        2. 缓冲区中的数据没有被全部读取:调用 [压缩]{.red} 的方法重新返回写模式

          95c63f409ed4df541187e6340ae63989.png
    • 清空:position 指针重新移动到缓冲区开头,limit 指针重新移动到缓冲区结尾

      6e570f500925a0f79dcc168c90cec5a7.png
    • 压缩:[将没有被读取的数据移动到缓冲区开头]{.red},position 指针移动到没有被读取的数据后面,limit 指针重新移动到缓冲区结尾

      af734a77fd91f2110f46b4f77e9ab622.png

      注:[无论是清空还是压缩方法,缓冲区中的数据都没有被实际清除,仅仅只是将指针重新移动到开始的位置而已]{.red}

  • 构造方法:[缓冲区抽象类创建对象采用单例模式,也就是说构造方法是私有的]{.red}

    Buffer(int mark, int pos, int lim, int cap) {       // package-private
    if (cap < 0)
    throw new IllegalArgumentException("Negative capacity: " + cap);
    this.capacity = cap;
    limit(lim);
    position(pos);
    if (mark >= 0) {
    if (mark > pos)
    throw new IllegalArgumentException("mark > position: ("
    + mark + " > " + pos + ")");
    this.mark = mark;
    }
    }
  • 方法

    • put() & get():[Buffer 抽象类没有提供读取和写入方法具体实现 -> 交付给子类自行实现读取和写入的方法]{.red}

    • compact():[Buffer 抽象类也没有提供压缩方法的具体实现]

    • flip():[缓冲区从写模式切换到读模式]{.red}

      /*不允许子类重写的方法*/
      public final Buffer flip() {
      // limit 指针移动到 position 指针处
      limit = position;
      // position 指针重新移动到缓冲区开始的位置
      position = 0;
      mark = -1;
      return this;
      }
    • clear():[清空缓冲区并且从读模式切换到写模式]{.red}

      public final Buffer clear() {
      position = 0;
      limit = capacity;
      mark = -1;
      return this;
      }
    • rewind():[重新将 position 指针移动到缓冲区起始位置]{.red}

      public final Buffer rewind() {
      position = 0;
      mark = -1;
      return this;
      }
    • hasremaning():[缓冲区中是否还存在数据]{.red}

      public final boolean hasRemaining() {
      return position < limit;
      }

ByteBuffer

字节缓冲区:基础知识

  • 特点:仅能够存放字节类型的数据
  • 子类:
    • MappedByteBuffer
    • DirectByteBuffer:[采用物理机中的真实内存作为缓冲区,读写效率高,创建效率低,容易出现内存泄露问题]{.red}
    • HeadByteBuffer:[采用虚拟机中的堆内存作为缓冲区,读写效率较低,创建效率较高]{.red}

字节缓冲区:方法

  • 创建缓冲区对象

    • 实例:

      // 创建缓冲区对象
      private static void createBuffer()
      {
      // 缓冲区使用堆内存: 缓冲区中的数据会受到虚拟机的垃圾回收机制的影响, 读写效率相对较低
      ByteBuffer heapBuffer = ByteBuffer.allocate(10);
      // 缓冲区采用直接内存(系统内存):
      // 缓冲区中的数据不会收到垃圾回收机制的影响, 读写效率高, 创建对象效率较低
      // 缓冲区中的数据如果没有合理释放,容易导致内存溢出或者泄露
      ByteBuffer directBuffer = ByteBuffer.allocateDirect(10);
      }
    • 方法:

      /*堆内存缓冲区*/
      public static ByteBuffer allocate(int capacity) {
      if (capacity < 0)
      throw new IllegalArgumentException();
      // 返回的是 HeapByteBuffer 对象,也就是 ByteBuffer 的子类
      return new HeapByteBuffer(capacity, capacity);
      }
      /*直接内存缓冲区*/
      public static ByteBuffer allocateDirect(int capacity) {
      // 返回的是 DirectByteBuffer 对象,也就是 ByteBuffer 的子类
      return new DirectByteBuffer(capacity);
      }
  • 读取和写入缓冲区:

    • 字符串转换为字节方式:[缓冲区只能够存放字节类型的数据,所以需要将发送的数据转换为字节类型]{.blue}

      // 字符串转换为字节类型
      public static void stringToBytes()
      {
      // 缓冲区
      ByteBuffer buffer1 = ByteBuffer.allocate(50);
      // 1.直接利用字符串类提供的方法
      byte[] bytes = "Hello,World!".getBytes();
      // 手动将字节数组放入缓冲区中
      buffer1.put(bytes);

      // 2.利用 NIO 提供的字符串转换方式:方法返回值就是缓冲区,不需要手动添加数据
      ByteBuffer buffer2 = StandardCharsets.UTF_16.encode("Hello,World");

      // 3. 利用缓冲区自身提供的方法实现字符串转换
      ByteBuffer buffer3 = ByteBuffer.wrap("Hello,World!".getBytes());
      }
    • 读取和写入方式

      // 读取和写入缓冲区的方法
      public static void readOrWriteBuffer()
      {
      ByteBuffer buffer = ByteBuffer.allocate(1024);
      // 仅写入单个字符
      buffer.put((byte)'A');
      // 指定字符写入的位置
      buffer.put(1, (byte)'B');
      // 写入字符数组
      buffer.put("Hello,World".getBytes());
      // 写入其他的缓冲区
      buffer.put(StandardCharsets.UTF_16.encode("你好~"));

      // 缓冲区记得翻转
      buffer.flip();

      System.out.println("当前位置: " + buffer.position());
      // 读取单个字节:每次读取, position 指针是会向前移动的
      System.out.println((char) buffer.get());
      System.out.println("当前位置: " + buffer.position());
      // 读取指定位置的字节: 每次读取, position 指针都是不会移动的
      // 指定的位置:当前 position 指针 + 给出的 index
      System.out.println((char) buffer.get(2));
      System.out.println("当前位置: " + buffer.position());
      // 读取字节数组
      byte[] bytes = new byte[10];
      // 每次读取一个字节数组的大小,将字节存放在字节数组中,并且 position指针 也是会向前移动的
      // 返回值是 ByteBuffer:并不是新创建一个缓冲区,而是返回当前缓冲区在读取一个字节数组之后的状态
      ByteBuffer byteBuffer = buffer.get(bytes);
      System.out.println("当前位置: " + buffer.position());
      }
  • 读写文件:[需要提前用到 Channel ]{.blue}

    // 利用 NIO 核心类读取文件
    public static void readFileByNIO() throws IOException
    {
    // 利用文件流获取到相应的通道
    FileChannel channel = new FileInputStream(new File("file.txt")).getChannel();
    // 为缓冲区分配大小
    ByteBuffer buffer = ByteBuffer.allocate(5);
    // 从通道中读取数据 -> 将通道中读取的数据写入缓冲区 -> 服务器/客户端从缓冲区中接收数据
    // 读取次数
    int count = 0;
    // 循环读取通道中的数据: 这里缓冲区大小仅有 5 字节, 而文本含有 12 字节, 显然需要读 3 次
    while (channel.read(buffer) != -1)
    {
    // 将缓冲区中的数据读取出来: 先进行翻转后读取
    buffer.flip();
    // 循环读出缓冲区中的数据
    while (buffer.hasRemaining())
    {
    // 只能一个字节一个字节地读取
    System.out.print((char) buffer.get());
    }
    System.out.println("\n读取次数: " + ++count);
    // 需要再次向缓冲区中写入数据:先将缓冲区清空后写入
    buffer.clear();
    }
    // 关闭通道
    channel.close();
    }

字节缓冲区:分散读取 + 集中写入

  • 分散读取(ScatteringRead):[从通道中的读取的数据同时向多个缓冲区写入数据 -> 最后从多个缓冲区中读取数据]{.red}

    // 分散读取
    public static void scatteringRead() throws IOException
    {
    FileChannel fileChannel = new FileInputStream(new File("file.txt")).getChannel();
    // 多个缓冲区
    ByteBuffer buffer1 = ByteBuffer.allocate(3);
    ByteBuffer buffer2 = ByteBuffer.allocate(4);
    ByteBuffer buffer3 = ByteBuffer.allocate(5);
    // 通道中的数据同时写入到多个缓冲区中
    fileChannel.read(new ByteBuffer[]{buffer1, buffer2, buffer3});
    // 从多个缓冲区中分别读取数据:分散读取
    buffer1.flip();
    while (buffer1.hasRemaining())
    {
    System.out.print((char) buffer1.get());
    }
    }
  • 集中写入(GatheringWrites):[向缓冲区中写入数据 -> 同时从多个缓冲区中读取数据并且写入通道中]{.red}

    public static void gatheringWrites() throws IOException
    {
    FileChannel fileChannel = new FileOutputStream(new File("newfile.txt")).getChannel();
    // 多个缓冲区
    ByteBuffer buffer1 = StandardCharsets.UTF_8.encode("Hello,World\n");
    ByteBuffer buffer2 = Charset.defaultCharset().encode("你好~\n");
    ByteBuffer buffer3 = ByteBuffer.wrap("I/O 模型是个啥啊\n".getBytes());
    // 同时从多个缓冲区中读取数据后写入通道中
    fileChannel.write(new ByteBuffer[]{buffer1, buffer2, buffer3});

    }

字节缓冲区:黏包半包分析

  • 问题:

    • 客户端向服务器发送多条消息,每条消息采用换行符分割开来

      // 发送的消息: 
      "Hello,World!\n"
      "Persona5\n"
      "Tohou\n"
    • 通道能够写入的数据是 [有限]{.red} 的,缓冲区可能一次无法将所有的数据全部写入通道中

      导致缓冲区中遗留下来的数据和下次写入缓冲区中的数据进行重新组合,接收方接收到混乱的数据

      // 黏包半包现象
      // 半包现象:仅有部分数据被接收到
      "Hello"
      // 黏包想象:多个数据被连接起来形成一条数据
      ",World!\nPersona5\nTohou"
    • 简单的解决方案

      // 简单解决黏包半包现象
      public static void split(ByteBuffer buffer)
      {
      // 缓冲区切换成读模式
      buffer.flip();
      // 存放完整消息的缓冲区
      ByteBuffer destination;
      for (int i = 0; i < buffer.limit(); i++)
      {
      // 如果读取到换行符就把前面所有字节读取出来
      if (buffer.get(i) == '\n')
      {
      byte[] bytes = new byte[i + 1 - buffer.position()];
      buffer.get(bytes);
      for (byte b : bytes)
      {
      System.out.print((char)b);
      }
      }
      }
      // 最后如果缓冲区中仍然存在不完整的消息:那么将剩余数据压缩到缓冲区头部
      buffer.compact();
      }

Channel

  • 特点:① 抽象类 ② 可读可写 ③ 通道中的数据必须经过缓冲区后才可以输出 ④ 通道提供的方法默认为阻塞式调用,可以选择设置为非阻塞式调用
  • 子类
    • 文件通道:FileChannel
      • FileChannel 仅提供阻塞式方法,并且不能够使用 Selector 优化传输过程
      • FileChannel 可以创建单向的通道对象,也可以创建双向的通道对象
    • 网络通道:
      • UDP 连接:DatagramChannel

      • TCP 连接:ServerSocketChannel、SocketChannel

FileChannel

文件通道:基础知识

  • 定义:文件之间交互数据的通道
  • 特点:
    • 可以利用流对象和随机流对象创建文件通道:流对象创建的通道是单向的,随机流对象创建的通道可以双向可以单向
    • 可以利用静态方法创建对象(单例模式)
    • [文件通道仅能够采用阻塞式方法进行数据的传输]{.red}
    • [文件通道之间可以不借助缓冲区直接交互数据、其余通道子类必须借助缓冲区交互数据]{.red}
    • [写入文件通道的数据并不会立刻被操作系统写入文件,而是会先写入内存的缓冲区,等待缓冲区充满之后才会全部写入]{.red}

文件通道:方法

  • 创建文件通道

    • 利用文件流获取通道:FileInputStream 获取通道;FileOutputStream 获取通道

      • [通道本身具有双向性,但是通过文件流获取的通道依然只能够单向传输]{.red}

      • [单向通道不可以调用反向的方法,否则会抛出异常]{.red}(NonWritableChannelException、NonReadableChannelException

        // 获取文件通道
        public static void getFileChannel() throws FileNotFoundException
        {
        FileChannel fileInputChannel = new FileInputStream(new File("file.txt")).getChannel();
        FileChannel fileOutputChannel = new FileOutputStream(new File("file.txt")).getChannel();
        }
    • 利用随机文件流获取通道:[指定随机文件流为可读可写时,获取的通道才具有双向性]{.red}

      // 获取文件通道
      public static void getFileChannel() throws FileNotFoundException
      {
      FileChannel fileChannel = new RandomAccessFile(new File("file.txt"), "rw").getChannel();
      // 既可以读取也可以写入
      filechannel.read(...);
      filechannel.write(...);
      }
    • 利用工具类创建获取通道

      // 创建双向的文件通道
      public static void getDoubleChannel() throws IOException
      {
      // 指定当前的通道类型:只能够指定一个
      FileChannel fileChannel1 = FileChannel.open(Paths.get("file.txt"), StandardOpenOption.READ);
      // 指定当前通道的类型:可以指定多个,也就意味着通道可以实现双向性
      Set<OpenOption> openOptions = new HashSet<>();
      Collections.addAll(openOptions, StandardOpenOption.WRITE, StandardOpenOption.READ);
      FileChannel fileChannel2 = FileChannel.open(Paths.get("file.txt"), openOptions);
      }
  • 文件通道读写方法

    // 通道的读写方法
    public static void channelReadAndWrite() throws IOException
    {
    // 使用双向的文件通道
    Set<OpenOption> openOptions = new HashSet<>();
    Collections.addAll(openOptions, StandardOpenOption.WRITE, StandardOpenOption.READ);
    FileChannel fileChannel = FileChannel.open(Paths.get("file.txt"), openOptions);
    ByteBuffer buffer1 = ByteBuffer.allocate(12);
    ByteBuffer buffer2 = ByteBuffer.allocate(12);
    ByteBuffer buffer3 = ByteBuffer.allocate(12);
    // 写入单个缓冲区
    fileChannel.read(buffer1);
    // 写入多个缓冲区
    fileChannel.read(new ByteBuffer[]{buffer1, buffer2, buffer3});

    // 读取单个缓冲区
    fileChannel.write(buffer1);
    // 读取多个缓冲区
    fileChannel.write(new ByteBuffer[]{buffer1, buffer2, buffer3});
    }
  • 文件通道相互传递数据:[通道借助缓冲区可以交互数据,也可以通道之间直接交互数据,效率更高 -> 用于实现文件拷贝]{.red}

    • 通道借助缓冲区交互数据

      // 通道间通信
      public static void channelTransfer() throws IOException
      {
      ByteBuffer byteBuffer = StandardCharsets.UTF_8.encode("Hello,World!");
      FileChannel fic = new FileInputStream(new File("file.txt")).getChannel();
      FileChannel foc = new FileOutputStream(new File("newfile.txt")).getChannel();
      // 借助缓冲区交互数据
      fic.read(byteBuffer);
      byteBuffer.flip();
      foc.write(byteBuffer);
      byteBuffer.clear();
      }
    • 通道之间直接交互数据

      • [每次通道仅能够交互 2G 的数据,超出传输限制后就不能够一次传输完成]{.red}
      • 实际测试每次传输最大为 2079MB 的大小
      // 通道间通信
      public static void channelTransfer() throws IOException
      {
      ByteBuffer byteBuffer = StandardCharsets.UTF_8.encode("Hello,World!");
      FileChannel fic = new FileInputStream(new File("file.txt")).getChannel();
      FileChannel foc = new FileOutputStream(new File("newfile.txt")).getChannel();
      // 直接交互大量数据: 文件通道超过 2G 后就没有办法一次性交互完成了
      int count = 0;
      long length = fic.size();
      for (long i = 0; i < length;)
      {
      // transferTo 会返回写入的字节数量
      i = fic.transferTo(i , length - i, foc);
      System.out.println(i + "\t" + ++count);
      }

      }

文件通道:工具类

  • 前提:JDK 1.7 之后提供了新的文件工具类 Path & Files

  • Path & Paths

    • 定义:Path 采用文件路径的表示文件的 [接口]{.red};Paths 类是用来构建 Path 对象的工具类

    • 归属:package java.nio.file;

      注:为什么提到这个呢?因为 package javafx.scene.shape; 包中同样提供了 Path 类

    • 方法:

      • Path:提供的方法和 File 文件类相近,不再加以赘述

      • Paths:创建实现了 Path 接口的对象

        // 创建 Path 文件对象
        public static void createPath()
        {
        Path path = Paths.get("file.txt");
        }
  • Files

    • 定义:用于控制 Path 文件路径表示的文件

    • 归属:package java.nio.file;

    • 新增目录

      // 使用 Files 工具类
      public static void filesToAdd() throws IOException
      {
      Path path = Paths.get("first/second/file.txt");
      // 检验路径表示的文件是否存在
      Files.exists(path);
      // 创建单级目录: 如果创建的是多级目录的话就会报错
      Files.createDirectory(path);
      // 创建多级目录
      Files.createDirectories(path);
      }
    • 删除文件/目录

      public static void filesToDel() throws IOException
      {
      Path path = Paths.get("file.txt");
      Path directory = Paths.get("src/main/java/network/");
      // 删除文件: 如果文件不存在会报错
      Files.delete(path);
      // 删除目录:如果目录不为空会报错
      Files.delete(directory);
      }
    • 拷贝/移动文件

      public static void filesToUpdate() throws IOException
      {
      Path source = Paths.get("newfile.txt");
      // 移动的位置的文件可以是不存在的,会自动帮你生成
      Path destination = Paths.get("src/main/java/mynio/newfile.txt");
      // 将文件复制到新的位置: 如果文件已经存在默认报错,可以设置选项强制覆盖已经存在的文件
      Files.copy(source, destination, StandardCopyOption.REPLACE_EXISTING);
      // 将文件移动到新的位置: 保证文件移动的原子性
      Files.move(source, destination, StandardCopyOption.ATOMIC_MOVE);
      }
    • 遍历目录:JDK 1.7 以前遍历文件目录是需要采用递归手动实现的

      // 采用 walktree 方法实现的目录遍历
      public static void filesToList() throws IOException
      {
      Path directory = Paths.get("D:\\动画\\魔法禁书目录");
      // 计算文件数量: 使用原子整型类
      // 因为匿名内部类中所有变量都默认是 final, 所以不可以使用普通变量
      AtomicInteger count = new AtomicInteger();
      Files.walkFileTree(directory, new SimpleFileVisitor<Path>(){
      // SimpleFileVisitor 的构造方法是受保护的,所以我们不可以直接构造其对象
      // 这里采用匿名内部类的方式,实际上是创建了继承 SimpleFileVisitor 的类
      // 理所应当地可以创建其子类的对象
      // 第一个重写方法:访问目录前需要完成的事情
      @Override
      public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException
      {
      System.out.println("-->"+dir);
      return super.preVisitDirectory(dir, attrs);
      }
      // 第二个重写的方法:访问到文件时需要完成的事情
      @Override
      public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException
      {
      System.out.println(file);
      count.getAndIncrement();
      return super.visitFile(file, attrs);
      }
      // 第三个需要重写的方法:退出目录时完成的事情
      @Override
      public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException
      {
      System.out.println("<--"+dir);
      return super.postVisitDirectory(dir, exc);
      }
      });
      // 输出文件数量
      System.out.println(count);
      }

      // 采用 walk 实现方法实现目录遍历
      public static void filesToList() throws IOException
      {
      Path directory = Paths.get("D:\\动画\\魔法禁书目录");
      AtomicInteger count = new AtomicInteger();
      // 指定遍历的深度: 即可以向下遍历多少层级的目录
      Files.walk(directory, 4).forEach(path -> {
      if (Files.isDirectory(path))
      {
      System.out.println("目录: " + path);
      }
      else if (Files.isRegularFile(path))
      {
      System.out.println("文件: " + path);
      count.getAndIncrement();
      }
      });
      System.out.println(count);
      }

SocketChannel

  • 定义:客户端使用的通道端口

  • 特点:

    • 仅能够利用静态方法创建对象(构造方法是受到保护的)
    • 默认提供阻塞式的方法,可以设置为非阻塞式方法:[设置读写方法为非阻塞]{.red}
  • 方法

    • 创建客户端端口对象:[客户端可以在创建端口对象的同时指定连接的服务器端口号]{.red}

      // 创建对象时指定端口号并且连接该端口号:不需要手动连接服务器端口
      public static void main(String[] args) throws IOException
      {
      SocketChannel sc = SocketChannel.open(new InetSocketAddress("127.0.0.1", 4396));
      System.out.println("客户端:启动成功" );
      }

      // 手动连接服务器端口号
      public static viod main(String[] args) throws IOException
      {
      SocketChannel sc = SocketChannel.open();
      // 客户端连接服务器: 调用 connect 方法连接不是 bind
      sc.connect(new InetSocketAddress("127.0.0.1", 4396));
      }
    • 注册监听器(涉及到 Selector 类)

      		
  • 客户端实例

    public static void main(String[] args) throws IOException
    {
    SocketChannel sc = SocketChannel.open();
    // 客户端连接服务器: 调用 connect 方法连接不是 bind
    sc.connect(new InetSocketAddress("127.0.0.1", 4396));
    System.out.println("客户端:启动成功" );
    }
  • 调试技巧

    • 可以不用代码编写输入流或者写入固定的数据用以测试

    • 可以在客户端最后一行代码旁打上 [断点]{.red}

    • 启动服务器端后以调试(Debug)的方式启动客户端

    • 在客户端的调试界面右击端口对象后,点击 Evaluate Expression 后直接编写代码调试

      19c7119a8d00be28477bff5632bc05e4.png
    • 编写代码调试,这样测试的代码就不用遗留下来

      4cb5269b20102ec2c36264e49ea32e44.png

ServerSocketChannel

  • 定义:服务器端使用的端口

  • 特点:

    • 仅能够利用静态方法创建对象(构造方法是受到保护的)
    • 默认提供阻塞式的方法,可以设置为非阻塞式方法:[设置接收请求的方法为非阻塞]{.red}
  • 方法:

    • 创建服务器端口对象

      // NIO 提供的端口对象不能够在创建的时候传入端口号:需要手动调用方法绑定
      ServerSocketChannel ssc = ServerSocketChannel.open();
    • 绑定端口号

      // IO 提供的端口对象实际上内部就是调用了 bind 方法:只不过现在需要手动调用了
      ssc.bind(new InetSocketAddress(4396));
    • 注册监听器(涉及到 Selector 类)

      // Selector 类中详细解释:设置当前通道关心哪些事件,只要事件被监听到,服务器就不再阻塞
      Selector selector = Selector.open();
      ssc.register(selector, SelectionKey.OP_ACCEPT);
  • 服务器实例

    • 传统 IO 实现的 BIO 通信模型需要使用多线程来满足多个客户端同时发出的请求

    • 新 IO 实现的 NIO 通信模型只需要使用单线程就可以完全满足多个客户端同时发出请求

      public static void main(String[] args) throws IOException
      {
      // 构造器时受到保护的: 通过静态方法创建对象
      ServerSocketChannel ssc = ServerSocketChannel.open();
      // 绑定端口号
      ssc.bind(new InetSocketAddress(4396));
      // 改进: 手动设置为非阻塞 -> accept 方法会非阻塞式调用
      ssc.configureBlocking(false);
      // 创建缓冲区
      ByteBuffer buffer = ByteBuffer.allocate(36);
      // 记录当前在线的客户端
      List<SocketChannel> clients = new LinkedList<>();
      // 服务器长时间运行
      while (true)
      {
      SocketChannel sc = ssc.accept();
      if (sc != null)
      {
      // 只要服务器接收到客户端相应的请求就将其添加到集合汇总
      clients.add(sc);
      System.out.println(sc);
      // 改进:从客户端通道读取数据时也设置为非阻塞 -> 读写方法变为非阻塞方法
      sc.configureBlocking(false);
      }
      // 读取客户端发送的消息
      for (SocketChannel client : clients)
      {
      // 只有在读取到字节后才可以输出: read 方法没有读取到字节返回 0, 抛出异常返回 -1
      if (client.read(buffer) > 0)
      {
      // 输出缓冲区中的内容
      readBuffer(buffer);
      System.out.print("\t" + client + "\n");
      }
      }
      }

      }
  • 问题:思考一下,现在的服务器代码是否存在问题?如果存在问题,那么应该怎样解决?

Selector

Selector:基础知识

  • 前情提要:

    • 问题:测试过之前的服务器代码就能够发现问题,服务器即使在没有客户端连接的情况下都在运行,非常占用服务器资源

      !!经过自己电脑测试之后发现,每开一个客户端进程,CPU 的资源利用率就会上涨将近 15%!!

    • 目的:

      我们显然不希望在没有客户端请求到来的时候服务器也在不停地运行

      [我们需要的是服务器仅在客户端发来请求的时候才开始处理,其余时候阻塞就行]{.blue}

    • 解决:[Selector 类]{.red}

  • 名称:直译为选择器

  • 定义:Selector 监视 [事件]{.red} 的发生:只要事件发生就通知相应的通道来处理该事件,否则就阻塞服务器

  • 核心:[将轮询的非阻塞机制优化成复用式的阻塞机制]{.red}

    注:Selector 并不是将阻塞变成非阻塞,而是对非阻塞机制进行优化,优化的结果就是阻塞机制

  • 事件:[主要分为四种事件]{.red}

    • SelectionKey.OP_ACCEPT:服务器接收客户端连接请求时该事件发生
    • SelectionKey.OP_READ:客户端发送数据时该事件发生
    • SelectionKey.OP_WRITE:服务器端发送数据时该事件发生
    • SelectionKey.OP_CONNECT:客户端向服务器发送连接请求时该事件发生
  • 特点:

    • [Selector 可以被多个通道共同使用]{.red}
    • [Selector 可以监视同一个通道关注的多个事件]{.red}
  • 创建并注册 Selector

    • 创建:Selector 构造方法依然是受到保护的,仅能够通过静态方法创建实例对象
    • 注册:
      • 概述:[ServerSocketChannel 对象]{.blue} 调用 register 方法
      • 第一个参数:传入通道想要注册的监视器(注:是通道向监视器注册)
      • 第二个参数:传入通道关注的事件([注:通道可以同时关注多个事件]{.red})
      • 第三个参数:传入通道独占的缓冲区,主要在服务器发生 [写事件]{.red} 时会使用到
      • 返回值:Selector 会为通道分配相应的管理员,用来管理该通道关注的所有事件
    public static void main(String[] args) throws IOException
    {
    // 服务器端口
    ServerSocketChannel ssc = ServerSocketChannel.open();
    ssc.bind(new InetSocketAddress(4396));
    ssc.configureBlocking(false);
    // 创建监视器(选择器)
    Selector selector = Selector.open();
    // 服务器端注册监视器: 指定当前通道关注的事件
    // 返回值为监视器分配的管理员: 用于管理该通道关注的所有事件
    SelectionKey key = ssc.register(selector, SelectionKey.OP_ACCEPT, null);
    ...
    }
  • 阻塞服务器:Selector 调用 select 方法

    public static void main(String[] args) throws IOException
    {
    ...
    while (true)
    {
    // 只要没有事件发生,监视器就会阻塞服务器; 只要事件产生,就让服务器开始处理事件
    selector.select();
    ...
    }
    ...
    }
  • 获取并且遍历事件集合

    public static void main(String[] args) throws IOException
    {
    ...
    while (true)
    {
    // 获取当前发生的事件集合: 因为可能多个客户端同时触发事件,所以是集合
    Set<SelectionKey> selectionKeys = selector.selectedKeys();
    // 采用迭代器遍历集合: 后续涉及到移除事件
    Iterator<SelectionKey> iterator = selectionKeys.iterator();
    // 管理员判断当前是哪种事件发生了
    if (key.isAcceptable())
    {
    // 遍历事件集合处理事件
    while (iterator.hasNext())
    {
    // 获取当前的事件
    SelectionKey sk = iterator.next();
    ...
    }
    }
    }
    ...
    }
  • 处理事件

    • 处理接收事件
    • 处理读事件
    • 处理写事件
  • 移除事件

Selector:相关问题

  • 处理客户端断开连接问题
  • 处理消息边界问题
  • 处理容量超出问题

实例: 多人聊天室

服务器端

package chatroom.v5;

import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.Set;

// 多人聊天室: 采用 NIO + Selector 实现
public class ChatServer
{
// 服务器端口号: 默认使用的端口号
private static final int DEFAULT_PORT = 7777;
// 缓冲区固定分配的空间大小
private static final int BUFFER_SIZE = 1024;
// 退出消息标识符
private static final String EXIT = "退出";
// 调用者可以根据需要使用其他的端口号
private int port;
// 服务器端
private ServerSocketChannel server;
// 监听器
private Selector selector;

public ChatServer()
{
this(DEFAULT_PORT);
}

public ChatServer(int port)
{
this.port = port;
}

// 获取客户端相关信息
private String getClientName(SocketChannel client)
{
return "客户端[" + client.socket().getPort() + "]:";
}

// 处理连接事件
private void acceptHandler(SelectionKey selectionKey) throws IOException
{
// 通知相应的通道来处理事件: 服务器端开始处理
ServerSocketChannel server = (ServerSocketChannel) selectionKey.channel();
// 获取客户端连接
SocketChannel client = server.accept();
// 打印提示信息
System.out.println(getClientName(client) + "连接成功");
// 客户端同样设置为非阻塞
client.configureBlocking(false);
// 客户端向选择器注册: 每个客户端都有属于自己的专属缓冲区
client.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(BUFFER_SIZE));
}

// 转发消息
private void forward(Set<SelectionKey> selectionKeys, SocketChannel client, ByteBuffer buffer) throws IOException
{
// 开始处理事件: 获取所有注册在监听器上的通道
Set<SelectionKey> keys = selector.keys();
// 开始遍历
for (SelectionKey receiver : keys)
{
// 因为获取的是所有的事件而不是发生的所有事件, 所以这里面包含服务器端要处理的事件
// 我们只需要向客户端转发消息, 所有只需要处理对应通道为客户端的事件, 而不需要服务器端的
if (receiver.channel() instanceof SocketChannel)
{
SocketChannel channel = (SocketChannel) receiver.channel();
if (channel != client)
{
buffer.rewind();
channel.write(buffer);
}
}
}
}

// 判断客户端是否退出
private boolean isExit(ByteBuffer buffer)
{
return String.valueOf(StandardCharsets.UTF_8.decode(buffer)).contains(EXIT);
}

// 处理读事件
private void readHandler(SelectionKey selectionKey) throws IOException
{
// 缓冲区
ByteBuffer buffer = (ByteBuffer) selectionKey.attachment();
// 从发送消息的客户端中读取数据
SocketChannel client = (SocketChannel) selectionKey.channel();
buffer.put(getClientName(client).getBytes());
client.read(buffer);
buffer.flip();
if (isExit(buffer))
{
System.out.println(getClientName(client) + "断开连接");
client.write(StandardCharsets.UTF_8.encode(EXIT));
// 取消监听该事件
selectionKey.cancel();
// 关闭客户端
client.close();
// 结束执行该方法
return;
}

// 写模式切换成读模式
buffer.flip();
// 打印提示信息: decode 方法会将 position 指针移动, 所以使用这个方法之后想要再次读取, 就要使用 clear 或者 rewind 方法
System.out.println(StandardCharsets.UTF_8.decode(buffer));
// 转发消息
forward(selector.keys(), client, buffer);
}

// 处理发生的事件
private void selectionKeyHandler(Set<SelectionKey> selectionKeys) throws IOException
{
// 采用迭代器遍历事件集合更好: 因为有可能部分事件发生了但没有处理
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext())
{
// 遍历集合事件
SelectionKey selectionKey = iterator.next();
// 根据事件类型分别处理
if (selectionKey.isAcceptable())
{
acceptHandler(selectionKey);
}
else if (selectionKey.isReadable())
{
readHandler(selectionKey);
}
else if (selectionKey.isWritable())
{
// TODO 服务器端向客户端发送消息, 一次没有发送完成才会触发写事件
}
// 事件处理完成后移除
iterator.remove();
}
}

private void close(Closeable close) throws IOException
{
if (close != null)
{
close.close();
}
}

private void start()
{
try
{
server = ServerSocketChannel.open();
// 服务器端设置为非阻塞
server.configureBlocking(false);
// 绑定端口号
server.bind(new InetSocketAddress(port));
// 创建选择器
selector = Selector.open();
// 服务器端注册: 仅需要关注客户端连接的事件
server.register(selector, SelectionKey.OP_ACCEPT, null);
// 打印提示信息
System.out.printf("服务器[%d]:%s\n", port, "启动成功");
// 服务器开始等待客户端连接并且处理事件
while (true)
{
// 阻塞服务器: 防止服务器空转
selector.select();
// 获取此时所有发生的事件并处理
selectionKeyHandler(selector.selectedKeys());

}
}
catch (IOException e)
{
e.printStackTrace();
}
}

public static void main(String[] args)
{
new ChatServer().start();
}

}

客户端

package chatroom.v5;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.Set;

public class ChatClient
{
private static final String DEFAULT_LOCALHOST = "127.0.0.1";
private static final int DEFAULT_PORT = 7777;
private static final int BUFFER_SIZE = 1024;
private static final String EXIT = "退出";
private int port;
private String localhost;
private SocketChannel client;
private Selector selector;

public ChatClient()
{
this(DEFAULT_PORT, DEFAULT_LOCALHOST);
}

public ChatClient(int port, String localhost)
{
this.port = port;
this.localhost = localhost;
}

// 获取客户端相关信息
private String getClientName(SocketChannel client) throws IOException
{
InetSocketAddress address = (InetSocketAddress) client.getLocalAddress();
return "客户端[" + address.getPort() + "]:";
}

// 处理连接事件
@Deprecated
private void connectHandler(SelectionKey selectionKey) throws IOException
{
// 获取处理事件的通道
SocketChannel channel = (SocketChannel) selectionKey.channel();
// 连接正在建立时返回 true
if (channel.isConnectionPending())
{
// 手动确认连接建立完成
channel.finishConnect();

}
}

// 处理读取事件
private void readHandler(SelectionKey selectionKey) throws IOException
{
// 获取事件对应的通道
SocketChannel client = (SocketChannel) selectionKey.channel();
// 获取事件对应的附件(缓冲区)
ByteBuffer buffer = (ByteBuffer) selectionKey.attachment();
client.read(buffer);
// 写模式切换成读模式
buffer.flip();
String msg = String.valueOf(StandardCharsets.UTF_8.decode(buffer));
if (EXIT.equals(msg))
{
System.exit(1);
}
// 打印提示信息
System.out.println();
// 读模式切换成写模式
buffer.clear();
}

// 处理事件
private void selectionKeysHandler(Set<SelectionKey> selectionKeys) throws IOException
{
// 遍历事件集合
for (SelectionKey selectionKey : selectionKeys)
{
readHandler(selectionKey);
}
selectionKeys.clear();
}

private void start()
{
try
{
// 创建客户端
client = SocketChannel.open();
// 客户端绑定端口号
client.connect(new InetSocketAddress(localhost, port));
// 客户端设置为非阻塞
client.configureBlocking(false);
// 开启输入线程
new Thread(new ChatClientInputThread(client)).start();
// 打印提示信息
System.out.println(getClientName(client) + "启动成功");
// 监听器
selector = Selector.open();
// 注册监听事件
client.register(selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ,
ByteBuffer.allocate(BUFFER_SIZE));

while (true)
{
// 阻塞客户端
selector.select();
// 获取发生的事件集合
selectionKeysHandler(selector.selectedKeys());
}

}
catch (IOException e)
{
e.printStackTrace();
}
}

public static void main(String[] args)
{
new ChatClient().start();
}
}

package chatroom.v5;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;

public class ChatClientInputThread implements Runnable
{

private static final String EXIT = "退出";
private static final int BUFFER_SIZE = 1024;
private static final BufferedReader READER = new BufferedReader(
new InputStreamReader(
System.in));
private SocketChannel client;

public ChatClientInputThread(SocketChannel client)
{
this.client = client;
}

@Override
public void run()
{
try
{
String msg;
while ((msg = READER.readLine()) != null)
{
ByteBuffer buffer = StandardCharsets.UTF_8.encode(msg);
client.write(buffer);
}
}
catch (IOException e)
{
e.printStackTrace();
}
}
}

Author: Fuyusakaiori
Link: http://example.com/2021/08/18/java/io/NIO/
Copyright Notice: All articles in this blog are licensed under CC BY-NC-SA 4.0 unless stating additionally.