当前位置: 移动技术网 > IT编程>开发语言>Java > RocketMQ中Producer消息的发送

RocketMQ中Producer消息的发送

2019年08月02日  | 移动技术网IT编程  | 我要评论
上篇博客介绍过Producer的启动,这里涉及到相关内容就不再累赘了 【RocketMQ中Producer的启动源码分析】 Producer发送消息,首先需要生成Message实例: 其中properties中存放需要配置的属性,由MessageConst规定其key: 在创建完Message后,通 ...

上篇博客介绍过producer的启动,这里涉及到相关内容就不再累赘了 【rocketmq中producer的启动源码分析】

 

producer发送消息,首先需要生成message实例:

 1 public class message implements serializable {
 2     private static final long serialversionuid = 8445773977080406428l;
 3 
 4     private string topic;
 5     private int flag;
 6     private map<string, string> properties;
 7     private byte[] body;
 8     private string transactionid;
 9 
10      public message() {}
11 
12     public message(string topic, byte[] body) {
13         this(topic, "", "", 0, body, true);
14     }
15 
16     public message(string topic, string tags, byte[] body) {
17         this(topic, tags, "", 0, body, true);
18     }
19     
20     public message(string topic, string tags, string keys, byte[] body) {
21         this(topic, tags, keys, 0, body, true);
22     }
23     
24     public message(string topic, string tags, string keys, int flag, byte[] body, boolean waitstoremsgok) {
25         this.topic = topic;
26         this.flag = flag;
27         this.body = body;
28 
29         if (tags != null && tags.length() > 0)
30             this.settags(tags);
31 
32         if (keys != null && keys.length() > 0)
33             this.setkeys(keys);
34 
35         this.setwaitstoremsgok(waitstoremsgok);
36     }
37 
38     public void settags(string tags) {
39         this.putproperty(messageconst.property_tags, tags);
40     }
41 
42     public void setkeys(string keys) {
43         this.putproperty(messageconst.property_keys, keys);
44     }
45 
46     public void setwaitstoremsgok(boolean waitstoremsgok) {
47         this.putproperty(messageconst.property_wait_store_msg_ok, boolean.tostring(waitstoremsgok));
48     }
49 
50     void putproperty(final string name, final string value) {
51         if (null == this.properties) {
52             this.properties = new hashmap<string, string>();
53         }
54 
55         this.properties.put(name, value);
56     }
57 
58     public void putuserproperty(final string name, final string value) {
59         if (messageconst.string_hash_set.contains(name)) {
60             throw new runtimeexception(string.format(
61                 "the property<%s> is used by system, input another please", name));
62         }
63 
64         if (value == null || value.trim().isempty()
65             || name == null || name.trim().isempty()) {
66             throw new illegalargumentexception(
67                 "the name or value of property can not be null or blank string!"
68             );
69         }
70 
71         this.putproperty(name, value);
72     }
73     
74 }


其中properties中存放需要配置的属性,由messageconst规定其key:

 1 public class messageconst {
 2     public static final string property_keys = "keys";
 3     public static final string property_tags = "tags";
 4     public static final string property_wait_store_msg_ok = "wait";
 5     public static final string property_delay_time_level = "delay";
 6     public static final string property_retry_topic = "retry_topic";
 7     public static final string property_real_topic = "real_topic";
 8     public static final string property_real_queue_id = "real_qid";
 9     public static final string property_transaction_prepared = "tran_msg";
10     public static final string property_producer_group = "pgroup";
11     public static final string property_min_offset = "min_offset";
12     public static final string property_max_offset = "max_offset";
13     public static final string property_buyer_id = "buyer_id";
14     public static final string property_origin_message_id = "origin_message_id";
15     public static final string property_transfer_flag = "transfer_flag";
16     public static final string property_correction_flag = "correction_flag";
17     public static final string property_mq2_flag = "mq2_flag";
18     public static final string property_reconsume_time = "reconsume_time";
19     public static final string property_msg_region = "msg_region";
20     public static final string property_trace_switch = "trace_on";
21     public static final string property_uniq_client_message_id_keyidx = "uniq_key";
22     public static final string property_max_reconsume_times = "max_reconsume_times";
23     public static final string property_consume_start_timestamp = "consume_start_time";
24     public static final string property_transaction_prepared_queue_offset = "tran_prepared_queue_offset";
25     public static final string property_transaction_check_times = "transaction_check_times";
26     public static final string property_check_immunity_time_in_seconds = "check_immunity_time_in_seconds";
27 }


在创建完message后,通过defaultmqproducer的send方法对消息进行发送

producer支持三种模式的消息发送,由communicationmode枚举规定:

1 public enum communicationmode {
2     sync,
3     async,
4     oneway,
5 }

分别代表:同步、异步以及单向发送
其中同步和异步是根据不同参数类型的send方法来决定的

只要send方法中带有sendcallback参数,都代表着异步发送,否则就是同步,sendcallback提供了异步发送的回滚事件响应:

1 public interface sendcallback {
2     void onsuccess(final sendresult sendresult);
3 
4     void onexception(final throwable e);
5 }


而单向发送需要使用sendoneway方法

 

无论使用哪种方式,最后都是通过调用defaultmqproducer包装的defaultmqproducerimpl的senddefaultimpl方法


