当前位置: 移动技术网 > IT编程>开发语言>Java > rocketmq消费负载均衡--push消费详解

rocketmq消费负载均衡--push消费详解

2019年07月19日  | 移动技术网IT编程  | 我要评论

前言

本文介绍了defaultmqpushconsumerimpl消费者,客户端负载均衡相关知识点。本文从defaultmqpushconsumerimpl启动过程到实现负载均衡,从源代码一步一步分析,共分为6个部分进行介绍,其中第6个部分 rebalancebytopic 为负载均衡的核心逻辑模块,具体过程运用了图文进行阐述。

介绍之前首先抛出几个问题:

1. 要做负载均衡,首先要解决的一个问题是什么?

2. 负载均衡是client端处理还是broker端处理?

个人理解:

1. 要做负载均衡,首先要做的就是信号收集。

所谓信号收集,就是得知道每一个consumergroup有哪些consumer,对应的topic是谁。信号收集分为client端信号收集与broker端信号收集两个部分。

2. 负载均衡放在client端处理。

具体做法是:消费者客户端在启动时完善rebalanceimpl实例,同时拷贝订阅信息存放rebalanceimpl实例对象中,另外也是很重要的一个步骤 -- 通过心跳消息,不停的上报自己到所有broker,注册registerconsumer,等待上述过程准备好之后在client端不断执行的负载均衡服务线程从broker端获取一份全局信息(该consumergroup下所有的消费client),然后分配这些全局信息,获取当前客户端分配到的消费队列。

本文具体的内容:

i. copysubscription

client端信号收集,拷贝订阅信息。

在defaultmqpushconsumerimpl.start()时,会将消费者的topic订阅关系设置到rebalanceimpl的subscriptioninner的map中用于负载:

private void copysubscription() throws mqclientexception {
try {
//注:一个consumer对象可以订阅多个topic
map<string, string> sub = this.defaultmqpushconsumer.getsubscription();
if (sub != null) {
for (final map.entry<string, string> entry : sub.entryset()) {
final string topic = entry.getkey();
final string substring = entry.getvalue();
subscriptiondata subscriptiondata =
filterapi.buildsubscriptiondata(this.defaultmqpushconsumer.getconsumergroup(),//
topic, substring);
this.rebalanceimpl.getsubscriptioninner().put(topic, subscriptiondata);
}
}
if (null == this.messagelistenerinner) {
this.messagelistenerinner = this.defaultmqpushconsumer.getmessagelistener();
}
switch (this.defaultmqpushconsumer.getmessagemodel()) {
case broadcasting:
break;
case clustering:
final string retrytopic = mixall.getretrytopic(this.defaultmqpushconsumer.getconsumergroup());
subscriptiondata subscriptiondata =
filterapi.buildsubscriptiondata(this.defaultmqpushconsumer.getconsumergroup(),//
retrytopic, subscriptiondata.sub_all);
this.rebalanceimpl.getsubscriptioninner().put(retrytopic, subscriptiondata);
break;
default:
break;
}
}
catch (exception e) {
throw new mqclientexception("subscription exception", e);
}
}

filterapi.buildsubscriptiondata接口将订阅关系转换为subscriptiondata 数据,其中substring包含订阅tag等信息。另外,如果该消费者的消费模式为集群消费,则会将retry的topic一并放到。

ii. 完善rebalanceimpl实例

client继续收集信息:

this.rebalanceimpl.setconsumergroup(this.defaultmqpushconsumer.getconsumergroup());
this.rebalanceimpl.setmessagemodel(this.defaultmqpushconsumer.getmessagemodel());
this.rebalanceimpl.setallocatemessagequeuestrategy(this.defaultmqpushconsumer
.getallocatemessagequeuestrategy());
this.rebalanceimpl.setmqclientfactory(this.mqclientfactory);

本文以defaultmqpushconsumerimpl为例,因此this对象类型为defaultmqpushconsumerimp。

iii. this.rebalanceservice.start()

开启负载均衡服务。this.rebalanceservice是一个rebalanceservice实例对象,它继承与servicethread,是一个线程类。 this.rebalanceservice.start()执行时,也即执行rebalanceservice线程体:

@override
public void run() {
log.info(this.getservicename() + " service started");
while (!this.isstoped()) {
this.waitforrunning(waitinterval);
this.mqclientfactory.dorebalance();
}
log.info(this.getservicename() + " service end");
}

