Netty使用记录

    技术2024-04-12  78

    1、注册服务

    服务端,开启8080端口,当有数据写入当前端口时,建立通道连接;

    @Autowired private CustTomServerInitializer custTomServerInitializer; //Spring初始化后执行netty启动方法 @PostConstruct public void start() { ServerBootstrap boot = new ServerBootstrap(); EventLoopGroup bossGroup = new NioEventLoopGroup(4); EventLoopGroup workerGroup = new NioEventLoopGroup(4); boot.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(custTomServerInitializer) .option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.SO_KEEPALIVE, true); new Thread(()->startListener(boot,8080)).start(); } private void startListener(ServerBootstrap serverBootstrap,int port){ try{ serverBootstrap.bind(port).sync().channel().closeFuture().sync(); }catch (Exception ex){ LOGGER.error("start listener err",ex); } }

    客户端,连接某个IP的某个端口

    public class PlatformClient implements Client { private static final Logger LOGGER = LoggerFactory.getLogger(PlatformClient.class); private String host; private int port; Bootstrap boot = null; EventLoopGroup group = null; private boolean isActive = false; private static final int MAX_FRAME_LENGTH = 8192; public String getHost() { return host; } public void setHost(String host) { this.host = host; } public int getPort() { return port; } public void setPort(int port) { this.port = port; } public PlatformClient() { } public PlatformClient(String host, int port) { this.host = host; this.port = port; } @Override public void init() { boot = new Bootstrap(); group = new NioEventLoopGroup(4, Executors.newFixedThreadPool(4)); boot.group(group); boot.channel(NioSocketChannel.class); boot.remoteAddress(host, port); boot.handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel channel) throws Exception { ChannelPipeline channelPipeline = channel.pipeline(); channelPipeline.addLast("link", new LocalLinkHandler(PlatformClient.class)); channelPipeline.addLast(new LengthFieldBasedFrameDecoder(MAX_FRAME_LENGTH, 22, 2, 1, 0)); channelPipeline.addLast("decoder", new ProtocolDecoder(LOGGER)); channelPipeline.addLast("encoder", new ProtocolEncoder(LOGGER)); channelPipeline.addLast("timeout", new WriteTimeoutHandler(60)); channelPipeline.addLast("business", new LocalBusinessHandler(PlatformClient.class)); } }); } @Override public void start() { try { try { ChannelFuture future = boot.connect().sync(); isActive = future.isSuccess(); future.channel().closeFuture().sync(); } finally { group.shutdownGracefully().sync(); isActive = false; } } catch (Exception e) { LOGGER.error("ClientComm boot exception.", e); } } @Override public boolean isActive() { return isActive; } } @Override public void init() { if (CollectionUtils.isEmpty(HOST_PORT_MAP)) { HOST_PORT_MAP.put(host, port); } HOST_PORT_MAP.forEach((host, port) -> { hostPortClientMap.put(host + port, new PlatformClient(host, port)); }); connRobot = new ConnRobot(); connRobotExecutor = Executors.newSingleThreadScheduledExecutor(); } @Override public void start() { // 第一次5秒后执行,后续每5秒检测一次进行重连 connRobotExecutor.scheduleAtFixedRate(connRobot, 3, 3, TimeUnit.SECONDS); } /** * 负责心跳和重连 */ private class ConnRobot implements Runnable { @Override public void run() { hostPortClientMap.forEach((hostPort, client) -> { if (!client.isActive()) { new Thread(new Runnable() { @Override public void run() { client.init(); client.start(); } }).start(); } }); } }

    2、激活Channel

    服务端和客户端类似,都是实现ChannelHandler

    @Service public class CustTomServerInitializer extends ChannelInitializer<SocketChannel> { public static final Logger LOGGER = LoggerFactory.getLogger(JTTerminalServerInitializer.class); @Autowired private CustTomServerHandler custTomServerHandler; @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); pipeline.addLast(new DelimiterBasedFrameDecoder(8192, Unpooled.copiedBuffer(JT_BEGIN_STR.getBytes()), Unpooled.copiedBuffer(JT_BEGIN_STR2.getBytes()) )); if (LOGGER.isDebugEnabled()) { pipeline.addLast(new LoggingHandler(LogLevel.INFO)); } pipeline.addLast(new IdleStateHandler(100,0,0, TimeUnit.SECONDS)); pipeline.addLast(custTomServerHandler); } } /** *重写SimpleChannelInboundHandler方法 **/ @Component @ChannelHandler.Sharable public class CustTomServerHandler extends SimpleChannelInboundHandler<ByteBuf>{ }

    客户端

    public class LocalBusinessHandler extends ChannelInboundHandlerAdapter { }

    addLast是维护一个链表,保存第一个和最后一个Handler,前一个Handler有后一个Handler的引用;

    ChannelInboundHandler方法

    方法描述channelRegistered当一个Channel注册到EventLoop上,可以处理I/O时被调用channelUnregistered当一个Channel从它的EventLoop上解除注册,不再处理I/O时被调用channelActive当Channel变成活跃状态时被调用;Channel是连接/绑定、就绪的channelInactive当Channel离开活跃状态,不再连接到某个远端时被调用channelReadComplete当Channel上的某个读操作完成时被调用channelRead当从Channel中读数据时被调用

    ChannelOutboundHandler方法

    方法描述bind(ChannelHandlerContext,SocketAddress,ChannelPromise)请求绑定 Channel 到一个本地地址connect(ChannelHandlerContext, SocketAddress,SocketAddress,ChannelPromise)请求连接 Channel 到远端disconnect(ChannelHandlerContext, ChannelPromise)请求从远端断开 Channelclose(ChannelHandlerContext,ChannelPromise)请求关闭 Channelderegister(ChannelHandlerContext, ChannelPromise)请求 Channel 从它的 EventLoop 上解除注册read(ChannelHandlerContext)请求从 Channel 中读更多的数据flush(ChannelHandlerContext)请求通过 Channel 刷队列数据到远端write(ChannelHandlerContext,Object, ChannelPromise)请求通过 Channel 写数据到远

     

    Processed: 0.013, SQL: 9