jdk源码解析八之NIO

    技术2022-07-11  87

    文章目录

    BufferByteBufferMappedByteBufferDirectByteBuffer ChannelAbstractInterruptibleChannelFileChannelFileChannelImpltransfermapScatter&Gather直接传输,mmap,零拷贝复制在transferTo()应用 socket通道 其他ByteOrderCleanerBitsUnsafeFileLockFileLockImpl

    Buffer

    public abstract class Buffer { static final int SPLITERATOR_CHARACTERISTICS = Spliterator.SIZED | Spliterator.SUBSIZED | Spliterator.ORDERED; // Invariants: mark <= position <= limit <= capacity //标记位置,用于reset private int mark = -1; //当前读取位置 private int position = 0; //缓冲区缓存个数 private int limit; //缓冲区容量 private int capacity; //堆外内存地址 long address; Buffer(int mark, int pos, int lim, int cap) { // package-private if (cap < 0) throw new IllegalArgumentException("Negative capacity: " + cap); this.capacity = cap; limit(lim); position(pos); if (mark >= 0) { if (mark > pos) throw new IllegalArgumentException("mark > position: (" + mark + " > " + pos + ")"); this.mark = mark; } } public final int capacity() { return capacity; } /** * Returns this buffer's position. * * @return The position of this buffer */ public final int position() { return position; } public final Buffer position(int newPosition) { //移动的位置>上限,或者<0,则异常 if ((newPosition > limit) || (newPosition < 0)) throw new IllegalArgumentException(); position = newPosition; //标记位置>新位置,则重置mark标记 if (mark > position) mark = -1; return this; } /** * Returns this buffer's limit. * * @return The limit of this buffer */ public final int limit() { return limit; } public final Buffer limit(int newLimit) { //上限超过容量,或者<0异常 if ((newLimit > capacity) || (newLimit < 0)) throw new IllegalArgumentException(); limit = newLimit; //位置>limit,则位置为上限 if (position > limit) position = limit; //标记位置>limit,重置标记 if (mark > limit) mark = -1; return this; } /** * Sets this buffer's mark at its position. * * @return This buffer */ //标记,用于reset public final Buffer mark() { mark = position; return this; } //回滚标记点 public final Buffer reset() { int m = mark; if (m < 0) throw new InvalidMarkException(); position = m; return this; } //重置位置,但不影响缓冲区所存数据 public final Buffer clear() { position = 0; limit = capacity; mark = -1; return this; } //将缓冲区翻转,便于管道从position~limit之间读取 public final Buffer flip() { limit = position; position = 0; mark = -1; return this; } //将位置重置0 public final Buffer rewind() { position = 0; mark = -1; return this; } //返回剩余可读个数 public final int remaining() { return limit - position; } /** * Tells whether there are any elements between the current position and * the limit. * * @return <tt>true</tt> if, and only if, there is at least one element * remaining in this buffer */ //是否读取到上界 public final boolean hasRemaining() { return position < limit; } //是否只读 public abstract boolean isReadOnly(); //是否是数组 public abstract boolean hasArray(); //返回缓冲区数组 public abstract Object array(); public abstract int arrayOffset(); //是否是直接缓冲区 public abstract boolean isDirect(); //返回下一个读取位置 final int nextGetIndex() { // package-private if (position >= limit) throw new BufferUnderflowException(); return position++; } //返回读取nb数量之后的索引位置 final int nextGetIndex(int nb) { // package-private if (limit - position < nb) throw new BufferUnderflowException(); int p = position; position += nb; return p; } //下一个put索引 final int nextPutIndex() { // package-private if (position >= limit) throw new BufferOverflowException(); return position++; } //累加nb之后的put索引 final int nextPutIndex(int nb) { // package-private if (limit - position < nb) throw new BufferOverflowException(); int p = position; position += nb; return p; } //上限校验 final int checkIndex(int i) { // package-private if ((i < 0) || (i >= limit)) throw new IndexOutOfBoundsException(); return i; } //i+nb上限校验 final int checkIndex(int i, int nb) { // package-private if ((i < 0) || (nb > limit - i)) throw new IndexOutOfBoundsException(); return i; } //返回标记值 final int markValue() { // package-private return mark; } final void truncate() { // package-private mark = -1; position = 0; limit = 0; capacity = 0; } //重置mark final void discardMark() { // package-private mark = -1; } //校验off+len<size static void checkBounds(int off, int len, int size) { // package-private if ((off | len | (off + len) | (size - (off + len))) < 0) throw new IndexOutOfBoundsException(); } }

    ByteBuffer

    public abstract class ByteBuffer extends Buffer implements Comparable<ByteBuffer> { //缓冲数组 final byte[] hb; // Non-null only for heap buffers //偏移位置 final int offset; // boolean isReadOnly; // Valid only for heap buffers ByteBuffer(int mark, int pos, int lim, int cap, // package-private byte[] hb, int offset) { super(mark, pos, lim, cap); //记录缓冲数组 this.hb = hb; //记录偏移位置 this.offset = offset; } ByteBuffer(int mark, int pos, int lim, int cap) { // package-private this(mark, pos, lim, cap, null, 0); } //通过 allocateDirect() 方法分配直接缓冲区,将缓冲区建立在物理内存中。可以提高效率* public static ByteBuffer allocateDirect(int capacity) { return new DirectByteBuffer(capacity); } //分配基于堆指定容量的缓冲区 //对象用于写入,通道可能会在每次调用中隐含地进行下面的操作: // 1.创建一个临时的直接ByteBuffer对象。 // 2.将非直接缓冲区的内容复制到临时缓冲中。 // 3.使用临时缓冲区执行低层次I/O操作。 // 4.临时缓冲区对象离开作用域,并最终成为被回收的无用数据。 public static ByteBuffer allocate(int capacity) { if (capacity < 0) throw new IllegalArgumentException(); return new HeapByteBuffer(capacity, capacity); } //使用提供的数组存储缓冲区数据 public static ByteBuffer wrap(byte[] array, int offset, int length) { try { return new HeapByteBuffer(array, offset, length); } catch (IllegalArgumentException x) { throw new IndexOutOfBoundsException(); } } public static ByteBuffer wrap(byte[] array) { return wrap(array, 0, array.length); } //分割缓冲区,创建一个从原始缓冲区的当前位置开始到剩余元素数量容量的新缓冲区,也会继承只读和直接属性 public abstract ByteBuffer slice(); //创建与原始视图相似的新缓冲区 public abstract ByteBuffer duplicate(); //生成只读视图 public abstract ByteBuffer asReadOnlyBuffer(); public abstract byte get(); public abstract ByteBuffer put(byte b); public abstract byte get(int index); public abstract ByteBuffer put(int index, byte b); public ByteBuffer get(byte[] dst, int offset, int length) { checkBounds(offset, length, dst.length); if (length > remaining()) throw new BufferUnderflowException(); int end = offset + length; for (int i = offset; i < end; i++) dst[i] = get(); return this; } public ByteBuffer get(byte[] dst) { return get(dst, 0, dst.length); } public ByteBuffer put(ByteBuffer src) { if (src == this) throw new IllegalArgumentException(); if (isReadOnly()) throw new ReadOnlyBufferException(); int n = src.remaining(); if (n > remaining()) throw new BufferOverflowException(); for (int i = 0; i < n; i++) put(src.get()); return this; } public ByteBuffer put(byte[] src, int offset, int length) { checkBounds(offset, length, src.length); if (length > remaining()) throw new BufferOverflowException(); int end = offset + length; for (int i = offset; i < end; i++) this.put(src[i]); return this; } public final ByteBuffer put(byte[] src) { return put(src, 0, src.length); } //是否是一个数组同时可读可写 public final boolean hasArray() { return (hb != null) && !isReadOnly; } //返回缓冲区数组 public final byte[] array() { if (hb == null) throw new UnsupportedOperationException(); if (isReadOnly) throw new ReadOnlyBufferException(); return hb; } //返回数组偏移 public final int arrayOffset() { if (hb == null) throw new UnsupportedOperationException(); if (isReadOnly) throw new ReadOnlyBufferException(); return offset; } //丢弃已经读取的数据,保留未读取的数据 public abstract ByteBuffer compact(); //是否是直接缓冲区 public abstract boolean isDirect(); public String toString() { StringBuffer sb = new StringBuffer(); sb.append(getClass().getName()); sb.append("[pos="); sb.append(position()); sb.append(" lim="); sb.append(limit()); sb.append(" cap="); sb.append(capacity()); sb.append("]"); return sb.toString(); } public boolean equals(Object ob) { //判断对象是否一致 if (this == ob) return true; //是否继承同一抽象类 if (!(ob instanceof ByteBuffer)) return false; ByteBuffer that = (ByteBuffer)ob; //可读个数是否一致 if (this.remaining() != that.remaining()) return false; int p = this.position(); //比较pos~lim区间值是否一致,也就是说只要区间value一致就判断一样,pos和lim不一致也没问题 for (int i = this.limit() - 1, j = that.limit() - 1; i >= p; i--, j--) if (!equals(this.get(i), that.get(j))) return false; return true; } private static boolean equals(byte x, byte y) { return x == y; } public int compareTo(ByteBuffer that) { //比较pos-lim区间大小 int n = this.position() + Math.min(this.remaining(), that.remaining()); for (int i = this.position(), j = that.position(); i < n; i++, j++) { int cmp = compare(this.get(i), that.get(j)); if (cmp != 0) return cmp; } return this.remaining() - that.remaining(); } private static int compare(byte x, byte y) { return Byte.compare(x, y); } // -- Other char stuff -- //排序方式,默认大端字节存储 boolean bigEndian // package-private = true; //操作系统默认存储方式 boolean nativeByteOrder // package-private = (Bits.byteOrder() == ByteOrder.BIG_ENDIAN); public final ByteOrder order() { //默认大端字节存储 return bigEndian ? ByteOrder.BIG_ENDIAN : ByteOrder.LITTLE_ENDIAN; } public final ByteBuffer order(ByteOrder bo) { //是否设置的大端存储 bigEndian = (bo == ByteOrder.BIG_ENDIAN); //设置的排序方式 nativeByteOrder = (bigEndian == (Bits.byteOrder() == ByteOrder.BIG_ENDIAN)); return this; } //操作堆外内存获取指定下标数据 abstract byte _get(int i); //操作堆外内存存储指定下标数据 abstract void _put(int i, byte b); //封装2个字节返回 public abstract char getChar(); //将缓冲区映射为字符视图的缓冲区 public abstract CharBuffer asCharBuffer(); // ........ . }

    MappedByteBuffer

    public abstract class MappedByteBuffer extends ByteBuffer { private final FileDescriptor fd; // This should only be invoked by the DirectByteBuffer constructors // MappedByteBuffer(int mark, int pos, int lim, int cap, // package-private FileDescriptor fd) { super(mark, pos, lim, cap); //赋值文件描述符 this.fd = fd; } MappedByteBuffer(int mark, int pos, int lim, int cap) { // package-private super(mark, pos, lim, cap); this.fd = null; } private void checkMapped() { //文件描述符为空则说明没映射 if (fd == null) // Can only happen if a luser explicitly casts a direct byte buffer throw new UnsupportedOperationException(); } //返回缓冲区到页对齐距离 private long mappingOffset() { int ps = Bits.pageSize(); long offset = address % ps; return (offset >= 0) ? offset : (ps + offset); } private long mappingAddress(long mappingOffset) { return address - mappingOffset; } private long mappingLength(long mappingOffset) { return (long)capacity() + mappingOffset; } //被映射的文件是否完全常驻物理内存 public final boolean isLoaded() { //校验是否已经映射 checkMapped(); if ((address == 0) || (capacity() == 0)) return true; //分别算出对齐后的内存起始值 long offset = mappingOffset(); long length = mappingLength(offset); //根据页对齐之后的物理内存地址查询数据是否load return isLoaded0(mappingAddress(offset), length, Bits.pageCount(length)); } // not used, but a potential target for a store, see load() for details. private static byte unused; //加载整个文件以使它常驻内存 public final MappedByteBuffer load() { //校验是否有来源可以映射,也就是判断是否有某个文件可以映射物理内存 checkMapped(); if ((address == 0) || (capacity() == 0)) return this; long offset = mappingOffset(); long length = mappingLength(offset); //加载页对齐后的数据 load0(mappingAddress(offset), length); // Read a byte from each page to bring it into memory. A checksum // is computed as we go along to prevent the compiler from otherwise // considering the loop as dead code. Unsafe unsafe = Unsafe.getUnsafe(); //获取每页大小 int ps = Bits.pageSize(); //计算占用页面数 int count = Bits.pageCount(length); //获取对齐后的地址 long a = mappingAddress(offset); byte x = 0; //获取每页的第一个数据,然后与X进行^操作 //防止编译器将循环视为死代码??? for (int i=0; i<count; i++) { x ^= unsafe.getByte(a); a += ps; } if (unused != 0) unused = x; return this; } //强制将映射缓冲区上的更改应用到永久磁盘存储器上 //只在MapMode.READ_WRITE有效 public final MappedByteBuffer force() { checkMapped(); if ((address != 0) && (capacity() != 0)) { long offset = mappingOffset(); force0(fd, mappingAddress(offset), mappingLength(offset)); } return this; } private native boolean isLoaded0(long address, long length, int pageCount); private native void load0(long address, long length); private native void force0(FileDescriptor fd, long address, long length); }

    DirectByteBuffer

    class DirectByteBuffer extends MappedByteBuffer implements DirectBuffer { // Cached unsafe-access object protected static final Unsafe unsafe = Bits.unsafe(); // Cached array base offset //可以获取数组的在内容中的基本偏移量(arrayBaseOffset),获取数组内元素的间隔(比例) private static final long arrayBaseOffset = (long)unsafe.arrayBaseOffset(byte[].class); // Cached unaligned-access capability //是否对齐处理 protected static final boolean unaligned = Bits.unaligned(); //对缓冲区的引用 private final Object att; public Object attachment() { return att; } private static class Deallocator implements Runnable { private static Unsafe unsafe = Unsafe.getUnsafe(); private long address; private long size; private int capacity; private Deallocator(long address, long size, int capacity) { assert (address != 0); this.address = address; this.size = size; this.capacity = capacity; } public void run() { if (address == 0) { // Paranoia return; } //释放内存 unsafe.freeMemory(address); address = 0; //标记该内存已经释放 Bits.unreserveMemory(size, capacity); } } private final Cleaner cleaner; public Cleaner cleaner() { return cleaner; } // Primary constructor // DirectByteBuffer(int cap) { // package-private super(-1, 0, cap, cap); //内存是否按页分配对齐 boolean pa = VM.isDirectMemoryPageAligned(); //获取每页内存大小 int ps = Bits.pageSize(); //分配内存的大小,如果是按页对齐方式,需要再加一页内存的容量 long size = Math.max(1L, (long)cap + (pa ? ps : 0)); //用Bits类保存总分配内存(按页分配)的大小和实际内存的大小 Bits.reserveMemory(size, cap); long base = 0; try { //分配size大小字节内存,返回地址 base = unsafe.allocateMemory(size); } catch (OutOfMemoryError x) { //内存不够再减去 Bits.unreserveMemory(size, cap); throw x; } //初始化内存填充0 unsafe.setMemory(base, size, (byte) 0); //计算堆外内存的基地址 if (//如果页对齐 pa && //且取模不尽 (base % ps != 0)) { // Round up to page boundary //重新确定对齐之后的地址,确定为ps的倍数 address = base + ps - (base & (ps - 1)); } else { address = base; } //用来清除堆外内存的 cleaner = Cleaner.create(this, new Deallocator(base, size, cap)); att = null; } // Invoked to construct a direct ByteBuffer referring to the block of // memory. A given arbitrary object may also be attached to the buffer. // DirectByteBuffer(long addr, int cap, Object ob) { super(-1, 0, cap, cap); address = addr; cleaner = null; att = ob; } // Invoked only by JNI: NewDirectByteBuffer(void*, long) // private DirectByteBuffer(long addr, int cap) { super(-1, 0, cap, cap); address = addr; cleaner = null; att = null; } // For memory-mapped buffers -- invoked by FileChannelImpl via reflection // protected DirectByteBuffer(int cap, long addr, FileDescriptor fd, Runnable unmapper) { super(-1, 0, cap, cap, fd); address = addr; cleaner = Cleaner.create(this, unmapper); att = null; } // For duplicates and slices // DirectByteBuffer(DirectBuffer db, // package-private int mark, int pos, int lim, int cap, int off) { super(mark, pos, lim, cap); address = db.address() + off; cleaner = null; att = db; } //分割缓冲区,创建一个从原始缓冲区的当前位置开始到剩余元素数量容量的新缓冲区,也会继承只读和直接属性 public ByteBuffer slice() { //创建lim-pos容量的缓冲区,操作的还是同一片内存区域 int pos = this.position(); int lim = this.limit(); assert (pos <= lim); int rem = (pos <= lim ? lim - pos : 0); int off = (pos << 0); assert (off >= 0); return new DirectByteBuffer(this, -1, 0, rem, rem, off); } //创建与原始视图相似的新缓冲区 public ByteBuffer duplicate() { return new DirectByteBuffer(this, this.markValue(), this.position(), this.limit(), this.capacity(), 0); } //生成只读视图 public ByteBuffer asReadOnlyBuffer() { //put操作全部抛异常 return new DirectByteBufferR(this, this.markValue(), this.position(), this.limit(), this.capacity(), 0); } //返回堆外分配地址 public long address() { return address; } //计算内存存储地址 private long ix(int i) { return address + ((long)i << 0); } public byte get() { return ((unsafe.getByte(ix(nextGetIndex())))); } public byte get(int i) { return ((unsafe.getByte(ix(checkIndex(i))))); } public ByteBuffer get(byte[] dst, int offset, int length) { //当copy数据长度>6则走本地批量复制,否则一个个复制 if (((long)length << 0) > Bits.JNI_COPY_TO_ARRAY_THRESHOLD) { checkBounds(offset, length, dst.length); //获取位置和上限,计算可读个数 int pos = position(); int lim = limit(); assert (pos <= lim); int rem = (pos <= lim ? lim - pos : 0); if (length > rem) throw new BufferUnderflowException(); //copy Bits.copyToArray(ix(pos), dst, arrayBaseOffset, (long)offset << 0, (long)length << 0); //重新计算位置 position(pos + length); } else { super.get(dst, offset, length); } return this; } public ByteBuffer put(byte x) { unsafe.putByte(ix(nextPutIndex()), ((x))); return this; } public ByteBuffer put(int i, byte x) { unsafe.putByte(ix(checkIndex(i)), ((x))); return this; } public ByteBuffer put(ByteBuffer src) { //对直接内存处理 if (src instanceof DirectByteBuffer) { if (src == this) throw new IllegalArgumentException(); DirectByteBuffer sb = (DirectByteBuffer)src; //获取写入缓冲区的位置和下标,从而计算可读次数 int spos = sb.position(); int slim = sb.limit(); assert (spos <= slim); int srem = (spos <= slim ? slim - spos : 0); int pos = position(); int lim = limit(); assert (pos <= lim); int rem = (pos <= lim ? lim - pos : 0); if (srem > rem) throw new BufferOverflowException(); //从pos开始复制数据到当前缓冲区直接内存中 unsafe.copyMemory(sb.ix(spos), ix(pos), (long)srem << 0); //重新计算读取开始索引 sb.position(spos + srem); position(pos + srem); } else if (src.hb != null) { //针对缓冲区数组的处理 int spos = src.position(); int slim = src.limit(); assert (spos <= slim); int srem = (spos <= slim ? slim - spos : 0); put(src.hb, src.offset + spos, srem); src.position(spos + srem); } else { //调用父类方法一个个的读取,写入 super.put(src); } return this; } public ByteBuffer put(byte[] src, int offset, int length) { //当写入数组长度>6,则直接操作unsafe if (((long)length << 0) > Bits.JNI_COPY_FROM_ARRAY_THRESHOLD) { checkBounds(offset, length, src.length); int pos = position(); int lim = limit(); assert (pos <= lim); int rem = (pos <= lim ? lim - pos : 0); if (length > rem) throw new BufferOverflowException(); Bits.copyFromArray(src, arrayBaseOffset, (long)offset << 0, ix(pos), (long)length << 0); position(pos + length); } else { //一个个写入 super.put(src, offset, length); } return this; } //丢弃已经读取的数据,保留未读取的数据 public ByteBuffer compact() { int pos = position(); int lim = limit(); assert (pos <= lim); int rem = (pos <= lim ? lim - pos : 0); //将pos~limt数据从索引0挪动,然后pos索引重置为limit-pos,limit为数组容量 unsafe.copyMemory(ix(pos), ix(0), (long)rem << 0); //设置位置和上限 position(rem); limit(capacity()); //标记 discardMark(); return this; } //是直接缓冲区 public boolean isDirect() { return true; } //可读可写 public boolean isReadOnly() { return false; } //操作堆外内存,获取指定索引数据 byte _get(int i) { // package-private return unsafe.getByte(address + i); } void _put(int i, byte b) { // package-private unsafe.putByte(address + i, b); } private char getChar(long a) { //针对没对齐的处理 if (unaligned) { char x = unsafe.getChar(a); //针对大端和小端排序处理 return (nativeByteOrder ? x : Bits.swap(x)); } //字节对齐处理 return Bits.getChar(a, bigEndian); } public char getChar() { return getChar(ix(nextGetIndex((1 << 1)))); } //指定位置读取2字节数据,封装成char返回 public char getChar(int i) { return getChar(ix(checkIndex(i, (1 << 1)))); } private ByteBuffer putChar(long a, char x) { //字节没对齐处理 if (unaligned) { char y = (x); unsafe.putChar(a, (nativeByteOrder ? y : Bits.swap(y))); } else { //字节对齐处理 Bits.putChar(a, x, bigEndian); } return this; } public ByteBuffer putChar(char x) { putChar(ix(nextPutIndex((1 << 1))), x); return this; } public ByteBuffer putChar(int i, char x) { putChar(ix(checkIndex(i, (1 << 1))), x); return this; } //将缓冲区映射为字符视图的缓冲区 public CharBuffer asCharBuffer() { //获取位置和上限,从而计算可读容量 int off = this.position(); int lim = this.limit(); assert (off <= lim); int rem = (off <= lim ? lim - off : 0); //因为byte转成char所以容量/2 int size = rem >> 1; if ( //对齐处理 !unaligned //同时没对齐 && ((address + off) % (1 << 1) != 0)) { //因为需要对齐处理,所以这里底层操作的还是byteBuffer //针对大小端排序创建不同缓冲区对象 return (bigEndian ? (CharBuffer)(new ByteBufferAsCharBufferB(this, -1, 0, size, size, off)) : (CharBuffer)(new ByteBufferAsCharBufferL(this, -1, 0, size, size, off))); } else { //因为不需要对齐处理,所以直接操作堆外内存地址 //当需要对齐处理,但是%2除尽了,又因为char存储的绝对是2个字节数据,所以也就不需要再次进行对齐处理了 return (nativeByteOrder ? (CharBuffer)(new DirectCharBufferU(this, -1, 0, size, size, off)) : (CharBuffer)(new DirectCharBufferS(this, -1, 0, size, size, off))); } } }

    Channel

    public interface Channel extends Closeable { //是否打开 public boolean isOpen(); //是否关闭 public void close() throws IOException; }

    AbstractInterruptibleChannel

    public abstract class AbstractInterruptibleChannel implements Channel, InterruptibleChannel { private final Object closeLock = new Object(); //通道状态,默认开启 private volatile boolean open = true; protected AbstractInterruptibleChannel() { } public final void close() throws IOException { synchronized (closeLock) { if (!open) return; //标记关闭 open = false; //调用子类实现 implCloseChannel(); } } //子类实现关闭通道扩展方法 protected abstract void implCloseChannel() throws IOException; //默认打开 public final boolean isOpen() { return open; } // -- Interruption machinery -- //中断线程方法 private Interruptible interruptor; //记录中断的线程 private volatile Thread interrupted; protected final void begin() { if (interruptor == null) { interruptor = new Interruptible() { public void interrupt(Thread target) { synchronized (closeLock) { //文件关闭,则直接返回 if (!open) return; //标记关闭 open = false; //中断线程 interrupted = target; try { //子类实现,关闭通道 AbstractInterruptibleChannel.this.implCloseChannel(); } catch (IOException x) { } } }}; } //将中断方法加入当前线程,然后中断的时候,执行 interruptor.interrupt() blockedOn(interruptor); //获取当前线程 Thread me = Thread.currentThread(); //如果当前线程已中断,则执行中断方法 if (me.isInterrupted()) interruptor.interrupt(me); } protected final void end(boolean completed) throws AsynchronousCloseException { //清空当前线程中断方法 blockedOn(null); Thread interrupted = this.interrupted; if ( //说明执行过中断方法 interrupted != null //如果是当前线程执行的中断方法,则抛出异常. && interrupted == Thread.currentThread()) { //清空记录当前线程,然后抛出异常 interrupted = null; throw new ClosedByInterruptException(); } //IO操作没完成同时已经关闭,则异步关闭异常 if (!completed && !open) throw new AsynchronousCloseException(); } // -- sun.misc.SharedSecrets -- static void blockedOn(Interruptible intr) { // package-private //仅仅是将中断方法加入当前线程 sun.misc.SharedSecrets.getJavaLangAccess().blockedOn(Thread.currentThread(), intr); } }

    当调用blockedOn的时候,会调用System类 然后将中断方法设置当前线程 当中断的时候,会判断是否实现了中断方法,如果实现了则调用 接着会走到

    FileChannel

    public abstract class FileChannel extends AbstractInterruptibleChannel implements SeekableByteChannel, GatheringByteChannel, ScatteringByteChannel { protected FileChannel() { } public static FileChannel open(Path path, Set<? extends OpenOption> options, FileAttribute<?>... attrs) throws IOException { FileSystemProvider provider = path.getFileSystem().provider(); return provider.newFileChannel(path, options, attrs); } @SuppressWarnings({"unchecked", "rawtypes"}) // generic array construction private static final FileAttribute<?>[] NO_ATTRIBUTES = new FileAttribute[0]; public static FileChannel open(Path path, OpenOption... options) throws IOException { Set<OpenOption> set = new HashSet<OpenOption>(options.length); Collections.addAll(set, options); return open(path, set, NO_ATTRIBUTES); } public abstract int read(ByteBuffer dst) throws IOException; //读到多个缓冲区,支持Scatter,从数组第offset~length public abstract long read(ByteBuffer[] dsts, int offset, int length) throws IOException; public final long read(ByteBuffer[] dsts) throws IOException { return read(dsts, 0, dsts.length); } public abstract int write(ByteBuffer src) throws IOException; //写到多个缓冲区,支持Gather public abstract long write(ByteBuffer[] srcs, int offset, int length) throws IOException; public final long write(ByteBuffer[] srcs) throws IOException { return write(srcs, 0, srcs.length); } // 表示文件中的当前字节位置 public abstract long position() throws IOException; //将通道的position设置为指定值 public abstract FileChannel position(long newPosition) throws IOException; public abstract long size() throws IOException; //砍掉指定size之外数据 public abstract FileChannel truncate(long size) throws IOException; //强制写到磁盘 //metaData是否更新元数据,如文件所有者,访问权限,lastModifyTime等 public abstract void force(boolean metaData) throws IOException; // this -> target public abstract long transferTo(long position, long count, WritableByteChannel target) throws IOException; //将一个通道交叉到另一个通道,从指定位置开始 //src - 》 this public abstract long transferFrom(ReadableByteChannel src, long position, long count) throws IOException; //读取dst,从pos位置开始 public abstract int read(ByteBuffer dst, long position) throws IOException; //写入src,从pos开始 public abstract int write(ByteBuffer src, long position) throws IOException; //映射模式 public static class MapMode { /** * Mode for a read-only mapping. */ //只读 public static final MapMode READ_ONLY = new MapMode("READ_ONLY"); /** * Mode for a read/write mapping. */ //可读写 public static final MapMode READ_WRITE = new MapMode("READ_WRITE"); /** * Mode for a private (copy-on-write) mapping. */ //写时拷贝 // 通过put( )方法所做的任何修改都会导致产生一个私有的数据拷贝并且该拷贝中的数据只有MappedByteBuffer实例可以看到。 // 该过程不会对底层文件做任何修改,而且一旦缓冲区被施以垃圾收集动作(garbage collected),那些修改都会丢失 public static final MapMode PRIVATE = new MapMode("PRIVATE"); private final String name; private MapMode(String name) { this.name = name; } public String toString() { return name; } } //在一个打开的文件和一个特殊类型的ByteBuffer之间建立一个虚拟内存映射 //映射position~size范围字节数据 //MapMode:映射模式,可读/可写/写时拷贝 public abstract MappedByteBuffer map(MapMode mode, long position, long size) throws IOException; //指定文件内部锁定区域的开始position以及锁定区域的size,以及是否共享锁 public abstract FileLock lock(long position, long size, boolean shared) throws IOException; //锁最终是由操作系统或文件系统来判优的并且几乎总是在进程级而非线程级上判优。 // 锁都是与一个文件关联的,而不是与单个的文件句柄或通道关联。 //也就是在jvm中,可以多线程访问,但是垮jvm,因为涉及到进程,而进程是由操作系统处理的,将阻塞获取锁 public final FileLock lock() throws IOException { return lock(0L, Long.MAX_VALUE, false); } //尝试获取锁,获取不到,直接返回 public abstract FileLock tryLock(long position, long size, boolean shared) throws IOException; public final FileLock tryLock() throws IOException { return tryLock(0L, Long.MAX_VALUE, false); } }

    FileChannelImpl

    transfer

    将一个通道交叉到另一个通道

    map

    映射到直接内存

    Scatter&Gather

    读取的数据存储多个存储桶,或者不同数据块合并为一个整体

    public class FileChannelImpl extends FileChannel { // Memory allocation size for mapping buffers //映射缓冲区的内存分配大小 private static final long allocationGranularity; // Used to make native read and write calls //用来本地读写回调 private final FileDispatcher nd; // File descriptor //文件描述器 private final FileDescriptor fd; // File access mode (immutable) //标记可读/可写 private final boolean writable; private final boolean readable; private final boolean append; // Required to prevent finalization of creating stream (immutable) //记录调用open的对象 private final Object parent; // The path of the referenced file // (null if the parent stream is created with a file descriptor) //文件路径 private final String path; // Thread-safe set of IDs of native threads, for signalling private final NativeThreadSet threads = new NativeThreadSet(2); // Lock for operations involving position and size //锁 private final Object positionLock = new Object(); private FileChannelImpl(FileDescriptor fd, String path, boolean readable, boolean writable, boolean append, Object parent) { //文件描述符 this.fd = fd; //可读 this.readable = readable; //是否可写 this.writable = writable; this.append = append; //传递的父对象,FileInputStream/RandomAccessFile this.parent = parent; //文件路径 this.path = path; this.nd = new FileDispatcherImpl(append); } // Used by FileInputStream.getChannel() and RandomAccessFile.getChannel() //打开一个文件管道 public static FileChannel open(FileDescriptor fd, String path, boolean readable, boolean writable, Object parent) { return new FileChannelImpl(fd, path, readable, writable, false, parent); } // Used by FileOutputStream.getChannel public static FileChannel open(FileDescriptor fd, String path, boolean readable, boolean writable, boolean append, Object parent) { return new FileChannelImpl(fd, path, readable, writable, append, parent); } //是否打开 private void ensureOpen() throws IOException { if (!isOpen()) throw new ClosedChannelException(); } // -- Standard channel operations -- //父类关闭时调用 protected void implCloseChannel() throws IOException { // Release and invalidate any locks that we still hold if (fileLockTable != null) { //释放所有文件锁 for (FileLock fl: fileLockTable.removeAll()) { synchronized (fl) { if (fl.isValid()) { nd.release(fd, fl.position(), fl.size()); ((FileLockImpl)fl).invalidate(); } } } } // signal any threads blocked on this channel //通知当前通道所有被阻塞的线程 threads.signalAndWait(); //关闭调用open的对象 if (parent != null) { // Close the fd via the parent stream's close method. The parent // will reinvoke our close method, which is defined in the // superclass AbstractInterruptibleChannel, but the isOpen logic in // that method will prevent this method from being reinvoked. // ((java.io.Closeable)parent).close(); } else { //关闭文件描述器 nd.close(fd); } } public int read(ByteBuffer dst) throws IOException { //是否打开 ensureOpen(); //是否可读 if (!readable) throw new NonReadableChannelException(); synchronized (positionLock) { int n = 0; int ti = -1; try { begin(); ti = threads.add(); //再次判断是否打开 if (!isOpen()) return 0; do { n = IOUtil.read(fd, dst, -1, nd); } while ( //IO没中断 (n == IOStatus.INTERRUPTED) && //打开状态 isOpen()); return IOStatus.normalize(n); } finally { threads.remove(ti); end(n > 0); assert IOStatus.check(n); } } } public long read(ByteBuffer[] dsts, int offset, int length) throws IOException { //上下界校验 if ((offset < 0) || (length < 0) || (offset > dsts.length - length)) throw new IndexOutOfBoundsException(); //通道打开校验 ensureOpen(); //是否可读校验 if (!readable) throw new NonReadableChannelException(); synchronized (positionLock) { long n = 0; int ti = -1; try { begin(); //将当前线程加入到线程集合中,当Channel关闭时,可以发送信号给线程,避免线程被I/O阻塞住 ti = threads.add(); if (!isOpen()) return 0; do { //fd经过nd数据写入dsts n = IOUtil.read(fd, dsts, offset, length, nd); } while ((n == IOStatus.INTERRUPTED) && isOpen()); return IOStatus.normalize(n); } finally { //I/O完成移除线程 threads.remove(ti); end(n > 0); assert IOStatus.check(n); } } } public int write(ByteBuffer src) throws IOException { ensureOpen(); if (!writable) throw new NonWritableChannelException(); synchronized (positionLock) { int n = 0; int ti = -1; try { begin(); ti = threads.add(); if (!isOpen()) return 0; do { n = IOUtil.write(fd, src, -1, nd); } while ((n == IOStatus.INTERRUPTED) && isOpen()); return IOStatus.normalize(n); } finally { threads.remove(ti); end(n > 0); assert IOStatus.check(n); } } } public long write(ByteBuffer[] srcs, int offset, int length) throws IOException { if ((offset < 0) || (length < 0) || (offset > srcs.length - length)) throw new IndexOutOfBoundsException(); ensureOpen(); if (!writable) throw new NonWritableChannelException(); synchronized (positionLock) { long n = 0; int ti = -1; try { begin(); ti = threads.add(); if (!isOpen()) return 0; do { n = IOUtil.write(fd, srcs, offset, length, nd); } while ((n == IOStatus.INTERRUPTED) && isOpen()); return IOStatus.normalize(n); } finally { threads.remove(ti); end(n > 0); assert IOStatus.check(n); } } } // -- Other operations -- // 表示文件中的当前字节位置 public long position() throws IOException { ensureOpen(); synchronized (positionLock) { long p = -1; int ti = -1; try { begin(); ti = threads.add(); if (!isOpen()) return 0; do { // in append-mode then position is advanced to end before writing //在append-mode模式下,则提前结束位置,返回最后的位置 //否则返回当前位置 p = (append) ? nd.size(fd) : position0(fd, -1); } while ((p == IOStatus.INTERRUPTED) && isOpen()); return IOStatus.normalize(p); } finally { threads.remove(ti); end(p > -1); assert IOStatus.check(p); } } } public FileChannel position(long newPosition) throws IOException { ensureOpen(); if (newPosition < 0) throw new IllegalArgumentException(); synchronized (positionLock) { long p = -1; int ti = -1; try { begin(); ti = threads.add(); if (!isOpen()) return null; do { //移动到新的位置 p = position0(fd, newPosition); } while ((p == IOStatus.INTERRUPTED) && isOpen()); return this; } finally { threads.remove(ti); end(p > -1); assert IOStatus.check(p); } } } //返回管道的文件大小 public long size() throws IOException { ensureOpen(); synchronized (positionLock) { long s = -1; int ti = -1; try { begin(); ti = threads.add(); if (!isOpen()) return -1; do { s = nd.size(fd); } while ((s == IOStatus.INTERRUPTED) && isOpen()); return IOStatus.normalize(s); } finally { threads.remove(ti); end(s > -1); assert IOStatus.check(s); } } } //砍掉指定size之外数据 public FileChannel truncate(long newSize) throws IOException { ensureOpen(); if (newSize < 0) throw new IllegalArgumentException("Negative size"); if (!writable) throw new NonWritableChannelException(); synchronized (positionLock) { int rv = -1; //记录读取位置 long p = -1; int ti = -1; long rp = -1; try { begin(); ti = threads.add(); if (!isOpen()) return null; // get current size long size; do { size = nd.size(fd); } while ((size == IOStatus.INTERRUPTED) && isOpen()); if (!isOpen()) return null; // get current position do { p = position0(fd, -1); } while ((p == IOStatus.INTERRUPTED) && isOpen()); if (!isOpen()) return null; assert p >= 0; // truncate file if given size is less than the current size //当设置的大小<文件大小,则截取文件大小到新的大小 if (newSize < size) { do { rv = nd.truncate(fd, newSize); } while ((rv == IOStatus.INTERRUPTED) && isOpen()); if (!isOpen()) return null; } // if position is beyond new size then adjust it //也就是说最小也得设置>=postion的大小位置 if (p > newSize) p = newSize; do { //重新设置位置从p开始 rp = position0(fd, p); } while ((rp == IOStatus.INTERRUPTED) && isOpen()); return this; } finally { threads.remove(ti); end(rv > -1); assert IOStatus.check(rv); } } } public void force(boolean metaData) throws IOException { ensureOpen(); int rv = -1; int ti = -1; try { begin(); ti = threads.add(); if (!isOpen()) return; do { rv = nd.force(fd, metaData); } while ((rv == IOStatus.INTERRUPTED) && isOpen()); } finally { threads.remove(ti); end(rv > -1); assert IOStatus.check(rv); } } // Assume at first that the underlying kernel supports sendfile(); // set this to false if we find out later that it doesn't // //是否支持sendfile private static volatile boolean transferSupported = true; // Assume that the underlying kernel sendfile() will work if the target // fd is a pipe; set this to false if we find out later that it doesn't // private static volatile boolean pipeSupported = true; // Assume that the underlying kernel sendfile() will work if the target // fd is a file; set this to false if we find out later that it doesn't // private static volatile boolean fileSupported = true; private long transferToDirectlyInternal(long position, int icount, WritableByteChannel target, FileDescriptor targetFD) throws IOException { assert !nd.transferToDirectlyNeedsPositionLock() || Thread.holdsLock(positionLock); long n = -1; int ti = -1; try { begin(); ti = threads.add(); if (!isOpen()) return -1; do { //直接操作文件描述符传输 n = transferTo0(fd, position, icount, targetFD); } while ((n == IOStatus.INTERRUPTED) && isOpen()); if (n == IOStatus.UNSUPPORTED_CASE) { if (target instanceof SinkChannelImpl) pipeSupported = false; if (target instanceof FileChannelImpl) fileSupported = false; return IOStatus.UNSUPPORTED_CASE; } if (n == IOStatus.UNSUPPORTED) { // Don't bother trying again transferSupported = false; return IOStatus.UNSUPPORTED; } return IOStatus.normalize(n); } finally { threads.remove(ti); end (n > -1); } } // Maximum size to map when using a mapped buffer private static final long MAPPED_TRANSFER_SIZE = 8L*1024L*1024L; private long transferFromFileChannel(FileChannelImpl src, long position, long count) throws IOException { if (!src.readable) throw new NonReadableChannelException(); synchronized (src.positionLock) { long pos = src.position(); long max = Math.min(count, src.size() - pos); long remaining = max; long p = pos; while (remaining > 0L) { long size = Math.min(remaining, MAPPED_TRANSFER_SIZE); // ## Bug: Closing this channel will not terminate the write //创建映射缓冲区 MappedByteBuffer bb = src.map(MapMode.READ_ONLY, p, size); try { //写入映射缓冲区 long n = write(bb, position); assert n > 0; p += n; position += n; remaining -= n; } catch (IOException ioe) { // Only throw exception if no bytes have been written if (remaining == max) throw ioe; break; } finally { //释放映射缓冲区 unmap(bb); } } long nwritten = max - remaining; src.position(pos + nwritten); return nwritten; } } private static final int TRANSFER_SIZE = 8192; //常规传输需要多次内存拷贝以及在用户模式和内核模式切换。 private long transferFromArbitraryChannel(ReadableByteChannel src, long position, long count) throws IOException { // Untrusted target: Use a newly-erased buffer int c = (int)Math.min(count, TRANSFER_SIZE); //创建临时缓冲区 ByteBuffer bb = Util.getTemporaryDirectBuffer(c); long tw = 0; // Total bytes written long pos = position; try { Util.erase(bb); while (tw < count) { bb.limit((int)Math.min((count - tw), (long)TRANSFER_SIZE)); // ## Bug: Will block reading src if this channel // ## is asynchronously closed //读入临时缓冲区 int nr = src.read(bb); if (nr <= 0) break; bb.flip(); //写入目标 int nw = write(bb, pos); tw += nw; if (nw != nr) break; pos += nw; bb.clear(); } return tw; } catch (IOException x) { if (tw > 0) return tw; throw x; } finally { //释放临时缓冲区 Util.releaseTemporaryDirectBuffer(bb); } } public long transferFrom(ReadableByteChannel src, long position, long count) throws IOException { //判断通道状态 ensureOpen(); if (!src.isOpen()) throw new ClosedChannelException(); if (!writable) throw new NonWritableChannelException(); if ((position < 0) || (count < 0)) throw new IllegalArgumentException(); if (position > size()) return 0; //如果是写入当前通道的是file通道,则采用内存映射方式传输 if (src instanceof FileChannelImpl) return transferFromFileChannel((FileChannelImpl)src, position, count); //普通方式传输 return transferFromArbitraryChannel(src, position, count); } public int read(ByteBuffer dst, long position) throws IOException { if (dst == null) throw new NullPointerException(); if (position < 0) throw new IllegalArgumentException("Negative position"); if (!readable) throw new NonReadableChannelException(); ensureOpen(); if (nd.needsPositionLock()) { synchronized (positionLock) { return readInternal(dst, position); } } else { return readInternal(dst, position); } } private int readInternal(ByteBuffer dst, long position) throws IOException { assert !nd.needsPositionLock() || Thread.holdsLock(positionLock); int n = 0; int ti = -1; try { begin(); ti = threads.add(); if (!isOpen()) return -1; do { n = IOUtil.read(fd, dst, position, nd); } while ((n == IOStatus.INTERRUPTED) && isOpen()); return IOStatus.normalize(n); } finally { threads.remove(ti); end(n > 0); assert IOStatus.check(n); } } public int write(ByteBuffer src, long position) throws IOException { if (src == null) throw new NullPointerException(); if (position < 0) throw new IllegalArgumentException("Negative position"); if (!writable) throw new NonWritableChannelException(); ensureOpen(); if (nd.needsPositionLock()) { synchronized (positionLock) { return writeInternal(src, position); } } else { return writeInternal(src, position); } } private int writeInternal(ByteBuffer src, long position) throws IOException { assert !nd.needsPositionLock() || Thread.holdsLock(positionLock); int n = 0; int ti = -1; try { begin(); ti = threads.add(); if (!isOpen()) return -1; do { n = IOUtil.write(fd, src, position, nd); } while ((n == IOStatus.INTERRUPTED) && isOpen()); return IOStatus.normalize(n); } finally { threads.remove(ti); end(n > 0); assert IOStatus.check(n); } } // -- Memory-mapped buffers -- private static class Unmapper implements Runnable { // may be required to close file private static final NativeDispatcher nd = new FileDispatcherImpl(); // keep track of mapped buffer usage static volatile int count; static volatile long totalSize; static volatile long totalCapacity; private volatile long address; private final long size; private final int cap; private final FileDescriptor fd; private Unmapper(long address, long size, int cap, FileDescriptor fd) { assert (address != 0); this.address = address; this.size = size; this.cap = cap; this.fd = fd; synchronized (Unmapper.class) { count++; totalSize += size; totalCapacity += cap; } } public void run() { if (address == 0) return; //移除内存映射 unmap0(address, size); address = 0; // if this mapping has a valid file descriptor then we close it if (fd.valid()) { try { //关闭文件描述符 nd.close(fd); } catch (IOException ignore) { // nothing we can do } } synchronized (Unmapper.class) { count--; totalSize -= size; totalCapacity -= cap; } } } private static void unmap(MappedByteBuffer bb) { Cleaner cl = ((DirectBuffer)bb).cleaner(); if (cl != null) cl.clean(); } private static final int MAP_RO = 0; private static final int MAP_RW = 1; private static final int MAP_PV = 2; public MappedByteBuffer map(MapMode mode, long position, long size) throws IOException { //判断是否打开 ensureOpen(); //mode空判断 if (mode == null) throw new NullPointerException("Mode is null"); if (position < 0L) throw new IllegalArgumentException("Negative position"); if (size < 0L) throw new IllegalArgumentException("Negative size"); if (position + size < 0) throw new IllegalArgumentException("Position + size overflow"); if (size > Integer.MAX_VALUE) throw new IllegalArgumentException("Size exceeds Integer.MAX_VALUE"); //获取映射模式 int imode = -1; if (mode == MapMode.READ_ONLY) imode = MAP_RO; else if (mode == MapMode.READ_WRITE) imode = MAP_RW; else if (mode == MapMode.PRIVATE) imode = MAP_PV; assert (imode >= 0); //如果该文件不能写,但是模式设置可写或者private,则冲突异常 if ((mode != MapMode.READ_ONLY) && !writable) throw new NonWritableChannelException(); //文件不能读,但模式设置可读或private,则冲突异常 if (!readable) throw new NonReadableChannelException(); long addr = -1; int ti = -1; try { //调用父类 begin(); ti = threads.add(); //再次判断通道是否打开 if (!isOpen()) return null; long filesize; do { filesize = nd.size(fd); //channel打开且IO没中断 } while ((filesize == IOStatus.INTERRUPTED) && isOpen()); if (!isOpen()) return null; //当文件大小<映射的大小,则扩展文件大小 if (filesize < position + size) { // Extend file size //不能写直接异常 if (!writable) { throw new IOException("Channel not open for writing " + "- cannot extend file to required size"); } int rv; do { //扩展大小 rv = nd.truncate(fd, position + size); } while ((rv == IOStatus.INTERRUPTED) && isOpen()); //再次判断管道是否关闭 if (!isOpen()) return null; } //size=0,返回一个DirectByteBufferR if (size == 0) { addr = 0; // a valid file descriptor is not required FileDescriptor dummy = new FileDescriptor(); if ((!writable) || (imode == MAP_RO)) return Util.newMappedByteBufferR(0, 0, dummy, null); else return Util.newMappedByteBuffer(0, 0, dummy, null); } //重新计算位置,是映射缓冲区的倍数 int pagePosition = (int)(position % allocationGranularity); long mapPosition = position - pagePosition; long mapSize = size + pagePosition; try { // If no exception was thrown from map0, the address is valid //获取映射地址 addr = map0(imode, mapPosition, mapSize); } catch (OutOfMemoryError x) { // An OutOfMemoryError may indicate that we've exhausted memory // so force gc and re-attempt map //OOM,则释放内存 System.gc(); try { Thread.sleep(100); } catch (InterruptedException y) { Thread.currentThread().interrupt(); } try { //再次分配地址 addr = map0(imode, mapPosition, mapSize); } catch (OutOfMemoryError y) { // After a second OOME, fail throw new IOException("Map failed", y); } } // On Windows, and potentially other platforms, we need an open // file descriptor for some mapping operations. FileDescriptor mfd; try { //创建一个内存映射的文件描述符,指向当前的native文件描述符 mfd = nd.duplicateForMapping(fd); } catch (IOException ioe) { unmap0(addr, mapSize); throw ioe; } //校验IO状态 assert (IOStatus.checkAll(addr)); //校验地址是否allocationGranularity整数倍 assert (addr % allocationGranularity == 0); //获取低32位 int isize = (int)size; //回收器 Unmapper um = new Unmapper(addr, mapSize, isize, mfd); //不能写或者模式为只读,则创建只读Buffer,所有写操作都抛异常 if ((!writable) || (imode == MAP_RO)) { return Util.newMappedByteBufferR(isize, addr + pagePosition, mfd, um); } else { return Util.newMappedByteBuffer(isize, addr + pagePosition, mfd, um); } } finally { threads.remove(ti); end(IOStatus.checkAll(addr)); } } /** * Invoked by sun.management.ManagementFactoryHelper to create the management * interface for mapped buffers. */ public static sun.misc.JavaNioAccess.BufferPool getMappedBufferPool() { return new sun.misc.JavaNioAccess.BufferPool() { @Override public String getName() { return "mapped"; } @Override public long getCount() { return Unmapper.count; } @Override public long getTotalCapacity() { return Unmapper.totalCapacity; } @Override public long getMemoryUsed() { return Unmapper.totalSize; } }; } // -- Locks -- // keeps track of locks on this file private volatile FileLockTable fileLockTable; // indicates if file locks are maintained system-wide (as per spec) private static boolean isSharedFileLockTable; // indicates if the disableSystemWideOverlappingFileLockCheck property // has been checked private static volatile boolean propertyChecked; // The lock list in J2SE 1.4/5.0 was local to each FileChannel instance so // the overlap check wasn't system wide when there were multiple channels to // the same file. This property is used to get 1.4/5.0 behavior if desired. private static boolean isSharedFileLockTable() { if (!propertyChecked) { synchronized (FileChannelImpl.class) { if (!propertyChecked) { String value = AccessController.doPrivileged( new GetPropertyAction( "sun.nio.ch.disableSystemWideOverlappingFileLockCheck")); isSharedFileLockTable = ((value == null) || value.equals("false")); propertyChecked = true; } } } return isSharedFileLockTable; } private FileLockTable fileLockTable() throws IOException { if (fileLockTable == null) { synchronized (this) { if (fileLockTable == null) { //根据独占和共享模式,建立不同锁表 if (isSharedFileLockTable()) { int ti = threads.add(); try { ensureOpen(); fileLockTable = FileLockTable.newSharedFileLockTable(this, fd); } finally { threads.remove(ti); } } else { fileLockTable = new SimpleFileLockTable(); } } } } return fileLockTable; } public FileLock lock(long position, long size, boolean shared) throws IOException { ensureOpen(); //写模式不能共享锁 if (shared && !readable) throw new NonReadableChannelException(); //读模式不能独占锁 if (!shared && !writable) throw new NonWritableChannelException(); //新建一个锁对象 FileLockImpl fli = new FileLockImpl(this, position, size, shared); //获取文件锁表 FileLockTable flt = fileLockTable(); //添加文件锁表 flt.add(fli); //标记是否执行完毕 boolean completed = false; int ti = -1; try { begin(); ti = threads.add(); if (!isOpen()) return null; int n; do { //调用native方法加锁 n = nd.lock(fd, true, position, size, shared); } while ((n == FileDispatcher.INTERRUPTED) && isOpen()); if (isOpen()) { //部分操作系统不支持共享锁,若获取到的是独占锁,则更新当前FileLockImpl为独占锁 if (n == FileDispatcher.RET_EX_LOCK) { assert shared; FileLockImpl fli2 = new FileLockImpl(this, position, size, false); flt.replace(fli, fli2); fli = fli2; } completed = true; } } finally { //加锁失败,移除锁 if (!completed) flt.remove(fli); threads.remove(ti); try { end(completed); } catch (ClosedByInterruptException e) { throw new FileLockInterruptionException(); } } return fli; } public FileLock tryLock(long position, long size, boolean shared) throws IOException { ensureOpen(); if (shared && !readable) throw new NonReadableChannelException(); if (!shared && !writable) throw new NonWritableChannelException(); FileLockImpl fli = new FileLockImpl(this, position, size, shared); FileLockTable flt = fileLockTable(); flt.add(fli); int result; int ti = threads.add(); try { try { ensureOpen(); result = nd.lock(fd, false, position, size, shared); } catch (IOException e) { flt.remove(fli); throw e; } if (result == FileDispatcher.NO_LOCK) { flt.remove(fli); return null; } if (result == FileDispatcher.RET_EX_LOCK) { assert shared; FileLockImpl fli2 = new FileLockImpl(this, position, size, false); flt.replace(fli, fli2); return fli2; } return fli; } finally { threads.remove(ti); } } void release(FileLockImpl fli) throws IOException { int ti = threads.add(); try { ensureOpen(); //释放锁 nd.release(fd, fli.position(), fli.size()); } finally { threads.remove(ti); } assert fileLockTable != null; //从锁表中移除 fileLockTable.remove(fli); } // -- File lock support -- /** * A simple file lock table that maintains a list of FileLocks obtained by a * FileChannel. Use to get 1.4/5.0 behaviour. */ private static class SimpleFileLockTable extends FileLockTable { // synchronize on list for access private final List<FileLock> lockList = new ArrayList<FileLock>(2); public SimpleFileLockTable() { } private void checkList(long position, long size) throws OverlappingFileLockException { assert Thread.holdsLock(lockList); for (FileLock fl: lockList) { if (fl.overlaps(position, size)) { throw new OverlappingFileLockException(); } } } public void add(FileLock fl) throws OverlappingFileLockException { synchronized (lockList) { checkList(fl.position(), fl.size()); lockList.add(fl); } } public void remove(FileLock fl) { synchronized (lockList) { lockList.remove(fl); } } public List<FileLock> removeAll() { synchronized(lockList) { List<FileLock> result = new ArrayList<FileLock>(lockList); lockList.clear(); return result; } } public void replace(FileLock fl1, FileLock fl2) { synchronized (lockList) { lockList.remove(fl1); lockList.add(fl2); } } } // -- Native methods -- // Creates a new mapping private native long map0(int prot, long position, long length) throws IOException; // Removes an existing mapping private static native int unmap0(long address, long length); // Transfers from src to dst, or returns -2 if kernel can't do that private native long transferTo0(FileDescriptor src, long position, long count, FileDescriptor dst); // Sets or reports this file's position // If offset is -1, the current position is returned // otherwise the position is set to offset //返回当前文件位置,offset-1则返回当前位置 private native long position0(FileDescriptor fd, long offset); // Caches fieldIDs private static native long initIDs(); static { IOUtil.load(); allocationGranularity = initIDs(); } }
    直接传输,mmap,零拷贝复制在transferTo()应用
    // this -> target public long transferTo(long position, long count, WritableByteChannel target) throws IOException { ensureOpen(); if (!target.isOpen()) throw new ClosedChannelException(); if (!readable) throw new NonReadableChannelException(); if (target instanceof FileChannelImpl && !((FileChannelImpl)target).writable) throw new NonWritableChannelException(); if ((position < 0) || (count < 0)) throw new IllegalArgumentException(); long sz = size(); if (position > sz) return 0; //确定读取个数不能超过最大可读个数 int icount = (int)Math.min(count, Integer.MAX_VALUE); if ((sz - position) < icount) icount = (int)(sz - position); long n; // Attempt a direct transfer, if the kernel supports it //内核支持,则直接传输 //文件描述符写文件描述符 if ((n = transferToDirectly(position, icount, target)) >= 0) return n; // Attempt a mapped transfer, but only to trusted channel types //内存映射传输 //创建缓冲区映射,然后写入管道 if ((n = transferToTrustedChannel(position, icount, target)) >= 0) return n; // Slow path for untrusted targets //慢速传输 //先写到缓冲区,在从缓冲区写入管道 return transferToArbitraryChannel(position, icount, target); }

    private long transferToDirectly(long position, int icount, WritableByteChannel target) throws IOException { //内核不支持,直接返回 if (!transferSupported) return IOStatus.UNSUPPORTED; FileDescriptor targetFD = null; if (target instanceof FileChannelImpl) { if (!fileSupported) return IOStatus.UNSUPPORTED_CASE; //获取文件描述器 targetFD = ((FileChannelImpl)target).fd; } else if (target instanceof SelChImpl) { // Direct transfer to pipe causes EINVAL on some configurations if ((target instanceof SinkChannelImpl) && !pipeSupported) return IOStatus.UNSUPPORTED_CASE; // Platform-specific restrictions. Now there is only one: // Direct transfer to non-blocking channel could be forbidden SelectableChannel sc = (SelectableChannel)target; if (!nd.canTransferToDirectly(sc)) return IOStatus.UNSUPPORTED_CASE; targetFD = ((SelChImpl)target).getFD(); } if (targetFD == null) return IOStatus.UNSUPPORTED; int thisFDVal = IOUtil.fdVal(fd); int targetFDVal = IOUtil.fdVal(targetFD); if (thisFDVal == targetFDVal) // Not supported on some configurations return IOStatus.UNSUPPORTED; if (nd.transferToDirectlyNeedsPositionLock()) { synchronized (positionLock) { long pos = position(); try { //直接操作文件描述符传输 return transferToDirectlyInternal(position, icount, target, targetFD); } finally { position(pos); } } } else { return transferToDirectlyInternal(position, icount, target, targetFD); } }

    private long transferToTrustedChannel(long position, long count, WritableByteChannel target) throws IOException { boolean isSelChImpl = (target instanceof SelChImpl); if (!((target instanceof FileChannelImpl) || isSelChImpl)) return IOStatus.UNSUPPORTED; // Trusted target: Use a mapped buffer long remaining = count; while (remaining > 0L) { //计算边界 long size = Math.min(remaining, MAPPED_TRANSFER_SIZE); try { //获取映射的缓冲区 MappedByteBuffer dbb = map(MapMode.READ_ONLY, position, size); try { // ## Bug: Closing this channel will not terminate the write int n = target.write(dbb); assert n >= 0; remaining -= n; if (isSelChImpl) { // one attempt to write to selectable channel break; } assert n > 0; position += n; } finally { //释放缓冲区内存 unmap(dbb); } } catch (ClosedByInterruptException e) { // target closed by interrupt as ClosedByInterruptException needs // to be thrown after closing this channel. assert !target.isOpen(); try { close(); } catch (Throwable suppressed) { e.addSuppressed(suppressed); } throw e; } catch (IOException ioe) { // Only throw exception if no bytes have been written if (remaining == count) throw ioe; break; } } return count - remaining; }

    private long transferToArbitraryChannel(long position, int icount, WritableByteChannel target) throws IOException { // Untrusted target: Use a newly-erased buffer //最大只能传传8192 int c = Math.min(icount, TRANSFER_SIZE); //创建固定大小缓存 ByteBuffer bb = Util.getTemporaryDirectBuffer(c); long tw = 0; // Total bytes written long pos = position; try { Util.erase(bb); while (tw < icount) { //设置可读边界 bb.limit(Math.min((int)(icount - tw), TRANSFER_SIZE)); //将数据读取到临时缓冲区 int nr = read(bb, pos); if (nr <= 0) break; bb.flip(); // ## Bug: Will block writing target if this channel // ## is asynchronously closed //数据写入目标缓冲区 int nw = target.write(bb); //重新计算读取索引 tw += nw; //说明目标写满,直接退出 if (nw != nr) break; pos += nw; //重置位置和可读个数 bb.clear(); } return tw; } catch (IOException x) { if (tw > 0) return tw; throw x; } finally { //释放缓冲区内存 Util.releaseTemporaryDirectBuffer(bb); } }

    socket通道

    其他

    ByteOrder

    public final class ByteOrder { private String name; private ByteOrder(String name) { this.name = name; } //大端字节存储,也就是按顺序存储 public static final ByteOrder BIG_ENDIAN = new ByteOrder("BIG_ENDIAN"); //小端字节存储,也就是硬件存储地址逆序存储 public static final ByteOrder LITTLE_ENDIAN = new ByteOrder("LITTLE_ENDIAN"); //返回jvm运行平台的固有字节顺序 public static ByteOrder nativeOrder() { return Bits.byteOrder(); } public String toString() { return name; } }

    Cleaner

    Bits

    Unsafe

    FileLock

    public abstract class FileLock implements AutoCloseable { //创建该锁的通道 private final Channel channel; //锁开始位置 private final long position; //锁多少长度 private final long size; //是否共享锁,具体取决操作系统实现 private final boolean shared; protected FileLock(FileChannel channel, long position, long size, boolean shared) { if (position < 0) throw new IllegalArgumentException("Negative position"); if (size < 0) throw new IllegalArgumentException("Negative size"); if (position + size < 0) throw new IllegalArgumentException("Negative position + size"); this.channel = channel; this.position = position; this.size = size; this.shared = shared; } protected FileLock(AsynchronousFileChannel channel, long position, long size, boolean shared) { if (position < 0) throw new IllegalArgumentException("Negative position"); if (size < 0) throw new IllegalArgumentException("Negative size"); if (position + size < 0) throw new IllegalArgumentException("Negative position + size"); this.channel = channel; this.position = position; this.size = size; this.shared = shared; } //返回创建锁的通道 public final FileChannel channel() { return (channel instanceof FileChannel) ? (FileChannel)channel : null; } //返回获取此锁的文件的通道。 public Channel acquiredBy() { return channel; } //返回锁定区域的第一个字节的文件中的位置。 public final long position() { return position; } //以字节为单位返回锁定区域的大小。 public final long size() { return size; } public final boolean isShared() { return shared; } //查询一个FileLock对象是否与一个指定的文件区域重叠 public final boolean overlaps(long position, long size) { if (position + size <= this.position) return false; // That is below this if (this.position + this.size <= position) return false; // This is below that return true; } //告诉这个锁是否有效。 public abstract boolean isValid(); public abstract void release() throws IOException; //释放锁 public final void close() throws IOException { release(); } public final String toString() { return (this.getClass().getName() + "[" + position + ":" + size + " " + (shared ? "shared" : "exclusive") + " " + (isValid() ? "valid" : "invalid") + "]"); } }

    FileLockImpl

    public class FileLockImpl extends FileLock { private volatile boolean valid = true; FileLockImpl(FileChannel channel, long position, long size, boolean shared) { super(channel, position, size, shared); } FileLockImpl(AsynchronousFileChannel channel, long position, long size, boolean shared) { super(channel, position, size, shared); } public boolean isValid() { return valid; } void invalidate() { //如果当且仅当当前线程拥有某个具体对象的锁,则返回true assert Thread.holdsLock(this); valid = false; } public synchronized void release() throws IOException { Channel ch = acquiredBy(); //通道已经关闭,则抛异常 if (!ch.isOpen()) throw new ClosedChannelException(); //锁有效 if (valid) { if (ch instanceof FileChannelImpl) ((FileChannelImpl)ch).release(this); else if (ch instanceof AsynchronousFileChannelImpl) ((AsynchronousFileChannelImpl)ch).release(this); else throw new AssertionError(); valid = false; } } }
    Processed: 0.013, SQL: 9