JMS : Java 消息中间件的服务接口规范,activemq 之上是 mq , 而 mq 之上是JMS 定义的消息规范 。 activemq 是mq 技术的一种理论实现(与之相类似的实现还有 Kafka RabbitMQ RockitMQ ),而 JMS 是更上一级的规范。
Destination是目的地。下面拿jvm和mq,做个对比。目的地,我们可以理解为是数据存储的地方。
Destination分为两种:队列和主题。下图介绍:
3.1 导入依赖
<!-- activemq 所需要的jar 包--> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.15.9</version> </dependency> <!-- activemq 和 spring 整合的基础包 --> <dependency> <groupId>org.apache.xbean</groupId> <artifactId>xbean-spring</artifactId> <version>3.16</version> </dependency>3.2 编写生产者类
import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class Produce_Queue { public static final String ACTIVEMQ_URL ="tcp://112.124.16.77:61616"; public static final String QUEUE_NAME = "queue01"; public static void main(String[] args) throws JMSException { //1 创建连接工厂,默认为默认密码, ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("admin","admin",ACTIVEMQ_URL); //2 创建连接,并开启连接 Connection connection = activeMQConnectionFactory.createConnection(); connection.start(); //3 开启会话 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //4 创建目的地 Queue queue = session.createQueue(QUEUE_NAME); //5 创建消息的生产者 MessageProducer messageProducer = session.createProducer(queue); for (int i = 0; i <4; i++) { //发送消息 messageProducer.send(session.createTextMessage("Message:"+i)); } //6 关闭资源 messageProducer.close(); session.close(); connection.close(); System.out.println("发送完毕!!!"); } }3.3 编写消费者类
import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class Consumer_Queue { public static final String ACTIVEMQ_URL = "tcp://112.124.16.77:8161"; public static final String QUEUE_NAME = "queue01"; public static void main(String[] args) throws JMSException { ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL); Connection connection = activeMQConnectionFactory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue(QUEUE_NAME); MessageConsumer consumer = session.createConsumer(queue); while(true) { //receive() 一直等待接收消息,在能够接收到消息之前将一直阻塞。是同步阻塞方式。 //和socket的accept方法类似的。参数是为等待时间。 TextMessage textMessage = (TextMessage) consumer.receive(200); if(null != textMessage) { System.out.println("*****消费者的消息:"+textMessage.getText()); }else { break; } } consumer.close(); session.close(); connection.close(); } }3.4 分别运行生成者和消费者
已经产生了四条消息;
再运行一下消费者:
*****消费者的消息:Message:0 *****消费者的消息:Message:1 *****消费者的消息:Message:2 *****消费者的消息:Message:3可以看出,已经被消费了四条了消息。
Number Of Pending Messages:等待消费的消息,这个是未出队列的数量,公式=总接收数-总出队列数。
Number Of Consumers:消费者数量,消费者端的消费者数量。
Messages Enqueued:进队消息数,进队列的总消息量,包括出队列的。这个数只增不减。
Messages Dequeued:出队消息数,可以理解为是消费者消费掉的数量。
在此基础上,我们再进行增强一下,将消费者设置为监听模式,当有消息传过来时,进行消费。
同时开启多个消费者进行消费。
import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class Consumer_Queue { public static final String ACTIVEMQ_URL = "tcp://112.124.16.77:61616"; public static final String QUEUE_NAME = "queue01"; public static void main(String[] args) throws Exception { //同时启动三个。依次设为消费者1号消费者2号和消费者3号 System.out.println("我是消费者1号"); ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL); Connection connection = activeMQConnectionFactory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue(QUEUE_NAME); MessageConsumer consumer = session.createConsumer(queue); //在原来的代码上增加一个监听功能呢 consumer.setMessageListener(new MessageListener(){ public void onMessage(Message message) { if(null != message && message instanceof TextMessage) { try { System.out.println("*****异步消费者的消息:"+((TextMessage) message).getText()); } catch (JMSException e) { e.printStackTrace(); System.out.println("****异步消费不成功!!!"); } } } }); // while(true) { //receive() 一直等待接收消息,在能够接收到消息之前将一直阻塞。是同步阻塞方式。 //和socket的accept方法类似的。参数是为等待时间。 TextMessage textMessage = (TextMessage) consumer.receive(200); if(null != textMessage) { System.out.println("*****消费者的消息:"+textMessage.getText()); }else { break; } } //让主线程保持在线 System.in.read(); consumer.close(); session.close(); connection.close(); } }通过测试可以发现:
当有消息产生时,三个消费者会平分消息,第一次发送4条,一号有两条,二号有一条,三号一条。再发送一次,一号有三条,二号有三条,三号有两条。即尽量会保持三个消费者的消息数尽可能的平均。
4.1 导入依赖。跟上面一样
4.2 编写TOPIC生产者.其实和队列类似,只是部分不同
//生产者 import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class Produce_Topic { public static final String ACTIVEMQ_URL = "tcp://112.124.16.77:61616"; public static final String TOPIC_NAME = "topic"; public static void main(String[] args) throws JMSException { ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL); Connection connection = activeMQConnectionFactory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //只是这个地方略有不同,创建的是topic Topic topic = session.createTopic(TOPIC_NAME); MessageProducer producer = session.createProducer(topic); for (int i = 0; i <4; i++) { TextMessage textMessage = session.createTextMessage("topic_message:"+(i)); producer.send(textMessage); MapMessage mapMessage = session.createMapMessage(); } producer.close(); session.close(); connection.close(); System.out.println("topic message send success!"); } } //消费者 import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; import java.io.IOException; public class Consumer_Topic { public static final String ACTIVEMQ_URL = "tcp://112.124.16.77:61616"; public static final String TOPIC_NAME = "topic"; public static void main(String[] args) throws JMSException, IOException { System.out.println("启动topic03"); ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL); Connection connection = activeMQConnectionFactory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //创建的是topic消费者。 Topic topic = session.createTopic(TOPIC_NAME); MessageConsumer consumer = session.createConsumer(topic); consumer.setMessageListener((message) -> { if(null != message && message instanceof TextMessage) { TextMessage textMessage = (TextMessage) message; try { System.out.println("topic get message:"+textMessage.getText()); }catch (Exception e) { e.printStackTrace(); } } }); System.in.read(); consumer.close(); session.close(); connection.close(); } }注意:
topic模式下,需要先运行消费者,即有人订阅了才行,没人订阅的消息就属于废消息了。topic模式下,如果有多个订阅者,消息不会被分摊,而是以相同数量发给不同订阅者。