defaultmqproducerimpl的senddefaultimpl方法:

  1 private sendresult senddefaultimpl(
  2         message msg,
  3         final communicationmode communicationmode,
  4         final sendcallback sendcallback,
  5         final long timeout
  6     ) throws mqclientexception, remotingexception, mqbrokerexception, interruptedexception {
  7     this.makesurestateok();
  8     validators.checkmessage(msg, this.defaultmqproducer);
  9 
 10     final long invokeid = random.nextlong();
 11     long begintimestampfirst = system.currenttimemillis();
 12     long begintimestampprev = begintimestampfirst;
 13     long endtimestamp = begintimestampfirst;
 14     topicpublishinfo topicpublishinfo = this.trytofindtopicpublishinfo(msg.gettopic());
 15     if (topicpublishinfo != null && topicpublishinfo.ok()) {
 16         boolean calltimeout = false;
 17         messagequeue mq = null;
 18         exception exception = null;
 19         sendresult sendresult = null;
 20         int timestotal = communicationmode == communicationmode.sync ? 1 + this.defaultmqproducer.getretrytimeswhensendfailed() : 1;
 21         int times = 0;
 22         string[] brokerssent = new string[timestotal];
 23         for (; times < timestotal; times++) {
 24             string lastbrokername = null == mq ? null : mq.getbrokername();
 25             messagequeue mqselected = this.selectonemessagequeue(topicpublishinfo, lastbrokername);
 26             if (mqselected != null) {
 27                 mq = mqselected;
 28                 brokerssent[times] = mq.getbrokername();
 29                 try {
 30                     begintimestampprev = system.currenttimemillis();
 31                     long costtime = begintimestampprev - begintimestampfirst;
 32                     if (timeout < costtime) {
 33                         calltimeout = true;
 34                         break;
 35                     }
 36 
 37                     sendresult = this.sendkernelimpl(msg, mq, communicationmode, sendcallback, topicpublishinfo, timeout - costtime);
 38                     endtimestamp = system.currenttimemillis();
 39                     this.updatefaultitem(mq.getbrokername(), endtimestamp - begintimestampprev, false);
 40                     switch (communicationmode) {
 41                         case async:
 42                             return null;
 43                         case oneway:
 44                             return null;
 45                         case sync:
 46                             if (sendresult.getsendstatus() != sendstatus.send_ok) {
 47                                 if (this.defaultmqproducer.isretryanotherbrokerwhennotstoreok()) {
 48                                     continue;
 49                                 }
 50                             }
 51 
 52                             return sendresult;
 53                         default:
 54                             break;
 55                     }
 56                 } catch (remotingexception e) {
 57                     endtimestamp = system.currenttimemillis();
 58                     this.updatefaultitem(mq.getbrokername(), endtimestamp - begintimestampprev, true);
 59                     log.warn(string.format("sendkernelimpl exception, resend at once, invokeid: %s, rt: %sms, broker: %s", invokeid, endtimestamp - begintimestampprev, mq), e);
 60                     log.warn(msg.tostring());
 61                     exception = e;
 62                     continue;
 63                 } catch (mqclientexception e) {
 64                     endtimestamp = system.currenttimemillis();
 65                     this.updatefaultitem(mq.getbrokername(), endtimestamp - begintimestampprev, true);
 66                     log.warn(string.format("sendkernelimpl exception, resend at once, invokeid: %s, rt: %sms, broker: %s", invokeid, endtimestamp - begintimestampprev, mq), e);
 67                     log.warn(msg.tostring());
 68                     exception = e;
 69                     continue;
 70                 } catch (mqbrokerexception e) {
 71                     endtimestamp = system.currenttimemillis();
 72                     this.updatefaultitem(mq.getbrokername(), endtimestamp - begintimestampprev, true);
 73                     log.warn(string.format("sendkernelimpl exception, resend at once, invokeid: %s, rt: %sms, broker: %s", invokeid, endtimestamp - begintimestampprev, mq), e);
 74                     log.warn(msg.tostring());
 75                     exception = e;
 76                     switch (e.getresponsecode()) {
 77                         case responsecode.topic_not_exist:
 78                         case responsecode.service_not_available:
 79                         case responsecode.system_error:
 80                         case responsecode.no_permission:
 81                         case responsecode.no_buyer_id:
 82                         case responsecode.not_in_current_unit:
 83                             continue;
 84                         default:
 85                             if (sendresult != null) {
 86                                 return sendresult;
 87                             }
 88 
 89                             throw e;
 90                     }
 91                 } catch (interruptedexception e) {
 92                     endtimestamp = system.currenttimemillis();
 93                     this.updatefaultitem(mq.getbrokername(), endtimestamp - begintimestampprev, false);
 94                     log.warn(string.format("sendkernelimpl exception, throw exception, invokeid: %s, rt: %sms, broker: %s", invokeid, endtimestamp - begintimestampprev, mq), e);
 95                     log.warn(msg.tostring());
 96 
 97                     log.warn("sendkernelimpl exception", e);
 98                     log.warn(msg.tostring());
 99                     throw e;
100                 }
101             } else {
102                 break;
103             }
104         }
105 
106         if (sendresult != null) {
107             return sendresult;
108         }
109 
110         string info = string.format("send [%d] times, still failed, cost [%d]ms, topic: %s, brokerssent: %s",
111             times,
112             system.currenttimemillis() - begintimestampfirst,
113             msg.gettopic(),
114             arrays.tostring(brokerssent));
115 
116         info += faqurl.suggesttodo(faqurl.send_msg_failed);
117 
118         mqclientexception mqclientexception = new mqclientexception(info, exception);
119         if (calltimeout) {
120             throw new remotingtoomuchrequestexception("senddefaultimpl call timeout");
121         }
122 
123         if (exception instanceof mqbrokerexception) {
124             mqclientexception.setresponsecode(((mqbrokerexception) exception).getresponsecode());
125         } else if (exception instanceof remotingconnectexception) {
126             mqclientexception.setresponsecode(clienterrorcode.connect_broker_exception);
127         } else if (exception instanceof remotingtimeoutexception) {
128             mqclientexception.setresponsecode(clienterrorcode.access_broker_timeout);
129         } else if (exception instanceof mqclientexception) {
130             mqclientexception.setresponsecode(clienterrorcode.broker_not_exist_exception);
131         }
132 
133         throw mqclientexception;
134     }
135 
136     list<string> nslist = this.getmqclientfactory().getmqclientapiimpl().getnameserveraddresslist();
137     if (null == nslist || nslist.isempty()) {
138         throw new mqclientexception(
139             "no name server address, please set it." + faqurl.suggesttodo(faqurl.name_server_addr_not_exist_url), null).setresponsecode(clienterrorcode.no_name_server_exception);
140     }
141 
142     throw new mqclientexception("no route info of this topic, " + msg.gettopic() + faqurl.suggesttodo(faqurl.no_topic_route_info),
143         null).setresponsecode(clienterrorcode.not_found_topic_exception);
144 }

