消息中间件:01消息中间件初识

    技术2022-07-13  66

    系统间通信

    目前,业界通常有两种方式来实现系统间通信,其中一种是基于远程过程调用(Remote Procedure Call) 的方式,另 一种是基于消息队列(MQ) 的方式。

    基于消息队列的消息数据可以非常简单,比如只包含文本字符串,也可以很复杂,比如包含字节流、 字节数组,还可能包含嵌入对象,甚至是Java 对象(经过序列化的对象)。 消息在被发送后可以立即返回,由消息队列来负责消息的传递,消息生产者只管将消息发布到消息队列而不用管谁来取,消息消费者只管从消息队列中取消息而不管是谁发布的,这样生产者和消费者都不用知道对方的存在。如下图:

    为何要用消息队列

    解耦

    在大型系统的开发过程中会经常碰到此类情况,随着需求的叠加, 各模块之间逐渐变成了 相互调用的关系,这种模块间紧密关联的关系就是紧相合。 紧相合带来的问题是对一个模块的 功能变更将导致其关联模块发生变化,因此各个模块难以独立演化。 要解决这个问题,可以在模块之间调用时增加一个中间层来实现解耦,这也方便了以后的 扩展。所谓解耦,简单地讲,就是一个模块只关心自己的核心流程,而依赖该模块执行结果的 其他模块如果做的不是很重要的事情,有通知即可,无须等待结果。 换句话说,基于消息队列 的模型,关心的是通知,而非处理。

    削峰

    一般的 MySQL,扛到每秒 2k 个请求就差不多了,如果每秒请求到 5k 的话,可能就直接把 MySQL 给打死了,导致系统崩溃,用户也就没法再使用系统了。当流量过大,访问量剧增,极有可能发生服务雪崩。 这种场景下可使用消息队列,先将短时间高并发的请求持久化,然后逐步处理,从而削平高峰期的并发流量,改善系统的性能。

    异步

    使用消息队列将调用异步化,可改善网站的扩展性,使用消息队列将调用异步化,可改善网站的扩展性,还可改善网站系统的性能。 在不使用消息队列的情况下,用户的请求数据直接写入数据库,在高并发的情况下,会对数据库造成巨大的压力, 同时也使得响应延迟加剧。在使用消息队列后,用户请求的数据发送给消息队列后立即返回,再由消息队列的消费者进程(通常情况下, 该进程 通常独立部署在专门的服务器集群上)从消息队列中获取数据, 异步写入数据库。由于消息队列服务器处理速度远快于数据库(消息队列服务器也比数据库具有更好的伸缩性),因此用户的响应延迟可得到有效改善。

    日志收集

    在项目开发和运维中日志是一个非常重要的部分,通过日志可以跟踪调试信息、定位问题 等。在初期很多系统可能各自独立记录日志,定位问题也需要到每个系统对应的目录中查看相 应的日志。但是随着业务的发展,要建设的系统越来越多,系统的功能也越做越多,每天产生的日志数据量变得越来越大。通过分析海量的日志来实时获取每个系统当前的状态,变得越来越迫切,离线分析当前系统的功能为未来的设计和扩展提供参考,也变得越来越重要。 在这种情况下,利用消息队列产品在接收和持久化消息方面的高性能,引入消息队列快速 接收日志消息,避免因为写入日志时的某些故障导致业务系统访问阻塞、请求延迟等。所以很多公司会选择构建一个日志收集系统,由它来统一收集业务日志数据,供离线和在线的分析系统使用。

    解决分布式事务问题

    保证事务最终一致性

    消息队列的功能特点

    一个典型意义上 的消息队列,至少需要包含消息的发送、接收和暂存功能,如下图:

    Broker:消息处理中心, 负责消息的接收、存储、转发等。Producer:消息生产者,负责产生和发送消息到消息处理中心。Consumer:消息消费者,负责从消息处理中心获取消息,并进行相应的处理。

    但在生产环境应用中,对消息队列的要求远不止基本的消息发送、接收和暂存。在不同的业务场景中,需要消息队列产品能解决诸如消息堆积、消息持久化、可靠投递、消息重复、严格有序、集群等各种问题。

    消息堆积

    根据消息队列的生产者、消费者处理模型来分析, 因为生产者和消费者是两个分开处理消息的系统,所以无法预知两者对消息处理速度的快慢, 一旦在某个时间段消费者处理消息的速度没有跟上生产者发送消息的速度,必将导致消息在处理中心逐渐积压而得不到释放。 因此,有时需要消息队列产品能够处理这种情况,比如给消息队列设置一个阈值,将超过阈值的消息不再放入处理中心,以防止系统资源被耗尽,导致机器挂掉甚至整个消息队列不可用。

    消息持久化

    持久化方案有很多种,比如将消息存到本地文件、分布式文件系统、数据库系统中等。

    可靠投递

    可靠投递是不允许存在消息丢失的情况的。从消息的整个生命周期来分析,消息丢失的情况一般发生在如下过程中:

    从生产者到消息处理中心。从消息处理中心到消息消费者。消息处理中心持久化消息。

    消息重复

    因为消息发送是基于网络发送的,假设网络延迟或者网络卡顿,消息发送机制多次重试,消息重复发送的问题不可避免的发生。要直接避免不重复发送基本太难,因为网络环境无法预知,还会使程序复杂度加大,因此默认允许消息重复发送。因此无论是点对点,还是发布/订阅模型,都可能出现生产者发送多条一样的数据到MQ,此时就会出现重复数据。

    严格有序

    在实际的业务场景中,经常会碰到需要按生产消息时的顺序来消费的情形。比如网购时产生的订单, 每一笔订单一般都经过创建订单、 支付完成、 己发货、己收货、订单完成等环节,每个环节都可能产生消息,但会要求严格按照顺序消费消息,否则在业务处理上就是不正确的。 这就需要消息队列能够提供有序消息的保证。

    集群

    在大型应用中,系统架构一般都需要实现高可用性,以排除单点故障引起的服务中断,所以可能需要消息队列产品提供对集群模式的支持。集群不仅可以 让消费者和生产者在某个节点崩溃的情况下继续运行,集群之间的多个节点还能够共享负载,当某台机器或网络出现故障时能自动进行负载均衡,而且可以通过增加更多的节点来提高消息通信的吞吐量。

    消息中间件

    非底层操作系统软件、非业务应用软件,不是直接给最终用户使用的,不能直接给客户带来价值的软件统称为中间件。消息中间件关注于数据的发送和接收,利用高效、可靠的异步消息传递机制集成分布式系统。中间件是一种独立的系统软件或服务程序,分布式应用系统借助这种软件在不同的技术之间共享资源, 管理计算资源和网络通信。中间件在计算机系统中是一个关键软件,它能实现应用的互联和互操作,能保证系统安全、可靠、高效运行。

    设计一个简单的消息队列

    消息队列服务的核心是消息处理中心,它至少要具备消息发送、消息接收和消息暂存功能。 作为一个消息处理中心,至少要有一个数据容器用来保存接收到的消息。

    public class Broker { //队列存储消息的最大数量 private final static int MAX_SIZE = 3; //保存消息数据的容器 private static ArrayBlockingQueue<String> messageQueue = new ArrayBlockingQueue<>(MAX_SIZE); //生产消息 public static void produce(String msg) { if (messageQueue.offer(msg)) { System.out.println("成功向消息处理中心投递消息 :" + msg + ", 当 前暂存的消息数量是:" + messageQueue.size()); } else { System.out.println("消息处理中心内暂存的消息达到最大负 荷,不能继续 放入消息!"); } System.out.println("======================="); } //消费消息 public static String consume() { String msg = messageQueue.poll(); if (msg != null) { //消费条件满足情况,从消息容器中取出一条消息 System.out.println("已经消费消息: " + msg + " ,当前暂存的消息数 量是: " + messageQueue.size()); } else { System.out.println("消息处理中心内 没有消息可供消费!"); } System.out.println("======================="); return msg; } }

    有了消息处理中心类之后,需要将该类的功能暴露出去,这样别人才能用它来发送和接收消息。所以,我们定义了BrokerServer类用来对外提供 Broker类的服务。

    public class BrokerServer implements Runnable{ public static int SERVICE_PORT = 9999; private final Socket socket; public BrokerServer(Socket socket) { this.socket = socket; } @Override public void run() { try { BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream())); PrintWriter out = new PrintWriter(socket.getOutputStream()); while (true) { String str = in.readLine(); if (str == null) { continue; } System.out.println("接收到原始数据: " + str); if (str.equals("CONSUME")) { //CONSUME表示要消费一条消息 //从消息队列中消费一条消息 String message = Broker.consume(); out.println(message); out.flush(); } else { //其他情况都表示生产消息放到消息队列中 Broker.produce(str); } } } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) throws IOException { ServerSocket server = new ServerSocket(SERVICE_PORT); while (true) { BrokerServer brokerServer = new BrokerServer(server.accept()); new Thread(brokerServer).start(); } } }

    有了消息处理中心后,自然需要有相应客户端与之通信来发送和接收消息。

    public class MqClient { //生产消息 public static void produce(String message) throws IOException { Socket socket = new Socket(InetAddress.getLocalHost(), BrokerServer.SERVICE_PORT); try ( PrintWriter out = new PrintWriter(socket.getOutputStream()); ) { out.println(message); out.flush(); } } //消费消息 public static String consume() throws IOException { Socket socket = new Socket(InetAddress.getLocalHost(), BrokerServer.SERVICE_PORT); try ( BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream())); PrintWriter out = new PrintWriter(socket.getOutputStream()); ) { out.println("CONSUME"); out.flush(); String message = in.readLine(); return message; } } }

    消费消息

    public class ConsumeClient { public static void main(String[] args) throws Exception { MqClient client = new MqClient(); String message = client.consume(); System.out.println("获取的消息为: " + message); } }

    生产消息

    public class ProdeceClient { public static void main(String[] args) throws Exception { MqClient client = new MqClient(); client.produce("Hello World"); } }

    参考: [1] 分布式 消息中间件实践 详细消息中间件的高可用、高性能配置和原理(倪伟)

    Processed: 0.015, SQL: 9