Mqtt协议-broker-moqutte在Springboot中的使用

    技术2022-07-10  151

    什么是MQTT协议

    本博客源码:https://github.com/xushuoAI/broker-moqutte-springboot

    https://github.com/mcxiaoke/mqtt 这是一位大佬的中文翻译,大家可以看看。

    在SpringBoot中启动moquette.broker.Server

    https://segmentfault.com/a/1190000016456748 这是一位大佬的整合

    客户端连接到broker并使用

    通过上面的操作,我们可以在启动一个SpringBoot项目的同时启动好moqutte.broker服务。现在我们只需要创建客户端连接到broker服务就可以实现客户端之间的通信了。下面这段代码,大家肯定很熟悉了,网上基本都是这个版本,通过指定服务器的IP地址和端口,我们创建了一个客户端连接到我们启动的Broker服务。然后这里使用的连接方式是回调式链接。

    1.采用阻塞式的连接的(BlockingConnection)

    2.采用回调式的连接 (CallbackConnection)

    3.采用Future样式的连接(FutureConnection)

    https://blog.csdn.net/woaijianning/article/details/17417409参考文章

    public class MQTTServer { private static final Logger LOG = LoggerFactory.getLogger(MQTTServer.class); private final static String CONNECTION_STRING = "tcp://0.0.0.0:1883"; private final static boolean CLEAN_START = true; private final static short KEEP_ALIVE = 30;// 低耗网络,但是又需要及时获取数据,心跳30s public static Topic[] topics = { new Topic("china/beijing", QoS.EXACTLY_ONCE), new Topic("china/tianjin", QoS.AT_LEAST_ONCE), new Topic("china/henan", QoS.AT_MOST_ONCE)}; public final static long RECONNECTION_ATTEMPT_MAX=6; public final static long RECONNECTION_DELAY=2000; public final static int SEND_BUFFER_SIZE=2*1024*1024;//发送最大缓冲为2M public static void main(String arg[]) { MQTT mqtt = new MQTT(); try { //设置服务端的ip mqtt.setHost(CONNECTION_STRING); //连接前清空会话信息 mqtt.setCleanSession(CLEAN_START); //设置重新连接的次数 mqtt.setReconnectAttemptsMax(RECONNECTION_ATTEMPT_MAX); //设置重连的间隔时间 mqtt.setReconnectDelay(RECONNECTION_DELAY); //设置心跳时间 mqtt.setKeepAlive(KEEP_ALIVE); //设置缓冲的大小 mqtt.setSendBufferSize(SEND_BUFFER_SIZE); /* //创建连接 BlockingConnection connection = mqtt.blockingConnection(); //开始连接 connection.connect();*/ // 使用回调式API final CallbackConnection callbackConnection = mqtt .callbackConnection(); // 连接监听 callbackConnection.listener(new Listener() { // 接收订阅话题发布的消息 @Override public void onPublish(UTF8Buffer topic, Buffer payload, Runnable onComplete) { System.out.println("=============receive msg================" + new String(payload.toByteArray())); onComplete.run(); } // 连接失败 @Override public void onFailure(Throwable value) { System.out.println("===========connect failure==========="); callbackConnection.disconnect(null); } // 连接断开 @Override public void onDisconnected() { System.out.println("====mqtt disconnected====="); } // 连接成功 @Override public void onConnected() { System.out.println("====mqtt connected====="); } }); // 连接 callbackConnection.connect(new Callback<Void>() { // 连接失败 public void onFailure(Throwable value) { System.out.println("============连接失败:" + value.getLocalizedMessage() + "============"); } // 连接成功 public void onSuccess(Void v) { // 订阅主题 String topic="msg/normal"; String topic1="msg/disconnect"; Topic[] topics = { new Topic(topic, QoS.AT_LEAST_ONCE), new Topic(topic1, QoS.AT_LEAST_ONCE) }; callbackConnection.subscribe(topics, new Callback<byte[]>() { // 订阅主题成功 public void onSuccess(byte[] qoses) { System.out.println("========订阅成功======="); } // 订阅主题失败 public void onFailure(Throwable value) { System.out.println("========订阅失败======="); callbackConnection.disconnect(null); } }); // 发布消息 callbackConnection.publish(topic, ("Hello,我是服务端").getBytes(), QoS.AT_LEAST_ONCE, true, new Callback<Void>() { public void onSuccess(Void v) { LOG.info("===========消息发布成功============"); } public void onFailure(Throwable value) { LOG.info("========消息发布失败======="); callbackConnection.disconnect(null); } }); } }); try { int count=0; //getClient(); while(true){ /* count++; //订阅的主题 String topic="china/tianjin"; //主题的内容 String message="hello "+count+"chinese people !"; //connection.publish(topic, message.getBytes(), QoS.AT_LEAST_ONCE, false); LOG.info("MQTTServer Message Topic="+topic+" Content :"+message); Thread.sleep(2000);*/ } } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } catch (URISyntaxException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } }

    通过上面的代码,我们就可以简单的创建和实现客户端,通过向主题发送消息,其他订阅了该主题的客户端就可以收到对应的消息。

    客户端在线状态

    但是在使用过程中,我们肯定是想要知道客户端的在线情况,简单的发送消息已经实现了,那我们就来看看如何知道客户端在线情况吧!

    - 客户端如何知道自己是否连接上服务端?

    在上面的代码中我们使用了回调式连接,在该连接方式中

    connection.listener(new Listener() { public void onDisconnected() { // Send a message to a topic LOG.info("----------连接掉线-------------"); } public void onConnected() { LOG.info("----------连接上-------------"); } public void onPublish(UTF8Buffer topic, Buffer payload, Runnable ack) { // You can now process a received message from a topic. // Once process execute the ack runnable. System.out.println("=============receive msg================" + new String(payload.toByteArray())); ack.run(); } public void onFailure(Throwable value) { LOG.info("----------a connection failure occured.-------------"); connection.disconnect(null); // a connection failure occured. } });

    在该Listener中可以监听到客户端是否连接上了服务端。

    - 服务端如何知道有客户端掉线?

    mqttServer.startServer(config, interceptHandlers, null, iAuthenticatorImp, authorizatorPolicy);

    在刚才,我们在Springboot中启动了服务端,在start方法中我们传递了几个参数,其中interceptHandlers就可以实现对客户端的监听。

    package com.hyls.sb.mqtt; import groovy.util.logging.Slf4j; import io.moquette.interception.AbstractInterceptHandler; import io.moquette.interception.messages.InterceptAcknowledgedMessage; import io.moquette.interception.messages.InterceptConnectMessage; import io.moquette.interception.messages.InterceptConnectionLostMessage; import io.moquette.interception.messages.InterceptPublishMessage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; @Slf4j @Component("safetyInterceptHandler") public class SafetyInterceptHandler extends AbstractInterceptHandler { private static final Logger LOG = LoggerFactory.getLogger(MoquetteServer.class); @Override public String getID() { return SafetyInterceptHandler.class.getName(); } @Override public void onConnect(InterceptConnectMessage msg) { super.onConnect(msg); LOG.info("==啥玩意连接上了===="); } @Override public void onConnectionLost(InterceptConnectionLostMessage msg) { super.onConnectionLost(msg); LOG.info("==哦豁,有人掉了ID:===="+msg.getClientID()); LOG.info("==哦豁,有人掉了Name:===="+msg.getUsername()); } @Override public void onPublish(InterceptPublishMessage msg) { super.onPublish(msg); } @Override public void onMessageAcknowledged(InterceptAcknowledgedMessage msg) { super.onMessageAcknowledged(msg); } }

    我们自定义一个类,实现对应的监听方法。就可以知道有客户端掉线了。那是如何判断掉线呢。刚才我们新建客户端的时候设置了心跳这一值,该值就是服务端判断客户端有没有掉线的依据,超过心跳时间没有收到客户端发送的数据,服务端就认为客户端已掉线。(这里的代码,在上面的那个链接中。)

    知道有客户端掉了,是谁掉了?

    在实际应用中,我们知道有客户端掉了,但是只有一个clientId,这个ID不是我们设置的,知道这个ID我们也不知道是那一个客户端掉了。

    package io.moquette.interception.messages; public class InterceptConnectionLostMessage implements InterceptMessage { private final String clientID; private final String username; public InterceptConnectionLostMessage(String clientID, String username) { this.clientID = clientID; this.username = username; } public String getClientID() { return this.clientID; } public String getUsername() { return this.username; } }

    查看moquette.messages源码,如上,我们发现有一个userName属性,诶,我们就可以通过设置这个userName就知道是谁掉了涩。 在上面新建客户端的时候我们

    mqtt.setHost(CONNECTION_STRING); //连接前清空会话信息 mqtt.setCleanSession(CLEAN_START); //设置重新连接的次数 mqtt.setReconnectAttemptsMax(RECONNECTION_ATTEMPT_MAX); //设置重连的间隔时间 mqtt.setReconnectDelay(RECONNECTION_DELAY); //设置心跳时间 mqtt.setKeepAlive(KEEP_ALIVE);

    set了一堆东西,那有没有setName方法呢。 哎呀,果然有该方法

    mqtt.setUserName("王德发");

    这里我们新增设置一个name,叫王德发(嘿嘿 滑稽)。 这下我们就知道是谁掉了。赶紧试试 就在我们翘着二郎腿,美滋滋的时候一运行

    LOG.error("Authenticator has rejected the MQTT credentials CId={}, username={}", clientId, login);

    控制台就出现这个玩意,一看出现了权限问题。哦豁,咋搞 原来我们设置了运行匿名访问服务端,所有之前设置的客户端可以匿名访问,现在我们设置了一个username,Mqtt就走了鉴权,但是我们没有设置对应的鉴权规则。查看鉴权源码

    private boolean login(MqttConnectMessage msg, String clientId) { if (msg.variableHeader().hasUserName()) { byte[] pwd = null; if (msg.variableHeader().hasPassword()) { pwd = msg.payload().password().getBytes(StandardCharsets.UTF_8); } else if (!this.brokerConfig.isAllowAnonymous()) { LOG.error("Client didn't supply any password and MQTT anonymous mode is disabled CId={}", clientId); return false; } String login = msg.payload().userName(); if (!this.authenticator.checkValid(clientId, login, pwd)) { LOG.error("Authenticator has rejected the MQTT credentials CId={}, username={}", clientId, login); return false; } NettyUtils.userName(this.channel, login); } else if (!this.brokerConfig.isAllowAnonymous()) { LOG.error("Client didn't supply any credentials and MQTT anonymous mode is disabled. CId={}", clientId); return false; } return true; } 这里进行check public interface IAuthenticator { boolean checkValid(String var1, String var2, byte[] var3); }

    哎呀,那我们如何去自定义鉴权规则呢 回到启动Broker的这个方法,发现参数中有一个 IAuthenticator authenticator

    mqttServer.startServer(config, interceptHandlers, null, iAuthenticatorImp, authorizatorPolicy);

    喔,原来这里就是我们可以自定义传入鉴权规则

    import io.moquette.broker.security.IAuthenticator; import org.springframework.stereotype.Component; @Component public class IAuthenticatorImp implements IAuthenticator { @Override public boolean checkValid(String s, String s1, byte[] bytes) { return true; } }

    我们实现鉴权接口,然后把自定义实现类作为参数传递到启动方法中。 果然现在由于我们直接返回了true,我们的客户端设置了用户名可以直接连接上,这样有客户端掉线。我们就可以通过自定义的userName知道是谁掉线了。 博主也是新手,有误希望大家指点~

    Processed: 0.009, SQL: 9