其中communicationmode参数会根据调用的api进行如上所说进行发送类型的设置
而sendcallback参数,只有当使用异步发送的api时才不是null


首先调用makesurestateok方法,检查producer是否启动:

1 private void makesurestateok() throws mqclientexception {
2     if (this.servicestate != servicestate.running) {
3         throw new mqclientexception("the producer service state not ok, "
4             + this.servicestate
5             + faqurl.suggesttodo(faqurl.client_service_not_ok),
6             null);
7     }
8 }

servicestate 在上一篇博客中介绍过了


在检查完producer的状态后,还需要通过validators的checktopic方法验证message的合法性:

 1 public static void checktopic(string topic) throws mqclientexception {
 2     if (utilall.isblank(topic)) {
 3         throw new mqclientexception("the specified topic is blank", null);
 4     }
 5 
 6     if (!regularexpressionmatcher(topic, pattern)) {
 7         throw new mqclientexception(string.format(
 8             "the specified topic[%s] contains illegal characters, allowing only %s", topic,
 9             valid_pattern_str), null);
10     }
11 
12     if (topic.length() > character_max_length) {
13         throw new mqclientexception("the specified topic is longer than topic max length 255.", null);
14     }
15 
16     //whether the same with system reserved keyword
17     if (topic.equals(mixall.auto_create_topic_key_topic)) {
18         throw new mqclientexception(
19             string.format("the topic[%s] is conflict with auto_create_topic_key_topic.", topic), null);
20     }
21 }

验证完毕后,记录开始时间戳,预示着发送的真正开始


接着调用trytofindtopicpublishinfo,根据topic获取路由信息
trytofindtopicpublishinfo方法:

 1 private topicpublishinfo trytofindtopicpublishinfo(final string topic) {
 2     topicpublishinfo topicpublishinfo = this.topicpublishinfotable.get(topic);
 3     if (null == topicpublishinfo || !topicpublishinfo.ok()) {
 4         this.topicpublishinfotable.putifabsent(topic, new topicpublishinfo());
 5         this.mqclientfactory.updatetopicrouteinfofromnameserver(topic);
 6         topicpublishinfo = this.topicpublishinfotable.get(topic);
 7     }
 8 
 9     if (topicpublishinfo.ishavetopicrouterinfo() || topicpublishinfo.ok()) {
10         return topicpublishinfo;
11     } else {
12         this.mqclientfactory.updatetopicrouteinfofromnameserver(topic, true, this.defaultmqproducer);
13         topicpublishinfo = this.topicpublishinfotable.get(topic);
14         return topicpublishinfo;
15     }
16 }

在producer启动中已经介绍过了topicpublishinfotable,是一张记录有关topic的路由信息的map,先尝试获取是否有存在的topicpublishinfo
若是不存在,或者消息队列不可用(ok不成立):

1 public boolean ok() {
2     return null != this.messagequeuelist && !this.messagequeuelist.isempty();
3 }

ok用来验证该路由上的消息队列是否可用

需要创建一个新的topicpublishinfo放在map中,然后调用updatetopicrouteinfofromnameserver来更新路由信息,updatetopicrouteinfofromnameserver在上一篇说过,在定时任务中会使用,这里就是为了及时更新

若是存在,且有路由信息消息队列可用,则直接返回topicpublishinfo
否则还需要调用updatetopicrouteinfofromnameserver来进行一次更新


回到senddefaultimpl,在取得到路由信息后,现设置calltimeout超时响应为false,用于处理发送超时
接着根据发送方式communicationmode,计算如果发送失败,允许重发的次数,这里是针对同步发送,默认1+2共三次,其他两种模式只允许发送一次

根据发送次数,创建一个记录brokername的数组,再由发送次数进行for循环

首先根据topicpublishinfo和lastbrokername调用selectonemessagequeue选取指定的消息队列,是由topicpublishinfo的selectonemessagequeue方法实现的:

 1 public messagequeue selectonemessagequeue(final string lastbrokername) {
 2     if (lastbrokername == null) {
 3         return selectonemessagequeue();
 4     } else {
 5         int index = this.sendwhichqueue.getandincrement();
 6         for (int i = 0; i < this.messagequeuelist.size(); i++) {
 7             int pos = math.abs(index++) % this.messagequeuelist.size();
 8             if (pos < 0)
 9                 pos = 0;
10             messagequeue mq = this.messagequeuelist.get(pos);
11             if (!mq.getbrokername().equals(lastbrokername)) {
12                 return mq;
13             }
14         }
15         return selectonemessagequeue();
16     }
17 }
18 
19 public messagequeue selectonemessagequeue() {
20     int index = this.sendwhichqueue.getandincrement();
21     int pos = math.abs(index) % this.messagequeuelist.size();
22     if (pos < 0)
23         pos = 0;
24     return this.messagequeuelist.get(pos);
25 }

当lastbrokername等于null,使用selectonemessagequeue的无参方法,其中sendwhichqueue我在上一篇介绍过,不同线程通过getandincrement获得到的index是一个随机值
根据这个index对messagequeuelist取余,来获取在list中的下标,根据这个下标在messagequeuelist中选取一个messagequeue
由于不同的messagequeue有不同的路由信息,所里在这里其实是为了负载均衡,保证每次发送能发送给不同的broker

若是lastbrokername不等于null,还是和上面相似,只不过当选取到了messagequeue时,要和lastbrokername比较,当不想同时,才返回,同样也是为了保证不向同一broker重复发送来保证负载均衡

