当前位置: 移动技术网 > IT编程>开发语言>Java > 浅谈spring-boot-rabbitmq动态管理的方法

浅谈spring-boot-rabbitmq动态管理的方法

2019年07月19日  | 移动技术网IT编程  | 我要评论

使用spring boot + rabbitmq的时候,在开发过程中,可能会想要临时停用/启用监听,或修改监听消费者数量。如果每次修改都重启比较浪费时间,所以研究了一下不停机就启用停用监听或修改一些配置

一. 关于rabbitmq监听的配置

  1. 配置属性类:rabbitproperties,包含rabbitmq的认证、监听、发送者以及其他的一些配置
  2. 自动配置类:rabbitautoconfiguration,主要配置rabbitmq的连接工厂和发送者等,不包含监听的配置
  3. rabbitmq监听的配置是rabbitannotationdrivenconfiguration,是通过rabbitautoconfiguration引入的
@configuration
@conditionalonclass({ rabbittemplate.class, channel.class })
@enableconfigurationproperties(rabbitproperties.class)
@import(rabbitannotationdrivenconfiguration.class)
public class rabbitautoconfiguration {
  ...
}

rabbitannotationdrivenconfiguration中主要就是监听工厂的配置、监听工厂,但是这里也只是创建bean,并没有真正的初始化

通过配置里的bean类名,分析一下,rabbitmq的监听肯定是由监听工厂创建的,所以找到监听工厂simplerabbitlistenercontainerfactory

@bean
@conditionalonmissingbean(name = "rabbitlistenercontainerfactory")
public simplerabbitlistenercontainerfactory rabbitlistenercontainerfactory(
 simplerabbitlistenercontainerfactoryconfigurer configurer,
 connectionfactory connectionfactory) {
  simplerabbitlistenercontainerfactory factory = new simplerabbitlistenercontainerfactory();
  configurer.configure(factory, connectionfactory);
  return factory;
}

既然自动配置里面没有初始化监听,那就应该是在其他地方调用的,进入监听工厂类中,发现有initializecontainer(simplemessagelistenercontainer instance)方法,猜测初始化肯定与这个方法有关,所以查看有哪些地方调用,于是找到rabbitlistenerendpointregistry.createlistenercontainer(rabbitlistenerendpoint endpoint,rabbitlistenercontainerfactory<?> factory)方法中有创建监听容器和初始化的代码

/**
 * create and start a new {@link messagelistenercontainer} using the specified factory.
 * @param endpoint the endpoint to create a {@link messagelistenercontainer}.
 * @param factory the {@link rabbitlistenercontainerfactory} to use.
 * @return the {@link messagelistenercontainer}.
 */
protected messagelistenercontainer createlistenercontainer(rabbitlistenerendpoint endpoint,
 rabbitlistenercontainerfactory<?> factory) {
  messagelistenercontainer listenercontainer = factory.createlistenercontainer(endpoint);
  if (listenercontainer instanceof initializingbean) {
   try {
      ((initializingbean) listenercontainer).afterpropertiesset();
   }
   catch (exception ex) {
      throw new beaninitializationexception("failed to initialize message listener container", ex);
   }
  }
  int containerphase = listenercontainer.getphase();
  if (containerphase < integer.max_value) { // a custom phase value
   if (this.phase < integer.max_value && this.phase != containerphase) {
      throw new illegalstateexception("encountered phase mismatch between container factory definitions: " +
       this.phase + " vs " + containerphase);
   }
   this.phase = listenercontainer.getphase();
  }  
  return listenercontainer;
}

继续找调用这个方法的地方,找到rabbitlistenerendpointregistrar.afterpropertiesset()方法之后,发现调用的地方很多了

看看afterpropertiesset方法,是initializingbean接口中的,猜测应该是spring容器创建bean之后都会调用的bean初始化的方法,所以查找找到rabbitlistenerendpointregistrar是在哪里创建的实例。原来是在rabbitlistenerannotationbeanpostprocessor中的私有属性,而rabbitlistenerannotationbeanpostprocessor是在rabbitbootstrapconfiguration这个自动配置里面初始化的,所以这就找到rabbitmq初始化监听的源头了

二. 动态管理rabbitmq监听

回到最初的问题,想要动态的启用停用mq的监听,所以先看看初始化配置的类,既然有初始化,那可能会有相关的管理,于是在rabbitlistenerendpointregistry中找到了start()和stop()方法,里面有对监听容器进行操作,主要源码如下

/**
 * @return the managed {@link messagelistenercontainer} instance(s).
 */
public collection<messagelistenercontainer> getlistenercontainers() {
  return collections.unmodifiablecollection(this.listenercontainers.values());
}
 
@override
public void start() {
  for (messagelistenercontainer listenercontainer : getlistenercontainers()) {
   startifnecessary(listenercontainer);
  }
}

/**
 * start the specified {@link messagelistenercontainer} if it should be started
 * on startup or when start is called explicitly after startup.
 * @see messagelistenercontainer#isautostartup()
 */
private void startifnecessary(messagelistenercontainer listenercontainer) {
  if (this.contextrefreshed || listenercontainer.isautostartup()) {
   listenercontainer.start();
  }
}

@override
public void stop() {
  for (messagelistenercontainer listenercontainer : getlistenercontainers()) {
   listenercontainer.stop();
  }
}

写个controller,注入rabbitlistenerendpointregistry,使用start()和stop()对监听进行启用停用的操作,并且rabbitlistenerendpointregistry实例还可以获取监听容器,对监听的一些参数也能进行修改,比如消费者数量。代码如下:

import java.util.set;
import javax.annotation.resource;
import org.springframework.amqp.rabbit.listener.rabbitlistenerendpointregistry;
import org.springframework.amqp.rabbit.listener.simplemessagelistenercontainer;
import org.springframework.web.bind.annotation.requestmapping;
import org.springframework.web.bind.annotation.restcontroller;
import com.itopener.framework.resultmap;
/**
 * created by fuwei.deng on 2017年7月24日.
 */
@restcontroller
@requestmapping("rabbitmq/listener")
public class rabbitmqcontroller {

  @resource
  private rabbitlistenerendpointregistry rabbitlistenerendpointregistry;
  
  @requestmapping("stop")
  public resultmap stop(){
   rabbitlistenerendpointregistry.stop();
   return resultmap.buildsuccess();
  }
  
  @requestmapping("start")
  public resultmap start(){
   rabbitlistenerendpointregistry.start();
   return resultmap.buildsuccess();
  }
  
  @requestmapping("setup")
  public resultmap setup(int consumer, int maxconsumer){
   set<string> containerids = rabbitlistenerendpointregistry.getlistenercontainerids();
   simplemessagelistenercontainer container = null;
   for(string id : containerids){
   container = (simplemessagelistenercontainer) rabbitlistenerendpointregistry.getlistenercontainer(id);
   if(container != null){
    container.setconcurrentconsumers(consumer);
    container.setmaxconcurrentconsumers(maxconsumer);
   }
   }
   return resultmap.buildsuccess();
  }
}

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持移动技术网。

如对本文有疑问, 点击进行留言回复!!

相关文章:

验证码:
移动技术网