关于 RocketMQ 多实例消费者的一个问题

前言

今天在和领导讨论关于 RocketMQ 的使用规范时,我提出我们团队在开发过程中使用到了多个实例的消费者分 tags 消费同一个 topic。 领导敏锐地察觉到这种用法在集群环境下使用很可能有问题,让我验证一下。

验证

rocketmq-multi-instance-demo

Consumer 的代码如下,在同一台服务器上起了两个进程(也就是两个消费者),原本以为会均衡的负载到这两个消费者上面。 但是结果并不是这样,我一共发送了十条消息,这两个并没有收到全部十条消息,并且收到的消息全都相同。 这个结果也证实了领导提出的疑问。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@RocketMQMessageListener(
        topic = "${demo.rocketmq.topic}",
        consumerGroup = "taga-consumer-group",
        selectorExpression = "${demo.rocketmq.tagA}"
)
public class TagAConsumer implements RocketMQListener<String>, RocketMQPushConsumerLifecycleListener {

    @Override
    public void onMessage(String message) {
        System.out.printf("------- TagAConsumer received: %s \n", message);
    }

    @Override
    public void prepareStart(DefaultMQPushConsumer consumer) {
        // 设置当前实例的名称
        // consumer.setInstanceName("tag-a-consumer-instance@" + UtilAll.getPid());
        consumer.setInstanceName("tag-a-consumer-instance");
    }

}

分析

带着上面这个问题,我对 RocketMQ 源码进行了简单分析。 查看下面两段源码,我们可以发现 RocketMQ 默认的平均负载策略是根据 clientId 去分配的, 那么,这个 clientId 又是哪里来的呢,我们接着往下看。

RebalanceImpl.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
...
        
AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;

List<MessageQueue> allocateResult = null;
try {
    allocateResult = strategy.allocate(
        this.consumerGroup,
        this.mQClientFactory.getClientId(), // 先看这里
        mqAll,
        cidAll);
} catch (Throwable e) {
    log.error("allocate message queue exception. strategy name: {}, ex: {}", strategy.getName(), e);
    return false;
}

...

AllocateMessageQueueAveragely:平均负载策略,RocketMQ 默认使用的策略。 AllocateMessageQueueAveragely.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package org.apache.rocketmq.client.consumer.rebalance;

import java.util.ArrayList;
import java.util.List;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.MessageQueue;

/**
 * Average Hashing queue algorithm
 */
public class AllocateMessageQueueAveragely extends AbstractAllocateMessageQueueStrategy {

    public AllocateMessageQueueAveragely() {
        log = ClientLogger.getLog();
    }

    public AllocateMessageQueueAveragely(InternalLogger log) {
        this.log = log;
    }

    @Override
    public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
        List<String> cidAll) {

        List<MessageQueue> result = new ArrayList<MessageQueue>();
        if (!check(consumerGroup, currentCID, mqAll, cidAll)) {
            return result;
        }

        // 再看这里
        int index = cidAll.indexOf(currentCID);
        int mod = mqAll.size() % cidAll.size();
        int averageSize =
            mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()
                + 1 : mqAll.size() / cidAll.size());
        int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
        int range = Math.min(averageSize, mqAll.size() - startIndex);
        for (int i = 0; i < range; i++) {
            result.add(mqAll.get((startIndex + i) % mqAll.size()));
        }
        return result;
    }

    @Override
    public String getName() {
        return "AVG";
    }
}

从下面这段源码中,我们可以发现,如果我们设置了一个固定值的 instanceName,那么生成的 clientId 的格式是这样的 IP@instanceName, 当同一个服务器上同时起了两个服务(非容器),那么这两个服务的 IP 是相同的,instanceName 也是相同的,也就是说 clientId 也是相同的。 再根据上面的负载均衡策略代码,我们就不难看出为什么会出现我前面验证的那个问题了。

ClientConfig.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
...

public String buildMQClientId() {
    StringBuilder sb = new StringBuilder();
    sb.append(this.getClientIP());

    sb.append("@");
    sb.append(this.getInstanceName());
    if (!UtilAll.isBlank(this.unitName)) {
    sb.append("@");
    sb.append(this.unitName);
    }

    return sb.toString();
}

...

public void changeInstanceNameToPID() {
    if (this.instanceName.equals("DEFAULT")) {
    this.instanceName = String.valueOf(UtilAll.getPid());
    }
}

...

如果想要进一步分析 RocketMQ 的源码,可以参考这位大佬写的这篇文章: Rocketmq源码分析12:consumer 负载均衡

结论

上面的问题一般只会出现在不使用容器部署的集群环境下(而且是同一台服务器), 我们生产环境一般都是 k8s(cluster ip) 也就是说 Consumer 起来的时候拿到的 IP 是容器的 IP, 不太会出现上面的问题。 但开发过程中还是要注意,建议在设置 instanceName 的时候后面加一个随机数(好像把这个随机数设置在 unitName 更合适)。

写在最后

翻了翻 Git 上 RocketMQ 最新的源码,发现 changeInstanceNameToPID 这个方法在使用默认的 instanceName 的时候在进程号后面又加了一个时间, 难道光是进程号有可能还是会重复?🤔

ClientConfig.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
...

public String buildMQClientId() {
    StringBuilder sb = new StringBuilder();
    sb.append(this.getClientIP());

    sb.append("@");
    sb.append(this.getInstanceName());
    if (!UtilAll.isBlank(this.unitName)) {
        sb.append("@");
        sb.append(this.unitName);
    }

    if (enableStreamRequestType) {
        sb.append("@");
        sb.append(RequestType.STREAM);
    }

    return sb.toString();
}

...

public void changeInstanceNameToPID() {
    if (this.instanceName.equals("DEFAULT")) {
    this.instanceName = UtilAll.getPid() + "#" + System.nanoTime();
    }
}

...