回到senddefaultimpl,在选取完messagequeue后,记录brokername,在计算是否达到超时事件,当这些成功后需要调用sendkernelimpl来完成真正的发送:
sendkernelimpl方法:

  1 private sendresult sendkernelimpl(final message msg,
  2                                       final messagequeue mq,
  3                                       final communicationmode communicationmode,
  4                                       final sendcallback sendcallback,
  5                                       final topicpublishinfo topicpublishinfo,
  6                                       final long timeout) throws mqclientexception, remotingexception, mqbrokerexception, interruptedexception {
  7     long beginstarttime = system.currenttimemillis();
  8     string brokeraddr = this.mqclientfactory.findbrokeraddressinpublish(mq.getbrokername());
  9     if (null == brokeraddr) {
 10         trytofindtopicpublishinfo(mq.gettopic());
 11         brokeraddr = this.mqclientfactory.findbrokeraddressinpublish(mq.getbrokername());
 12     }
 13 
 14     sendmessagecontext context = null;
 15     if (brokeraddr != null) {
 16         brokeraddr = mixall.brokervipchannel(this.defaultmqproducer.issendmessagewithvipchannel(), brokeraddr);
 17 
 18         byte[] prevbody = msg.getbody();
 19         try {
 20             //for messagebatch,id has been set in the generating process
 21             if (!(msg instanceof messagebatch)) {
 22                 messageclientidsetter.setuniqid(msg);
 23             }
 24 
 25             int sysflag = 0;
 26             boolean msgbodycompressed = false;
 27             if (this.trytocompressmessage(msg)) {
 28                 sysflag |= messagesysflag.compressed_flag;
 29                 msgbodycompressed = true;
 30             }
 31 
 32             final string tranmsg = msg.getproperty(messageconst.property_transaction_prepared);
 33             if (tranmsg != null && boolean.parseboolean(tranmsg)) {
 34                 sysflag |= messagesysflag.transaction_prepared_type;
 35             }
 36 
 37             if (hascheckforbiddenhook()) {
 38                 checkforbiddencontext checkforbiddencontext = new checkforbiddencontext();
 39                 checkforbiddencontext.setnamesrvaddr(this.defaultmqproducer.getnamesrvaddr());
 40                 checkforbiddencontext.setgroup(this.defaultmqproducer.getproducergroup());
 41                 checkforbiddencontext.setcommunicationmode(communicationmode);
 42                 checkforbiddencontext.setbrokeraddr(brokeraddr);
 43                 checkforbiddencontext.setmessage(msg);
 44                 checkforbiddencontext.setmq(mq);
 45                 checkforbiddencontext.setunitmode(this.isunitmode());
 46                 this.executecheckforbiddenhook(checkforbiddencontext);
 47             }
 48 
 49             if (this.hassendmessagehook()) {
 50                 context = new sendmessagecontext();
 51                 context.setproducer(this);
 52                 context.setproducergroup(this.defaultmqproducer.getproducergroup());
 53                 context.setcommunicationmode(communicationmode);
 54                 context.setbornhost(this.defaultmqproducer.getclientip());
 55                 context.setbrokeraddr(brokeraddr);
 56                 context.setmessage(msg);
 57                 context.setmq(mq);
 58                 string istrans = msg.getproperty(messageconst.property_transaction_prepared);
 59                 if (istrans != null && istrans.equals("true")) {
 60                     context.setmsgtype(messagetype.trans_msg_half);
 61                 }
 62 
 63                 if (msg.getproperty("__startdelivertime") != null || msg.getproperty(messageconst.property_delay_time_level) != null) {
 64                     context.setmsgtype(messagetype.delay_msg);
 65                 }
 66                 this.executesendmessagehookbefore(context);
 67             }
 68 
 69             sendmessagerequestheader requestheader = new sendmessagerequestheader();
 70             requestheader.setproducergroup(this.defaultmqproducer.getproducergroup());
 71             requestheader.settopic(msg.gettopic());
 72             requestheader.setdefaulttopic(this.defaultmqproducer.getcreatetopickey());
 73             requestheader.setdefaulttopicqueuenums(this.defaultmqproducer.getdefaulttopicqueuenums());
 74             requestheader.setqueueid(mq.getqueueid());
 75             requestheader.setsysflag(sysflag);
 76             requestheader.setborntimestamp(system.currenttimemillis());
 77             requestheader.setflag(msg.getflag());
 78             requestheader.setproperties(messagedecoder.messageproperties2string(msg.getproperties()));
 79             requestheader.setreconsumetimes(0);
 80             requestheader.setunitmode(this.isunitmode());
 81             requestheader.setbatch(msg instanceof messagebatch);
 82             if (requestheader.gettopic().startswith(mixall.retry_group_topic_prefix)) {
 83                 string reconsumetimes = messageaccessor.getreconsumetime(msg);
 84                 if (reconsumetimes != null) {
 85                     requestheader.setreconsumetimes(integer.valueof(reconsumetimes));
 86                     messageaccessor.clearproperty(msg, messageconst.property_reconsume_time);
 87                 }
 88 
 89                 string maxreconsumetimes = messageaccessor.getmaxreconsumetimes(msg);
 90                 if (maxreconsumetimes != null) {
 91                     requestheader.setmaxreconsumetimes(integer.valueof(maxreconsumetimes));
 92                     messageaccessor.clearproperty(msg, messageconst.property_max_reconsume_times);
 93                 }
 94             }
 95 
 96             sendresult sendresult = null;
 97             switch (communicationmode) {
 98                 case async:
 99                     message tmpmessage = msg;
100                     if (msgbodycompressed) {
101                         //if msg body was compressed, msgbody should be reset using prevbody.
102                         //clone new message using commpressed message body and recover origin massage.
103                         //fix bug:https://github.com/apache/rocketmq-externals/issues/66
104                         tmpmessage = messageaccessor.clonemessage(msg);
105                         msg.setbody(prevbody);
106                     }
107                     long costtimeasync = system.currenttimemillis() - beginstarttime;
108                     if (timeout < costtimeasync) {
109                         throw new remotingtoomuchrequestexception("sendkernelimpl call timeout");
110                     }
111                     sendresult = this.mqclientfactory.getmqclientapiimpl().sendmessage(
112                         brokeraddr,
113                         mq.getbrokername(),
114                         tmpmessage,
115                         requestheader,
116                         timeout - costtimeasync,
117                         communicationmode,
118                         sendcallback,
119                         topicpublishinfo,
120                         this.mqclientfactory,
121                         this.defaultmqproducer.getretrytimeswhensendasyncfailed(),
122                         context,
123                         this);
124                     break;
125                 case oneway:
126                 case sync:
127                     long costtimesync = system.currenttimemillis() - beginstarttime;
128                     if (timeout < costtimesync) {
129                         throw new remotingtoomuchrequestexception("sendkernelimpl call timeout");
130                     }
131                     sendresult = this.mqclientfactory.getmqclientapiimpl().sendmessage(
132                         brokeraddr,
133                         mq.getbrokername(),
134                         msg,
135                         requestheader,
136                         timeout - costtimesync,
137                         communicationmode,
138                         context,
139                         this);
140                     break;
141                 default:
142                     assert false;
143                     break;
144             }
145 
146             if (this.hassendmessagehook()) {
147                 context.setsendresult(sendresult);
148                 this.executesendmessagehookafter(context);
149             }
150 
151             return sendresult;
152         } catch (remotingexception e) {
153             if (this.hassendmessagehook()) {
154                 context.setexception(e);
155                 this.executesendmessagehookafter(context);
156             }
157             throw e;
158         } catch (mqbrokerexception e) {
159             if (this.hassendmessagehook()) {
160                 context.setexception(e);
161                 this.executesendmessagehookafter(context);
162             }
163             throw e;
164         } catch (interruptedexception e) {
165             if (this.hassendmessagehook()) {
166                 context.setexception(e);
167                 this.executesendmessagehookafter(context);
168             }
169             throw e;
170         } finally {
171             msg.setbody(prevbody);
172         }
173     }
174 
175     throw new mqclientexception("the broker[" + mq.getbrokername() + "] not exist", null);
176 }

