ActiveMQ传输协议简介

    技术2022-07-10  116

    一、ActiveMQ传输协议简介

    ActiveMQ支持的client-broker通信协议有:TCP、  NIO、 UDP 、SSL、 Http(s)、 VM,其中配置Transport Connector的文件在activeMQ安装目录的conf/activemq.xml中的<transportConnectors>标签内

    <transportConnectors> <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB --> <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> </transportConnectors>

    可以看到ActiveMQ支持的协议,默认是tcp协议,这里的name用的openwire是因为采用的消息协议是openwire。不同的配置,MQ的性能是不一样的

    在上文给出的配置信息中

    URI描述信息的头部都是采用协议名称,例如

    描述amqp协议的监听端口时,采用的URI描述格式为“amqp//......”;

    描述Stomp协议的监听端口时,采用的URI描述格式为“stomp://......”

    唯独在进行openwire 协议描述时,URI头却采用的“tcp://......” 这是因为ActiveMQ中默认的消息协议就是openwire.

    进入ActiveMQ的管理界面,点击Connections标签,可以看到几种连接方式,和activemq.xml里的一一对应。

    二、各种协议的介绍

    2.1 TCP(Transmission Control Protocol):

             这是Broker默认的配置,TCP的Client监听端口是61616。          在网络传输数据前,必须进行序列化数据,消息通过一个叫wire protocol的协议序列化成字节流。          TCP连接的URI形式如:tcp://hostname:port?key=value,后面的参数是可选的 。          TCP传输的优点:                   TCP协议传输可靠,稳定性强。                   高效性:字节流方式传递,效率很高。                   有效性、可用性:应用广泛,支持任何平台。           关于Transport协议的配置参数,参见官网:http://activemq.apache.org/tcp-transport-reference2.2NIO(New I/O Protocol):

             NIO协议和TCP协议类似,但是NIO更侧重于底层访问操作。它允许开发人员对同一资源可以有更多的Client调用和服务端有更多的负载。          适合使用NIO协议的场景:          可能有大量的Client连接到Broker上,一般情况下,大量Client连接Broker是被操作系统的线程所限制的。因此,NIO的实现比TCP需要更少的线程运行,所以建议使用NIO协议。           可能对于Broker有一个很迟钝的网络传输,NIO比TCP提供更好的性能。           NIO连接的URI形式:nio://hostname:port?key=value。          Transport Connector配置示例,参考官网:http://activemq.apache.org/nio-transport-reference           上面的TCP和NIO需要掌握,下面的只需要了解即可,实际工作中用的不多,而且编码方式上会有不同。

    2.3AMQP(Advanced Message Queuing Protocol):

    一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同开发语言等条件限制。

            具体配置见官网:http://activemq.apache.org/amqp

     2.4STOMP(Streaming Text Orientated Message Protocol):

               流文本定向消息协议,是一种为MOM(Message Oriented Middleware,面向消息中间件)设计的简单文本协议。

               具体配置见官网:http://activemq.apache.org/stomp

    2.5 SSL(Secure Sockets Protocol):

               连接的URL形式:ssl://hostname:port?key=value。            具体配置见官网:http://activemq.apache.org/ssl-transport-reference2.6MQTT(Message Queuing Telemetry Transport,消息队列遥测传输)

              是IBM开发的一个即时通讯协议,也可能成为物联网的重要组成部分。该协议支持所有平台,几乎可以把所有联网物品和外部连接起来,被用来当做传感器和致动器(比如通过Twitter让房屋联网)的通信协议。

    具体配置见官网:http://activemq.apache.org/mqtt  

    三、ActiveMQ的传输协议之NIO

    NIO比TCP提供了更高的性能,我们根据官网的介绍,尝试将协议改成NIO的方式。

    凡是修改,先做备份,执行命令cp activemq.xml activemq.xml.bak将activemq.xml备份出来一份。

    在bin目录下执行命令./activemq stop停止ActiveMQ服务。

    在activemq.xml的transportConnectors结点最后加上一个transportConnector结点,注意端口号是61618。

    <transportConnector name="nio" uri="nio://0.0.0.0:61618?trace=true"/>

             在bin目录下执行命令./activemq start启动ActiveMQ服务,访问ActiveMQ的管理界面,点击Connections标签查看,发现多了一个nio。修改如图中的红色框中的内容,前台效果如下图

    验证的生产者代码

    public class JmsProduce { // public static final String ACTIVEMQ_URL = "tcp://10.5.96.48:61616"; public static final String ACTIVEMQ_URL = "nio://10.5.96.48:61618"; public static final String QUEUE_NAME = "queue01"; public static void main(String[]args) throws JMSException { //1.创建连接工厂 ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL); //2.通过连接工厂,获得连接connection并启动访问 Connection connection = activeMQConnectionFactory.createConnection(); connection.start(); //3.创建会话 session //两个桉树 ,第一个叫事务 第二个叫签收 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //4 .创建目的地(具体是队列还是主题topic) Queue queue = session.createQueue(QUEUE_NAME); //5. 创建消息的生产者 MessageProducer messageProducer = session.createProducer(queue); //消息的持久化设置(现在是非持久) messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT); //6.通过使用messageProducer 生产3条消息发送到队列里面 for (int i = 0; i <3 ; i++) { // 7.创建消息 TextMessage textMessage = session.createTextMessage("Msg---" + i); //8.消息生产者发送给mq messageProducer.send(textMessage); } //9.关闭资源 顺着开启,倒着关闭 messageProducer.close(); session.close(); connection.close(); System.out.println("**** 消息发布到MQ完毕 ****"); } }

    验证的消费者代码

    public class JmsConsumer { // public static final String ACTIVEMQ_URL = "tcp://10.5.96.48:61616"; public static final String ACTIVEMQ_URL = "nio://10.5.96.48:61618"; public static final String QUEUE_NAME = "queue01"; public static void main(String[]args) throws JMSException, IOException { System.out.println("我是2号消费者"); //1.创建连接工厂 ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL); //2.通过连接工厂,获得连接connection并启动访问 Connection connection = activeMQConnectionFactory.createConnection(); connection.start(); //3.创建会话 session //两个参数 ,第一个叫事务 第二个叫签收 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //4.创建目的地(具体是队列还是主题Topic) Queue queue = session.createQueue(QUEUE_NAME); //创建消费者 MessageConsumer messageConsumer = session.createConsumer(queue); while(true){ TextMessage textMessage = (TextMessage) messageConsumer.receive(4000); if( null !=textMessage ){ System.out.println("***消费者受到消息:" + textMessage.getText()); }else{ break; } } messageConsumer.close(); session.close(); connection.close(); } }

    先运行生产者,在运行消费者,可以在控制台看到打印的日志信息

    四、ActiveMQ的传输协议之NIO加强

             前面配置文件中的nio是以TCP协议为基础的NIO网络IO模型,但是这样的配置,只能使这个端口支持Openwire协议。怎么让这个端口支持NIO网络IO模型,又支持多个协议呢?

    使用命令./activemq stop停止ActiveMQ。进入config目录下,修改activemq.xml配置文件。去掉之前的nio配置,加上这一行配置。  

    <transportConnector name="auto+nio" uri="auto+nio://0.0.0.0:61608?maximumConnection=1000&wireFormat.maxFrameSize=104857600&org.apache.activemq.transport.nio.SelectorManager.corePoolSize=20&org.apache.activemq.transport.nio.SelectorManager.maximumPoolSize=50"/>

            保存并启动ActiveMQ。访问ActiveMQ的管理界面,找到Connections标签,可以看到有一个auto+nio的项,说明此时配置成功了。将生产者和消费者中的ACTIVEMQ_URL改成nio://hostname:61608即可运行。代码前面的可以复用

             如果此时,将nio改成tcp,端口号不变,保持61608,再次运行,发现还是可以正常运行的,这就是我们配置了auto的效果,ActiveMQ为我们自动适配相应的协议。

             这里常用的就是TCP和NIO,其他的协议在实现的代码是不一样的,所以要用其他协议,不能直接简单的修改协议,要改动的代码还很多,这里不过多解释。

    参考

    https://blog.csdn.net/qq_36059561/article/details/103844534

    https://blog.csdn.net/qq_36059561/article/details/103848392

    https://blog.csdn.net/qq_36059561/article/details/103849242

    https://blog.csdn.net/qq_36059561/article/details/103865507

    Processed: 0.018, SQL: 9