EventLoop可以看成只有一个线程的线程池。
每个EventLoop包含的线程Thread定义在父类SingleThreadEventExecutor中 每个EventLoop包含两个队列 taskQueue来自父类SingleThreadEventExecutor,保存各种任务,比如处理事件等等 tailTask来自父类SingleThreadEventLoop,用于每次事件循环后置任务处理
实现的是 SingleThreadEventExecutor 的抽象方法
@Override protected void run() { for (;;) { try { //处理各种事件 switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) { case SelectStrategy.CONTINUE: continue; case SelectStrategy.SELECT: //3.2.1 select(wakenUp.getAndSet(false)); if (wakenUp.get()) { selector.wakeup(); } // fall through default: } cancelledKeys = 0; needsToSelectAgain = false; final int ioRatio = this.ioRatio; if (ioRatio == 100) { try { //3.2.2 processSelectedKeys(); } finally { // Ensure we always run tasks. runAllTasks(); } } else { final long ioStartTime = System.nanoTime(); try { processSelectedKeys(); } finally { // Ensure we always run tasks. final long ioTime = System.nanoTime() - ioStartTime; runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } } } catch (Throwable t) { handleLoopException(t); } // Always handle shutdown even if the loop processing threw an exception. try { if (isShuttingDown()) { closeAll(); if (confirmShutdown()) { return; } } } catch (Throwable t) { handleLoopException(t); } } }跟 selectNow 和 select:
@Override public int selectNow() throws IOException { selectionKeys.reset(); NIO的selectNow实现 return delegate.selectNow(); } @Override public int select(long timeout) throws IOException { selectionKeys.reset(); //NIO的select实现 return delegate.select(timeout); }类 NioEventLoop
private void processSelectedKeys() { if (selectedKeys != null) { processSelectedKeysOptimized(); } else { processSelectedKeysPlain(selector.selectedKeys()); } }跟 processSelectedKeysPlain:
private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) { if (selectedKeys.isEmpty()) { return; } Iterator<SelectionKey> i = selectedKeys.iterator(); for (;;) { final SelectionKey k = i.next(); final Object a = k.attachment(); i.remove(); if (a instanceof AbstractNioChannel) { //跟这里 processSelectedKey(k, (AbstractNioChannel) a); } else { @SuppressWarnings("unchecked") NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a; processSelectedKey(k, task); } if (!i.hasNext()) { break; } if (needsToSelectAgain) { selectAgain(); selectedKeys = selector.selectedKeys(); // Create the iterator again to avoid ConcurrentModificationException if (selectedKeys.isEmpty()) { break; } else { i = selectedKeys.iterator(); } } } }跟 processSelectedKey:
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); if (!k.isValid()) { final EventLoop eventLoop; try { eventLoop = ch.eventLoop(); } catch (Throwable ignored) { if (eventLoop != this || eventLoop == null) { return; } // close the channel if the key is not valid anymore unsafe.close(unsafe.voidPromise()); return; } try { int readyOps = k.readyOps(); // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise // the NIO JDK channel implementation may throw a NotYetConnectedException. if ((readyOps & SelectionKey.OP_CONNECT) != 0) { // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking // See https://github.com/netty/netty/issues/924 int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops); unsafe.finishConnect(); } // Process OP_WRITE first as we may be able to write some queued buffers and so free memory. if ((readyOps & SelectionKey.OP_WRITE) != 0) { // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write ch.unsafe().forceFlush(); } // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead // to a spin loop //3.2.2.1 if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); } } catch (CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); } }从中看可以看到 NIO 代码里的各种事件了
跟 read: 类 NioMessageUnsafe in AbstractNioMessageChannel
private final class NioMessageUnsafe extends AbstractNioUnsafe { //这里的Object就是NioSocketChannel private final List<Object> readBuf = new ArrayList<Object>(); @Override public void read() { assert eventLoop().inEventLoop(); final ChannelConfig config = config(); final ChannelPipeline pipeline = pipeline(); final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); allocHandle.reset(config); boolean closed = false; Throwable exception = null; try { try { do { //accept阻塞,往readBuf里添加新的读channel 见3.2.2.1.1 int localRead = doReadMessages(readBuf); if (localRead == 0) { break; } if (localRead < 0) { closed = true; break; } allocHandle.incMessagesRead(localRead); } while (allocHandle.continueReading()); } catch (Throwable t) { exception = t; } int size = readBuf.size(); for (int i = 0; i < size; i ++) { readPending = false; //见3.2.2.1.2 pipeline.fireChannelRead(readBuf.get(i)); } readBuf.clear(); allocHandle.readComplete(); pipeline.fireChannelReadComplete(); if (exception != null) { closed = closeOnReadError(exception); pipeline.fireExceptionCaught(exception); } if (closed) { inputShutdown = true; if (isOpen()) { close(voidPromise()); } } } finally { if (!readPending && !config.isAutoRead()) { removeReadOp(); } } } }类 NioServerSocketChannel
@Override protected int doReadMessages(List<Object> buf) throws Exception { //jdk的SocketChannel 调用了jdk的ServerSocketChannel的accept方法,阻塞 SocketChannel ch = SocketUtils.accept(javaChannel()); try { if (ch != null) { //包装成netty的NioSocketChannel,关注OP_READ事件,放入集合 buf.add(new NioSocketChannel(this, ch)); return 1; } } catch (Throwable t) { logger.warn("Failed to create a new channel from an accepted socket.", t); try { ch.close(); } catch (Throwable t2) { logger.warn("Failed to close a socket.", t2); } } return 0; }fireChannelRead 会调用所有入站 handler 的 channelRead 方法,主要看 ServerBootstrapAcceptor 的 channelRead 方法:
@Override @SuppressWarnings("unchecked") public void channelRead(ChannelHandlerContext ctx, Object msg) { final Channel child = (Channel) msg; //child是处理读事件的handler child.pipeline().addLast(childHandler); setChannelOptions(child, childOptions, logger); for (Entry<AttributeKey<?>, Object> e: childAttrs) { child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue()); } try { //轮询一个EventLoop去register见4.1.3 childGroup.register(child).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { forceClose(child, future.cause()); } } }); } catch (Throwable t) { forceClose(child, t); } }父类 SingleThreadEventExecutor
@Override public void execute(Runnable task) { if (task == null) { throw new NullPointerException("task"); } //正在调用execute的线程和SingleThreadEventExecutor里的变量Thread是不是同一个 boolean inEventLoop = inEventLoop(); //添加到taskQueue队列,默认大小16,默认拒绝策略:抛出异常 addTask(task); if (!inEventLoop) { //新建线程死循环执行队列里的任务 见3.3.1 startThread(); if (isShutdown() && removeTask(task)) { reject(); } } //addTaskWakesUp: 添加任务后,任务是否会自动导致线程唤醒 //wakesUpForTask(task): return !(task instanceof NonWakeupRunnable); if (!addTaskWakesUp && wakesUpForTask(task)) { //唤醒线程 见3.3.2 wakeup(inEventLoop); } }跟 doStartThread:
private void doStartThread() { assert thread == null; //ThreadPerTaskExecutor,新建线程执行Runnable 见(2) executor.execute(new Runnable() { @Override public void run() { //SingleThreadEventExecutor的thread变量赋值 thread = Thread.currentThread(); if (interrupted) { thread.interrupt(); } boolean success = false; // 更新最后执行时间 updateLastExecutionTime(); try { //见 3.2 SingleThreadEventExecutor.this.run(); success = true; } catch (Throwable t) { logger.warn("Unexpected exception from an event executor: ", t); } finally { for (;;) { int oldState = state; if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet( SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) { break; } } // Check if confirmShutdown() was called at the end of the loop. if (success && gracefulShutdownStartTime == 0) { if (logger.isErrorEnabled()) { logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " + SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must " + "be called before run() implementation terminates."); } } try { // Run all remaining tasks and shutdown hooks. for (;;) { if (confirmShutdown()) { break; } } } finally { try { cleanup(); } finally { STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED); threadLock.release(); if (!taskQueue.isEmpty()) { if (logger.isWarnEnabled()) { logger.warn("An event executor terminated with " + "non-empty task queue (" + taskQueue.size() + ')'); } } terminationFuture.setSuccess(null); } } } } }); }类 NioEventLoop
@Override protected void wakeup(boolean inEventLoop) { if (!inEventLoop && wakenUp.compareAndSet(false, true)) { selector.wakeup(); } }