jdk源码解析八之Piped管道流

    技术2022-07-13  80

    文章目录

    demoPipedInputStreamPipedOutputStream

    demo

    package io; import java.io.IOException; import java.io.PipedInputStream; import java.io.PipedOutputStream; /** * Java IO中的管道为运行在同一个JVM中的两个线程提供了通信的能力。所以管道也可以作为数据源以及目标媒介 * * DataInputStream 读取的数据由大于一个字节的Java原语(如int,long,float,double等)组成 * SequenceInputStream 将两个或者多个输入流当成一个输入流依次读取 */ public class PipeExample { public static void main(String[] args) throws IOException { final PipedOutputStream pipedOutputStream = new PipedOutputStream(); final PipedInputStream pipedInputStream = new PipedInputStream(pipedOutputStream); //pipedInputStream.connect(pipedOutputStream); 也可以连接 Thread thread1 = new Thread(new Runnable() { public void run() { try { pipedOutputStream.write("hello".getBytes()); } catch (IOException e) { e.printStackTrace(); } try { pipedOutputStream.close(); } catch (IOException e) { e.printStackTrace(); } } }); Thread thread = new Thread(new Runnable() { public void run() { try { int read; while ((read = pipedInputStream.read()) != -1) { System.out.println((char) read); } } catch (IOException e) { e.printStackTrace(); } try { pipedInputStream.close(); } catch (IOException e) { e.printStackTrace(); } } }); thread.start(); thread1.start(); } }

    PipedInputStream

    public class PipedInputStream extends InputStream { //分别标记当前读管道,写管道的状态 boolean closedByWriter = false; volatile boolean closedByReader = false; //标记是否连接到写管道 boolean connected = false; //读写2个线程 Thread readSide; Thread writeSide; //默认管道缓冲区大小 private static final int DEFAULT_PIPE_SIZE = 1024; protected static final int PIPE_SIZE = DEFAULT_PIPE_SIZE; protected byte buffer[]; //下一个写入字节位置 in=out则说明满了 protected int in = -1; //下一个读取字节位置 protected int out = 0; public PipedInputStream(PipedOutputStream src) throws IOException { this(src, DEFAULT_PIPE_SIZE); } public PipedInputStream(PipedOutputStream src, int pipeSize) throws IOException { //初始化缓冲区大小,默认大小1024 initPipe(pipeSize); //将当前对象传入PipedOutputStream connect(src); } public PipedInputStream() { //初始化缓冲区大小,默认大小1024 initPipe(DEFAULT_PIPE_SIZE); } public PipedInputStream(int pipeSize) { //初始化指定大小的缓冲区 initPipe(pipeSize); } private void initPipe(int pipeSize) { //初始化指定大小管道缓冲区 if (pipeSize <= 0) { throw new IllegalArgumentException("Pipe Size <= 0"); } buffer = new byte[pipeSize]; } public void connect(PipedOutputStream src) throws IOException { //管道连接 src.connect(this); } //接受output发送过来的数据 protected synchronized void receive(int b) throws IOException { checkStateForReceive(); //获取当前线程 writeSide = Thread.currentThread(); //读满,则等待,通知其他读线程,我要开始写了 if (in == out) awaitSpace(); //读满或初次读取,则重置下标 if (in < 0) { in = 0; out = 0; } //写入数据 buffer[in++] = (byte)(b & 0xFF); //超过边界则从0开始 if (in >= buffer.length) { in = 0; } } synchronized void receive(byte b[], int off, int len) throws IOException { //检查读写管道状态是否正常开放 checkStateForReceive(); writeSide = Thread.currentThread(); int bytesToTransfer = len; while (bytesToTransfer > 0) { //读满,则等待,通知其他读线程,我要开始写了 if (in == out) awaitSpace(); int nextTransferAmount = 0; //如果还有空间可以读取,则获取最大可读取大小 if (out < in) { nextTransferAmount = buffer.length - in; } else if (in < out) { //读满或者初次读取则重置 if (in == -1) { in = out = 0; nextTransferAmount = buffer.length - in; } else { //读到这里说明in走了一圈,重置为0了 nextTransferAmount = out - in; } } //读取空间足够 if (nextTransferAmount > bytesToTransfer) nextTransferAmount = bytesToTransfer; assert(nextTransferAmount > 0); //写入范围数据到buffer System.arraycopy(b, off, buffer, in, nextTransferAmount); //记录剩下还需要写入的空间 bytesToTransfer -= nextTransferAmount; off += nextTransferAmount; in += nextTransferAmount; //写入超限,重置 if (in >= buffer.length) { in = 0; } } } private void checkStateForReceive() throws IOException { //当前是否连接 if (!connected) { throw new IOException("Pipe not connected"); } else if (closedByWriter || closedByReader) { //当前读写管道是否关闭了 throw new IOException("Pipe closed"); } else if (readSide != null && !readSide.isAlive()) { //当前线程死了 throw new IOException("Read end dead"); } } private void awaitSpace() throws IOException { while (in == out) { //检查读写管道状态是否正常开放 checkStateForReceive(); /* full: kick any waiting readers */ //通知 notifyAll(); try { //等待 wait(1000); } catch (InterruptedException ex) { throw new java.io.InterruptedIOException(); } } } synchronized void receivedLast() { //标记写管道流关闭 closedByWriter = true; //通知所有等待的线程 notifyAll(); } public synchronized int read() throws IOException { //校验当前线程是否正常连接 if (!connected) { throw new IOException("Pipe not connected"); } else if (closedByReader) { //当前读管道是否关闭 throw new IOException("Pipe closed"); } else if (writeSide != null && !writeSide.isAlive() && !closedByWriter && (in < 0)) { //写线程是否存活,以及是否关闭了写流 throw new IOException("Write end dead"); } readSide = Thread.currentThread(); int trials = 2; //等待写入 while (in < 0) { //写入流关闭,则返回-1 if (closedByWriter) { /* closed by writer, return EOF */ return -1; } //写线程已经不存活了,则抛异常 if ((writeSide != null) && (!writeSide.isAlive()) && (--trials < 0)) { throw new IOException("Pipe broken"); } /* might be a writer waiting */ //通知线程,开始读了 notifyAll(); try { //等待 wait(1000); } catch (InterruptedException ex) { throw new java.io.InterruptedIOException(); } } //读入数据 int ret = buffer[out++] & 0xFF; //读到缓冲区上限则重置 if (out >= buffer.length) { out = 0; } //读满则重置 if (in == out) { /* now empty */ in = -1; } return ret; } public synchronized int read(byte b[], int off, int len) throws IOException { if (b == null) { throw new NullPointerException(); } else if (off < 0 || len < 0 || len > b.length - off) { throw new IndexOutOfBoundsException(); } else if (len == 0) { return 0; } /* possibly wait on the first character */ //尝试读取一个字节 int c = read(); //没数据则直接返回 if (c < 0) { return -1; } b[off] = (byte) c; int rlen = 1; while ((in >= 0) && (len > 1)) { int available; //获取可读取的范围 if (in > out) { available = Math.min((buffer.length - out), (in - out)); } else { //执行到这里,说明in写了一圈了. available = buffer.length - out; } // A byte is read beforehand outside the loop //可读取的空间足够,则直接读取len-1长度 if (available > (len - 1)) { available = len - 1; } System.arraycopy(buffer, out, b, off + rlen, available); out += available; rlen += available; len -= available; //读到缓冲区上限则重置 if (out >= buffer.length) { out = 0; } //读满则重置 if (in == out) { /* now empty */ in = -1; } } return rlen; } public synchronized int available() throws IOException { //读满,或者第一次读取,则暂无字节可读取 if(in < 0) return 0; else if(in == out) //读满,则重置范围 return buffer.length; else if (in > out) //还在一圈范围内 return in - out; else //读取一圈 return in + buffer.length - out; } public void close() throws IOException { //标记当前读流关闭 closedByReader = true; synchronized (this) { in = -1; } } }

    PipedOutputStream

    public class PipedOutputStream extends OutputStream { private PipedInputStream sink; public PipedOutputStream(PipedInputStream snk) throws IOException { //管道连接 connect(snk); } public PipedOutputStream() { } public synchronized void connect(PipedInputStream snk) throws IOException { if (snk == null) { throw new NullPointerException(); } else if (sink != null || snk.connected) { throw new IOException("Already connected"); } sink = snk; //标记初次写入 snk.in = -1; //读取起始位置 snk.out = 0; //标记状态为连接 snk.connected = true; } public void write(int b) throws IOException { if (sink == null) { throw new IOException("Pipe not connected"); } //往input写入数据 sink.receive(b); } public void write(byte b[], int off, int len) throws IOException { if (sink == null) { throw new IOException("Pipe not connected"); } else if (b == null) { throw new NullPointerException(); } else if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length) || ((off + len) < 0)) { throw new IndexOutOfBoundsException(); } else if (len == 0) { return; } //往input写入指定范围数据 sink.receive(b, off, len); } //强制唤醒所有阻塞状态 public synchronized void flush() throws IOException { if (sink != null) { synchronized (sink) { //通知其他 sink.notifyAll(); } } } public void close() throws IOException { //标记当前管道已经关闭 if (sink != null) { sink.receivedLast(); } } }
    Processed: 0.011, SQL: 9