目录
1. Buffer
1.1 ByteBufferAsCharBufferB 和 ByteBufferAsCharBufferL
1.2 DirectCharBufferU 和 DirectCharBufferS
1.3 HeapCharBuffer 和 HeapCharBufferR
2. Channel
2.1 NetworkChannel 接口
2.1.1 SocketChannel 类
2.1.2 ServerSocketChannel 类
3. Selector 多路选择器
3.1 AbstractSelector 抽象类
4. InetSocketAddress 类
5. SelectionKey 类
6. 简单的例子
6.1 客户端
6.2 服务端
NIO的三个核心:Buffer、Channel 和 Selector。
channel 与套接字相连接,接收套接字的数据和写数据进套接字都要经过 channel,而 channel 又注册在 selector 中,因此,selector 有能力获取到所有 channel 中准备好读写的那些 channel,以便我们进行读写;不管是读还是写,数据都需要被存储在某个地方,这个地方就是 buffer。可以看出,一个线程就完成了上述的事情,因此IO多路复用的效率很高。
下图是缓冲区系列的框架图,一共封装了7种基本数据类型的缓冲区(除了 boolean 类型的),但是都是抽象类,不能直接使用。
Buffer 类是所有缓冲区的根类,其中定义了缓冲区的基本属性和框架。提一下 address 这个变量,表示一个地址,可以是内核态的缓冲区地址,也可以是套接字缓冲区地址(其实也是内核态)。
public abstract class Buffer { static final int SPLITERATOR_CHARACTERISTICS = Spliterator.SIZED | Spliterator.SUBSIZED | Spliterator.ORDERED; // 标记索引 private int mark = -1; // 下一个将要读取的位置 private int position = 0; // 限制索引 private int limit; // 缓冲区空间大小 private int capacity; long address; // 构造方法 Buffer(int mark, int pos, int lim, int cap) { if (cap < 0) throw new IllegalArgumentException("Negative capacity: " + cap); this.capacity = cap; limit(lim); position(pos); if (mark >= 0) { if (mark > pos) throw new IllegalArgumentException("mark > position: (" + mark + " > " + pos + ")"); this.mark = mark; } } public final int capacity() { return capacity; } public final int position() { return position; } public final int limit() { return limit; } final int markValue() { return mark; } // 设置新position public final Buffer position(int newPosition) { if ((newPosition > limit) || (newPosition < 0)) throw new IllegalArgumentException(); position = newPosition; if (mark > position) mark = -1; return this; } // 设置新limit public final Buffer limit(int newLimit) { if ((newLimit > capacity) || (newLimit < 0)) throw new IllegalArgumentException(); limit = newLimit; if (position > limit) position = limit; if (mark > limit) mark = -1; return this; } // 将当前的 position 做标记 public final Buffer mark() { mark = position; return this; } // 让 position 重回 mark 处 public final Buffer reset() { int m = mark; if (m < 0) throw new InvalidMarkException(); position = m; return this; } // 清空 buffer,但是空间还在 public final Buffer clear() { position = 0; limit = capacity; mark = -1; return this; } // 读写模式的切换,即从读模式转为写模式,或者从写模式转为读模式 public final Buffer flip() { limit = position; position = 0; mark = -1; return this; } // 从头开始,标记失效 public final Buffer rewind() { position = 0; mark = -1; return this; } // 还剩多少可读或者还剩多少空间可写入 public final int remaining() { return limit - position; } // 是否还有剩余 public final boolean hasRemaining() { return position < limit; } public abstract boolean isReadOnly(); public abstract boolean hasArray(); public abstract Object array(); public abstract int arrayOffset(); public abstract boolean isDirect(); // 往后跳一个位置 final int nextGetIndex() { if (position >= limit) throw new BufferUnderflowException(); return position++; } // 和上面一个样,不过意义不同而已 final int nextPutIndex() { if (position >= limit) throw new BufferOverflowException(); return position++; } // 往后跳nb个位置 final int nextGetIndex(int nb) { if (limit - position < nb) throw new BufferUnderflowException(); int p = position; position += nb; return p; } final int nextPutIndex(int nb) { if (limit - position < nb) throw new BufferOverflowException(); int p = position; position += nb; return p; } // 将缓冲区设置无效 final void truncate() { mark = -1; position = 0; limit = 0; capacity = 0; } // 是标记失效 final void discardMark() { mark = -1; } }对于每一类 buffer,都有三大类子系列。拿 CharBuffer 类举例说明:
ByteBufferAsCharBufferB 类是将 ByteBuffer 进行一层封装,提供的方法都是字符层面的,使用大端序列。
ByteBufferAsCharBufferL 类是将 ByteBuffer 进行一层封装,提供的方法都是字符层面的,使用小端序列。
ByteBufferAsCharBufferRB 继承 ByteBufferAsCharBufferB 类,是 ByteBufferAsCharBufferB 类的可读版本。
ByteBufferAsCharBufferRL 继承 ByteBufferAsCharBufferL 类,是 ByteBufferAsCharBufferL 类的可读版本。
DirectBuffer 系列是内核态的缓冲区,而不是用户态的缓冲区,JVM所管理的内存都是属于用户态,如果需要进行IO的话,比如将数据存入磁盘,数据需要经历用户态、内核态、磁盘,如果我们使用 DirectBuffer 系列的话,就直接将数据写入内核态,于是数据只需要经历内核态和磁盘,这种技术就叫零拷贝技术。
DirectCharBufferU 类是内核态字符缓冲区,U表示与平台字节序列保持一致。
DirectCharBufferS 类是内核态字符缓冲区,U表示与平台字节序列不一致。
DirectCharBufferRU 类继承 DirectCharBufferU,是 DirectCharBufferU 类的可读版本。但是目前JDK8中没有实现,因此需要自己去实现。
DirectCharBufferRS 类继承 DirectCharBufferS,是 DirectCharBufferS 类的可读版本。但是目前JDK8中没有实现,因此需要自己去实现。
HeapBuffer 系列是我们平常用的缓冲区,是JVM堆中的缓冲区。HeapCharBufferR 继承 HeapCharBuffer 类,是它的可读版本。
下图是 Channel 系列的框架图,根据功能,通道被分为5大类。
Channel 接口是定义通道基本规范的,就2个方法,如下。 Channel 通道就相当于鼻子一样,nio 下面的输入输出都需要靠通道来进行,读取数据(吸气),需要通过嘴巴,写出数据(呼气),也是需要通过嘴巴。
public interface Channel extends Closeable { // 查看通道是否开启 public boolean isOpen(); // 关闭通道 public void close() throws IOException; }这个是用于网络系列的,专用于 Socket。
然后看看 NetworkChannel 接口的定义。
public interface NetworkChannel extends Channel { // 将此通道与套接字进行绑定 NetworkChannel bind(SocketAddress local) throws IOException; // 获得套接字的信息 SocketAddress getLocalAddress() throws IOException; // 设置套接字的属性 <T> NetworkChannel setOption(SocketOption<T> name, T value) throws IOException; // 获得套接字的属性 <T> T getOption(SocketOption<T> name) throws IOException; // 获得此通道绑定的所有套接字的属性集 Set<SocketOption<?>> supportedOptions(); }选择器是实现IO多路复用的关键,selector 具体是如何实现的,不同的操作系统还不一样,一共有四种方式:select、poll、epoll、kqueue,因此,SelectorProvider 抽象类的作用(当然具体的实现还是要靠它的子实现类)就是创建平台默认使用的 selector,创建 socketChannel。
public abstract class SocketChannel extends AbstractSelectableChannel implements ByteChannel, ScatteringByteChannel, GatheringByteChannel, NetworkChannel { // 构造方法,参数是一个选择器提供者 protected SocketChannel(SelectorProvider provider) { super(provider); } // 创建一个 socketChannel public static SocketChannel open() throws IOException { return SelectorProvider.provider().openSocketChannel(); } // 创建一个 socketChannel,并将它与指定的远程套接字连接 public static SocketChannel open(SocketAddress remote) throws IOException { SocketChannel sc = open(); try { sc.connect(remote); } catch (Throwable x) { try { sc.close(); } catch (Throwable suppressed) { x.addSuppressed(suppressed); } throw x; } assert sc.isConnected(); return sc; } // 返回此 channel 支持的操作集,每个操作是一个数字,因此多个数字 | 操作得到的值就可以看出有 // 哪些操作 public final int validOps() { return (SelectionKey.OP_READ | SelectionKey.OP_WRITE | SelectionKey.OP_CONNECT); } // 将通道与本地主机上的套接字绑定 @Override public abstract SocketChannel bind(SocketAddress local) throws IOException; // 设置通道的属性 @Override public abstract <T> SocketChannel setOption(SocketOption<T> name, T value) throws IOException; // 关闭通道的输入功能,无法从通道读取数据了 public abstract SocketChannel shutdownInput() throws IOException; // 关闭通道的输出功能,无法往通道里写入数据了 public abstract SocketChannel shutdownOutput() throws IOException; // 返回与此通道连接的套接字 public abstract Socket socket(); // 判断此通道是否还与套接字处于连接状态 public abstract boolean isConnected(); // 判断此通道是否正在连接一个套接字 public abstract boolean isConnectionPending(); // 连接一个远程的套接字 public abstract boolean connect(SocketAddress remote) throws IOException; // 断开与套接字的连接 public abstract boolean finishConnect() throws IOException; // 返回远程套接字 public abstract SocketAddress getRemoteAddress() throws IOException; public abstract int read(ByteBuffer dst) throws IOException; public abstract long read(ByteBuffer[] dsts, int offset, int length) throws IOException; public final long read(ByteBuffer[] dsts) throws IOException { return read(dsts, 0, dsts.length); } public abstract int write(ByteBuffer src) throws IOException; // 向通道写入srcs的某段数据 public abstract long write(ByteBuffer[] srcs, int offset, int length) throws IOException; // 向通道写入数据 public final long write(ByteBuffer[] srcs) throws IOException { return write(srcs, 0, srcs.length); } // 返回此 channel 绑定的套接字 @Override public abstract SocketAddress getLocalAddress() throws IOException; }这是监听套接字的通道,有几个方法是和上面一样的,剩下的方法如下。
public abstract class ServerSocketChannel extends AbstractSelectableChannel implements NetworkChannel { protected ServerSocketChannel(SelectorProvider provider) { super(provider); } public static ServerSocketChannel open() throws IOException { return SelectorProvider.provider().openServerSocketChannel(); } public final int validOps() { return SelectionKey.OP_ACCEPT; } public final ServerSocketChannel bind(SocketAddress local) throws IOException { return bind(local, 0); } // 绑定本机的套接字,backlog 表示接受的最大连接 public abstract ServerSocketChannel bind(SocketAddress local, int backlog) throws IOException; // 设置属性 public abstract <T> ServerSocketChannel setOption(SocketOption<T> name, T value) throws IOException; // 返回与此channel连接的socket public abstract ServerSocket socket(); // 接收一个连接,并创建一个新的channel与该连接绑定,然后返回该channel public abstract SocketChannel accept() throws IOException; // 返回套接字的信息 @Override public abstract SocketAddress getLocalAddress() throws IOException; }选择器的实现就比较单一,如下图。
先看一下Selector抽象类中定义的基本规范。
public abstract class Selector implements Closeable { protected Selector() { } // 创建一个selector public static Selector open() throws IOException { return SelectorProvider.provider().openSelector(); } // 判断selector是否开启 public abstract boolean isOpen(); // 返回创建这个selector 的 selectorProvider public abstract SelectorProvider provider(); // 返回selector管理的所有通道的标识信息 public abstract Set<SelectionKey> keys(); // 返回当前可读写的通道的标识信息 public abstract Set<SelectionKey> selectedKeys(); // 返回当前可读写的通道数量 public abstract int selectNow() throws IOException; // 返回当前时刻经过timeout时间过程中,可读写的通道数量 public abstract int select(long timeout) throws IOException; // 返回当前可读写的通道数量,如果没有可读写的通道,则阻塞 public abstract int select() throws IOException; // 唤醒处于阻塞状态中的select操作 public abstract Selector wakeup(); // 关闭这个selector public abstract void close() throws IOException; }接下来看看这个AbstractSelector 都实现了什么,扩展了哪些,下面给了一个简单的说明。
public abstract class AbstractSelector extends Selector { // 一个代表此selector是否处于开启的线程安全boolean变量 private AtomicBoolean selectorOpen = new AtomicBoolean(true); // 创建这个selector 的 selectorProvider private final SelectorProvider provider; protected AbstractSelector(SelectorProvider provider) { this.provider = provider; } // 用于存储要取消的通道 private final Set<SelectionKey> cancelledKeys = new HashSet<SelectionKey>(); void cancel(SelectionKey k) { synchronized (cancelledKeys) { cancelledKeys.add(k); } } // 关闭此selector public final void close() throws IOException { boolean open = selectorOpen.getAndSet(false); if (!open) return; implCloseSelector(); } protected abstract void implCloseSelector() throws IOException; // 判断selector是否开启 public final boolean isOpen() { return selectorOpen.get(); } public final SelectorProvider provider() { return provider; } protected final Set<SelectionKey> cancelledKeys() { return cancelledKeys; } // 将通道注册进selector protected abstract SelectionKey register(AbstractSelectableChannel ch, int ops, Object att); // 将通道从selector中注销 protected final void deregister(AbstractSelectionKey key) { ((AbstractSelectableChannel)key.channel()).removeKey(key); } private Interruptible interruptor = null; protected final void begin() { if (interruptor == null) { interruptor = new Interruptible() { public void interrupt(Thread ignore) { AbstractSelector.this.wakeup(); }}; } AbstractInterruptibleChannel.blockedOn(interruptor); Thread me = Thread.currentThread(); if (me.isInterrupted()) interruptor.interrupt(me); } protected final void end() { AbstractInterruptibleChannel.blockedOn(null); } }SocketAddress 是一个抽象类,内容为空,InetSocketAddress 是 SocketAddress 类的实现类,用于封装一个套接字的相关信息。
总共三个要点:主机名,地址,端口号。其中当指定主机名时,会对这个主机名进行DNS解析,判断这个主机名是否能够被解析。
public class InetSocketAddress extends SocketAddress { private static class InetSocketAddressHolder { // 套接字的名字 private String hostname; // 套接字IP private InetAddress addr; // 套接字端口号 private int port; private InetSocketAddressHolder(String hostname, InetAddress addr, int port) { this.hostname = hostname; this.addr = addr; this.port = port; } private int getPort() { return port; } private InetAddress getAddress() { return addr; } private String getHostName() { if (hostname != null) return hostname; if (addr != null) return addr.getHostName(); return null; } private String getHostString() { if (hostname != null) return hostname; if (addr != null) { if (addr.holder().getHostName() != null) return addr.holder().getHostName(); else return addr.getHostAddress(); } return null; } private boolean isUnresolved() { return addr == null; } @Override public String toString() { if (isUnresolved()) { return hostname + ":" + port; } else { return addr.toString() + ":" + port; } } @Override public final boolean equals(Object obj) { if (obj == null || !(obj instanceof InetSocketAddressHolder)) return false; InetSocketAddressHolder that = (InetSocketAddressHolder)obj; boolean sameIP; if (addr != null) sameIP = addr.equals(that.addr); else if (hostname != null) sameIP = (that.addr == null) && hostname.equalsIgnoreCase(that.hostname); else sameIP = (that.addr == null) && (that.hostname == null); return sameIP && (port == that.port); } @Override public final int hashCode() { if (addr != null) return addr.hashCode() + port; if (hostname != null) return hostname.toLowerCase().hashCode() + port; return port; } } private final transient InetSocketAddressHolder holder; private static final long serialVersionUID = 5076001401234631237L; private static int checkPort(int port) { if (port < 0 || port > 0xFFFF) throw new IllegalArgumentException("port out of range:" + port); return port; } private static String checkHost(String hostname) { if (hostname == null) throw new IllegalArgumentException("hostname can't be null"); return hostname; } // 创建一个套接字地址信息,指定端口号,至于IP的话,就看主机上有几个网卡了,任意选择一个IP public InetSocketAddress(int port) { this(InetAddress.anyLocalAddress(), port); } // 创建一个套接字地址信息,指定IP和端口号 public InetSocketAddress(InetAddress addr, int port) { holder = new InetSocketAddressHolder( null, addr == null ? InetAddress.anyLocalAddress() : addr, checkPort(port)); } // 给定主机名和端口号,创建一个套接字地址 public InetSocketAddress(String hostname, int port) { checkHost(hostname); InetAddress addr = null; String host = null; try { addr = InetAddress.getByName(hostname); } catch(UnknownHostException e) { host = hostname; } holder = new InetSocketAddressHolder(host, addr, checkPort(port)); } private InetSocketAddress(int port, String hostname) { holder = new InetSocketAddressHolder(hostname, null, port); } // 给定主机名和端口号,创建一个未被解析的套接字地址 public static InetSocketAddress createUnresolved(String host, int port) { return new InetSocketAddress(checkPort(port), checkHost(host)); } /** * @serialField hostname String * @serialField addr InetAddress * @serialField port int */ private static final ObjectStreamField[] serialPersistentFields = { new ObjectStreamField("hostname", String.class), new ObjectStreamField("addr", InetAddress.class), new ObjectStreamField("port", int.class)}; private void writeObject(ObjectOutputStream out) throws IOException { // Don't call defaultWriteObject() ObjectOutputStream.PutField pfields = out.putFields(); pfields.put("hostname", holder.hostname); pfields.put("addr", holder.addr); pfields.put("port", holder.port); out.writeFields(); } private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { // Don't call defaultReadObject() ObjectInputStream.GetField oisFields = in.readFields(); final String oisHostname = (String)oisFields.get("hostname", null); final InetAddress oisAddr = (InetAddress)oisFields.get("addr", null); final int oisPort = oisFields.get("port", -1); // Check that our invariants are satisfied checkPort(oisPort); if (oisHostname == null && oisAddr == null) throw new InvalidObjectException("hostname and addr " + "can't both be null"); InetSocketAddressHolder h = new InetSocketAddressHolder(oisHostname, oisAddr, oisPort); UNSAFE.putObject(this, FIELDS_OFFSET, h); } private void readObjectNoData() throws ObjectStreamException { throw new InvalidObjectException("Stream data required"); } private static final long FIELDS_OFFSET; private static final sun.misc.Unsafe UNSAFE; static { try { sun.misc.Unsafe unsafe = sun.misc.Unsafe.getUnsafe(); FIELDS_OFFSET = unsafe.objectFieldOffset( InetSocketAddress.class.getDeclaredField("holder")); UNSAFE = unsafe; } catch (ReflectiveOperationException e) { throw new Error(e); } } // 返回端口 public final int getPort() { return holder.getPort(); } // 返回地址 public final InetAddress getAddress() { return holder.getAddress(); } // 返回主机名 public final String getHostName() { return holder.getHostName(); } // 返回主机名或者地址的字符串形式 public final String getHostString() { return holder.getHostString(); } // 检查地址是否已被解析。 public final boolean isUnresolved() { return holder.isUnresolved(); } @Override public String toString() { return holder.toString(); } @Override public final boolean equals(Object obj) { if (obj == null || !(obj instanceof InetSocketAddress)) return false; return holder.equals(((InetSocketAddress) obj).holder); } @Override public final int hashCode() { return holder.hashCode(); } }selectionKey 类是个抽象类,类似于通道的一个描述符,通过这个描述符,我们可以了解通道的实时状态。
public abstract class SelectionKey { protected SelectionKey() { } // 返回此key对应的通道 public abstract SelectableChannel channel(); // 返回此key对应的selector public abstract Selector selector(); // 判断此key是否有效,如果此key返回了,或者它的通道返回了,selector返回了,那key就无效 public abstract boolean isValid(); // 请求取消此键的通道及其选择器的注册。在返回时,该键将无效,并将被添加到其选择器的取消键集。 // 在下一次选择操作中,该键将从选择器的所有键集中删除。 public abstract void cancel(); // 返回此key的兴趣集合 public abstract int interestOps(); // 设置兴趣集为指定值 public abstract SelectionKey interestOps(int ops); // public abstract int readyOps(); public static final int OP_READ = 1 << 0; public static final int OP_WRITE = 1 << 2; public static final int OP_CONNECT = 1 << 3; public static final int OP_ACCEPT = 1 << 4; // 测试此key对应的通道是否可读 public final boolean isReadable() { return (readyOps() & OP_READ) != 0; } // 测试此key对应的通道是否可写 public final boolean isWritable() { return (readyOps() & OP_WRITE) != 0; } // 测试此key对应的通道是否已经完成套接字的连接 public final boolean isConnectable() { return (readyOps() & OP_CONNECT) != 0; } // 判断此key对应的通道是否有accept事件 public final boolean isAcceptable() { return (readyOps() & OP_ACCEPT) != 0; } private volatile Object attachment = null; private static final AtomicReferenceFieldUpdater<SelectionKey,Object> attachmentUpdater = AtomicReferenceFieldUpdater.newUpdater( SelectionKey.class, Object.class, "attachment" ); public final Object attach(Object ob) { return attachmentUpdater.getAndSet(this, ob); } public final Object attachment() { return attachment; } }