rocketmq笔记:rebalance的逻辑

本文记录rocketmq rebalance相关的源码阅读笔记。只是记录,没有总结。

启动RebalanceService线程

rebalance单线程运行,它的实现在RebalanceService类。假定consumer的启动代码如下:

1
2
3
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
...
consumer.start();

执行consumer.start()时内部会先创建一个MQClientInstance的实例,然后调用其start()方法,RebalanceService的实例也就是在这个过程中创建并启动的

1
2
3
this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
...
mQClientFactory.start();

RebalanceService本身继承了ServiceThread,用于管理线程的生命周期。启动时可以认为是直接调用的start()方法,也就是说一个mq client只有一个rebalance线程

1
2
3
4
5
// MQClientInstance内部
this.rebalanceService.start();

// rebalanceService内部
this.thread.start();

重平衡执行逻辑

RebalanceService启动后开始执行重平衡,它本身只是用于调度,实际的逻辑入口还是在MQClientInstance

1
2
3
4
5
while (!this.isStopped()) {
this.waitForRunning(waitInterval);
// 调用MQClientInstance的doRebalance方法
this.mqClientFactory.doRebalance();
}

consumerTable的来历

MQClientInstance里一个关键的数据结构是 consumerTable

1
ConcurrentMap<String, MQConsumerInner> consumerTable = new ConcurrentHashMap<>();

它的key是什么?是消费组的名字,比如example里的「please_rename_unique_group_name_4」

它的value是什么?是一个实现了MQConsumerInner接口的实例。它从哪里来?就来自DefaultMQPushConsumer的内部

1
2
3
4
5
6
7
8
// DefaultMQPushConsumer的内部
this.defaultMQPushConsumerImpl.start();

// DefaultMQPushConsumerImpl的内部,它本身是实现了MQConsumerInner接口的。这个this就是defaultMQPushConsumerImpl自己
mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);

// 嗨,就是这一行添加的
MQConsumerInner prev = this.consumerTable.putIfAbsent(group, consumer);

遍历consumerTable执行rebalance

为啥要遍历?因为可能订阅了不止一个topic,这样就有了多个consumeGroup,每一个都需要rebalance

1
2
3
4
5
6
7
8
9
10
for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
MQConsumerInner impl = entry.getValue();
if (impl != null) {
try {
impl.doRebalance();
} catch (Throwable e) {
log.error("doRebalance exception", e);
}
}
}

MQConsumerInner只是做个分发,rebalance的活是交给RebalanceImpl干的

1
this.rebalanceImpl.doRebalance(this.isConsumeOrderly());

服务端还是客户端?

rebalance可以发生在服务端,也可以发生在客户端,关键在于这个topic是怎么配的。默认是客户端重平衡,什么情况下需要服务端重平衡?待研究

服务端重平衡

很显然要从服务端查询重平衡结果

1
2
Set<MessageQueueAssignment> messageQueueAssignments = this.mQClientFactory.queryAssignment(topic, consumerGroup,
strategyName, messageModel, QUERY_ASSIGNMENT_TIMEOUT)

既然服务端都处理好了,那就直接更新分配到的队列结果就完了

1
this.updateMessageQueueAssignment(topic, messageQueueAssignments, isOrder);

客户端重平衡

这个才是要重点关注的。根据消息模式的不同,又分为广播模式(BROADCASTING)和集群模式(CLUSTERING),它们的区别关键点在于:广播模式下每个消费组实例要监听所有的队列,而集群模式下只监听分配到的实例

广播模式(BROADCASTING)重平衡

1
2
3
4
5
// 从服务端查询topic下的队列
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);

// 更新到本地缓存
this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);

集群模式(CLUSTERING)重平衡

1
2
3
4
5
6
7
8
9
10
11
// 从服务端查询topic下的队列
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);

// 从服务端查询topic下所有的消费者
List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);

// 调用具体的分配策略分配队列
List<MessageQueue> allocateResult = strategy.allocate(this.consumerGroup, this.mQClientFactory.getClientId(), mqAll, cidAll);

// 更新到本地缓存
this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);

更新重平衡结果

不管前面执行了何种逻辑,最终都要把结果更新到本地缓存,所以updateProcessQueueTableInRebalance是最终的归宿

感觉这里的代码写的不太好,看起来有些乱,需要理一下。

processQueueTable是什么

