当前位置: 移动技术网 > IT编程>开发语言>Java > spring boot集成rabbitmq的实例教程

spring boot集成rabbitmq的实例教程

2019年07月19日  | 移动技术网IT编程  | 我要评论
一、rabbitmq的介绍   rabbitmq是消息中间件的一种,消息中间件即分布式系统中完成消息的发送和接收的基础软件.这些软件有很多,包括activemq(apac

一、rabbitmq的介绍  

rabbitmq是消息中间件的一种,消息中间件即分布式系统中完成消息的发送和接收的基础软件.这些软件有很多,包括activemq(apache公司的),rocketmq(阿里巴巴公司的,现已经转让给apache).

消息中间件的工作过程可以用生产者消费者模型来表示.即,生产者不断的向消息队列发送信息,而消费者从消息队列中消费信息.具体过程如下:

从上图可看出,对于消息队列来说,生产者,消息队列,消费者是最重要的三个概念,生产者发消息到消息队列中去,消费者监听指定的消息队列,并且当消息队列收到消息之后,接收消息队列传来的消息,并且给予相应的处理.消息队列常用于分布式系统之间互相信息的传递.

对于rabbitmq来说,除了这三个基本模块以外,还添加了一个模块,即交换机(exchange).它使得生产者和消息队列之间产生了隔离,生产者将消息发送给交换机,而交换机则根据调度策略把相应的消息转发给对应的消息队列.那么rabitmq的工作流程如下所示:

紧接着说一下交换机.交换机的主要作用是接收相应的消息并且绑定到指定的队列.交换机有四种类型,分别为direct,topic,headers,fanout.

direct是rabbitmq默认的交换机模式,也是最简单的模式.即创建消息队列的时候,指定一个bindingkey.当发送者发送消息的时候,指定对应的key.当key和消息队列的bindingkey一致的时候,消息将会被发送到该消息队列中.

topic转发信息主要是依据通配符,队列和交换机的绑定主要是依据一种模式(通配符+字符串),而当发送消息的时候,只有指定的key和该模式相匹配的时候,消息才会被发送到该消息队列中.

headers也是根据一个规则进行匹配,在消息队列和交换机绑定的时候会指定一组键值对规则,而发送消息的时候也会指定一组键值对规则,当两组键值对规则相匹配的时候,消息会被发送到匹配的消息队列中.

fanout是路由广播的形式,将会把消息发给绑定它的全部队列,即便设置了key,也会被忽略. 

概念:

  • 生产者 消息的产生方,负责将消息推送到消息队列
  • 消费者 消息的最终接受方,负责监听队列中的对应消息,消费消息
  • 队列 消息的寄存器,负责存放生产者发送的消息
  • 交换机 负责根据一定规则分发生产者产生的消息
  • 绑定 完成交换机和队列之间的绑定

模式:

1、direct

直连模式,用于实例间的任务分发

2、topic

话题模式,通过可配置的规则分发给绑定在该exchange上的队列

3、headers

适用规则复杂的分发,用headers里的参数表达规则

4、fanout

分发给所有绑定到该exchange上的队列,忽略routing key

安装

单机版安装很简单,大概步骤如下:

# 安装erlang包
 yum install erlang
# 安装socat
 yum install socat
# 安装rabbit 
 rpm -ivh rabbitmq-server-3.6.6-1.el6.noarch.rpm 
# 启动服务
 rabbitmq-server start
# 增加管理控制功能
 rabbitmq-plugins enable rabbitmq_management
# 增加用户:
 sudo rabbitmqctl add_user root password
 rabbitmqctl set_user_tags root administrator 
 rabbitmqctl set_permissions -p / root '.*' '.*' '.*'

集群安装,可参考这篇文章:

    

以上就是rabbitmq的介绍,下面开始本文的正文:spring boot 集成rabbitmq ,本人在学习rabbitmq时发现网上很少有系统性介绍springboot和rabbitmq如何集成的,其他人总结的都片段化,所以结合个人调研过程,整理此篇文章。

二、springboot配置

废话少说直接上代码:

配置参数

application.yml:

spring:
 rabbitmq:
 addresses: 192.168.1.1:5672
 username: username
 password: password
 publisher-confirms: true
 virtual-host: /

java config读取参数

/**
 * rabbitmq配置文件读取类
 *
 * @author chenhf
 * @create 2017-10-23 上午9:31
 **/
@configuration
@configurationproperties(prefix = "spring.rabbitmq")
public class rabbitmqconfig {

 @value("${spring.rabbitmq.addresses}")
 private string addresses;
 @value("${spring.rabbitmq.username}")
 private string username;
 @value("${spring.rabbitmq.password}")
 private string password;
 @value("${spring.rabbitmq.publisher-confirms}")
 private boolean publisherconfirms;
 @value("${spring.rabbitmq.virtual-host}")
 private string virtualhost;

 // 构建mq实例工厂
 @bean
 public connectionfactory connectionfactory(){
 cachingconnectionfactory connectionfactory = new cachingconnectionfactory();
 connectionfactory.setaddresses(addresses);
 connectionfactory.setusername(username);
 connectionfactory.setpassword(password);
 connectionfactory.setpublisherconfirms(publisherconfirms);
 connectionfactory.setvirtualhost(virtualhost);
 return connectionfactory;
 }

 @bean
 public rabbitadmin rabbitadmin(connectionfactory connectionfactory){
 return new rabbitadmin(connectionfactory);
 }

 @bean
 @scope(configurablebeanfactory.scope_prototype)
 public rabbittemplate rabbittemplate(){
 rabbittemplate template = new rabbittemplate(connectionfactory());
 return template;
 }
}

三、rabbitmq生产者配置

主要配置了直连和话题模式,其中话题模式设置两个队列(queuetopictest1、queuetopictest2),此两个队列在和交换机绑定时分别设置不同的routingkey(.test.以及lazy.#)来验证匹配模式。

/**
 * 用于配置交换机和队列对应关系
 * 新增消息队列应该按照如下步骤
 * 1、增加queue bean,参见queuexxxx方法
 * 2、增加queue和exchange的binding
 * @author chenhf
 * @create 2017-10-23 上午10:33
 **/
@configuration
@autoconfigureafter(rabbitmqconfig.class)
public class rabbitmqexchangeconfig {
 /** logger */
 private static final logger logger = loggerfactory.getlogger(rabbitmqexchangeconfig.class);

 /**
 * @author:chenhf
 * @description: 主题型交换机
 * @date:下午5:49 2017/10/23
 * @param
 * @return
 */
 @bean
 topicexchange contracttopicexchangedurable(rabbitadmin rabbitadmin){
 topicexchange contracttopicexchange = new topicexchange(rabbitmqenum.exchange.contract_topic.getcode());
 rabbitadmin.declareexchange(contracttopicexchange);
 logger.debug("完成主题型交换机bean实例化");
 return contracttopicexchange;
 }
 /**
 * 直连型交换机
 */
 @bean
 directexchange contractdirectexchange(rabbitadmin rabbitadmin) {
 directexchange contractdirectexchange = new directexchange(rabbitmqenum.exchange.contract_direct.getcode());
 rabbitadmin.declareexchange(contractdirectexchange);
 logger.debug("完成直连型交换机bean实例化");
 return contractdirectexchange;
 }

 //在此可以定义队列

 @bean
 queue queuetest(rabbitadmin rabbitadmin){
 queue queue = new queue(rabbitmqenum.queuename.testqueue.getcode());
 rabbitadmin.declarequeue(queue);
 logger.debug("测试队列实例化完成");
 return queue;
 }

 //topic 1
 @bean
 queue queuetopictest1(rabbitadmin rabbitadmin){
 queue queue = new queue(rabbitmqenum.queuename.topictest1.getcode());
 rabbitadmin.declarequeue(queue);
 logger.debug("话题测试队列1实例化完成");
 return queue;
 }
 //topic 2
 @bean
 queue queuetopictest2(rabbitadmin rabbitadmin){
 queue queue = new queue(rabbitmqenum.queuename.topictest2.getcode());
 rabbitadmin.declarequeue(queue);
 logger.debug("话题测试队列2实例化完成");
 return queue;
 }


 //在此处完成队列和交换机绑定
 @bean
 binding bindingqueuetest(queue queuetest,directexchange exchange,rabbitadmin rabbitadmin){
 binding binding = bindingbuilder.bind(queuetest).to(exchange).with(rabbitmqenum.queueenum.testqueue.getcode());
 rabbitadmin.declarebinding(binding);
 logger.debug("测试队列与直连型交换机绑定完成");
 return binding;
 }
 //topic binding1
 @bean
 binding bindingqueuetopictest1(queue queuetopictest1,topicexchange exchange,rabbitadmin rabbitadmin){
 binding binding = bindingbuilder.bind(queuetopictest1).to(exchange).with(rabbitmqenum.queueenum.testtopicqueue1.getcode());
 rabbitadmin.declarebinding(binding);
 logger.debug("测试队列与话题交换机1绑定完成");
 return binding;
 }

 //topic binding2
 @bean
 binding bindingqueuetopictest2(queue queuetopictest2,topicexchange exchange,rabbitadmin rabbitadmin){
 binding binding = bindingbuilder.bind(queuetopictest2).to(exchange).with(rabbitmqenum.queueenum.testtopicqueue2.getcode());
 rabbitadmin.declarebinding(binding);
 logger.debug("测试队列与话题交换机2绑定完成");
 return binding;
 }

}

在这里用到枚举类:rabbitmqenum

/**
 * 定义rabbitmq需要的常量
 *
 * @author chenhf
 * @create 2017-10-23 下午4:07
 **/
public class rabbitmqenum {

 /**
 * @param
 * @author:chenhf
 * @description:定义数据交换方式
 * @date:下午4:08 2017/10/23
 * @return
 */
 public enum exchange {
 contract_fanout("contract_fanout", "消息分发"),
 contract_topic("contract_topic", "消息订阅"),
 contract_direct("contract_direct", "点对点");

 private string code;
 private string name;

 exchange(string code, string name) {
 this.code = code;
 this.name = name;
 }

 public string getcode() {
 return code;
 }

 public string getname() {
 return name;
 }
 }

 /**
 * describe: 定义队列名称
 * creat_user: chenhf
 * creat_date: 2017/10/31
 **/
 public enum queuename {
 testqueue("testqueue", "测试队列"),
 topictest1("topictest1", "topic测试队列"),
 topictest2("topictest2", "topic测试队列");

 private string code;
 private string name;

 queuename(string code, string name) {
 this.code = code;
 this.name = name;
 }

 public string getcode() {
 return code;
 }

 public string getname() {
 return name;
 }

 }

 /**
 * describe: 定义routing_key
 * creat_user: chenhf
 * creat_date: 2017/10/31
 **/
 public enum queueenum {
 testqueue("testqueue1", "测试队列key"),
 testtopicqueue1("*.test.*", "topic测试队列key"),
 testtopicqueue2("lazy.#", "topic测试队列key");


 private string code;
 private string name;

 queueenum(string code, string name) {
 this.code = code;
 this.name = name;
 }

 public string getcode() {
 return code;
 }

 public string getname() {
 return name;
 }
 }

}

以上完成消息生产者的定义,下面封装调用接口

测试时直接调用此工具类,testuser类需自己实现

rabbitmqsender.sendrabbitmqdirect("testqueue1",testuser);
rabbitmqsender.sendrabbitmqtopic("lazy.1.2",testuser);
rabbitmqsender.sendrabbitmqtopic("lazy.test.2",testuser);
/**
 * rabbitmq发送消息工具类
 *
 * @author chenhf
 * @create 2017-10-26 上午11:10
 **/

@component
public class rabbitmqsender implements rabbittemplate.confirmcallback{
 /** logger */
 private static final logger logger = loggerfactory.getlogger(rabbitmqsender.class);

 private rabbittemplate rabbittemplate;

 @autowired
 public rabbitmqsender(rabbittemplate rabbittemplate) {
 this.rabbittemplate = rabbittemplate;
 this.rabbittemplate.setconfirmcallback(this);
 }

 @override
 public void confirm(correlationdata correlationdata, boolean b, string s) {
 logger.info("confirm: " + correlationdata.getid());
 }

 /**
 * 发送到 指定routekey的指定queue
 * @param routekey
 * @param obj
 */
 public void sendrabbitmqdirect(string routekey,object obj) {
 correlationdata correlationdata = new correlationdata(uuid.randomuuid().tostring());
 logger.info("send: " + correlationdata.getid());
 this.rabbittemplate.convertandsend(rabbitmqenum.exchange.contract_direct.getcode(), routekey , obj, correlationdata);
 }

 /**
 * 所有发送到topic exchange的消息被转发到所有关心routekey中指定topic的queue上
 * @param routekey
 * @param obj
 */
 public void sendrabbitmqtopic(string routekey,object obj) {
 correlationdata correlationdata = new correlationdata(uuid.randomuuid().tostring());
 logger.info("send: " + correlationdata.getid());
 this.rabbittemplate.convertandsend(rabbitmqenum.exchange.contract_topic.getcode(), routekey , obj, correlationdata);
 }
}

四、rabbitmq消费者配置

springboot注解方式监听队列,无法手动指定回调,所以采用了实现channelawaremessagelistener接口,重写onmessage来进行手动回调,详见以下代码,详细介绍可以在spring的官网上找amqp相关章节阅读

直连消费者

通过设置testuser的name来测试回调,分别发两条消息,一条username为1,一条为2,查看控制台中队列中消息是否被消费

/**
 * 消费者配置
 *
 * @author chenhf
 * @create 2017-10-30 下午3:14
 **/
@configuration
@autoconfigureafter(rabbitmqconfig.class)
public class exampleamqpconfiguration {
 @bean("testqueuecontainer")
 public messagelistenercontainer messagelistenercontainer(connectionfactory connectionfactory) {
 simplemessagelistenercontainer container = new simplemessagelistenercontainer();
 container.setconnectionfactory(connectionfactory);
 container.setqueuenames("testqueue");
 container.setmessagelistener(examplelistener());
 container.setacknowledgemode(acknowledgemode.manual);
 return container;
 }


 @bean("testqueuelistener")
 public channelawaremessagelistener examplelistener() {
 return new channelawaremessagelistener() {
 @override
 public void onmessage(message message, channel channel) throws exception {
 testuser testuser = (testuser) serializeutil.unserialize(message.getbody());
 //通过设置testuser的name来测试回调,分别发两条消息,一条username为1,一条为2,查看控制台中队列中消息是否被消费
 if ("2".equals(testuser.getusername())){
  system.out.println(testuser.tostring());
  channel.basicack(message.getmessageproperties().getdeliverytag(),false);
 }

 if ("1".equals(testuser.getusername())){
  system.out.println(testuser.tostring());
  channel.basicnack(message.getmessageproperties().getdeliverytag(),false,true);
 }

 }
 };
 }

}

topic消费者1

/**
 * 消费者配置
 *
 * @author chenhf
 * @create 2017-10-30 下午3:14
 **/
@configuration
@autoconfigureafter(rabbitmqconfig.class)
public class topicamqpconfiguration {
 @bean("topictest1container")
 public messagelistenercontainer messagelistenercontainer(connectionfactory connectionfactory) {
 simplemessagelistenercontainer container = new simplemessagelistenercontainer();
 container.setconnectionfactory(connectionfactory);
 container.setqueuenames("topictest1");
 container.setmessagelistener(examplelistener1());
 container.setacknowledgemode(acknowledgemode.manual);
 return container;
 }


 @bean("topictest1listener")
 public channelawaremessagelistener examplelistener1(){
 return new channelawaremessagelistener() {
 @override
 public void onmessage(message message, channel channel) throws exception {
 testuser testuser = (testuser) serializeutil.unserialize(message.getbody());
 system.out.println("topictest1:"+testuser.tostring());
 channel.basicack(message.getmessageproperties().getdeliverytag(),false);

 }
 };
 }




}

topic消费者2

/**
 * 消费者配置
 *
 * @author chenhf
 * @create 2017-10-30 下午3:14
 **/
@configuration
@autoconfigureafter(rabbitmqconfig.class)
public class topicamqpconfiguration2 {
 @bean("topictest2container")
 public messagelistenercontainer messagelistenercontainer(connectionfactory connectionfactory) {
 simplemessagelistenercontainer container = new simplemessagelistenercontainer();
 container.setconnectionfactory(connectionfactory);
 container.setqueuenames("topictest2");
 container.setmessagelistener(examplelistener());
 container.setacknowledgemode(acknowledgemode.manual);
 return container;
 }


 @bean("topictest2listener")
 public channelawaremessagelistener examplelistener() {
 return new channelawaremessagelistener() {
 @override
 public void

总结

以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,如果有疑问大家可以留言交流,谢谢大家对移动技术网的支持。

如您对本文有疑问或者有任何想说的,请点击进行留言回复,万千网友为您解惑!

相关文章:

验证码:
移动技术网