先记录开始时间beginstarttime,为可能的超时做准备
然后根据brokername来获取对应的broker地址
findbrokeraddressinpublish方法:

1 public string findbrokeraddressinpublish(final string brokername) {
2     hashmap<long/* brokerid */, string/* address */> map = this.brokeraddrtable.get(brokername);
3     if (map != null && !map.isempty()) {
4         return map.get(mixall.master_id);
5     }
6 
7     return null;
8 }

根据brokername在brokeraddrtable表中进行查找

若是没有找到还是通过trytofindtopicpublishinfo来进行更新,然后再通过findbrokeraddressinpublish重新查找

再往后,如果设置了vip(高优先级队列)通道,那么这里将根据brokeraddr获取vip通道的的地址:
mixall的brokervipchannel方法:

1 public static string brokervipchannel(final boolean ischange, final string brokeraddr) {
2     if (ischange) {
3         string[] ipandport = brokeraddr.split(":");
4         string brokeraddrnew = ipandport[0] + ":" + (integer.parseint(ipandport[1]) - 2);
5         return brokeraddrnew;
6     } else {
7         return brokeraddr;
8     }
9 }

vip通道的地址计算很简单,只是将端口号减去2

在设置完后,就是一大堆的配置了

这里定义了一个sysflag的整型值,表示消息的类型,有如下取值:

1 public class messagesysflag {
2     public final static int compressed_flag = 0x1;
3     public final static int multi_tags_flag = 0x1 << 1;
4     public final static int transaction_not_type = 0;
5     public final static int transaction_prepared_type = 0x1 << 2;
6     public final static int transaction_commit_type = 0x2 << 2;
7     public final static int transaction_rollback_type = 0x3 << 2;
8 }


还定义了一个msgbodycompressed,表示消息是否经过压缩,trytocompressmessage判断并对消息进行压缩:
trytocompressmessage方法:

 1 private boolean trytocompressmessage(final message msg) {
 2     if (msg instanceof messagebatch) {
 3         //batch dose not support compressing right now
 4         return false;
 5     }
 6     byte[] body = msg.getbody();
 7     if (body != null) {
 8         if (body.length >= this.defaultmqproducer.getcompressmsgbodyoverhowmuch()) {
 9             try {
10                 byte[] data = utilall.compress(body, zipcompresslevel);
11                 if (data != null) {
12                     msg.setbody(data);
13                     return true;
14                 }
15             } catch (ioexception e) {
16                 log.error("trytocompressmessage exception", e);
17                 log.warn(msg.tostring());
18             }
19         }
20     }
21 
22     return false;
23 }

当消息大小大于等于compressmsgbodyoverhowmuch(默认4m)时,使用utilall的compress消息进行压缩处理:

 1 public static byte[] compress(final byte[] src, final int level) throws ioexception {
 2     byte[] result = src;
 3     bytearrayoutputstream bytearrayoutputstream = new bytearrayoutputstream(src.length);
 4     java.util.zip.deflater defeater = new java.util.zip.deflater(level);
 5     deflateroutputstream deflateroutputstream = new deflateroutputstream(bytearrayoutputstream, defeater);
 6     try {
 7         deflateroutputstream.write(src);
 8         deflateroutputstream.finish();
 9         deflateroutputstream.close();
10         result = bytearrayoutputstream.tobytearray();
11     } catch (ioexception e) {
12         defeater.end();
13         throw e;
14     } finally {
15         try {
16             bytearrayoutputstream.close();
17         } catch (ioexception ignored) {
18         }
19 
20         defeater.end();
21     }
22 
23     return result;
24 }

这里采用zip的方式进行消息压缩

接下来,根据消息是否是事务消息来选择设置sysflag,关于事务消息在后面博客再说

接下来检查是否设置了checkforbiddenhook,若是设置了需要遍历所有的checkforbiddenhook,执行其 checkforbidden方法,来完成禁发

同理检查是否设置了sendmessagehook,遍历所有的sendmessagehook,执行其sendmessagebefore方法,在消息发送完毕后,会执行其sendmessageafter方法