要搞懂这些代码,首先要搞清楚processQueueTable的数据结构

1
ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable = new ConcurrentHashMap<>(64);

可以看到它的key的类型是MessageQueue,value的类型是ProcessQueue,所以就从这两个类型入手

用于key的MessageQueue

MessageQueue有3个属性

1
2
3
private String topic;
private String brokerName;
private int queueId;

它记录的是一个具体的队列的坐标:这个队列属于哪个topic + 它又是存储在哪个broker上的

可见MessageQueue就是从服务端获取的队列基本信息

用于value的ProcessQueue

ProcessQueue的结构相对更加复杂,它的注释是Queue consumption snapshot,表明是队列在消费时的快照

从内部变量上看,它记录了消息体MessageExt、消费的进度offset等,知道是做什么的就行,具体细节就不展开了

更新processQueueTable

重平衡后,有些队列已经不属于当前消费者了,所以要「移除不属于当前消费者的ProcessQueue

1
2
3
4
5
6
7
8
9
// 从processQueueTable这个map里找到当前要更新的topic
if (mq.getTopic().equals(topic))

// 队列已经被移除
if (!mqSet.contains(mq)) {
// 标记移除并且放到待删除记录里
pq.setDropped(true);
removeQueueMap.put(mq, pq);
}

移除MessageQueue并将其消费进度同步到服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
// removeQueueMap里记录着需要被移除的队列信息
for (Entry<MessageQueue, ProcessQueue> entry : removeQueueMap.entrySet()) {
MessageQueue mq = entry.getKey();
ProcessQueue pq = entry.getValue();

// 移除之前需要先把offset同步到broker
if (this.removeUnnecessaryMessageQueue(mq, pq)) {
// 移除消费队列
this.processQueueTable.remove(mq);
changed = true;
log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);
}
}

添加新的消费队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
for (MessageQueue mq : mqSet) {
// 本地没有的队列需要新增
if (!this.processQueueTable.containsKey(mq)) {
// 如果是有序队列,需要先在远程和本地锁住它
if (isOrder && !this.lock(mq)) {
log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
allMQLocked = false;
continue;
}

// 先在本地清除下消费进度,防止有脏数据
this.removeDirtyOffset(mq);

// 创建队列的快照
ProcessQueue pq = createProcessQueue(topic);

// 锁住
pq.setLocked(true);

// 计算消费的偏移量
long nextOffset = this.computePullFromWhere(mq);
if (nextOffset >= 0) {

// 放入处理中队列
ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
if (pre != null) {
// 更新时什么也不做
log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
} else {
// 如果是本地新监听的队列,需要去服务端拉取消息
log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
PullRequest pullRequest = new PullRequest();
pullRequest.setConsumerGroup(consumerGroup);
pullRequest.setNextOffset(nextOffset);
pullRequest.setMessageQueue(mq);
pullRequest.setProcessQueue(pq);
pullRequestList.add(pullRequest);
changed = true;
}
} else {
log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
}
}

}

发起拉取消息请求

1
this.dispatchPullRequest(pullRequestList, 500);

这里也只是把请求丢到 PullMessageService 的队列里

1
2
3
4
5
6
7
8
public void executePullRequestImmediately(final PullRequest pullRequest) {
try {
// 把请求放到队列里,等待拉取线程处理
this.messageRequestQueue.put(pullRequest);
} catch (InterruptedException e) {
logger.error("executePullRequestImmediately pullRequestQueue.put", e);
}
}

最终的拉取动作是在 PullMessageServicerun 方法里执行的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public void run() {
logger.info(this.getServiceName() + " service started");

while (!this.isStopped()) {
try {
MessageRequest messageRequest = this.messageRequestQueue.take();
// 两种不同的拉取模式。pop模式下MessageQueue可以共享
if (messageRequest.getMessageRequestMode() == MessageRequestMode.POP) {
this.popMessage((PopRequest)messageRequest);
} else {
this.pullMessage((PullRequest)messageRequest);
}
} catch (InterruptedException ignored) {
} catch (Exception e) {
logger.error("Pull Message Service Run Method exception", e);
}
}

logger.info(this.getServiceName() + " service end");
}

启动PullMessageService线程

PullMessageService线程和 RebalanceService 线程的启动时机类似,也是在 consumer 启动时通过调用 MQClientInstancestart 方法启动的

1
this.pullMessageService.start();