一、安装
1. Docker安装ActiveMQ
安装教程ActiveMQ官网
2. pom.xml
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.15.9</version>
</dependency>
二、队列(Queue)案列
1. 消息提供者
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class ActiveMqQueueProvider {
public static void main(String[] args) throws JMSException {
String url = "tcp://60.205.229.31:61617";
String queueName = "dreamer_queue";
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url);
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(queueName);
MessageProducer producer = session.createProducer(queue);
for(int i = 1; i<11; i++){
TextMessage textMessage = session.createTextMessage("发送消息:" + i);
producer.send(textMessage);
}
producer.close();
session.close();
connection.close();
System.out.println("信息发送完成");
}
}
2. 消息消费者
2.1. receive方式
public class ActiveMqQueueConsumer {
public static void main(String[] args) throws JMSException {
String url = "tcp://60.205.229.31:61617";
String queueName = "dreamer_queue";
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url);
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(queueName);
MessageConsumer consumer = session.createConsumer(queue);
while(true){
TextMessage receiveMessage = (TextMessage) consumer.receive(4000);
if(null!=receiveMessage){
System.out.println("接收的消息为:" + receiveMessage.getText());
}else{
break;
}
}
consumer.close();
session.close();
connection.close();
System.out.println("信息消费完成");
}
}
2.2. listener方式
package com.day.dreamer.consumer;
public class ActiveMqQueueConsumerListener {
public static void main(String[] args) throws JMSException, IOException {
String url = "tcp://60.205.229.31:61617";
String queueName = "dreamer_queue";
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url);
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(queueName);
MessageConsumer consumer = session.createConsumer(queue);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
if(null!=message && message instanceof TextMessage){
TextMessage textMessage = (TextMessage) message;
try{
System.out.println("接收到了消息:" + textMessage.getText());
}catch (Exception e){
e.printStackTrace();
}
}
}
});
System.in.read();
consumer.close();
session.close();
connection.close();
System.out.println("信息消费完成");
}
}
3. 队列总结
3.1. 两种消费方式
同步阻塞receive方式: 在未接收到消息或者未超时的情况下,将一直处于阻塞状态;异步非阻塞listen方式: 注册一个消息监听器,消息到达后去接收消息;适用性: 两种消费模式均可用于队列和主题消费;
3.2. 点对点消费模式
一个消息只能被消费一次,消费后该消息就会出队,保证消息不会被重复消费;多个消费者存在的情况下,多个消费者各消费一部分消息,默认负载均衡的方式轮询消费;生产者和消费者无时间关系,不管发送消息时候消费者是否启动,消息都可被消费;
三、主题(Topic)案列
1. 消息提供者
public class ActiveMqTopicProvider {
public static void main(String[] args) throws JMSException {
String url = "tcp://60.205.229.31:61617";
String topicName = "dreamer_topic";
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url);
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(topicName);
MessageProducer producer = session.createProducer(topic);
for(int i = 1; i<11; i++){
TextMessage textMessage = session.createTextMessage("topic消息:" + i);
producer.send(textMessage);
}
producer.close();
session.close();
connection.close();
System.out.println("信息发送完成");
}
}
2. 消息消费者
消费方式可以用receive或者listener方式;可以用lamda表达式写法进行listener消费;
public class ActiveMqTopicConsumerListener {
public static void main(String[] args) throws JMSException, IOException {
String url = "tcp://60.205.229.31:61617";
String topicName = "dreamer_topic";
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url);
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(topicName);
MessageConsumer consumer = session.createConsumer(topic);
consumer.setMessageListener(message -> {
if(null!=message && message instanceof TextMessage){
TextMessage textMessage = (TextMessage) message;
try{
System.out.println("接收到了消息:" + textMessage.getText());
}catch (Exception e){
e.printStackTrace();
}
}
});
System.in.read();
consumer.close();
session.close();
connection.close();
System.out.println("信息消费完成");
}
}
3. 主题总结
一个生产者可以包含多个消费者,一对多;先启动消费者,再启动生产者,否则生产者产生的为废消息;一个生产者产生的消息,可以被多个消费者进行消费;
转载请注明原文地址:https://ipadbbs.8miu.com/read-52964.html