接着会对请求头requestheader进行一大堆设置,做完这些后,进入switch块,根据不同的发送方式做了相应检查
最后无论是哪种发送方式,都会调用mqclientapiimpl的sendmessage方法:

 1 public sendresult sendmessage(
 2     final string addr,
 3     final string brokername,
 4     final message msg,
 5     final sendmessagerequestheader requestheader,
 6     final long timeoutmillis,
 7     final communicationmode communicationmode,
 8     final sendcallback sendcallback,
 9     final topicpublishinfo topicpublishinfo,
10     final mqclientinstance instance,
11     final int retrytimeswhensendfailed,
12     final sendmessagecontext context,
13     final defaultmqproducerimpl producer
14 ) throws remotingexception, mqbrokerexception, interruptedexception {
15     long beginstarttime = system.currenttimemillis();
16     remotingcommand request = null;
17     if (sendsmartmsg || msg instanceof messagebatch) {
18         sendmessagerequestheaderv2 requestheaderv2 = sendmessagerequestheaderv2.createsendmessagerequestheaderv2(requestheader);
19         request = remotingcommand.createrequestcommand(msg instanceof messagebatch ? requestcode.send_batch_message : requestcode.send_message_v2, requestheaderv2);
20     } else {
21         request = remotingcommand.createrequestcommand(requestcode.send_message, requestheader);
22     }
23 
24     request.setbody(msg.getbody());
25 
26     switch (communicationmode) {
27         case oneway:
28             this.remotingclient.invokeoneway(addr, request, timeoutmillis);
29             return null;
30         case async:
31             final atomicinteger times = new atomicinteger();
32             long costtimeasync = system.currenttimemillis() - beginstarttime;
33             if (timeoutmillis < costtimeasync) {
34                 throw new remotingtoomuchrequestexception("sendmessage call timeout");
35             }
36             this.sendmessageasync(addr, brokername, msg, timeoutmillis - costtimeasync, request, sendcallback, topicpublishinfo, instance,
37                 retrytimeswhensendfailed, times, context, producer);
38             return null;
39         case sync:
40             long costtimesync = system.currenttimemillis() - beginstarttime;
41             if (timeoutmillis < costtimesync) {
42                 throw new remotingtoomuchrequestexception("sendmessage call timeout");
43             }
44             return this.sendmessagesync(addr, brokername, msg, timeoutmillis - costtimesync, request);
45         default:
46             assert false;
47             break;
48     }
49 
50     return null;
51 }

首先会根据消息的类型,设置不同类型的请求remotingcommand

在完成请求的封装后,还是根据发送方式来执行


oneway方式:
会直接调用remotingclient即netty客户端的invokeoneway方法:

 1 public void invokeoneway(string addr, remotingcommand request, long timeoutmillis) throws interruptedexception,
 2         remotingconnectexception, remotingtoomuchrequestexception, remotingtimeoutexception, remotingsendrequestexception {
 3     final channel channel = this.getandcreatechannel(addr);
 4     if (channel != null && channel.isactive()) {
 5         try {
 6             dobeforerpchooks(addr, request);
 7             this.invokeonewayimpl(channel, request, timeoutmillis);
 8         } catch (remotingsendrequestexception e) {
 9             log.warn("invokeoneway: send request exception, so close the channel[{}]", addr);
10             this.closechannel(addr, channel);
11             throw e;
12         }
13     } else {
14         this.closechannel(addr, channel);
15         throw new remotingconnectexception(addr);
16     }
17 }

首先根据broker的地址在channeltables中选取一个channel(上一篇博客介绍过在netty客户端会缓存一张建立好连接的channel的map即channeltables)

然后和前面相似,执行所有配置了的rpchook的dobeforerequest方法
之后执行invokeonewayimpl方法:

 1 public void invokeonewayimpl(final channel channel, final remotingcommand request, final long timeoutmillis)
 2         throws interruptedexception, remotingtoomuchrequestexception, remotingtimeoutexception, remotingsendrequestexception {
 3     request.markonewayrpc();
 4     boolean acquired = this.semaphoreoneway.tryacquire(timeoutmillis, timeunit.milliseconds);
 5     if (acquired) {
 6         final semaphorereleaseonlyonce once = new semaphorereleaseonlyonce(this.semaphoreoneway);
 7         try {
 8             channel.writeandflush(request).addlistener(new channelfuturelistener() {
 9                 @override
10                 public void operationcomplete(channelfuture f) throws exception {
11                     once.release();
12                     if (!f.issuccess()) {
13                         log.warn("send a request command to channel <" + channel.remoteaddress() + "> failed.");
14                     }
15                 }
16             });
17         } catch (exception e) {
18             once.release();
19             log.warn("write send a request command to channel <" + channel.remoteaddress() + "> failed.");
20             throw new remotingsendrequestexception(remotinghelper.parsechannelremoteaddr(channel), e);
21         }
22     } else {
23         if (timeoutmillis <= 0) {
24             throw new remotingtoomuchrequestexception("invokeonewayimpl invoke too fast");
25         } else {
26             string info = string.format(
27                 "invokeonewayimpl tryacquire semaphore timeout, %dms, waiting thread nums: %d semaphoreasyncvalue: %d",
28                 timeoutmillis,
29                 this.semaphoreoneway.getqueuelength(),
30                 this.semaphoreoneway.availablepermits()
31             );
32             log.warn(info);
33             throw new remotingtimeoutexception(info);
34         }
35     }
36 }

首先对request的标志位进行设置:

1 public void markonewayrpc() {
2     int bits = 1 << rpc_oneway;
3     this.flag |= bits;
4 }


接着会使用一个信号量semaphorereleaseonlyonce,会保证该信号量被释放一次
最后调用netty的writeandflush方法,进行request的发送,同时设置了异步监听,用于成功后信号量的释放

由于是单向发送,发送完成后并没有过多的处理

 

