前言
今天在和领导讨论关于 RocketMQ 的使用规范时,我提出我们团队在开发过程中使用到了多个实例的消费者分 tags 消费同一个 topic。 领导敏锐地察觉到这种用法在集群环境下使用很可能有问题,让我验证一下。
验证
Consumer 的代码如下,在同一台服务器上起了两个进程(也就是两个消费者),原本以为会均衡的负载到这两个消费者上面。 但是结果并不是这样,我一共发送了十条消息,这两个并没有收到全部十条消息,并且收到的消息全都相同。 这个结果也证实了领导提出的疑问。
1 | |
分析
带着上面这个问题,我对 RocketMQ 源码进行了简单分析。 查看下面两段源码,我们可以发现 RocketMQ 默认的平均负载策略是根据 clientId 去分配的, 那么,这个 clientId 又是哪里来的呢,我们接着往下看。
1 | |
AllocateMessageQueueAveragely:平均负载策略,RocketMQ 默认使用的策略。 AllocateMessageQueueAveragely.java
1 | |
从下面这段源码中,我们可以发现,如果我们设置了一个固定值的 instanceName,那么生成的 clientId 的格式是这样的 IP@instanceName, 当同一个服务器上同时起了两个服务(非容器),那么这两个服务的 IP 是相同的,instanceName 也是相同的,也就是说 clientId 也是相同的。 再根据上面的负载均衡策略代码,我们就不难看出为什么会出现我前面验证的那个问题了。
ClientConfig.java
1 | |
如果想要进一步分析 RocketMQ 的源码,可以参考这位大佬写的这篇文章: Rocketmq源码分析12:consumer 负载均衡
结论
上面的问题一般只会出现在不使用容器部署的集群环境下(而且是同一台服务器), 我们生产环境一般都是 k8s(cluster ip) 也就是说 Consumer 起来的时候拿到的 IP 是容器的 IP, 不太会出现上面的问题。 但开发过程中还是要注意,建议在设置 instanceName 的时候后面加一个随机数(好像把这个随机数设置在 unitName 更合适)。
写在最后
翻了翻 Git 上 RocketMQ 最新的源码,发现 changeInstanceNameToPID 这个方法在使用默认的 instanceName 的时候在进程号后面又加了一个时间, 难道光是进程号有可能还是会重复?🤔
1 | |