NIO学习笔记

    技术2022-07-13  78

    目录

    背景

    NIO与IO的区别和基本用法

    与IO的区别

    缓冲区的基本用法

    通道的使用

    直接缓存区和非直接缓存区

    分散与聚集

    网络通信

    阻塞式

    非阻塞式

    UDP

    管道

    结语

    背景

    整理对java异步io库的学习笔记,示例代码运行环境为jdk8

    NIO与IO的区别和基本用法

    与IO的区别

    1、NIO是New  IO或 Non-blocking IO的缩写,面向缓冲区,而IO面向流

    2、NIO非阻塞,采用选择器,而IO是阻塞式的

    选择器就是打开到IO设备的连接,负责传输,缓冲区负责存储

    缓冲区的基本用法

    缓冲区底层就是数组,它有七种基本数据类型的缓冲区(boolean除外),通过allocate()获取缓存区

    ByteBuffer buffer = ByteBuffer.allocate(10 * 1024);

    存入数据使用put()方法,读取数据使用get()方法

    缓存区四个核心属性:

    1)、capacity:容量,声明后不能改变

    2)、limit:界限,表示缓存区中可操作数据的大小,也就是limit后的数据不能读写

    3)、position:位置,表示缓存区中正在操作数据的位置,显然position <= limit <= capacity

    4)、mark:标记,表示当前position的位置,通过reset()恢复到mark的位置

    各个方法使用如下:

    System.out.println(buffer.position()); // 0 System.out.println(buffer.limit()); // 10240 System.out.println(buffer.capacity()); // 10240 buffer.put("szc".getBytes()); // 写数据 System.out.println(buffer.position()); // 3 System.out.println(buffer.limit()); // 10240 System.out.println(buffer.capacity()); // 10240 buffer.put("szc".getBytes()); buffer.flip(); // 切换成读数据模式 System.out.println(buffer.position()); // 0 System.out.println(buffer.limit()); // 3,换成上一个position System.out.println(buffer.capacity()); // 10240 byte[] bytes = new byte[buffer.limit()]; buffer.get(bytes, 0, bytes.length); // 读数据 System.out.println(new String(bytes)); // szc System.out.println(buffer.position()); // 3 System.out.println(buffer.limit()); // 3 System.out.println(buffer.capacity()); // 10240 buffer.rewind(); // 重新读数据 System.out.println(buffer.position()); // 0 System.out.println(buffer.limit()); // 3 System.out.println(buffer.capacity()); // 10240 buffer.clear(); // 清空缓存区,数据依旧存在,但处于被遗忘状态,也就是所有的指标都回到了初始位置 System.out.println(buffer.position()); // 0 System.out.println(buffer.limit()); // 10240 System.out.println(buffer.capacity()); // 10240 buffer.get(bytes, 0, bytes.length); System.out.println(new String(bytes)); // szc

    对于mark()和reset()方法

    ByteBuffer buffer = ByteBuffer.allocate(10 * 1024); buffer.put("abcdefgh".getBytes()); buffer.flip(); byte[] bytes = new byte[2]; buffer.get(bytes, 0, bytes.length); System.out.println(new String(bytes)); System.out.println(buffer.position()); // 2 System.out.println(buffer.limit()); // 8 System.out.println(buffer.capacity()); // 10240 buffer.mark(); // 记录当前位置 2 buffer.get(bytes, 0, bytes.length); System.out.println(new String(bytes)); System.out.println(buffer.position()); // 4 System.out.println(buffer.limit()); // 8 System.out.println(buffer.capacity()); // 10240 buffer.reset(); // 位置记录重置,恢复到mark的位置 System.out.println(buffer.position()); // 2 System.out.println(buffer.limit()); // 8 System.out.println(buffer.capacity()); // 10240

    对于判断还有没有可操作的数据,可以先用hasRemaining()判断,为真后再使用remaining()获取还能操作数量

    ByteBuffer buffer = ByteBuffer.allocate(10 * 1024); buffer.put("abcdefgh".getBytes()); buffer.flip(); byte[] bytes = new byte[2]; buffer.get(bytes, 0, bytes.length); System.out.println(new String(bytes)); System.out.println(buffer.position()); // 2 System.out.println(buffer.limit()); // 8 System.out.println(buffer.capacity()); // 10240 buffer.mark(); // 记录当前位置 2 buffer.get(bytes, 0, bytes.length); System.out.println(new String(bytes)); System.out.println(buffer.position()); // 4 System.out.println(buffer.limit()); // 8 System.out.println(buffer.capacity()); // 10240 System.out.println("==============="); if (buffer.hasRemaining()) { System.out.println(buffer.remaining()); // 4 } System.out.println("===============");

    通道的使用

    通过用于源结点和目标结点的连接,负责传输缓冲区的数据。本地传输可以用FileChannel,tcp传输可以用SocketChannel和ServerSocketChannel,udp可以用DatagramSocket

    获取通道的方式:

    1)、本地IO:通过FileInputStream/FileOutputStream、RandomAccessFile的getChannel()方法获取

    2)、网络IO:通过Socket、ServerSocket、DatagramSocket的getChannel()方法获取

    3)、jdk1.7中针对各个通过提供了静态方法open()

    4)、jdk1.7的Files.newByteChannel()方法

    通道实现文件复制的方法如下所示

    public class ChannelTest { public static void main(String[] args) { try { FileInputStream inputStream = new FileInputStream("D:\\test.jpg"); FileOutputStream outputStream = new FileOutputStream("test.jpg"); FileChannel inChannel = inputStream.getChannel(); // 获取输入通道 FileChannel outChannel = outputStream.getChannel(); // 获取输出通道 ByteBuffer buffer = ByteBuffer.allocate(1024); // 分配缓存区 while (inChannel.read(buffer) != -1) { buffer.flip(); outChannel.write(buffer); buffer.clear(); } inChannel.close(); outChannel.close(); inputStream.close(); outputStream.close(); } catch (Exception e) { e.printStackTrace(); } } }

    使用内存映射文件完成复制文件的代码如下所示,内存映射文件也是一种直接内存的获取方式

    try { FileChannel inChannel = FileChannel.open(Paths.get("D:/test.jpg"), StandardOpenOption.READ); FileChannel outChannel = FileChannel.open(Paths.get("test.jpg"), StandardOpenOption.WRITE, StandardOpenOption.READ, StandardOpenOption.CREATE); MappedByteBuffer inBuffer = inChannel.map(FileChannel.MapMode.READ_ONLY, 0, inChannel.size()); // 内存映射文件,仅支持byteBuffer,也是一种直接内存获取方式 MappedByteBuffer outBuffer = outChannel.map(FileChannel.MapMode.READ_WRITE, 0, inChannel.size()); byte[] bytes = new byte[1024]; inBuffer.get(bytes); outBuffer.put(bytes); inChannel.close(); outChannel.close(); } catch (IOException e) { e.printStackTrace(); }

    使用通道间传输直接进行文件的复制,这也是一种直接内存方式

    try { FileChannel inChannel = FileChannel.open(Paths.get("D:/test.jpg"), StandardOpenOption.READ); FileChannel outChannel = FileChannel.open(Paths.get("test.jpg"), StandardOpenOption.READ, StandardOpenOption.WRITE, StandardOpenOption.CREATE); inChannel.transferTo(0, inChannel.size(), outChannel); // outChannel.transferFrom(inChannel, 0, inChannel.size()); inChannel.close(); outChannel.close(); } catch (Exception e) { e.printStackTrace(); }

    直接缓存区和非直接缓存区

    此部分介绍亦可参见文章JVM学习之直接内存

    1、非直接缓存区:通过allocate()方法分配的缓存区,将缓存区建立在JVM堆中

    2、直接缓存区:通过allocateDirect()方法分配的缓存区,建立在物理内存中,可参见JVM学习笔记的直接内存部分

    直接缓存效率高,但不受gc控制,照样会发生OOM

     

    直接和非直接缓存区用法相同,可以通过isDirect()来判断是否是直接缓存区

    System.out.println(buffer.isDirect());

    建议把直接缓存区用在能给程序性能带来明显好处的地方

    分散与聚集

    1、分散读:将通道中的数据依次分散到多个缓冲区中

    2、聚集写:将多个缓冲区的数据依次聚集到一个通道中

    用法如下

    try { RandomAccessFile inFile = new RandomAccessFile("D:/eula.1031.txt", "rw"); FileChannel inChannel = inFile.getChannel(); ByteBuffer buffer1 = ByteBuffer.allocate(6 * 1024); ByteBuffer buffer2 = ByteBuffer.allocate(6 * 1024); ByteBuffer buffer3 = ByteBuffer.allocate(7 * 1024); ByteBuffer[] inBuffers = {buffer1, buffer2, buffer3}; inChannel.read(inBuffers); // 分散读,传入缓存数组即可,要保证缓存数组总大小大于待读取文件大小 for (ByteBuffer inBuffer : inBuffers) { inBuffer.flip(); } RandomAccessFile outFile = new RandomAccessFile("out.txt", "rw"); FileChannel outChannel = outFile.getChannel(); outChannel.write(inBuffers); // 聚集写,也是传入缓存数组 inChannel.close(); outChannel.close(); inFile.close(); outFile.close(); } catch (Exception e) { e.printStackTrace(); }

    网络通信

    使用NIO完成网络通信的三个核心:通道、缓存和选择器

    选择器是一种多路复用器,用于监控SelectableChannel的IO状况,而SelectableChannel是网络通道的共同父类

    阻塞式

    客户端,连接、分配缓存、读文件即可

    public class BlockingTestClient { public static void main(String[] args) throws IOException { SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("localhost", 9999)); ByteBuffer buffer = ByteBuffer.allocate(1024); FileChannel fileChannel = FileChannel.open(Paths.get("test.jpg"), StandardOpenOption.READ); while (fileChannel.read(buffer) != -1) { buffer.flip(); socketChannel.write(buffer); buffer.clear(); } fileChannel.close(); socketChannel.close(); } }

    服务器端,建立连接、监听套接字、接受连接、创建缓存、写文件即可

    public class BlockingTestServer { public static void main(String[] args) throws IOException { ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.bind(new InetSocketAddress(9999)); SocketChannel client = serverSocketChannel.accept(); ByteBuffer buffer = ByteBuffer.allocate(1024); FileChannel fileChannel = FileChannel.open(Paths.get("2.jpg"), StandardOpenOption.READ, StandardOpenOption.WRITE, StandardOpenOption.CREATE); while (client.read(buffer) != -1) { buffer.flip(); fileChannel.write(buffer); buffer.clear(); } fileChannel.close(); client.close(); serverSocketChannel.close(); } }

    如果客户端要接收返回值,则需要在完成文件传输后,调用通道的shutdownOutput()方法通知对方自己发送完毕了

    public class BlockingTestClient { public static void main(String[] args) throws IOException { SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("localhost", 9999)); ByteBuffer buffer = ByteBuffer.allocate(1024); FileChannel fileChannel = FileChannel.open(Paths.get("test.jpg"), StandardOpenOption.READ); while (fileChannel.read(buffer) != -1) { buffer.flip(); socketChannel.write(buffer); buffer.clear(); } socketChannel.shutdownOutput(); int len = socketChannel.read(buffer); byte[] bytes = new byte[len]; buffer.flip(); buffer.get(bytes); System.out.println(new String(bytes, "utf-8")); fileChannel.close(); socketChannel.close(); } }

    对应服务端代码

    public class BlockingTestServer { public static void main(String[] args) throws IOException { ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.bind(new InetSocketAddress(9999)); SocketChannel client = serverSocketChannel.accept(); ByteBuffer buffer = ByteBuffer.allocate(1024); FileChannel fileChannel = FileChannel.open(Paths.get("2.jpg"), StandardOpenOption.READ, StandardOpenOption.WRITE, StandardOpenOption.CREATE); while (client.read(buffer) != -1) { buffer.flip(); fileChannel.write(buffer); buffer.clear(); } buffer.put("ok".getBytes()); buffer.flip(); client.write(buffer); fileChannel.close(); client.close(); serverSocketChannel.close(); } }

    非阻塞式

    客户端只需要把连接通道设置为非阻塞即可

    public class NonBlockingTestClient { public static void main(String[] args) throws IOException { SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("localhost", 9999)); socketChannel.configureBlocking(false); // 切换成非阻塞通道 ByteBuffer buffer = ByteBuffer.allocate(1024); FileChannel outChannel = FileChannel.open(Paths.get("test.jpg"), StandardOpenOption.READ); while (outChannel.read(buffer) != -1) { buffer.flip(); socketChannel.write(buffer); buffer.clear(); } outChannel.close(); socketChannel.close(); } }

    服务端除了把连接通道设置为非阻塞,还要注册选择器,轮询选择器,根据选择器的状态执行不同的逻辑

    public class NonBlockingTestServer { public static void main(String[] args) throws IOException { ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false); serverSocketChannel.bind(new InetSocketAddress(9999)); Selector selector = Selector.open(); // 获取选择器 serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); // 通道注册选择器,监听接收事件 while (selector.select() > 0) { // 轮询,如果select()返回值>0,说明已经有一个监听事件发生 Set<SelectionKey> selectionKeys = selector.selectedKeys(); for (SelectionKey selectedKey : selectionKeys) { if (selectedKey.isAcceptable()) { // 如果接收就绪,创建连接套接字 SocketChannel clientChannel = serverSocketChannel.accept(); clientChannel.configureBlocking(false); clientChannel.register(selector, SelectionKey.OP_READ); // 注册读事件 } else if (selectedKey.isReadable()) { // 如果读就绪,接收文件 SocketChannel clientChannel = (SocketChannel) selectedKey.channel(); ByteBuffer buffer = ByteBuffer.allocate(1024); FileChannel outChannel = FileChannel.open(Paths.get("3.jpg") , StandardOpenOption.READ, StandardOpenOption.WRITE, StandardOpenOption.CREATE); while (clientChannel.read(buffer) != -1) { buffer.flip(); outChannel.write(buffer); buffer.clear(); } outChannel.close(); clientChannel.close(); } selectionKeys.remove(selectedKey); // 用完选择键,就要移除 } } } }

    UDP

    UDP一般不用来做文件传输,所以用它来实现一个聊天即可

    客户端,创建通道、设置非阻塞,然后发送包即可

    public class NonBlockingUDPSender { public static void main(String[] args) throws IOException { DatagramChannel datagramChannel = DatagramChannel.open(); datagramChannel.configureBlocking(false); ByteBuffer buffer = ByteBuffer.allocate(1024); Scanner scanner = new Scanner(System.in); while (scanner.hasNext()) { buffer.put(scanner.next().getBytes("utf-8")); buffer.flip(); datagramChannel.send(buffer, new InetSocketAddress("localhost", 9999)); // 发送 buffer.clear(); } datagramChannel.close(); } }

    服务端创建套接字、设置非阻塞、绑定端口号、注册选择器,然后轮询选择器,根据事件进行处理即可

    public class NonBlockingUPDReceiver { public static void main(String[] args) throws IOException { DatagramChannel datagramChannel = DatagramChannel.open(); datagramChannel.configureBlocking(false); datagramChannel.bind(new InetSocketAddress(9999)); Selector selector = Selector.open(); datagramChannel.register(selector, SelectionKey.OP_READ); while (selector.select() > 0) { Set<SelectionKey> selectionKeys = selector.selectedKeys(); for (SelectionKey selectionKey : selectionKeys) { if (selectionKey.isReadable()) { DatagramChannel clientChannel = (DatagramChannel) selectionKey.channel(); ByteBuffer buffer = ByteBuffer.allocate(1024); clientChannel.receive(buffer); buffer.flip(); System.out.println(new String(buffer.array(), 0, buffer.limit(), "utf-8")); buffer.clear(); } selectionKeys.remove(selectionKey); } } } }

    管道

    管道是两个线程间的单向数据连接

    管道的sink部分负责输出数据,source部分负责读取数据。在读写数据前,都要对用到的缓存区进行flip()

    public class PipeTest { public static void main(String[] args) throws IOException { Pipe pipe = Pipe.open(); // 打开管道 Pipe.SinkChannel sinkChannel = pipe.sink(); // 获取输出 ByteBuffer buffer = ByteBuffer.allocate(1024); buffer.put("test".getBytes()); buffer.flip(); sinkChannel.write(buffer); // 向输出写数据 Pipe.SourceChannel sourceChannel = pipe.source(); // 获取源 buffer.flip(); int len = sourceChannel.read(buffer); // 从源头读数据 System.out.println(new String(buffer.array(), 0, len, "utf-8")); } }

    结语

    NIO笔记至此结束

    Processed: 0.012, SQL: 9