简介
rabbitmq是实现amqp(高级消息队列协议)的消息中间件的一种,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗
概念:
模式:
springboot集成rabbitmq
一、引入maven依赖
<dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-amqp</artifactid> <version>1.5.2.release</version> </dependency>
二、配置application.properties
# rabbitmq spring.rabbitmq.host = dev-mq.a.pa.com spring.rabbitmq.port = 5672 spring.rabbitmq.username = admin spring.rabbitmq.password = admin spring.rabbitmq.virtualhost = /message-test/
三、编写amqpconfiguration配置文件
package message.test.configuration; import org.springframework.amqp.core.acknowledgemode; import org.springframework.amqp.core.amqptemplate; import org.springframework.amqp.core.binding; import org.springframework.amqp.core.bindingbuilder; import org.springframework.amqp.core.directexchange; import org.springframework.amqp.core.queue; import org.springframework.amqp.rabbit.config.simplerabbitlistenercontainerfactory; import org.springframework.amqp.rabbit.connection.cachingconnectionfactory; import org.springframework.amqp.rabbit.connection.connectionfactory; import org.springframework.amqp.rabbit.core.rabbittemplate; import org.springframework.beans.factory.annotation.autowired; import org.springframework.beans.factory.annotation.qualifier; import org.springframework.boot.autoconfigure.amqp.rabbitproperties; import org.springframework.context.annotation.bean; import org.springframework.context.annotation.configuration; @configuration public class amqpconfiguration { /** * 消息编码 */ public static final string message_encoding = "utf-8"; public static final string exchange_issue = "exchange_message_issue"; public static final string queue_issue_user = "queue_message_issue_user"; public static final string queue_issue_all_user = "queue_message_issue_all_user"; public static final string queue_issue_all_device = "queue_message_issue_all_device"; public static final string queue_issue_city = "queue_message_issue_city"; public static final string routing_key_issue_user = "routing_key_message_issue_user"; public static final string routing_key_issue_all_user = "routing_key_message_issue_all_user"; public static final string routing_key_issue_all_device = "routing_key_message_issue_all_device"; public static final string routing_key_issue_city = "routing_key_message_issue_city"; public static final string exchange_push = "exchange_message_push"; public static final string queue_push_result = "queue_message_push_result"; @autowired private rabbitproperties rabbitproperties; @bean public queue issueuserqueue() { return new queue(queue_issue_user); } @bean public queue issuealluserqueue() { return new queue(queue_issue_all_user); } @bean public queue issuealldevicequeue() { return new queue(queue_issue_all_device); } @bean public queue issuecityqueue() { return new queue(queue_issue_city); } @bean public queue pushresultqueue() { return new queue(queue_push_result); } @bean public directexchange issueexchange() { return new directexchange(exchange_issue); } @bean public directexchange pushexchange() { // 参数1:队列 // 参数2:是否持久化 // 参数3:是否自动删除 return new directexchange(exchange_push, true, true); } @bean public binding issueuserqueuebinding(@qualifier("issueuserqueue") queue queue, @qualifier("issueexchange") directexchange exchange) { return bindingbuilder.bind(queue).to(exchange).with(routing_key_issue_user); } @bean public binding issuealluserqueuebinding(@qualifier("issuealluserqueue") queue queue, @qualifier("issueexchange") directexchange exchange) { return bindingbuilder.bind(queue).to(exchange).with(routing_key_issue_all_user); } @bean public binding issuealldevicequeuebinding(@qualifier("issuealldevicequeue") queue queue, @qualifier("issueexchange") directexchange exchange) { return bindingbuilder.bind(queue).to(exchange).with(routing_key_issue_all_device); } @bean public binding issuecityqueuebinding(@qualifier("issuecityqueue") queue queue, @qualifier("issueexchange") directexchange exchange) { return bindingbuilder.bind(queue).to(exchange).with(routing_key_issue_city); } @bean public binding pushresultqueuebinding(@qualifier("pushresultqueue") queue queue, @qualifier("pushexchange") directexchange exchange) { return bindingbuilder.bind(queue).to(exchange).withqueuename(); } @bean public connectionfactory defaultconnectionfactory() { cachingconnectionfactory connectionfactory = new cachingconnectionfactory(); connectionfactory.sethost(rabbitproperties.gethost()); connectionfactory.setport(rabbitproperties.getport()); connectionfactory.setusername(rabbitproperties.getusername()); connectionfactory.setpassword(rabbitproperties.getpassword()); connectionfactory.setvirtualhost(rabbitproperties.getvirtualhost()); return connectionfactory; } @bean public simplerabbitlistenercontainerfactory rabbitlistenercontainerfactory( @qualifier("defaultconnectionfactory") connectionfactory connectionfactory) { simplerabbitlistenercontainerfactory factory = new simplerabbitlistenercontainerfactory(); factory.setconnectionfactory(connectionfactory); factory.setacknowledgemode(acknowledgemode.manual); return factory; } @bean public amqptemplate rabbittemplate(@qualifier("defaultconnectionfactory") connectionfactory connectionfactory) { return new rabbittemplate(connectionfactory); } }
三、编写生产者
body = json.tojsonstring(issuemessage).getbytes(amqpconfiguration.message_encoding); rabbittemplate.convertandsend(amqpconfiguration.exchange_issue, amqpconfiguration.routing_key_issue_user, body);
四、编写消费者
@rabbitlistener(queues = amqpconfiguration.queue_push_result) public void handlepushresult(@payload byte[] data, channel channel, @header(amqpheaders.delivery_tag) long deliverytag) { }
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持移动技术网。
如对本文有疑问, 点击进行留言回复!!
集合框架——Map、泛型以及Collection算法常用方法
Elasticsearch 升级 7.x 版本后,我感觉掉坑里了
Mybatis的插件运行原理以及如何编写一个Mybatis的插件
网友评论