netty ChannelInitlizer的作用

    技术2024-01-17  98

    ChannelInitializer一般用来初始化pipline,这不是废话吗 ! 但是很神奇的一点是ChannelInitializer也是一个ChannelHandler,甚至直接继承至ChannelInboundHandlerAdapter

    @Sharable public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter { private static final InternalLogger logger = InternalLoggerFactory.getInstance(ChannelInitializer.class); // We use a Set as a ChannelInitializer is usually shared between all Channels in a Bootstrap / // ServerBootstrap. This way we can reduce the memory usage compared to use Attributes. private final Set<ChannelHandlerContext> initMap = Collections.newSetFromMap( new ConcurrentHashMap<ChannelHandlerContext, Boolean>()); /** * This method will be called once the {@link Channel} was registered. After the method returns this instance * will be removed from the {@link ChannelPipeline} of the {@link Channel}. * * @param ch the {@link Channel} which was registered. * @throws Exception is thrown if an error occurs. In that case it will be handled by * {@link #exceptionCaught(ChannelHandlerContext, Throwable)} which will by default close * the {@link Channel}. */ protected abstract void initChannel(C ch) throws Exception; ················· }

    典型的用法如下:在initChannel方法中加入自己的ChannelHandler

    import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; /** * Created by siiiriu on 2020/7/3. */ public class EchoServer2 { private int port; public EchoServer2(int port) { this.port = port; } public void run() throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1) EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); // (2) b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) // (3) .childHandler(new ChannelInitializer<SocketChannel>() { // (4) @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new EchoHandler()); } }) .option(ChannelOption.SO_BACKLOG, 128) // (5) .childOption(ChannelOption.SO_KEEPALIVE, true); // (6) // Bind and start to accept incoming connections. ChannelFuture f = b.bind(port).sync(); // (7) // Wait until the server socket is closed. // In this example, this does not happen, but you can do that to gracefully // shut down your server. f.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } public static void main(String[] args) throws Exception { int port = 8080; if (args.length > 0) { port = Integer.parseInt(args[0]); } new EchoServer2(port).run(); } } class EchoHandler extends ChannelInboundHandlerAdapter { // (1) @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ctx.write(msg); // (1) ctx.flush(); // (2) } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4) // Close the connection when an exception is raised. cause.printStackTrace(); ctx.close(); } }

    既然他是用来初始化pipline的,但是如果我们只有一个handler的话,还需要他吗? 直接上代码。(示例修改自netty官网)

    import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; /** * Created by siiiriu on 2020/7/3. */ public class EchoServer { private int port; public EchoServer(int port) { this.port = port; } public void run() throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1) EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); // (2) b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) // (3) .childHandler(new EchoHandler()) .option(ChannelOption.SO_BACKLOG, 128) // (5) .childOption(ChannelOption.SO_KEEPALIVE, true); // (6) // Bind and start to accept incoming connections. ChannelFuture f = b.bind(port).sync(); // (7) // Wait until the server socket is closed. // In this example, this does not happen, but you can do that to gracefully // shut down your server. f.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } public static void main(String[] args) throws Exception { int port = 8080; if (args.length > 0) { port = Integer.parseInt(args[0]); } new EchoServer(port).run(); } } class EchoHandler extends ChannelInboundHandlerAdapter { // (1) @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ctx.write(msg); // (1) ctx.flush(); // (2) } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4) // Close the connection when an exception is raised. cause.printStackTrace(); ctx.close(); } }

    运行,然后使用nc命令连接

    可以看到,能正常工作,我们发送过去的内容也全部正常返回了。 我们再起一个nc进行连接试试 连接上了,但是服务器控制台有报错输出,报错如下:

    21:32:43.818 [nioEventLoopGroup-2-1] WARN io.netty.channel.DefaultChannelPipeline - An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception. io.netty.channel.ChannelPipelineException: com.netty.EchoHandler is not a @Sharable handler, so can't be added or removed multiple times. at io.netty.channel.DefaultChannelPipeline.checkMultiplicity(DefaultChannelPipeline.java:600) at io.netty.channel.DefaultChannelPipeline.addLast(DefaultChannelPipeline.java:202) at io.netty.channel.DefaultChannelPipeline.addLast(DefaultChannelPipeline.java:381) at io.netty.channel.DefaultChannelPipeline.addLast(DefaultChannelPipeline.java:370) at io.netty.bootstrap.ServerBootstrap$ServerBootstrapAcceptor.channelRead(ServerBootstrap.java:212) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) at io.netty.channel.nio.AbstractNioMessageChannel$NioMessageUnsafe.read(AbstractNioMessageChannel.java:93) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.lang.Thread.run(Thread.java:748)

    报错显示,我们的EchoHandler is not a @Sharable handler 好的,我们给EchoHandler加上@Sharable注解。这下可以正常工作了,这个注解的意思是就是这个处理器的同一个对象是可以被多个连接复用的,这个相当于是一种netty做的多线程保护,如果一个handler中有一些成员变量,如果不进行多线程安全的处理,那么该handler的同一个对象(也就是上面new EchoHandler()这个对象,因为就new了一次,所以是同一个对象被多个连接共享)被多个连接共享的时候可能会线程不安全。 因为这里只能传递一个对象进去,所以这里如果不进行@Sharable处理的话,就是无解的,但是框架不应该限制用户的这种行为,所以ChannelInitializer就出场了,可以看到,这个类上面也就加了@Sharable注解。可以看下它内部做了什么。

    @Sharable public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter { ·········· protected abstract void initChannel(C ch) throws Exception; ········· /** * {@inheritDoc} If override this method ensure you call super! */ @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { if (ctx.channel().isRegistered()) { // This should always be true with our current DefaultChannelPipeline implementation. // The good thing about calling initChannel(...) in handlerAdded(...) is that there will be no ordering // surprises if a ChannelInitializer will add another ChannelInitializer. This is as all handlers // will be added in the expected order. if (initChannel(ctx)) { // We are done with init the Channel, removing the initializer now. removeState(ctx); } } } ······· @SuppressWarnings("unchecked") private boolean initChannel(ChannelHandlerContext ctx) throws Exception { if (initMap.add(ctx)) { // Guard against re-entrance. try { initChannel((C) ctx.channel()); } catch (Throwable cause) { // Explicitly call exceptionCaught(...) as we removed the handler before calling initChannel(...). // We do so to prevent multiple calls to initChannel(...). exceptionCaught(ctx, cause); } finally { ChannelPipeline pipeline = ctx.pipeline(); if (pipeline.context(this) != null) { pipeline.remove(this); } } return true; } return false; } ······ }

    在有新连接时,ChannelInitializer被加入到pipline中,channelAdded被调用,此时会调用InitChannel,将我们自己的handler加入,可以看到这时,虽然ChannelInitializer对象只有一个,但是我们自己的handler在每个连接中都会是一个新的对象,所以在这种情况下,我们就不需要在我们的处理类上使用@Sharable注解了。 注意一个小细节,在channelAdded方法执行完成以后,会将ChannelInitializer从pipline中删除,也就是这个handler只起到初始化handler执行链的过程,初始化完成后,就移除掉了。

    结论:ChannelInitializer起到了一个过桥的作用

    附:channel生命周期

    Processed: 0.010, SQL: 9