SpringCloud集成Rabbitmq注解版

    技术2022-07-10  149

    乐优项目改造

    接下来,我们就改造项目,实现搜索服务、商品静态页的数据同步。

    1.1.思路分析

    发送方:商品微服务

    什么时候发?

    当商品服务对商品进行写操作:增、删、改的时候,需要发送一条消息,通知其它服务。

    发送什么内容?

    对商品的增删改时其它服务可能需要新的商品数据,但是如果消息内容中包含全部商品信息,数据量太大,而且并不是每个服务都需要全部的信息。因此我们只发送商品id,其它服务可以根据id查询自己需要的信息。

    接收方:搜索微服务、静态页微服务

    接收消息后如何处理?

    搜索微服务: 增/改:添加新的数据到索引库删:删除索引库数据 静态页微服务: 增:创建新的静态页删:删除原来的静态页改:创建新的静态页并删除原来的

    1.2.商品服务发送消息

    我们先在商品微服务leyou-item-service中实现发送消息。

    1.2.1.引入依赖

    <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>

    1.2.2.配置文件

    我们在application.yml中添加一些有关RabbitMQ的配置:

    rabbitmq: # rabbitmq配置 host: 192.168.79.128 # 安装rabbitmq的ip username: leyou # rabbitmq的账号 password: 123456 # rabbitmq的密码 virtual-host: /leyou # 虚拟主机 template: # 这个配置是给AmqpTemplate用的 retry: # 重试的相关信息 enabled: true # 重试开启 initial-interval: 10000ms # 初始化的从实周期10000ms max-interval: 30000ms # 最大重试周期 multiplier: 2 # 10000ms重试后失败 乘以 2 如果还是失败 在乘以2 一直往下去 但是不会大于30000ms exchange: leyou.item.exchange # 默认交换机名称, AmqpTemplate发消息是会绑定一个交换机, 你没绑定交换机---默认用该名称交换机 publisher-confirms: true # 生产者确认 如果消息发送失败它会自动重试 template:有关AmqpTemplate的配置 retry:失败重试 enabled:开启失败重试initial-interval:第一次重试的间隔时长max-interval:最长重试间隔,超过这个间隔将不再重试multiplier:下次重试间隔的倍数,此处是2即下次重试间隔是上次的2倍 exchange:缺省的交换机名称,此处配置后,发送消息如果不指定交换机就会使用这个 publisher-confirms:生产者确认机制,确保消息会正确发送,如果发送失败会有错误回执,从而触发重试

    1.2.3.改造GoodsService

    在GoodsService中封装一个发送消息到mq的方法:

    private void sendMessage(Long id, String type){ // 发送消息 try { this.amqpTemplate.convertAndSend("item." + type, id); } catch (Exception e) { logger.error("{}商品消息发送异常,商品id:{}", type, id, e); } }

    这里没有指定交换机,因此默认发送到了配置中的:leyou.item.exchange

    注意:这里要把所有异常都try起来,不能让消息的发送影响到正常的业务逻辑

    然后在新增的时候调用:

    修改的时候调用:

    1.3.搜索服务接收消息

    搜索服务接收到消息后要做的事情:

    增:添加新的数据到索引库删:删除索引库数据改:修改索引库数据

    因为索引库的新增和修改方法是合二为一的,因此我们可以将这两类消息一同处理,删除另外处理。

    1.3.1.引入依赖

    <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>

    1.3.2.添加配置

    spring: rabbitmq: host: 192.168.79.128 # 安装rabbitmq的ip username: leyou # rabbitmq的账号 password: 123456 # rabbitmq的密码 virtual-host: /leyou # 虚拟主机

    这里只是接收消息而不发送,所以不用配置template相关内容。

    1.3.3.编写监听器

    代码:

    @Component public class GoodsListener { @Autowired private SearchService searchService; /** * 处理insert和update的消息 * * @param id * @throws Exception */ @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "leyou.create.index.queue", durable = "true"), exchange = @Exchange( value = "leyou.item.exchange", ignoreDeclarationExceptions = "true", type = ExchangeTypes.TOPIC), key = {"item.insert", "item.update"})) public void listenCreate(Long id) throws Exception { if (id == null) { return; } // 创建或更新索引 this.searchService.createIndex(id); } /** * 处理delete的消息 * * @param id */ @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "leyou.delete.index.queue", durable = "true"), exchange = @Exchange( value = "leyou.item.exchange", ignoreDeclarationExceptions = "true", type = ExchangeTypes.TOPIC), key = "item.delete")) public void listenDelete(Long id) { if (id == null) { return; } // 删除索引 this.searchService.deleteIndex(id); } }

    1.3.4.编写创建和删除索引方法

    这里因为要创建和删除索引,我们需要在SearchService中拓展两个方法,创建和删除索引:

    public void createIndex(Long id) throws IOException { Spu spu = this.goodsClient.querySpuById(id); // 构建商品 Goods goods = this.buildGoods(spu); // 保存数据到索引库 this.goodsRepository.save(goods); } public void deleteIndex(Long id) { this.goodsRepository.deleteById(id); }

    创建索引的方法可以从之前导入数据的测试类中拷贝和改造。

    1.4.静态页服务接收消息

    商品静态页服务接收到消息后的处理:

    增:创建新的静态页删:删除原来的静态页改:创建新的静态页并删除原来的

    不过,我们编写的创建静态页的方法也具备覆盖以前页面的功能,因此:增和改的消息可以放在一个方法中处理,删除消息放在另一个方法处理。

    1.4.1.引入依赖

    <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>

    1.4.2.添加配置

    spring: rabbitmq: host: 192.168.79.128 # 安装rabbitmq的ip username: leyou # rabbitmq的账号 password: 123456 # rabbitmq的密码 virtual-host: /leyou # 虚拟主机

    这里只是接收消息而不发送,所以不用配置template相关内容。

    1.4.3.编写监听器

    代码:

    @Component public class GoodsListener { @Autowired private GoodsHtmlService goodsHtmlService; @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "leyou.create.web.queue", durable = "true"), exchange = @Exchange( value = "leyou.item.exchange", ignoreDeclarationExceptions = "true", type = ExchangeTypes.TOPIC), key = {"item.insert", "item.update"})) public void listenCreate(Long id) throws Exception { if (id == null) { return; } // 创建页面 goodsHtmlService.createHtml(id); } @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "leyou.delete.web.queue", durable = "true"), exchange = @Exchange( value = "leyou.item.exchange", ignoreDeclarationExceptions = "true", type = ExchangeTypes.TOPIC), key = "item.delete")) public void listenDelete(Long id) { if (id == null) { return; } // 创建页面 goodsHtmlService.deleteHtml(id); } }

    1.4.4.添加删除页面方法

    public void deleteHtml(Long id) { File file = new File("C:\\project\\nginx-1.14.0\\html\\item\\", id + ".html"); file.deleteOnExit(); }

    1.5.测试

    1.5.1.查看RabbitMQ控制台

    重新启动项目,并且登录RabbitMQ管理界面:http://192.168.79.128:15672

    可以看到,交换机已经创建出来了: 队列也已经创建完毕:

    并且队列都已经绑定到交换机:

    1.5.2.修改数据试一试

    在后台修改商品数据的价格,分别在搜索及商品详情页查看是否统一。

    Processed: 0.012, SQL: 9