RocketMQ原理、源码分析及实践

    技术2025-05-28  17

    原理描述

    模块架构

     

     

    RocketMQ架构上主要分为四部分,如上图所示:

    Producer:消息发布的角色,支持分布式集群方式部署。Producer通过MQ的负载均衡模块选择相应的Broker集群队列进行消息投递,投递的过程支持快速失败并且低延迟。

    Consumer:消息消费的角色,支持分布式集群方式部署。支持以push推,pull拉两种模式对消息进行消费。同时也支持集群方式和广播方式的消费,它提供实时消息订阅机制,可以满足大多数用户的需求。

    NameServer:NameServer是一个非常简单的Topic路由注册中心,其角色类似Dubbo中的zookeeper,支持Broker的动态注册与发现。主要包括两个功能:Broker管理,NameServer接受Broker集群的注册信息并且保存下来作为路由信息的基本数据。然后提供心跳检测机制,检查Broker是否还存活;路由信息管理,每个NameServer将保存关于Broker集群的整个路由信息和用于客户端查询的队列信息。然后Producer和Conumser通过NameServer就可以知道整个Broker集群的路由信息,从而进行消息的投递和消费。NameServer通常也是集群的方式部署,各实例间相互不进行信息通讯。Broker是向每一台NameServer注册自己的路由信息,所以每一个NameServer实例上面都保存一份完整的路由信息。当某个NameServer因某种原因下线了,Broker仍然可以向其它NameServer同步其路由信息,Producer,Consumer仍然可以动态感知Broker的路由的信息。

    BrokerServer:Broker主要负责消息的存储、投递和查询以及服务高可用保证,为了实现这些功能,Broker包含了以下几个重要子模块。

    Remoting Module:整个Broker的实体,负责处理来自clients端的请求。Client Manager:负责管理客户端(Producer/Consumer)和维护Consumer的Topic订阅信息Store Service:提供方便简单的API接口处理消息存储到物理硬盘和查询功能。HA Service:高可用服务,提供Master Broker 和 Slave Broker之间的数据同步功能。Index Service:根据特定的Message key对投递到Broker的消息进行索引服务,以提供消息的快速查询。

    存储概念

     

     

    CommitLog:消息主体以及元数据的存储主体,存储Producer端写入的消息主体内容,消息内容不是定长的。单个文件大小默认1G ,文件名长度为20位,左边补零,剩余为起始偏移量,比如00000000000000000000代表了第一个文件,起始偏移量为0,文件大小为1G=1073741824;当第一个文件写满了,第二个文件为00000000001073741824,起始偏移量为1073741824,以此类推。消息主要是顺序写入日志文件,当文件满了,写入下一个文件;ConsumeQueue:消息消费队列,引入的目的主要是提高消息消费的性能,由于RocketMQ是基于主题topic的订阅模式,消息消费是针对主题进行的,如果要遍历commitlog文件中根据topic检索消息是非常低效的。Consumer即可根据ConsumeQueue来查找待消费的消息。其中,ConsumeQueue(逻辑消费队列)作为消费消息的索引,保存了指定Topic下的队列消息在CommitLog中的起始物理偏移量offset,消息大小size和消息Tag的HashCode值。consumequeue文件可以看成是基于topic的commitlog索引文件,故consumequeue文件夹的组织方式如下:topic/queue/file三层组织结构,具体存储路径为:$HOME/store/consumequeue/{topic}/{queueId}/{fileName}。同样consumequeue文件采取定长设计,每一个条目共20个字节,分别为8字节的commitlog物理偏移量、4字节的消息长度、8字节tag hashcode,单个文件由30W个条目组成,可以像数组一样随机访问每一个条目,每个ConsumeQueue文件大小约5.72M;IndexFile:IndexFile(索引文件)提供了一种可以通过key或时间区间来查询消息的方法。Index文件的存储位置是:$HOME \store\index${fileName},文件名fileName是以创建时的时间戳命名的,固定的单个IndexFile文件大小约为400M,一个IndexFile可以保存 2000W个索引,IndexFile的底层存储设计为在文件系统中实现HashMap结构,故rocketmq的索引文件其底层实现为hash索引。

    调用逻辑

    nameserv

    nameserv比较简单,其实就是一个注册中心的逻辑,broker启动之后会把自己的地址信息等注册到namserv上,这里其实就是基于netty的通信,然后producer实例启动的时候会先去和nameserv建立链接然后获取broker信息,name不承载负载均衡的功能。

    producer

    因为负载均衡都是在客户端实现的,所以producer会首先启动一个客户端,这个客户端首先会去启动netty实例,连接nameserv将producer自己注册到nameserv上。之后在发送消息时,producer会从nameserv上获取topic对应broker的存储列表,更新本地的缓存,尝试找到topic对应的broker。

     

    如果没有找到topic,相当于自动创建topic的逻辑,他就会自己创建一个topic的config,然后放到本地缓存的topic的table里面,之后会根据这个topic的table选择一个消费队列,这个消费队列后面在broker的地方说,默认消费队列的数量是4,如果是一个新的topic会获取一个随机的队列进行添加,同时还可能会有一个指数退避容错的策略运行,具体的容错策略均在MQFaultStrategy这个类中定义。这里有一个sendLatencyFaultEnable开关变量,如果开启,在随机递增取模的基础上,再过滤掉not available的Broker代理。所谓的"latencyFaultTolerance",是指对之前失败的,按一定的时间做退避。例如,如果上次请求的latency超过550Lms,就退避3000Lms;超过1000L,就退避60000L;如果关闭,采用随机递增取模的方式选择一个队列(MessageQueue)来发送消息,latencyFaultTolerance机制是实现消息发送高可用的核心关键所在。

     

     

     

    所以最终producer的发送的东西就是,选择broker,然后选择brocker的消费队列,把这个队列的id,消息的topic,消息的内容,tag等东西封装到消息报文里,通过netty发送到broker端。

    broker

    broker这个东西是所有模块里面最复杂的,首先它启动的时候创建了一对线程池队列等,粗略数了一下有十多个,但是这些线程池也是,实现他的消息队列的核心逻辑。

     

     

    上图中,两个逻辑是比较最重要的,第一个是messageStore,是message的存储的服务,处理了commitlog和consumeIndex的存储,但是这两个是放在不同的线程里面做的,producer发送消息过来会直接先写进commitlog日志里,在broker 启动的时候会创建一条单一线程,专门从commitlog里面获取没有被消费的消息,然后构造ConsumeQueue实例,创建consumeQueue索引文件,用来给消费者消费。

    然后后面的remotingservice和fastRemotingServer其实都是启动netty的客户端和服务端,用于和namesrv、producer和consumer消息的收发。

    接收producer消息

    接收消息发送消息这些,其实就是走的标准的netty的Reactor的模型,这个官方文档上有,下面也会简单说下,可以根据nettyserver的创建的,找到处理的handler。

     

     

     

    一路找下来,在直接看请求处理的的case。请求处理的过程中,首先其实就是调用对应的处理器来处理,这里用的是AsyncnettyRequestProcessor来处理的,其实实际处理的实例类是 SendMessageProcessor中的 processRequest方法。

     

     

     

     

     

    这里后续的处理流程就是校验请求头,请求类型等,组装存储的数据结构,最终会调用我们之前启动的时候创建的messageStore这个类中去存储。

     

     

    存储的过程中,其实只存储了commitlog这个日志,没有去构建消费队列,也没有构建消费者队列的的索引文件,写入commitlog后就会直接返回给producer,表示成功。

     

     

    consumeQueue构建

    看开发者指南的时候,有的地方一带而过,导致我看源码的时候一直没想明白这个consumeQueue到底是什么时候创建的,导致思维有个断层,后来又仔细看了下文档,才知道他其实是在broker启动的时候start了一条线程,专门根据commitlog的内容把消息分发到不同的队列中,因为在我的想法里,可能我直接扔到线程池里异步构建consumeQueue也是可以的,虽然还是需要一条这样的线程进行掉电之后的补偿,可能我太菜了吧。

    在broker的start()方法里面,启动的messageStore,在messageStore里面,又启动了一个单线程处理器。

     

     

    这个start方法里面其实是个死循环,只要服务不停,就不听的从commitlog里面读取消息分发,具体的实现在doReput()。

     

    通过DefaultMessageStor的doDispatch方法来构建队列和分发消息。DefaultMessageStore在初始化时会维护一个dispatcherList,默认加载两个类;CommitLogDispatcherBuildConsumeQueue用于构建ConsumerQueue、CommitLogDispatcherBuildIndex用于构建Index索引。

     

    dispatch()会调用putMessagePositionInfo()方法,putMessagePositionInfo方法首先在consumeQueueTable中找到对应TOPIC和queueID的ConsumerQueue然后调用ConsumerQueue的putMessagePositionInfoWrapper方法构建ConsumerQueue。

     

     

     

     

    ConsumeQueue的putMessagePositionInfoWrapper完成ConsumerQueue的构建。

     

     

    putMessagePositionInfo构建consumerQueue

     

     

    到这里就说明了consumeQueue是怎么构建起来的了。

    consumer

    在Consumer启动后,它就会通过定时任务不断地向RocketMQ集群中的所有Broker实例发送心跳包(其中包含了,消息消费分组名称、订阅关系集合、消息通信模式和客户端id的值等信息)。Broker端在收到Consumer的心跳消息后,会将它维护在ConsumerManager的本地缓存变量—consumerTable,同时并将封装后的客户端网络通道信息保存在本地缓存变量—channelInfoTable中,为之后做Consumer端的负载均衡提供可以依据的元数据信息。

    consumer的负载均衡,或者说他的并发就是根据consumeQueue来实现的,先来说下为什么需要consumeQueue的这个东西,在rocketmq的消息模型中,其实也使用了“请求-确认”的机制,确定消息不再传递的过程中由于网络或者服务器故障,或者消费失败导致丢失,没有推送,所以在消费端,消费者在收到消息运行消费逻辑(比如,将数据保存到数据库)后,会返回给broker一个确认的ACK,broker才会认定这条消息消费成功,但是这个给消费端带来了问题,因为要保证消息的有序性,在一条消息被消费成功之前是不会继续下一条消息的消费的(其实如果真的消费不掉,broker会把这条消息放到一个单独的死信队列中,进行后续处理),否则会出现消息空洞,不能保证有序性,比如一个订单产生了三条消息,订单创建,订单付款,订单完成,这个顺序是不能乱的,顺序消费才有意义,所以在这个例子下面需要保证消息消费的的顺序,rocketmq的做法就是把这三条消息都放到一个队列里面,队头的消息没有消费成功,就不消费下一个,但是不相关的两笔订单的消息又想同时消费怎么做,那就是在添加一个队列,增加消费者的数量这样就可以实现并发扩容。

     

     

    所以Rocketmq的实现方式是这样的,对于需要有序的消息,会放到同一个消费队列里,在rocketmq的设计里,同一个topic下面会有多个消费组,消费组的概念就是topic下面的消息会投放到所有的消费组中,每个消费组都消费主题中一份完整的消息,不同消费组之间消费进度彼此不受影响,也就是说,一条消息被 Consumer Group1 消费过,也会再给 Consumer Group2 消费。消费组中包含多个消费者,同一个组内的消费者是竞争消费的关系,每个消费者负责消费组内的一部分消息。如果一条消息被消费者 Consumer1 消费了,那同组的其他消费者就不会再收到这条消息。

    一上面的订单为例,producer发送消息的时候,会经过一个负载均衡的算法,这个是允许客户自定义的。

    // RocketMQ通过MessageQueueSelector中实现的算法来确定消息发送到哪一个队列上 // RocketMQ默认提供了两种MessageQueueSelector实现:随机/Hash // 当然你可以根据业务实现自己的MessageQueueSelector来决定消息按照何种策略发送到消息队列中 SendResult sendResult = producer.send(msg, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { Integer id = (Integer) arg; int index = id % mqs.size(); return mqs.get(index); } }, orderId);

    这样我们可以决定,同一个订单号可以通过自定义的取模分配到同一个队列上,每个消费者同时管理整个消费组中的多个队列,订单的消息在队列中是有序的,在整个消费组或者topic下是无序的,这样既保证了顺序执行,又保证了提高并发度不造成消息空洞。

    其实我个人认为对于乱序消费可以容忍消息空洞,因为我开发过类似的东西,无非就是你消费失败的时候可以进行补偿重新推送,写个定时器去定时看那些消息没有发送,再次发送一下就行了,同时还可以使用线程池进行并发发送提高性能。可能rocketmq作为一个产品化的系统来说,需要考虑多种情况,才使用了这种兼容严格顺序消费和普通顺序消费(其实如果是默认的随机投递队列的情况就是乱序消费)的模式。

    上面扯了一大堆其实是解决了为什么mq的消费模型是以消费组,消费队列,消费者构成的,下面说下代码里是具体怎么把队列分配给消费者的。

    作者:Elijah同学 链接:https://juejin.im/post/5eeb12226fb9a058897dbf03

     

     

    Processed: 0.011, SQL: 9