async方式:
调用sendmessageasync方法:

 1 private void sendmessageasync(
 2         final string addr,
 3         final string brokername,
 4         final message msg,
 5         final long timeoutmillis,
 6         final remotingcommand request,
 7         final sendcallback sendcallback,
 8         final topicpublishinfo topicpublishinfo,
 9         final mqclientinstance instance,
10         final int retrytimeswhensendfailed,
11         final atomicinteger times,
12         final sendmessagecontext context,
13         final defaultmqproducerimpl producer
14     ) throws interruptedexception, remotingexception {
15     this.remotingclient.invokeasync(addr, request, timeoutmillis, new invokecallback() {
16         @override
17         public void operationcomplete(responsefuture responsefuture) {
18             remotingcommand response = responsefuture.getresponsecommand();
19             if (null == sendcallback && response != null) {
20 
21                 try {
22                     sendresult sendresult = mqclientapiimpl.this.processsendresponse(brokername, msg, response);
23                     if (context != null && sendresult != null) {
24                         context.setsendresult(sendresult);
25                         context.getproducer().executesendmessagehookafter(context);
26                     }
27                 } catch (throwable e) {
28                 }
29 
30                 producer.updatefaultitem(brokername, system.currenttimemillis() - responsefuture.getbegintimestamp(), false);
31                 return;
32             }
33 
34             if (response != null) {
35                 try {
36                     sendresult sendresult = mqclientapiimpl.this.processsendresponse(brokername, msg, response);
37                     assert sendresult != null;
38                     if (context != null) {
39                         context.setsendresult(sendresult);
40                         context.getproducer().executesendmessagehookafter(context);
41                     }
42 
43                     try {
44                         sendcallback.onsuccess(sendresult);
45                     } catch (throwable e) {
46                     }
47 
48                     producer.updatefaultitem(brokername, system.currenttimemillis() - responsefuture.getbegintimestamp(), false);
49                 } catch (exception e) {
50                     producer.updatefaultitem(brokername, system.currenttimemillis() - responsefuture.getbegintimestamp(), true);
51                     onexceptionimpl(brokername, msg, 0l, request, sendcallback, topicpublishinfo, instance,
52                         retrytimeswhensendfailed, times, e, context, false, producer);
53                 }
54             } else {
55                 producer.updatefaultitem(brokername, system.currenttimemillis() - responsefuture.getbegintimestamp(), true);
56                 if (!responsefuture.issendrequestok()) {
57                     mqclientexception ex = new mqclientexception("send request failed", responsefuture.getcause());
58                     onexceptionimpl(brokername, msg, 0l, request, sendcallback, topicpublishinfo, instance,
59                         retrytimeswhensendfailed, times, ex, context, true, producer);
60                 } else if (responsefuture.istimeout()) {
61                     mqclientexception ex = new mqclientexception("wait response timeout " + responsefuture.gettimeoutmillis() + "ms",
62                         responsefuture.getcause());
63                     onexceptionimpl(brokername, msg, 0l, request, sendcallback, topicpublishinfo, instance,
64                         retrytimeswhensendfailed, times, ex, context, true, producer);
65                 } else {
66                     mqclientexception ex = new mqclientexception("unknow reseaon", responsefuture.getcause());
67                     onexceptionimpl(brokername, msg, 0l, request, sendcallback, topicpublishinfo, instance,
68                         retrytimeswhensendfailed, times, ex, context, true, producer);
69                 }
70             }
71         }
72     });
73 }

在这里设置了一个invokecallback,用于处理发送之后的回调


先看到invokeasync方法:

 1 public void invokeasync(string addr, remotingcommand request, long timeoutmillis, invokecallback invokecallback)
 2         throws interruptedexception, remotingconnectexception, remotingtoomuchrequestexception, remotingtimeoutexception,
 3         remotingsendrequestexception {
 4     long beginstarttime = system.currenttimemillis();
 5     final channel channel = this.getandcreatechannel(addr);
 6     if (channel != null && channel.isactive()) {
 7         try {
 8             dobeforerpchooks(addr, request);
 9             long costtime = system.currenttimemillis() - beginstarttime;
10             if (timeoutmillis < costtime) {
11                 throw new remotingtoomuchrequestexception("invokeasync call timeout");
12             }
13             this.invokeasyncimpl(channel, request, timeoutmillis - costtime, invokecallback);
14         } catch (remotingsendrequestexception e) {
15             log.warn("invokeasync: send request exception, so close the channel[{}]", addr);
16             this.closechannel(addr, channel);
17             throw e;
18         }
19     } else {
20         this.closechannel(addr, channel);
21         throw new remotingconnectexception(addr);
22     }
23 }

和前面oneway类似,其具体实现是invokeasyncimpl

invokeasyncimpl方法:

 1 public void invokeasyncimpl(final channel channel, final remotingcommand request, final long timeoutmillis,
 2         final invokecallback invokecallback)
 3         throws interruptedexception, remotingtoomuchrequestexception, remotingtimeoutexception, remotingsendrequestexception {
 4     long beginstarttime = system.currenttimemillis();
 5     final int opaque = request.getopaque();
 6     boolean acquired = this.semaphoreasync.tryacquire(timeoutmillis, timeunit.milliseconds);
 7     if (acquired) {
 8         final semaphorereleaseonlyonce once = new semaphorereleaseonlyonce(this.semaphoreasync);
 9         long costtime = system.currenttimemillis() - beginstarttime;
10         if (timeoutmillis < costtime) {
11             once.release();
12             throw new remotingtimeoutexception("invokeasyncimpl call timeout");
13         }
14 
15         final responsefuture responsefuture = new responsefuture(channel, opaque, timeoutmillis - costtime, invokecallback, once);
16         this.responsetable.put(opaque, responsefuture);
17         try {
18             channel.writeandflush(request).addlistener(new channelfuturelistener() {
19                 @override
20                 public void operationcomplete(channelfuture f) throws exception {
21                     if (f.issuccess()) {
22                         responsefuture.setsendrequestok(true);
23                         return;
24                     }
25                     requestfail(opaque);
26                     log.warn("send a request command to channel <{}> failed.", remotinghelper.parsechannelremoteaddr(channel));
27                 }
28             });
29         } catch (exception e) {
30             responsefuture.release();
31             log.warn("send a request command to channel <" + remotinghelper.parsechannelremoteaddr(channel) + "> exception", e);
32             throw new remotingsendrequestexception(remotinghelper.parsechannelremoteaddr(channel), e);
33         }
34     } else {
35         if (timeoutmillis <= 0) {
36             throw new remotingtoomuchrequestexception("invokeasyncimpl invoke too fast");
37         } else {
38             string info =
39                 string.format("invokeasyncimpl tryacquire semaphore timeout, %dms, waiting thread nums: %d semaphoreasyncvalue: %d",
40                     timeoutmillis,
41                     this.semaphoreasync.getqueuelength(),
42                     this.semaphoreasync.availablepermits()
43                 );
44             log.warn(info);
45             throw new remotingtimeoutexception(info);
46         }
47     }
48 }

