当前位置: 移动技术网 > IT编程>开发语言>Java > SpringBoot2.0源码分析(二):整合ActiveMQ分析

SpringBoot2.0源码分析(二):整合ActiveMQ分析

2019年03月29日  | 移动技术网IT编程  | 我要评论

springboot具体整合activemq可参考:springboot2.0应用(二):springboot2.0整合activemq

activemq自动注入

当项目中存在javax.jms.messageorg.springframework.jms.core.jmstemplate着两个类时,springboot将activemq需要使用到的对象注册为bean,供项目注入使用。一起看一下jmsautoconfiguration类。

@configuration
@conditionalonclass({ message.class, jmstemplate.class })
@conditionalonbean(connectionfactory.class)
@enableconfigurationproperties(jmsproperties.class)
@import(jmsannotationdrivenconfiguration.class)
public class jmsautoconfiguration {

    @configuration
    protected static class jmstemplateconfiguration {
        ......
        @bean
        @conditionalonmissingbean
        @conditionalonsinglecandidate(connectionfactory.class)
        public jmstemplate jmstemplate(connectionfactory connectionfactory) {
            propertymapper map = propertymapper.get();
            jmstemplate template = new jmstemplate(connectionfactory);
            template.setpubsubdomain(this.properties.ispubsubdomain());
            map.from(this.destinationresolver::getifunique).whennonnull()
                    .to(template::setdestinationresolver);
            map.from(this.messageconverter::getifunique).whennonnull()
                    .to(template::setmessageconverter);
            maptemplateproperties(this.properties.gettemplate(), template);
            return template;
        }
            ......
    }

    @configuration
    @conditionalonclass(jmsmessagingtemplate.class)
    @import(jmstemplateconfiguration.class)
    protected static class messagingtemplateconfiguration {

        @bean
        @conditionalonmissingbean
        @conditionalonsinglecandidate(jmstemplate.class)
        public jmsmessagingtemplate jmsmessagingtemplate(jmstemplate jmstemplate) {
            return new jmsmessagingtemplate(jmstemplate);
        }

    }

}

rabbitautoconfiguration主要注入了jmsmessagingtemplatejmstemplate
rabbitautoconfiguration同时导入了rabbitannotationdrivenconfiguration,注入了jmslistenercontainerfactory

消息发送

以下面的发送为例:

    jmsmessagingtemplate.convertandsend(this.queue, msg);

这个方法会先对消息进行转换,预处理,最终通过调用jmstemplate的dosend实现消息发送的。

    protected void dosend(session session, destination destination, messagecreator messagecreator)
            throws jmsexception {
        assert.notnull(messagecreator, "messagecreator must not be null");
        messageproducer producer = createproducer(session, destination);
        try {
            message message = messagecreator.createmessage(session);
            dosend(producer, message);
            if (session.gettransacted() && issessionlocallytransacted(session)) {
                jmsutils.commitifnecessary(session);
            }
        }
        finally {
            jmsutils.closemessageproducer(producer);
        }
    }

首先创建一个messageproducer的实例,接着将最初的org.springframework.messaging.message转换成javax.jms.message,再将消息委托给producer进行发送。

消息接收

先看一个消费的事例:

@component
public class consumer {
    @jmslistener(destination = "sample.queue")
    public void receivequeue(string text) {
        system.out.println(text);
    }
}

springboot会去解析@jmslistener,具体实现在jmslistenerannotationbeanpostprocessorpostprocessafterinitialization方法。

    public object postprocessafterinitialization(final object bean, string beanname) throws beansexception {
        if (!this.nonannotatedclasses.contains(bean.getclass())) {
            class<?> targetclass = aopproxyutils.ultimatetargetclass(bean);
            map<method, set<jmslistener>> annotatedmethods = methodintrospector.selectmethods(targetclass,
                    (methodintrospector.metadatalookup<set<jmslistener>>) method -> {
                        set<jmslistener> listenermethods = annotatedelementutils.getmergedrepeatableannotations(
                                method, jmslistener.class, jmslisteners.class);
                        return (!listenermethods.isempty() ? listenermethods : null);
                    });
            if (annotatedmethods.isempty()) {
                this.nonannotatedclasses.add(bean.getclass());
            }
            else {
                annotatedmethods.foreach((method, listeners) ->
                        listeners.foreach(listener ->
                                processjmslistener(listener, method, bean)));
            }
        }
        return bean;
    }

springboot根据注解找到了使用了@jmslistener注解的方法,当监听到activemq收到的消息时,会调用对应的方法。来看一下具体怎么进行listener和method的绑定的。

    protected void processjmslistener(jmslistener jmslistener, method mostspecificmethod, object bean) {
        method invocablemethod = aoputils.selectinvocablemethod(mostspecificmethod, bean.getclass());
        methodjmslistenerendpoint endpoint = createmethodjmslistenerendpoint();
        endpoint.setbean(bean);
        endpoint.setmethod(invocablemethod);
        endpoint.setmostspecificmethod(mostspecificmethod);
        endpoint.setmessagehandlermethodfactory(this.messagehandlermethodfactory);
        endpoint.setembeddedvalueresolver(this.embeddedvalueresolver);
        endpoint.setbeanfactory(this.beanfactory);
        endpoint.setid(getendpointid(jmslistener));
        endpoint.setdestination(resolve(jmslistener.destination()));
        if (stringutils.hastext(jmslistener.selector())) {
            endpoint.setselector(resolve(jmslistener.selector()));
        }
        if (stringutils.hastext(jmslistener.subscription())) {
            endpoint.setsubscription(resolve(jmslistener.subscription()));
        }
        if (stringutils.hastext(jmslistener.concurrency())) {
            endpoint.setconcurrency(resolve(jmslistener.concurrency()));
        }

        jmslistenercontainerfactory<?> factory = null;
        string containerfactorybeanname = resolve(jmslistener.containerfactory());
        if (stringutils.hastext(containerfactorybeanname)) {
            assert.state(this.beanfactory != null, "beanfactory must be set to obtain container factory by bean name");
            try {
                factory = this.beanfactory.getbean(containerfactorybeanname, jmslistenercontainerfactory.class);
            }
            catch (nosuchbeandefinitionexception ex) {
                throw new beaninitializationexception("could not register jms listener endpoint on [" +
                        mostspecificmethod + "], no " + jmslistenercontainerfactory.class.getsimplename() +
                        " with id '" + containerfactorybeanname + "' was found in the application context", ex);
            }
        }

        this.registrar.registerendpoint(endpoint, factory);
    }

先设置endpoint的相关属性,再获取jmslistenercontainerfactory,最后将endpoint注册到jmslistenercontainerfactory


本篇到此结束,如果读完觉得有收获的话,欢迎点赞、关注、加公众号【贰级天災】,查阅更多精彩历史!!!

如对本文有疑问, 点击进行留言回复!!

相关文章:

验证码:
移动技术网