Netty实战二 | Netty应用程序快速开发

    技术2024-10-07  56

    免责声明:本人最近在研读《Netty实战》书籍,对于里面内容颇感兴趣,本文旨在于技术学习交流,不存在盈利性目的。

    在文中, 我们将展示如何构建一个基于 Netty 的客户端和服务器。 应用程序很简单:客户端将消息发送给服务器,而服务器再将消息回送给客户端。

    Netty 客户端/服务器概览

    下图展示了一个将要编写的 Echo 客户端和服务器应用程序的总体结构图,本文主要的目的是编写基于 Web 的用于被浏览器访问的应用程序:

    Echo 客户端和服务器

    该结构支持多个客户端同时连接到一台服务器。所能够支持的客户端数量,在理论上,仅受限于系统的可用资源(以及所使用的 JDK 版本可能会施加的限制)。Echo 客户端和服务器之间的交互是非常简单的;在客户端建立一个连接之后,它会向服务 器发送一个或多个消息,反过来,服务器又会将每个消息回送给客户端。虽然它本身看起来好像用处不大,但它充分地体现了客户端/服务器系统中典型的请求-响应交互模式。

    编写 Echo 服务器

    所有的 Netty 服务器都需要以下两部分。

    至少一个 ChannelHandler—该组件实现了服务器对从客户端接收的数据的处理,即它的业务逻辑。引导—这是配置服务器的启动代码。至少,它会将服务器绑定到它要监听连接请求的端口上(相当于程序入口)。

    ChannelHandler 和业务逻辑

    ChannelHandler是一个接口族的父接口,它的实现负责接收并响应事件通知。在 Netty 应用程序中,所有的数据处理逻辑都包含在这些核心抽象的实现中。Echo 服务器会响应传入的消息,它需要实现 ChannelInboundHandler 接口, 用来定义响应入站事件的方法。这个简单的应用程序只需要用到少量的这些方法,所以继承 ChannelInboundHandlerAdapter 类也就足够了, 它提供了ChannelInboundHandler 的默认实现。

    ChannelHandler 实现类代码如下:

    @Sharable //该注解表示Channel可以被多个进程共享,否则只能连接一个客户端 public class EchoServerHandler extends ChannelInboundHandlerAdapter { @Override //从Channel中读取消息 public void channelRead(ChannelHandlerContext ctx, Object msg) { ByteBuf in = (ByteBuf) msg; System.out.println( "Server received: " + in.toString(CharsetUtil.UTF_8)); ctx.write(in); //将消息写给写给发送者(缓冲区中,而不是发送出去),而不冲刷出站消息 } @Override //读消息完成之后的事件 public void channelReadComplete(ChannelHandlerContext ctx) { ctx.writeAndFlush(Unpooled.EMPTY_BUFFER) //将缓冲区中剩余消息都刷出去 .addListener(ChannelFutureListener.CLOSE); } @Override //异常处理方法 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }

    该业务逻辑类主要实现了三个方法:

    channelRead()—对于每个传入的消息都要调用;channelReadComplete()—通知ChannelInboundHandler最后一次对channelRead()的调用是当前批量读取中的最后一条消息;exceptionCaught()—在读取操作期间, 有异常抛出时会调用。

    ChannelInboundHandlerAdapter 有一个直观的 API,并且它的每个方法都可以被重写以挂钩到事件生命周期的恰当点上。因为需要处理所有接收到的数据,所以重写了 channelRead()方法。在这个服务器应用程序中,将数据简单地回送给了远程节点。重写 exceptionCaught()方法允许你对 Throwable 的任何子类型做出反应, 在这里记录了异常并关闭了连接。虽然一个更加完善的应用程序也许会尝试从异常中恢复,但在这个场景下,只是通过简单地关闭连接来通知远程节点发生了错误。

    如果不捕获异常,会发生什么呢?

    每个 Channel 都拥有一个与之相关联的 ChannelPipeline,其持有一个 ChannelHandler 的实例链。在默认的情况下, ChannelHandler 会把对它的方法的调用转发给链中的下一个 ChannelHandler。因此,如果 exceptionCaught()方法没有被该链中的某处实现,那么所接收的异常将会被传递到 ChannelPipeline 的尾端并被记录。为此,你的应用程序应该提供至少有一个实现了 exceptionCaught()方法的 ChannelHandler。

    配置服务器的启动代码

    服务器的启动代码相当于是应用程序的启动类,在启动前需要配置服务器的地址和端口号等信息。具体涉及以下内容:

    绑定到服务器将在其上监听并接受传入连接请求的端口;配置 Channel,以将有关的入站消息通知给 EchoServerHandler 实例。

    启动类代码如下:

    public class EchoServer { private final int port; public EchoServer(int port) { this.port = port; } public static void main(String[] args) throws Exception { if (args.length != 1) { System.err.println( "Usage: " + EchoServer.class.getSimpleName() + " <port>"); } int port = Integer.parseInt(args[0]); new EchoServer(port).start(); } public void start() throws Exception { final EchoServerHandler serverHandler = new EchoServerHandler(); EventLoopGroup group = new NioEventLoopGroup(); //1、创建EventLoopGroup try { ServerBootstrap b = new ServerBootstrap(); //2、创建ServerBootstrap b.group(group) .channel(NioServerSocketChannel.class) //3、指定使用NIO传输Channel .localAddress(new InetSocketAddress(port)) //4、使用指定的端口设置套接字地址 .childHandler(new ChannelInitializer<SocketChannel>(){ @Override //5、添加一个EchoServerHandler 到子Channel的ChannelPipeline public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(serverHandler); //EchoServerHandler 被标注为@Shareable,所以可以总是使用同一个的实例。 } }); ChannelFuture f = b.bind().sync(); //6、异步地绑定服务器;调用 sync()方法阻塞 等待直到绑定完成 f.channel().closeFuture().sync(); //7、获取 Channel 的CloseFuture,并 且阻塞当前线 } finally { group.shutdownGracefully().sync(); //8、阻塞式关闭 } } }

    因为使用的是 NIO 传输, 所以指定了 NioEventLoopGroup 来接受和处理新的连接,并且将 Channel 的类型指定为NioServerSocketChannel 。将本地地址设置为一个具有选定端口的 InetSocketAddress 。服务器将绑定到这个地址以监听新的连接请求。

    ChannelInitializer类作用:当一个新的连接被接受时,一个新的子 Channel 将会被创建,而 ChannelInitializer 将会把一个你的 EchoServerHandler 的实例(如果设置了共享,则可以重复利用一个实例)添加到该 Channel 的 ChannelPipeline 中。这个 ChannelHandler 将会收到有关入站消息的通知。

    虽然 NIO 是可伸缩的, 但是其适当的尤其是关于多线程处理的配置并不简单。 Netty 的设计封装了大部分的复杂性。该示例使用了NIO,因为得益于它的可扩展性和彻底的异步性,它是目前使用最广泛的传输。但是也可以使用一个不同的传输实现。如果你想要在自己的服务器中使用 OIO 传输,将需要指定 OioServerSocketChannel 和 OioEventLoopGroup。

    总结引导过程中所需要的步骤如下:

    创建一个 ServerBootstrap 的实例以引导和绑定服务器;创建并分配一个 NioEventLoopGroup 实例以进行事件的处理,如接受新连接以及读/写数据;指定服务器绑定的本地的 InetSocketAddress;使用一个 EchoServerHandler 的实例初始化每一个新的 Channel;调用 ServerBootstrap.bind()方法以绑定服务器。

    至此,服务器已经初始化,并且已经就绪能被使用了。

    编写 Echo 客户端

    Echo 客户端主要有以下几个流程:

    连接到服务器;发送一个或者多个消息;对于每个消息,等待并接收从服务器发回的相同的消息;关闭连接。

    编写客户端所涉及的两个主要代码部分也是业务逻辑和引导,和在服务器中的一样。

    通过 ChannelHandler 实现客户端逻辑

    如同服务器,客户端将拥有一个用来处理数据的 ChannelInboundHandler。在这个场景下,你将扩展 SimpleChannelInboundHandler 类以处理所有必须的任务。客户端的客户端的 ChannelHandler代码如下:

    @Sharable //设置可共享 public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> { @Override //当被通知 Channel是活跃的时候,发送一条消息 public void channelActive(ChannelHandlerContext ctx) { ctx.writeAndFlush(Unpooled.copiedBuffer("Netty rocks!", CharsetUtil.UTF_8)); } @Override //记录已接收消息的转储 public void channelRead(ChannelHandlerContext ctx, ByteBuf in) { System.out.println("Client received: " + in.toString(CharsetUtil.UTF_8)); } @Override //在发生异常时,记录错误并关闭Channel public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }

    首先,重写了 channelActive()方法,其将在一个连接建立时被调用。这确保了数据将会被尽可能快地写入服务器,其在这个场景下是一个编码了字符串"Netty rocks!"的字节缓冲区。

    接下来,重写了 channelRead0()方法。 每当接收数据时,都会调用这个方法。需要注意的是,由服务器发送的消息可能会被分块接收。 也就是说,如果服务器发送了 5 字节, 那么不能保证这 5 字节会被一次性接收。 即使是对于这么少量的数据, channelRead0()方法也可能会被调用两次,第一次使用一个持有 3 字节的 ByteBuf( Netty 的字节容器),第二次使用一个持有 2 字节的 ByteBuf。作为一个面向流的协议, TCP 保证了字节数组将会按照服务器发送它们的顺序被接收。

    重写的第三个方法是 exceptionCaught()。如同在 EchoServerHandler中所示,记录 Throwable, 关闭 Channel,在这个场景下, 终止到服务器的连接。

    总结ChannelHandle中使用到的方法:

    channelActive()——在到服务器的连接已经建立之后将被调用;channelRead0()——当从服务器接收到一条消息时被调用;exceptionCaught()——在处理过程中引发异常时被调用。

    SimpleChannelInboundHandler 与 ChannelInboundHandler:

    为什么我们在客户端使用的是 SimpleChannelInboundHandler,而不是在 EchoServerHandler 中所使用的ChannelInboundHandlerAdapter 呢?这和两个因素的相互作用有关:业务逻辑如何处理消息以及 Netty 如何管理资源。

    在客户端,当 channelRead0()方法完成时,你已经有了传入消息,并且已经处理完它了。当该方法返回时,SimpleChannelInboundHandler 负责释放指向保存该消息的 ByteBuf 的内存引用。

    在 EchoServerHandler 中,你仍然需要将传入消息回送给发送者,而 write()操作是异步的,直到 channelRead()方法返回后可能仍然没有完成。为此, EchoServerHandler扩展了 ChannelInboundHandlerAdapter,其在这个时间点上不会释放消息。消息在 EchoServerHandler 的 channelReadComplete()方法中,当 writeAndFlush()方法被调用时被释放。

    引导客户端

    引导客户端类似于引导服务器,不同的是, 客户端是使用主机和端口参数来连接远程地址,也就是这里的 Echo 服务器的地址,而不是绑定到一个一直被监听的端口。引导客户端代码如下所示:

    public class EchoClient { private final String host; private final int port; public EchoClient(String host, int port) { this.host = host; this.port = port; } public void start() throws Exception { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); //创建 Bootstrap b.group(group) .channel(NioSocketChannel.class) //指定 EventLoopGroup 以处理客户端事件;需要适用于 NIO 的实现。适用于 NIO 传输的Channel 类型 .remoteAddress(new InetSocketAddress(host, port)) //设置服务器的InetSocketAddress .handler(new ChannelInitializer<SocketChannel>() { //在创建Channel时,向 ChannelPipeline中添加一个 EchoClientHandler 实例 @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new EchoClientHandler()); } }); ChannelFuture f = b.connect().sync(); //连接到远程节点, 阻塞等待直到连接完成 f.channel().closeFuture().sync(); //阻塞, 直到Channel 关闭 } finally { group.shutdownGracefully().sync(); //关闭线程池并且释放所有的资源 } } public static void main(String[] args) throws Exception { if (args.length != 2) { System.err.println("Usage: " + EchoClient.class.getSimpleName() + " <host> <port>"); return; } String host = args[0]; int port = Integer.parseInt(args[1]); new EchoClient(host, port).start(); } }

    在这里,使用了 NIO 传输。注意,也可以在客户端和服务器上分别使用不同的传输。例如,在服务器端使用 NIO 传输,而在客户端使用 OIO 传输。

    总结该部分的步骤:

    为初始化客户端, 创建了一个 Bootstrap 实例;为进行事件处理分配了一个 NioEventLoopGroup 实例, 其中事件处理包括创建新的连接以及处理入站和出站数据;为服务器连接创建了一个 InetSocketAddress 实例;当连接被建立时,一个 EchoClientHandler 实例会被安装到(该 Channel 的)ChannelPipeline 中;在一切都设置完成后,调用 Bootstrap.connect()方法连接到远程节点。

    运行结果

    同时启动服务端和客户端,在客户端的控制台中将显示:

    客户端运行结果

    在服务端将显示:

    服务端显示结果

     

     

    Processed: 0.013, SQL: 9