rocketmq笔记:rebalance的逻辑
本文记录rocketmq rebalance相关的源码阅读笔记。只是记录,没有总结。
启动RebalanceService
线程
rebalance单线程运行,它的实现在RebalanceService
类。假定consumer的启动代码如下:
1 | DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP); |
执行consumer.start()
时内部会先创建一个MQClientInstance
的实例,然后调用其start()
方法,RebalanceService
的实例也就是在这个过程中创建并启动的
1 | this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook); |
RebalanceService
本身继承了ServiceThread
,用于管理线程的生命周期。启动时可以认为是直接调用的start()
方法,也就是说一个mq client只有一个rebalance线程
1 | // MQClientInstance内部 |
重平衡执行逻辑
RebalanceService
启动后开始执行重平衡,它本身只是用于调度,实际的逻辑入口还是在MQClientInstance
里
1 | while (!this.isStopped()) { |
consumerTable的来历
MQClientInstance
里一个关键的数据结构是 consumerTable
1 | ConcurrentMap<String, MQConsumerInner> consumerTable = new ConcurrentHashMap<>(); |
它的key是什么?是消费组的名字,比如example里的「please_rename_unique_group_name_4」
它的value是什么?是一个实现了MQConsumerInner
接口的实例。它从哪里来?就来自DefaultMQPushConsumer
的内部
1 | // DefaultMQPushConsumer的内部 |
遍历consumerTable执行rebalance
为啥要遍历?因为可能订阅了不止一个topic,这样就有了多个consumeGroup,每一个都需要rebalance
1 | for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) { |
MQConsumerInner
只是做个分发,rebalance的活是交给RebalanceImpl
干的
1 | this.rebalanceImpl.doRebalance(this.isConsumeOrderly()); |
服务端还是客户端?
rebalance可以发生在服务端,也可以发生在客户端,关键在于这个topic是怎么配的。默认是客户端重平衡,什么情况下需要服务端重平衡?待研究
服务端重平衡
很显然要从服务端查询重平衡结果
1 | Set<MessageQueueAssignment> messageQueueAssignments = this.mQClientFactory.queryAssignment(topic, consumerGroup, |
既然服务端都处理好了,那就直接更新分配到的队列结果就完了
1 | this.updateMessageQueueAssignment(topic, messageQueueAssignments, isOrder); |
客户端重平衡
这个才是要重点关注的。根据消息模式的不同,又分为广播模式(BROADCASTING)和集群模式(CLUSTERING),它们的区别关键点在于:广播模式下每个消费组实例要监听所有的队列,而集群模式下只监听分配到的实例
广播模式(BROADCASTING)重平衡
1 | // 从服务端查询topic下的队列 |
集群模式(CLUSTERING)重平衡
1 | // 从服务端查询topic下的队列 |
更新重平衡结果
不管前面执行了何种逻辑,最终都要把结果更新到本地缓存,所以updateProcessQueueTableInRebalance
是最终的归宿
感觉这里的代码写的不太好,看起来有些乱,需要理一下。
processQueueTable
是什么
要搞懂这些代码,首先要搞清楚processQueueTable
的数据结构
1 | ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable = new ConcurrentHashMap<>(64); |
可以看到它的key的类型是MessageQueue
,value的类型是ProcessQueue
,所以就从这两个类型入手
用于key的MessageQueue
MessageQueue
有3个属性
1 | private String topic; |
它记录的是一个具体的队列的坐标:这个队列属于哪个topic + 它又是存储在哪个broker上的
可见MessageQueue
就是从服务端获取的队列基本信息
用于value的ProcessQueue
ProcessQueue
的结构相对更加复杂,它的注释是Queue consumption snapshot
,表明是队列在消费时的快照
从内部变量上看,它记录了消息体MessageExt、消费的进度offset等,知道是做什么的就行,具体细节就不展开了
更新processQueueTable
重平衡后,有些队列已经不属于当前消费者了,所以要「移除不属于当前消费者的ProcessQueue
」
1 | // 从processQueueTable这个map里找到当前要更新的topic |
移除MessageQueue
并将其消费进度同步到服务端
1 | // removeQueueMap里记录着需要被移除的队列信息 |
添加新的消费队列
1 | for (MessageQueue mq : mqSet) { |
发起拉取消息请求
1 | this.dispatchPullRequest(pullRequestList, 500); |
这里也只是把请求丢到 PullMessageService
的队列里
1 | public void executePullRequestImmediately(final PullRequest pullRequest) { |
最终的拉取动作是在 PullMessageService
的 run
方法里执行的
1 | public void run() { |
启动PullMessageService
线程
PullMessageService
线程和 RebalanceService
线程的启动时机类似,也是在 consumer 启动时通过调用 MQClientInstance
的 start
方法启动的
1 | this.pullMessageService.start(); |
- 2020-12-04
Rpc Agent is a framework, with which you can develop an agent server for a RPC framework.