梳理一下Netty反应器模式中各个组件之间的关系:
1.反应器EventLoop和通道Channel:一对多,一个EventLoop可以绑定多个Channel
2.通道Channel和处理器handler:多对多(一个channel的IO事件被多个handler处理,一个handler可以处理多个channel的相同IO事件)
那么如何把处理器绑定到通道中呢?
特殊组件:ChannelPipline(通道流水线):将多个处理器实例绑定到同一个通道,串起来管理,是一个双向链表,一个handler作为一个节点。一个channel拥有一个pipline。(netty为了减少锁等待的消耗,使用串行来提高效率)
为什么要称为流水线:因为这些处理器被串在一起,一个IO事件发生,是根据这个串起来的处理器,依次进行处理的,所以就如同工厂的流水线一样。Netty规定,入站处理,是从前到后的,而出战出来是从后到前的。也就是说,inbound永远都是从左到右执行,而outBound是从右向左执行。
ChannelPipline的默认实现类DefaultChannelPipline,我们看这个执行的过程,就可以推断,里面是一个双向链表在处理handler(链表,非集合), 所以我们看DefaultChannelPipline的字段:
//为当前线程存放类型和名字的映射,避免重名 private static final FastThreadLocal<Map<Class<?>, String>> nameCaches = new FastThreadLocal<Map<Class<?>, String>>() { @Override protected Map<Class<?>, String> initialValue() { return new WeakHashMap<Class<?>, String>(); } }; //消息大小写估算器更新 private static final AtomicReferenceFieldUpdater<DefaultChannelPipeline, MessageSizeEstimator.Handle> ESTIMATOR = AtomicReferenceFieldUpdater.newUpdater( DefaultChannelPipeline.class, MessageSizeEstimator.Handle.class, "estimatorHandle"); final AbstractChannelHandlerContext head; //头处理器上下文 final AbstractChannelHandlerContext tail; //尾处理器上下文 private final Channel channel; //通道 private final ChannelFuture succeededFuture; //通道异步结果 private final VoidChannelPromise voidPromise; //任意类型的一步结果 private final boolean touch = ResourceLeakDetector.isEnabled(); //是否要资源泄漏检测 //待回调链路 private PendingHandlerCallback pendingHandlerCallbackHead;而Pipline的初始化,是在NioServerSocketChannel初始化的时候,四个super()到顶端的AbstractChannel中的构造方法,进行初始化的,在上一节中,我们有源码。
1.初始化
只有一个构造方法
protected DefaultChannelPipeline(Channel channel) { this.channel = ObjectUtil.checkNotNull(channel, "channel"); succeededFuture = new SucceededChannelFuture(channel, null); voidPromise = new VoidChannelPromise(channel, true); tail = new TailContext(this); //创建尾节点 head = new HeadContext(this); //创建头节点 //双向链表 head.next = tail; tail.prev = head; }在构造方法中,初始化了头节点和尾节点,并且使用链表,连接了两个节点,而我们手动添加的节点,必然是存在于头尾节点中间的。
对于TailContext,HeadContext都是内部类,看一下继承图,
首先我们看一下父类AbstractChanelHandlerContext的字段
volatile AbstractChannelHandlerContext next; //后节点 volatile AbstractChannelHandlerContext prev; //前节点 private final DefaultChannelPipeline pipeline; //所属pipline private final String name; //节点名称 final EventExecutor executor; //事件执行器 private ChannelFuture succeededFuture; //回调在这里,我们没有发现Handler,handler的封装,是在子类DefaultChannelHandlerContext中,它是唯一字段,该类很简单,那么重要功能也都在父类中
final class DefaultChannelHandlerContext extends AbstractChannelHandlerContext { private final ChannelHandler handler; ... }对于TailContext类来说,它没有字段,也就是不存在Handler,HeadContext同样如此,但是它具有channle的unsafe对象
final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler { private final Unsafe unsafe; HeadContext(DefaultChannelPipeline pipeline) { super(pipeline, null, HEAD_NAME, HeadContext.class); unsafe = pipeline.channel().unsafe(); setAddComplete(); }可以调用unsafe.bind()方法,还要connect()方法
所以,我们的pipline并没有直接存储handler,而是使用ChannelContext上下文,对handler进行了封装,还添加了两个头尾节点,这两个节点没有handler处理功能。
2.常用方法
2.1 addLast()添加handler
这个方法是最常用的,我们看它的源码,在类中,有很多个重载的方法,基本套路,对参数的设置,最后流入这个方法
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) { final AbstractChannelHandlerContext newCtx; synchronized (this) { //检测handler是否重复添加 checkMultiplicity(handler); //包装 handler newCtx = newContext(group, filterName(name, handler), handler); //添加到双向链表中 addLast0(newCtx); if (!registered) { //channel还没有注册 newCtx.setAddPending(); //设置为待添加状态,一共有三个状态,可见该类中的顶部字段 callHandlerCallbackLater(newCtx, true); //添加回调 return this; } //从通道获取执行器 EventExecutor executor = newCtx.executor(); if (!executor.inEventLoop()) { //执行线程不是EventLoop线程 callHandlerAddedInEventLoop(newCtx, executor); //添加任务到执行器 return this; } } callHandlerAdded0(newCtx); //触发handlerAdd回调 return this; }2.2 包装Handler,调用ChannelContet的构造方法
DefaultChannelHandlerContext( DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) { super(pipeline, executor, name, handler.getClass()); this.handler = handler; } //父类 AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name, Class<? extends ChannelHandler> handlerClass) { this.name = ObjectUtil.checkNotNull(name, "name"); this.pipeline = pipeline; this.executor = executor; this.executionMask = mask(handlerClass); ordered = executor == null || executor instanceof OrderedEventExecutor; }2.3 添加到双向链表中
private void addLast0(AbstractChannelHandlerContext newCtx) { AbstractChannelHandlerContext prev = tail.prev; //找到尾节点的 前节点 newCtx.prev = prev; //新节点,对接前节点 newCtx.next = tail; //对接 后节点 prev.next = newCtx; //更改原有链表 tail.prev = newCtx; }添加handler还存在头添加,指定位置添加,对于addFirst,addBefore(),addAfter()逻辑基本都是差不多的,所以我们这里分析源码中使用的addLast()添加。
2.4 callHandlerCallbackLater()回调
channel注册到selector和 pipline.addHandler不是一个线程上的,基本逻辑是 先注册,再添加handler,如果我们添加handler的时候,还没有注册成功,那么就需要把handler 添加到待注册的链表(本类字段:pendingHandlerCallbackHead)中,等待注册成功了回调添加方法。(但是我们之前已经把handler放入了pipline中了)
将context封装成为PendingHandlerAddedTask,它是一个内部类,专门用来回调的,而且还是一个链表。
private void callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added) { assert !registered; //added=true, 然后把ctx包装成task 添加 or 删除 handler from pipline PendingHandlerCallback task = added ? new PendingHandlerAddedTask(ctx) : new PendingHandlerRemovedTask(ctx); PendingHandlerCallback pending = pendingHandlerCallbackHead; if (pending == null) { pendingHandlerCallbackHead = task; //链表是空的,则将task设置为第一个 } else { // Find the tail of the linked-list. while (pending.next != null) { //不为空,放到尾部,这里先遍历到尾部 pending = pending.next; } pending.next = task; //保存到尾部 } }封装完成之后,就channel被注册道selector的时候,触发回调,比如在addLast()中也可能触发
在AddedTask类中,也可能
private final class PendingHandlerAddedTask extends PendingHandlerCallback { ... public void run() { callHandlerAdded0(ctx); } @Override void execute() { //NioEventLoop 实现了EventExecutor EventExecutor executor = ctx.executor(); if (executor.inEventLoop()) { //防止多线程,如果在事件循环线程 callHandlerAdded0(ctx); // 直接执行 } else { try { executor.execute(this); //让执行器处理 } c..... }反正最后都流向了callHandlerAdded0()方法,
private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) { try { ctx.callHandlerAdded(); //就这一个方法,下面全是异常,委托给了context } catch (Throwable t) { .... }这里就是调用context的方法, 最后又跑到了handler中了,所以对于Handler部分的内容,我们在handler章节中分析,添加handler节点就到此结束。
3 删除节点
逻辑就和添加一样了,同样也是走的上面的回调方法,只不过创建的task变成了removeTask,它也是内部类。
private AbstractChannelHandlerContext remove(final AbstractChannelHandlerContext ctx) { assert ctx != head && ctx != tail; synchronized (this) { atomicRemoveFromHandlerList(ctx); //原子操作 if (!registered) { callHandlerCallbackLater(ctx, false); //未注册,也要回调 return ctx; } EventExecutor executor = ctx.executor(); if (!executor.inEventLoop()) { executor.execute(new Runnable() { @Override public void run() { callHandlerRemoved0(ctx); } }); return ctx; } } callHandlerRemoved0(ctx); return ctx; }4.register过程调用的方法
当我们init()完毕之后,回调用regist,将channel绑定到selector,当把本地nio channel绑定到selector之后,会使用pipline进行一些处理
位于AbstracChannel内部类AbstractUnsafe.register0()方法中
doRegister(); //由第二层子类 实现 【入】 //注册 neverRegistered = false; registered = true; pipeline.invokeHandlerAddedIfNeeded(); //1. 有待处理的任务 safeSetSuccess(promise); //向管道发送 注册事件 pipeline.fireChannelRegistered(); //2. 触发并传递注册事件 if (isActive()) { //如果管道可用 if (firstRegistration) { //向管道发送channel可用事件 pipeline.fireChannelActive(); //3.4.1 invokerHandlerAddedIfNeeded()
现在channel已经成功注册道selector中了,我们一次性处理待添加的handler。所以这里只处理一次。!
final void invokeHandlerAddedIfNeeded() { assert channel.eventLoop().inEventLoop(); if (firstRegistration) { //只处理一次 firstRegistration = false; callHandlerAddedForAllHandlers(); //【入】 } }进入实现功能方法,执行的就是内部类,我们上面展示的,PendingHandlerAddedTask.execute()方法
private void callHandlerAddedForAllHandlers() { final PendingHandlerCallback pendingHandlerCallbackHead; synchronized (this) { assert !registered; registered = true; //取得 本类 回调字段 pendingHandlerCallbackHead = this.pendingHandlerCallbackHead; // Null out so it can be GC'ed. 将本类字段设置为Null this.pendingHandlerCallbackHead = null; } // 遍历该链表,依次执行回调任务 PendingHandlerCallback task = pendingHandlerCallbackHead; while (task != null) { task.execute(); //依次执行 task = task.next; } }对于后面的fireChannelRegistered()方法和fireChannelActive()方法,他们的实现逻辑差不多,都是一种传播行为,这里涉及了迭代,所以代码稍显一点跳转的复杂,具体的逻辑呢,是没有很大复杂度,主要就是在各种方法之间跳过去调过来,中间就是有一个skipContext有点意思,就是inbound类型的,就不需要走outboun类型的地方,所以需要skip,在下面,我们列举了read是如何传播的。
5.事件传播
当有可读事件的时候,触发channelRead()方法
然后就是走的和上面一样的fireChannelRead()方法,开始遍历handler链表。
public final ChannelPipeline fireChannelRead(Object msg) { AbstractChannelHandlerContext.invokeChannelRead(head, msg); return this; }来到AbstractChannelHandlerContext类中的,主要是判断当前的执行器所在的线从,然后调用下一个节点的invokeChannelRead()方法,那么自然就进入了下一个节点中,这里就形成了传播!
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) { final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next); EventExecutor executor = next.executor(); //获得下一个节点的执行器 if (executor.inEventLoop()) { next.invokeChannelRead(m); //调用下一个节点的方法,这里就进行了下一个节点 } else { executor.execute(new Runnable() { @Override public void run() { next.invokeChannelRead(m); } }); } }如果走的是第一个if,那么就会来到本类的invokeChannelRead()中,它的内容,就是调用我们handler实现的具体业务逻辑。并且把context包装类自己传入进去了,方便在使用完毕之后,能够继续传播事件。
private void invokeChannelRead(Object msg) { if (invokeHandler()) { try { ((ChannelInboundHandler) handler()).channelRead(this, msg); } ... } else { fireChannelRead(msg); } }而具体的Handler要实现这个方法,如果要传递,就会在最后调用ctx.fireChannelRead()方法
public void channelRead(ChannelHandlerContext ctx, Object msg) { ctx.fireChannelRead(msg); }fireChannelRead()一出,这里就出现循环了。
参考连接:大神版:https://blog.csdn.net/wangwei19871103/article/details/104168322(系列文章,含多篇)
详细版:https://www.jianshu.com/p/0e15165714fc