SpringBoot:接入RabbitMQ(基础使用)

    技术2022-07-11  83

    pom <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>3.6.5</version> </dependency> Sender import java.util.HashMap; import java.util.Map; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Sender { public static void main(String[] args) throws Exception { // 1 创建ConnectionFactory ConnectionFactory connectionFactory = new ConnectionFactory(); // RabbitMQ的服务器地址 connectionFactory.setHost("127.0.0.1"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); // 2 创建Connection Connection connection = connectionFactory.newConnection(); // 3 创建Channel Channel channel = connection.createChannel(); // 4 声明 String queueName = "test001"; // 参数: queue名字,是否持久化,独占的queue(仅供此连接),不使用时是否自动删除, 其他参数 channel.queueDeclare(queueName, false, false, false, null); Map<String, Object> headers = new HashMap<String, Object>(); AMQP.BasicProperties props = new AMQP.BasicProperties.Builder() .deliveryMode(2) .contentEncoding("UTF-8") .headers(headers).build(); for(int i = 0; i < 5;i++) { String msg = "Hello World RabbitMQ " + i; channel.basicPublish("", queueName , props , msg.getBytes()); } channel.close(); connection.close(); } } Receiver import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.QueueingConsumer; import com.rabbitmq.client.QueueingConsumer.Delivery; public class Receiver { public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory() ; connectionFactory.setHost("127.0.0.1"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setAutomaticRecoveryEnabled(true); connectionFactory.setNetworkRecoveryInterval(3000); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); String queueName = "test001"; // durable 是否持久化消息 channel.queueDeclare(queueName, false, false, false, null); QueueingConsumer consumer = new QueueingConsumer(channel); // 参数:队列名称、是否自动ACK、Consumer channel.basicConsume(queueName, true, consumer); // 循环获取消息 while(true){ // 获取消息,如果没有消息,这一步将会一直阻塞 Delivery delivery = consumer.nextDelivery(); String msg = new String(delivery.getBody()); System.out.println("收到消息:" + msg); } } }

    我们可以试着run一下Sender,再run一下Receiver,可以在控制台看到发送的5条消息

    Processed: 0.010, SQL: 9