iv. this.mqclientfactory.dorebalance

客户端遍历消费组table,对该客户端上所有消费者独立进行负载均衡,分发消费队列:

public void dorebalance() {
for (string group : this.consumertable.keyset()) {
mqconsumerinner impl = this.consumertable.get(group);
if (impl != null) {
try {
impl.dorebalance();
} catch (exception e) {
log.error("dorebalance exception", e);
}
}
}
}

v. mqconsumerinner.dorebalance

由于本文以defaultmqpushconsumerimpl消费过程为例,即defaultmqpushconsumerimpl.dorebalance:

@override
public void dorebalance() {
if (this.rebalanceimpl != null) {
this.rebalanceimpl.dorebalance();
}
}

步骤ii 中完善了rebalanceimpl实例,为调用rebalanceimpl.dorebalance()提供了初始数据。

rebalanceimpl.dorebalance()过程如下:

public void dorebalance() {
     // 前文copysubscription中初始化了subscriptioninner
map<string, subscriptiondata> subtable = this.getsubscriptioninner();
if (subtable != null) {
for (final map.entry<string, subscriptiondata> entry : subtable.entryset()) {
final string topic = entry.getkey();
try {
this.rebalancebytopic(topic);
} catch (exception e) {
if (!topic.startswith(mixall.retry_group_topic_prefix)) {
log.warn("rebalancebytopic exception", e);
}
}
}
}
this.truncatemessagequeuenotmytopic();
}

vi. rebalancebytopic -- 核心步骤之一

rebalancebytopic方法中根据消费者的消费类型为broadcasting或clustering做不同的逻辑处理。clustering逻辑包括broadcasting逻辑,本部分只介绍集群消费负载均衡的逻辑。

集群消费负载均衡逻辑主要代码如下(省略了log等代码):

//1.从topicsubscribeinfotable列表中获取与该topic相关的所有消息队列
set<messagequeue> mqset = this.topicsubscribeinfotable.get(topic);
//2. 从broker端获取消费该消费组的所有客户端clientid
list<string> cidall = this.mqclientfactory.findconsumeridlist(topic, consumergroup);
f (null == mqset) { ... }
if (null == cidall) { ... }
if (mqset != null && cidall != null) {
list<messagequeue> mqall = new arraylist<messagequeue>();
mqall.addall(mqset);
collections.sort(mqall);
collections.sort(cidall);

     // 3.创建defaultmqpushconsumer对象时默认设置为allocatemessagequeueaveragely
allocatemessagequeuestrategy strategy = this.allocatemessagequeuestrategy;

list<messagequeue> allocateresult = null;
try {
         // 4.调用allocatemessagequeueaveragely.allocate方法,获取当前client分配消费队列
allocateresult = strategy.allocate(
this.consumergroup, 
this.mqclientfactory.getclientid(), 
mqall,
cidall);
} catch (throwable e) {
return;
}
    // 5. 将分配得到的allocateresult 中的队列放入allocateresultset 集合
set<messagequeue> allocateresultset = new hashset<messagequeue>();
if (allocateresult != null) {
allocateresultset.addall(allocateresult);
}
、
     //6. 更新updateprocessqueue
boolean changed = this.updateprocessqueuetableinrebalance(topic, allocateresultset);
if (changed) {
this.messagequeuechanged(topic, mqset, allocateresultset);
}
}

注:broadcasting逻辑只包含上述的1、6。

集群消费负载均衡逻辑中的1、2、4这三个点相关知识为其核心过程,各个点相关知识如下:

第1点:从topicsubscribeinfotable列表中获取与该topic相关的所有消息队列

第2点: 从broker端获取消费该消费组的所有客户端clientid

首先,消费者对象不断地向所有broker发送心跳包,上报自己,注册并更新订阅关系以及客户端channelinfotable;之后,客户端在做消费负载均衡时获取那些消费客户端,对这些客户端进行负载均衡,分发消费的队列。具体过程如下图所示:

第4点:调用allocatemessagequeueaveragely.allocate方法,获取当前client分配消费队列

注:上图中cid1、cid2、...、cidn通过 getconsumeridlistbygroup 获取,它们在这个consumergroup下所有在线客户端列表中。

当前消费对进行负载均衡策略后获取对应的消息消费队列。具体的算法很简单,可以看源码。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持移动技术网。

如对本文有疑问, 点击进行留言回复!!

相关文章:

验证码:
移动技术网