activemq点对点模式--java教程

    技术2025-08-05  13

    activemq有点对点模式,该模式是将一个队列作为中间传输的媒介.

    生产者:

    先写个生成者的demo

    package org.example.activemqtest; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; import java.util.Scanner; public class Provider { public static String username = "admin"; public static String password = "admin"; public static String url ="tcp://localhost:61616";//采用openwire public static String queueName ="test_queue_1"; private ConnectionFactory connectionFactory; private Connection connection; private Session session; { try { /** * 1.创建连接工厂 * 创建工厂,构造方法有三个参数:分别是用户名、密码、连接地址 * 无参构造:有默认的连接地址,localhost * 一个参数:无验证模式,无用户的认证 * 三个参数:有认证和连接地址 */ connectionFactory = new ActiveMQConnectionFactory(username,password,url); connection = connectionFactory.createConnection(); /** * 3.启动连接 * 生产者可以不启动,因为在发送消息的时候回进行检查 * 如果未启动连接,会自动启动 * 如果有特殊配置,需要配置完成后再启动连接 */ connection.start(); /** * 4.用连接创建会话 * 有两个参数:是否需要事务、消息确认机制 * 如果支持事务,对于生产者来说第二个参数就无效了,建议传入Session.SESSION_TRANSACTED * 如果不支持事务,第二个参数必须传递且有效 * * AUTO_ACKNOWLEDGE:自动确认,消息处理后自动确认(商业开发不推荐) * CLIENT_ACKNOWLEDGE:客户端手动确认,消费者处理后必须手动确认 * DUPS_OK_ACKNOWLEDGE:有副本的客户端手动确认,消息可以多次处理(不建议) */ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); System.out.println("clientID = " + connection.getClientID()); } catch (JMSException e) { e.printStackTrace(); } } private void send(String text) throws JMSException { Queue queue = session.createQueue(queueName); MessageProducer producer = session.createProducer(queue); producer.send(session.createTextMessage(text)); } public static void main(String[] args) throws Exception { Provider provider = new Provider(); Scanner scan = new Scanner(System.in); // 判断是否还有输入 while (scan.hasNextLine()) { String str2 = scan.nextLine(); System.out.println("发送的数据为:" + str2); provider.send(str2); } scan.close(); } }

    可以看到,代码实际上就是new Provicder().send(text)

    普通代码块里创建了连接,send方法往test_queue_1里发送数据.然后发送者就结束了,就不再管了.

    至于Scanner是为了屏幕输入然后发送

    我们启动这个main方法,发送一条数据

    1111 发送的数据为:1111

    然后观察active后台控制台里:

     可以发现test_queue_1这个队列里已经有了一条数据。

     

    消费者:

    package org.example.activemqtest; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; import java.time.LocalDateTime; public class Receiver { public static final String username ="admin"; private static final String password ="admin"; public static final String url ="tcp://localhost:61616"; public static String queueName ="test_queue_1"; Connection connection; Session session; { try { /** * 1.创建连接工厂 * 创建工厂,构造方法有三个参数:分别是用户名、密码、连接地址 * 无参构造:有默认的连接地址,localhost * 一个参数:无验证模式,无用户的认证 * 三个参数:有认证和连接地址 */ ConnectionFactory factory = new ActiveMQConnectionFactory(username,password,url); connection = factory.createConnection(); /** * 3.启动连接 * 生产者可以不启动,因为在发送消息的时候回进行检查 * 如果未启动连接,会自动启动 * 如果有特殊配置,需要配置完成后再启动连接 */ connection.start(); /** * 4.用连接创建会话 * 有两个参数:是否需要事务、消息确认机制 * 如果支持事务,对于生产者来说第二个参数就无效了,建议传入Session.SESSION_TRANSACTED * 如果不支持事务,第二个参数必须传递且有效 * * AUTO_ACKNOWLEDGE:自动确认,消息处理后自动确认(商业开发不推荐) * CLIENT_ACKNOWLEDGE:客户端手动确认,消费者处理后必须手动确认 * DUPS_OK_ACKNOWLEDGE:有副本的客户端手动确认,消息可以多次处理(不建议) */ session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE); } catch (JMSException e) { e.printStackTrace(); } } public void receive() throws JMSException { Queue queue = session.createQueue(queueName); MessageConsumer consumer = session.createConsumer(queue); consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { try{ TextMessage m = (TextMessage)message; String text = m.getText(); System.out.println(LocalDateTime.now() +" : " +text); }catch (Exception e){ e.printStackTrace(); } } }); } public static void main(String[] args) throws JMSException { new Receiver().receive(); } }

    可以看到,在连接到消息中间件后,客户端就加了一个而监听器而已,监听器里打印了发来的数据

    然后我们运行,发现立即打印了接收到发送的数据。

    我们再看到控制台:

    发现多了一个接收者,同时消息消费数也增加了=1

     

    进一步测试

    我们持续不断的使用生产者发送数据,同时会发现消费者也在接收数据。那么如果我们再多一个消费者呢?会发生什么呢?

    具体过程就不上传了,说下结论,多个消费者是按次序轮着接到消息。因为消息是一次性的,消费掉就没有了。

    Processed: 0.013, SQL: 9