这里会通过request的getopaque方法获取一个opaque值,这个值在request创建时就会被赋值,是一个自增的atomicinteger,也就是每个request的唯一id

之后会创建一个responsefuture封装invokecallback及channel,并将其放入responsetable中
responsetable是一个map:

1 protected final concurrentmap<integer /* opaque */, responsefuture> responsetable =
2     new concurrenthashmap<integer, responsefuture>(256);

其记录了requestid对应的responsefuture,用于管理异步发送后,对接收到响应的异步事件处理
也就是说当发送完毕,接收到响应消息,会通过requestid查找到对应的responsefuture,进而执行刚才设置的invokecallback中的方法,在invokecallback中,会执行processsendresponse方法,完成broker回送的响应消息的处理,最终根据情况会执行用户传入的sendcallback的onsuccess或者onexception方法,以此完成消息的异步发送

之后的步骤和oneway一样,由netty的writeandflush完成发送

 

sync方式:
调用sendmessagesync方法:

 1 private sendresult sendmessagesync(
 2         final string addr,
 3         final string brokername,
 4         final message msg,
 5         final long timeoutmillis,
 6         final remotingcommand request
 7     ) throws remotingexception, mqbrokerexception, interruptedexception {
 8     remotingcommand response = this.remotingclient.invokesync(addr, request, timeoutmillis);
 9     assert response != null;
10     return this.processsendresponse(brokername, msg, response);
11 }

首先调用netty客户端的invokesync方法:

invokesync方法:

 1 public remotingcommand invokesync(string addr, final remotingcommand request, long timeoutmillis)
 2         throws interruptedexception, remotingconnectexception, remotingsendrequestexception, remotingtimeoutexception {
 3     long beginstarttime = system.currenttimemillis();
 4     final channel channel = this.getandcreatechannel(addr);
 5     if (channel != null && channel.isactive()) {
 6         try {
 7             dobeforerpchooks(addr, request);
 8             long costtime = system.currenttimemillis() - beginstarttime;
 9             if (timeoutmillis < costtime) {
10                 throw new remotingtimeoutexception("invokesync call timeout");
11             }
12             remotingcommand response = this.invokesyncimpl(channel, request, timeoutmillis - costtime);
13             doafterrpchooks(remotinghelper.parsechannelremoteaddr(channel), request, response);
14             return response;
15         } catch (remotingsendrequestexception e) {
16             log.warn("invokesync: send request exception, so close the channel[{}]", addr);
17             this.closechannel(addr, channel);
18             throw e;
19         } catch (remotingtimeoutexception e) {
20             if (nettyclientconfig.isclientclosesocketiftimeout()) {
21                 this.closechannel(addr, channel);
22                 log.warn("invokesync: close socket because of timeout, {}ms, {}", timeoutmillis, addr);
23             }
24             log.warn("invokesync: wait response timeout exception, the channel[{}]", addr);
25             throw e;
26         }
27     } else {
28         this.closechannel(addr, channel);
29         throw new remotingconnectexception(addr);
30     }
31 }

还是和前面类似的步骤

直接看到invokesyncimpl方法:

 1 public remotingcommand invokesyncimpl(final channel channel, final remotingcommand request,
 2         final long timeoutmillis)
 3         throws interruptedexception, remotingsendrequestexception, remotingtimeoutexception {
 4     final int opaque = request.getopaque();
 5 
 6     try {
 7         final responsefuture responsefuture = new responsefuture(channel, opaque, timeoutmillis, null, null);
 8         this.responsetable.put(opaque, responsefuture);
 9         final socketaddress addr = channel.remoteaddress();
10         channel.writeandflush(request).addlistener(new channelfuturelistener() {
11             @override
12             public void operationcomplete(channelfuture f) throws exception {
13                 if (f.issuccess()) {
14                     responsefuture.setsendrequestok(true);
15                     return;
16                 } else {
17                     responsefuture.setsendrequestok(false);
18                 }
19 
20                 responsetable.remove(opaque);
21                 responsefuture.setcause(f.cause());
22                 responsefuture.putresponse(null);
23                 log.warn("send a request command to channel <" + addr + "> failed.");
24             }
25         });
26 
27         remotingcommand responsecommand = responsefuture.waitresponse(timeoutmillis);
28         if (null == responsecommand) {
29             if (responsefuture.issendrequestok()) {
30                 throw new remotingtimeoutexception(remotinghelper.parsesocketaddressaddr(addr), timeoutmillis,
31                     responsefuture.getcause());
32             } else {
33                 throw new remotingsendrequestexception(remotinghelper.parsesocketaddressaddr(addr), responsefuture.getcause());
34             }
35         }
36 
37         return responsecommand;
38     } finally {
39         this.responsetable.remove(opaque);
40     }
41 }

和async基本一致,只不过在完成writeandflush后,使用responsefuture的waitresponse方法,在超时时间内进行等待response的回送
若是发送失败,则会在defaultmqproducerimpl的senddefaultimpl中的for循环继续,直至发送完成或者发送此时用完

若是在超时时间内,接收到broker的回送response,在invokesync中会执行配置了的rpchook的doafterresponse方法,然后在sendmessagesync中由processsendresponse处理接收到的响应

 

到此producer的消息发送结束

如您对本文有疑问或者有任何想说的,请点击进行留言回复,万千网友为您解惑!

相关文章:

验证码:
移动技术网