Netty网络编程实战 - 用Netty手写实现一个UDP通知消息广播

    技术2025-05-18  46

    前言

    UDP 是面向无连接的通讯协议,UDP 数据包括目的端口号和源端口号信息, 由于通讯不需要连接,所以可以实现广播发送。Netty也为我们封装相关支持UDP诸多组件、数据报文和处理器。

     

    UDP 通讯时不需要接收方确认,属于不可靠的传输,可能会出现丢包现象, 实际应用中要求程序员编程验证。

    UDP 与 TCP 位于同一层,但它不管数据包的顺序、错误或重发。因此,UDP 不被应用于那些面向连接的服务,UDP 主要用于那些面向查询---应答的服务,例 如 NFS。相对于 FTP 或 Telnet,这些服务需要交换的信息量较小。使用 UDP 的服 务包括 NTP(网络时间协议)和 DNS(DNS 也使用 TCP),包总量较少的通信(DNS、 SNMP 等);2.视频、音频等多媒体通信(即时通信);3.限定于 LAN 等特定网 络中的应用通信;4.广播通信(广播、多播)。

     

    常用的 QQ,就是一个以 UDP 为主,TCP 为辅的通讯协议。 TCP 和 UDP 的优缺点无法简单地、绝对地去做比较:TCP 用于在传输层有 必要实现可靠传输的情况;而在一方面,UDP 主要用于那些对高速传输和实时 性有较高要求的通信或广播通信。TCP 和 UDP 应该根据应用的目的按需使用。

     

    Netty中UDP的核心组件

    Netty 的 DatagramPacket 是一个简单的消息容器,DatagramChannel 实现用它来和远程 节点通信。类似于在我们先前的类比中的明信片,它包含了接收者(和可选的发送者)的地 址以及消息的有效负载本身。

     

    让我们来运用Netty的核心组件,构建一个基于UDP的多播应用。

     

    代码设计实现

     广播端部分

    /** * @author andychen https://blog.51cto.com/14815984 * @description:通知的广播端 */ public class NoticeBroadcast {    //广播线程组    private final EventLoopGroup group;    //广播启动器    private final Bootstrap boot;    /**     * 默认构造     * @param remotePort 接收端端口     */    public NoticeBroadcast(int remotePort) {        this.group = new NioEventLoopGroup();        this.boot = new Bootstrap();        //绑定NioDatagramChannel数据报通道        this.boot.group(group).channel(NioDatagramChannel.class)        //设置通道用于广播        .option(ChannelOption.SO_BROADCAST, true)        .handler(new NoticeEncoder(new InetSocketAddress(Constant.BROADCAST_IP, remotePort)));    }    /**     * 运行广播     */    public void run() throws Exception {        int count = 0;        //绑定广播通道        Channel channel = this.boot.bind(0).sync().channel();        System.out.println("开始运行广播,发送通知,目标所有主机端口("+Constant.ACCEPTER_PORT+")...");        //循环广播通知        for (;;){            /**             * 发送通知到接收端             */           channel.writeAndFlush(new Notice(++count, Constant.getNotice(),null));           //间隔3秒发送            try {                Thread.sleep(3000);            } catch (InterruptedException e) {                Thread.interrupted();                e.printStackTrace();                break;            }        }    }    /**     * 停止运行     */    public void stop(){        try {            this.group.shutdownGracefully();        } catch (Exception e) {            e.printStackTrace();        }    } } /** * @author andychen https://blog.51cto.com/14815984 * @description:广播运行器 */ public class BroadcastRunner {    /**     * 运行消息广播     * @param args     */    public static void main(String[] args) {        NoticeBroadcast broadcast = null;        try {            broadcast = new NoticeBroadcast(Constant.ACCEPTER_PORT);            broadcast.run();        } catch (Exception e) {            e.printStackTrace();        } finally {            broadcast.stop();        }    } } /** * @author andychen https://blog.51cto.com/14815984 * @description:通知编码器 */ public class NoticeEncoder extends MessageToMessageEncoder<Notice> {    //目的地    private final InetSocketAddress target;    public NoticeEncoder(InetSocketAddress target) {        this.target = target;    }    /**     * 编码方法实现     * @param ctx 处理器上下文     * @param notice 通知对象     * @param list 集合     * @throws Exception     */    protected void encode(ChannelHandlerContext ctx, Notice notice, List<Object> list) throws Exception {        //内容数据        byte[] bytes = notice.getContent().getBytes(CharsetUtil.UTF_8);        //定义缓冲:一个int型+一个long型+内容长度+分隔符        int capacity = 4+8+bytes.length+1;        ByteBuf buf = ctx.alloc().buffer(capacity);        //写通知id        buf.writeInt(notice.getId());        //发送时间        buf.writeLong(notice.getTime());        //分隔符        buf.writeByte(Notice.SEPARATOR);        //内容        buf.writeBytes(bytes);        //加入消息列表        list.add(new DatagramPacket(buf, target));    } }

     

    接收端部分

     

    /** * @author andychen https://blog.51cto.com/14815984 * @description:通知接收器 */ public class NoticeAccepter {    //通知线程组    private final EventLoopGroup group;    //启动器    private final Bootstrap boot;    public NoticeAccepter() {        this.group = new NioEventLoopGroup();        this.boot = new Bootstrap();        this.boot.group(this.group)                .channel(NioDatagramChannel.class)                //开启通道底层广播                .option(ChannelOption.SO_BROADCAST, true)                //端口重用                .option(ChannelOption.SO_REUSEADDR, true)                .handler(new ChannelInitializer<Channel>() {                    @Override                    protected void initChannel(Channel channel) throws Exception {                        ChannelPipeline pipeline = channel.pipeline();                        pipeline.addLast(new NoticeDecoder());                        pipeline.addLast(new NoticeChannelHanler());                    }                })                .localAddress(Constant.ACCEPTER_PORT);    }    /**     * 运行接收器     */    public void run(){        try {            //设置不间断接收消息,并绑定通道            Channel channel = this.boot.bind().syncUninterruptibly().channel();            System.out.println("接收器启动,端口("+ Constant.ACCEPTER_PORT+"),等待接收通知...");            //通道阻塞,直到关闭            channel.closeFuture().sync();        } catch (InterruptedException e) {            e.printStackTrace();        }finally {           this.stop();        }    }    /**     * 停止接收消息     */    public void stop(){        try {            this.group.shutdownGracefully();        } catch (Exception e) {            e.printStackTrace();        }    } } /** * @author andychen https://blog.51cto.com/14815984 * @description:通知通道处理器 */ public class NoticeChannelHanler extends SimpleChannelInboundHandler<Notice> {    /**     * 接收广播传递过来的报文     * @param channelHandlerContext     * @param notice     * @throws Exception     */    @Override    protected void channelRead0(ChannelHandlerContext channelHandlerContext, Notice notice) throws Exception {        StringBuffer buffer = new StringBuffer();        buffer.append("时间[");        buffer.append(notice.getTime());        buffer.append("],广播源[");        buffer.append(notice.getSource().toString());        buffer.append("]=====[");        buffer.append(notice.getId());        buffer.append("]=====通知内容:");        buffer.append(notice.getContent());        //打印接收到的数据        System.out.println(buffer.toString());    }    /**     * 异常捕获     * @param ctx 上下文     * @param cause     * @throws Exception 异常信息     */    @Override    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {        cause.printStackTrace();        ctx.close();    } }

     

    /** * @author andychen https://blog.51cto.com/14815984 * @description:通知解码器 */ public class NoticeDecoder extends MessageToMessageDecoder<DatagramPacket> {    /**     * 解码器核心实现     * @param channelHandlerContext 处理器上下文     * @param datagramPacket 数据报     * @param list 消息列表     * @throws Exception     */    @Override    protected void decode(ChannelHandlerContext channelHandlerContext, DatagramPacket datagramPacket, List<Object> list) throws Exception {        //数据报内容        ByteBuf data = datagramPacket.content();        //通知id        int id = data.readInt();        //发送时间        long time = data.readLong();        //分隔符        data.readByte();        //当前索引        int idx = data.readerIndex();        //通知内容        String content = data.slice(idx, data.readableBytes()).toString(CharsetUtil.UTF_8);        //加入消息列表        list.add(new Notice(id,content, datagramPacket.sender()));    } } /** * @author andychen https://blog.51cto.com/14815984 * @description:消息接收器启动器 */ public class AccepterRunner {    /**     * 运行通知接收任务     * @param args     */    public static void main(String[] args) {        NoticeAccepter accepter = null;        try {            accepter = new NoticeAccepter();            accepter.run();        } catch (Exception e) {            e.printStackTrace();        }finally {            accepter.stop();        }    } }

     

    其它部分

     

    /** * @author andychen https://blog.51cto.com/14815984 * @description:通知信息 */ public class Notice {    public int getId() {        return id;    }    public long getTime() {        return time;    }    public String getContent() {        return content;    }    public InetSocketAddress getSource() {        return source;    }    //通知id    private final int id;    //发送时间    private final long time;    //通知内容    private final String content;    //来源地址    private final InetSocketAddress source;    //分隔符    public static final byte SEPARATOR = (byte) ':';    public Notice(int id, String content, InetSocketAddress source) {        this.id = id;        this.content = content;        this.source = source;        this.time = System.currentTimeMillis();    } }

     

    /** * @author andychen https://blog.51cto.com/14815984 * @description:系统常量 */ public class Constant {    /**     * 广播地址     */    public static final String BROADCAST_IP = "255.255.255.255";    /**     * 接收者端口(固定的)     */    public static final int ACCEPTER_PORT = 8700;    /**     * 通知池     */    private static final String[] NOTICE_POOL = {            "多国疫情突然反弹,北京下一步怎么办?",            "端午假期,这位政法委书记去了边境",            "省委书记、省长们的端午假期",            "人社部回应图书馆留言大叔 送吴桂春们求职就业指南",            "北京通报病例情况,有句话出现十多次",            "北京26日新增病例活动轨迹公布!涉及这些地方!\n",            "蚊蝇增多是否会传播新冠病毒?官方回应来了",            "俄罗斯将扩大直接对华供应食品地区名单",            "一图看懂:新发地到底有多大多复杂?",            "倒闭、亏损、坏账,影视行业如何“活下去”?",            "警察献血反被辱!香港医管局:致歉并展开调查",            "华为获准在英国建研发中心 美官员“打招呼”过问",            "105所高校通过认证!教育部公布一份重磅名单",            "北京新增确诊17例:北京16天确诊297例",            "印度陆军司令:已在中印实控线做长期准备",            "印度增兵边境做出两冒险动作 中方须做冲突升级准备",            "中印对峙印度下一步会有何行动?偷袭奇袭捞一把就走",            "印度大批军机飞向拉达克想洗刷耻辱 直升机紧急着陆",            "蓬佩奥为何反对伊朗买歼10?将威胁美在波斯湾秩序",            "美国防授权法呼吁美军医疗船安慰号及仁慈号停靠台湾",            "印度造舰能力有多强?媒体:像当年的中日韩值得看好",            "中国6代战机究竟长啥样:机头尖锐无平尾",            "歼16电子战机有多强:干扰距离翻倍 优于美军EA-18G",            "我军PCL09车载炮为何上高原 高低搭配火力覆盖没死角",            "胡锡进:这时候谁愿意去美国?签证留给黄之锋吧",            "美方因涉港问题对中方官员实施签证限制 中方回应",            "我军为何选择6-25高炮放弃单35 根本原因并不在火炮",            "在中国问题上 短视的是莫迪买单的是印度",            "解放军驻吉布提基地官兵已换装星空迷彩服",            "印度陆军司令向防长汇报:在中印实控线做长期准备",            "印度多架军机在中印边境密集活动 直升机紧急着陆",            "我军PCL09车载炮为何上高原 高低搭配火力覆盖没死角",            "蓬佩奥为何反对伊朗买歼10?将威胁美在波斯湾秩序",            "美国防授权法呼吁美军医疗船安慰号及仁慈号停靠台湾"    };    /**     * 获取消息     */    public static String getNotice(){        Random r = new Random();        return NOTICE_POOL[r.nextInt(NOTICE_POOL.length)];    } }

     

    运行验证

    UDP广播端模式,广播端和接收端并无严格地启动顺序;一般来说为了避免开始消息接收不到的问题,可先启动接收端等待。接收端开多个模拟验证。

     

    总结

    因为UDP广播模式的发送针对局域网所有主机IP,所以更适合在公司内部使用项目,类似通知模块和需要全体一起接收的业务场景。但同时鉴于UDP是面向无连接的,消息的发送没有对端的应答等机制。所以是不可靠的传输协议,大家在项目中要评估好业务场景。当然也可以作为辅助手段和TCP协议结合使用为最佳。

     

     

    Processed: 0.015, SQL: 9