什么RabbitMQ?
RabbitMQ是基于amqp协议,实现的一种MQ理念的服务。类似的服务 RocketMQ、ActiveMQ、Kafka等
为什么在分布式项目中需要一款消息中间件?
消息中间件能够实现一些Feign(同步调用)无法实现的效果:
1、服务的异步调用 2、消息的广播(事件总线) 3、消息的延迟处理 4、分布式事务 5、请求削峰(处理高并发)
1)拉取镜像
docker pull rabbitmq:3.8.5-management2)准备docker-compose模板
.... rabbitmq: image: rabbitmq:3.8.5-management container_name: rabbitmq ports: - 5672:5672 - 15672:15672 restart: always3)启动rabbitmq容器
docker-compose up -d rabbitmq4)访问rabbitmq的管理页面
5)登录rabbitmq的管理页面 (账号:guest 密码:guest)
看到这个页面说明安装成功
1)模型一
P -> Provider(提供者) 红色方块 -> 队列(存储消息) C -> Consumer(消费者)
2)模型二
一个提供者对应多个消费者,消息会轮训发送给两个消费者
起到一个消费端负载均衡的目的,减轻消费端的消费压力
3)模型三
发布/订阅模式 - 消息广播
多个消费者会同时收到提供者发布的消息
X -> Exchange(交换机,消息的复制转发,不能存储消息)
4)模型四
路由键 -> 交换机和队列绑定若干路由键,发布的消息可以指定路由键发送
5)模型五
通配符的路由键
6)模型六
Rabbitmq的同步调用模型
添加依赖
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.9.0</version> </dependency>
1)模型一的实现
服务的提供者:
public static void main(String[] args) throws IOException, TimeoutException { //1、连接RabbitMQ Connection connection = ConnectionUtil.getConnection(); //2、通过连接获得管道对象(后面所有的操作都是通过管道对象操作) Channel channel = connection.createChannel(); //3、创建队列 channel.queueDeclare("test_queue1", false, false, false, null); //4、给队列中发布消息 String msg = "Hello RabbitMQ!!!!"; channel.basicPublish("", "test_queue1", null, msg.getBytes("utf-8")); //关闭连接 connection.close(); }服务的消费者:
public static void main(String[] args) throws IOException { //1、连接RabbitMQ Connection connection = ConnectionUtil.getConnection(); //2、获得连接的channel对象 Channel channel = connection.createChannel(); //3、监听队列 channel.basicConsume("test_queue1", true, new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("接收到消息:" + new String(body, "utf-8")); } }); //关闭连接??? // connection.close(); }思考:
1、队列应该在消费者端创建还是提供者端创建?
- 消费者通常创建队列,提供者创建交换机
2、消费端是同步消费消息还是异步消费消息?- 同步消费,必须消费完一条消息,才能继续消费下一条消息,在实际开发过程中,为了提高消费者的消费速率,往往会引入线程池的方式,进行多线程消费。
//创建一个线程池 - 线程数量为5 private static ExecutorService executorService = Executors.newFixedThreadPool(5); .... channel.basicConsume("test_queue1", true, new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { executorService.submit(new Runnable() { public void run() { try { System.out.println("接收到消息:" + new String(body, "utf-8")); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } }); } });