二、入门案列

    技术2024-11-15  23

    一、安装

    1. Docker安装ActiveMQ

    安装教程ActiveMQ官网

    2. pom.xml

    <!--activemq的依赖jar包--> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.15.9</version> </dependency>

    二、队列(Queue)案列

    1. 消息提供者

    import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class ActiveMqQueueProvider { public static void main(String[] args) throws JMSException { //0. activemq的ip及端口: 默认采用tcp协议 // 队列的名字:保证提供者和消费者用的是同一个队列 String url = "tcp://60.205.229.31:61617"; String queueName = "dreamer_queue"; // 1. 创建连接工厂,采用tcp协议,采用默认用户名和密码admin ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url); // 2. 获取连接并启动访问 Connection connection = factory.createConnection(); connection.start(); // 3. 创建会话session: 参数一:事务; 参数二:签收 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 4. 创建目的地destination: 其中destination包含Queue和Topic Queue queue = session.createQueue(queueName); // 5. 创建消息生产者,并关联目的地 MessageProducer producer = session.createProducer(queue); // 6. 发送10条消息到activemq中 for(int i = 1; i<11; i++){ // 7. 创建消息: 当前选用TextMessage,即为普通字符串消息 TextMessage textMessage = session.createTextMessage("发送消息:" + i); // 8. 发送消息 producer.send(textMessage); } // 9. 关闭资源: 顺着申请,倒着关闭 producer.close(); session.close(); connection.close(); System.out.println("信息发送完成"); } }

    2. 消息消费者

    2.1. receive方式

    public class ActiveMqQueueConsumer { public static void main(String[] args) throws JMSException { String url = "tcp://60.205.229.31:61617"; String queueName = "dreamer_queue"; ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url); Connection connection = factory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue(queueName); // 1. 创建消息的消费者 MessageConsumer consumer = session.createConsumer(queue); while(true){ /** 2. 接受消息: * 2.1 带参数: 4秒没收到消息就会释放该consumer对象,控制台该消费者断开 * 2.2 不带参数: 即使没有收到消息,也不会释放该consumer */ TextMessage receiveMessage = (TextMessage) consumer.receive(4000); if(null!=receiveMessage){ System.out.println("接收的消息为:" + receiveMessage.getText()); }else{ break; } } consumer.close(); session.close(); connection.close(); System.out.println("信息消费完成"); } }

    2.2. listener方式

    package com.day.dreamer.consumer; public class ActiveMqQueueConsumerListener { public static void main(String[] args) throws JMSException, IOException { String url = "tcp://60.205.229.31:61617"; String queueName = "dreamer_queue"; ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url); Connection connection = factory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue(queueName); // 1 消息消费者 MessageConsumer consumer = session.createConsumer(queue); // 2 监听模式 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { if(null!=message && message instanceof TextMessage){ // 3 消息接收到了呗进行处理 TextMessage textMessage = (TextMessage) message; try{ System.out.println("接收到了消息:" + textMessage.getText()); }catch (Exception e){ e.printStackTrace(); } } } }); // 4. 先让控制台不关闭,先去连接activemq System.in.read(); // 5. 关闭资源 consumer.close(); session.close(); connection.close(); System.out.println("信息消费完成"); } }

    3. 队列总结

    3.1. 两种消费方式

    同步阻塞receive方式: 在未接收到消息或者未超时的情况下,将一直处于阻塞状态;异步非阻塞listen方式: 注册一个消息监听器,消息到达后去接收消息;适用性: 两种消费模式均可用于队列和主题消费;

    3.2. 点对点消费模式

    一个消息只能被消费一次,消费后该消息就会出队,保证消息不会被重复消费;多个消费者存在的情况下,多个消费者各消费一部分消息,默认负载均衡的方式轮询消费;生产者和消费者无时间关系,不管发送消息时候消费者是否启动,消息都可被消费;

    三、主题(Topic)案列

    1. 消息提供者

    public class ActiveMqTopicProvider { public static void main(String[] args) throws JMSException { String url = "tcp://60.205.229.31:61617"; String topicName = "dreamer_topic"; ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url); Connection connection = factory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 1. 创建队列消息 Topic topic = session.createTopic(topicName); MessageProducer producer = session.createProducer(topic); for(int i = 1; i<11; i++){ TextMessage textMessage = session.createTextMessage("topic消息:" + i); producer.send(textMessage); } producer.close(); session.close(); connection.close(); System.out.println("信息发送完成"); } }

    2. 消息消费者

    消费方式可以用receive或者listener方式;可以用lamda表达式写法进行listener消费; public class ActiveMqTopicConsumerListener { public static void main(String[] args) throws JMSException, IOException { String url = "tcp://60.205.229.31:61617"; String topicName = "dreamer_topic"; ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url); Connection connection = factory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTopic(topicName); MessageConsumer consumer = session.createConsumer(topic); //lamda表达式的listener方式 consumer.setMessageListener(message -> { if(null!=message && message instanceof TextMessage){ // 接收消息并处理 TextMessage textMessage = (TextMessage) message; try{ System.out.println("接收到了消息:" + textMessage.getText()); }catch (Exception e){ e.printStackTrace(); } } }); System.in.read(); consumer.close(); session.close(); connection.close(); System.out.println("信息消费完成"); } }

    3. 主题总结

    一个生产者可以包含多个消费者,一对多;先启动消费者,再启动生产者,否则生产者产生的为废消息;一个生产者产生的消息,可以被多个消费者进行消费;
    Processed: 0.013, SQL: 9