当前位置: 移动技术网 > IT编程>开发语言>Java > SpringBoot集成RabbitMQ的方法(死信队列)

SpringBoot集成RabbitMQ的方法(死信队列)

2019年07月19日  | 移动技术网IT编程  | 我要评论

介绍

死信队列:没有被及时消费的消息存放的队列,消息没有被及时消费有以下几点原因:
1.有消息被拒绝(basic.reject/ basic.nack)并且requeue=false
2.队列达到最大长度
3.消息ttl过期

场景

1.小时进入初始队列,等待30分钟后进入5分钟队列
2.消息等待5分钟后进入执行队列
3.执行失败后重新回到5分钟队列
4.失败5次后,消息进入2小时队列
5.消息等待2小时进入执行队列
6.失败5次后,将消息丢弃或做其他处理

使用

安装mq

使用docker方式安装,选择带mangement的版本

docker pull rabbitmq:management
docker run -d -p 5672:5672 -p 15672:15672 --name rabbitmq rabbitmq:management

访问 localhost: 15672,默认账号密码guest/guest

项目配置

(1)创建springboot项目
(2)在application.properties配置文件中配置mq连接信息

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

(3)队列配置

package com.df.ps.mq;

import org.springframework.amqp.core.binding;
import org.springframework.amqp.core.bindingbuilder;
import org.springframework.amqp.core.queue;
import org.springframework.amqp.core.topicexchange;
import org.springframework.amqp.rabbit.connection.connectionfactory;
import org.springframework.amqp.rabbit.listener.simplemessagelistenercontainer;
import org.springframework.amqp.rabbit.listener.adapter.messagelisteneradapter;
import org.springframework.beans.factory.annotation.autowire;
import org.springframework.beans.factory.annotation.value;
import org.springframework.context.annotation.bean;
import org.springframework.context.annotation.configuration;

import java.util.hashmap;
import java.util.map;

@configuration
public class mqconfig {

  //time
  @value("${spring.df.buffered.min:120}")
  private int springdfbufferedtime;

  @value("${spring.df.high-buffered.min:5}")
  private int springdfhighbufferedtime;

  @value("${spring.df.low-buffered.min:120}")
  private int springdflowbufferedtime;

  // 30min buffered queue
  @value("${spring.df.queue:spring-df-buffered-queue}")
  private string springdfbufferedqueue;

  @value("${spring.df.topic:spring-df-buffered-topic}")
  private string springdfbufferedtopic;

  @value("${spring.df.route:spring-df-buffered-route}")
  private string springdfbufferedroutekey;

  // 5m buffered queue
  @value("${spring.df.high-buffered.queue:spring-df-high-buffered-queue}")
  private string springdfhighbufferedqueue;

  @value("${spring.df.high-buffered.topic:spring-df-high-buffered-topic}")
  private string springdfhighbufferedtopic;

  @value("${spring.df.high-buffered.route:spring-df-high-buffered-route}")
  private string springdfhighbufferedroutekey;

  // high queue
  @value("${spring.df.high.queue:spring-df-high-queue}")
  private string springdfhighqueue;

  @value("${spring.df.high.topic:spring-df-high-topic}")
  private string springdfhightopic;

  @value("${spring.df.high.route:spring-df-high-route}")
  private string springdfhighroutekey;

  // 2h low buffered queue
  @value("${spring.df.low-buffered.queue:spring-df-low-buffered-queue}")
  private string springdflowbufferedqueue;

  @value("${spring.df.low-buffered.topic:spring-df-low-buffered-topic}")
  private string springdflowbufferedtopic;

  @value("${spring.df.low-buffered.route:spring-df-low-buffered-route}")
  private string springdflowbufferedroutekey;

  // low queue
  @value("${spring.df.low.queue:spring-df-low-queue}")
  private string springdflowqueue;

  @value("${spring.df.low.topic:spring-df-low-topic}")
  private string springdflowtopic;

  @value("${spring.df.low.route:spring-df-low-route}")
  private string springdflowroutekey;


  @bean(autowire = autowire.by_name, value = "springdfbufferedqueue")
  queue springdfbufferedqueue() {
    int bufferedtime = 1000 * 60 * springdfbufferedtime;
    return createbufferedqueue(springdfbufferedqueue, springdfhighbufferedtopic, springdfhighbufferedroutekey, bufferedtime);
  }

  @bean(autowire = autowire.by_name, value = "springdfhighbufferedqueue")
  queue springdfhighbufferedqueue() {
    int highbufferedtime = 1000 * 60 * springdfhighbufferedtime;
    return createbufferedqueue(springdfhighbufferedqueue, springdfhightopic, springdfhighroutekey, highbufferedtime);
  }

  @bean(autowire = autowire.by_name, value = "springdfhighqueue")
  queue springdfhighqueue() {
    return new queue(springdfhighqueue, true);
  }

  @bean(autowire = autowire.by_name, value = "springdflowbufferedqueue")
  queue springdflowbufferedqueue() {
    int lowbufferedtime = 1000 * 60 * springdflowbufferedtime;
    return createbufferedqueue(springdflowbufferedqueue, springdflowtopic, springdflowroutekey, lowbufferedtime);
  }

  @bean(autowire = autowire.by_name, value = "springdflowqueue")
  queue springdflowqueue() {
    return new queue(springdflowqueue, true);
  }


  @bean(autowire = autowire.by_name, value = "springdfbufferedtopic")
  topicexchange springdfbufferedtopic() {
    return new topicexchange(springdfbufferedtopic);
  }

  @bean
  binding springbuffereddf(queue springdfbufferedqueue, topicexchange springdfbufferedtopic) {
    return bindingbuilder.bind(springdfbufferedqueue).to(springdfbufferedtopic).with(springdfbufferedroutekey);
  }


  @bean(autowire = autowire.by_name, value = "springdfhighbufferedtopic")
  topicexchange springdfhighbufferedtopic() {
    return new topicexchange(springdfhighbufferedtopic);
  }

  @bean
  binding springhighbuffereddf(queue springdfhighbufferedqueue, topicexchange springdfhighbufferedtopic) {
    return bindingbuilder.bind(springdfhighbufferedqueue).to(springdfhighbufferedtopic).with(springdfhighbufferedroutekey);
  }

  @bean(autowire = autowire.by_name, value = "springdfhightopic")
  topicexchange springdfhightopic() {
    return new topicexchange(springdfhightopic);
  }

  @bean
  binding springhighdf(queue springdfhighqueue, topicexchange springdfhightopic) {
    return bindingbuilder.bind(springdfhighqueue).to(springdfhightopic).with(springdfhighroutekey);
  }

  @bean(autowire = autowire.by_name, value = "springdflowbufferedtopic")
  topicexchange springdflowbufferedtopic() {
    return new topicexchange(springdflowbufferedtopic);
  }

  @bean
  binding springlowbuffereddf(queue springdflowbufferedqueue, topicexchange springdflowbufferedtopic) {
    return bindingbuilder.bind(springdflowbufferedqueue).to(springdflowbufferedtopic).with(springdflowbufferedroutekey);
  }

  @bean(autowire = autowire.by_name, value = "springdflowtopic")
  topicexchange springdflowtopic() {
    return new topicexchange(springdflowtopic);
  }

  @bean
  binding springlowdf(queue springdflowqueue, topicexchange springdflowtopic) {
    return bindingbuilder.bind(springdflowqueue).to(springdflowtopic).with(springdflowroutekey);
  }


  @bean
  simplemessagelistenercontainer container(connectionfactory connectionfactory,
                       messagelisteneradapter listeneradapter) {
    simplemessagelistenercontainer container = new simplemessagelistenercontainer();
    container.setconnectionfactory(connectionfactory);
    container.setqueuenames(springdfhighqueue, springdflowqueue);
    container.setmessagelistener(listeneradapter);
    return container;
  }

  @bean
  messagelisteneradapter listeneradapter(integrationreceiver receiver) {


    messagelisteneradapter adapter = new messagelisteneradapter(receiver);
    adapter.setdefaultlistenermethod("receive");
    map<string, string> queueortagtomethodname = new hashmap<>();
    queueortagtomethodname.put(springdfhighqueue, "springdfhighreceive");
    queueortagtomethodname.put(springdflowqueue, "springdflowreceive");
    adapter.setqueueortagtomethodname(queueortagtomethodname);
    return adapter;

  }


  private queue createbufferedqueue(string queuename, string topic, string routekey, int bufferedtime) {
    map<string, object> args = new hashmap<>();
    args.put("x-dead-letter-exchange", topic);
    args.put("x-dead-letter-routing-key", routekey);
    args.put("x-message-ttl", bufferedtime);
    // 是否持久化
    boolean durable = true;
    // 仅创建者可以使用的私有队列,断开后自动删除
    boolean exclusive = false;
    // 当所有消费客户端连接断开后,是否自动删除队列
    boolean autodelete = false;

    return new queue(queuename, durable, exclusive, autodelete, args);
  }
}

消费者配置

package com.df.ps.mq;

import com.fasterxml.jackson.databind.objectmapper;
import org.slf4j.logger;
import org.slf4j.loggerfactory;
import org.springframework.amqp.rabbit.core.rabbittemplate;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.beans.factory.annotation.value;

import java.util.map;

public class mqreceiver {

  private static logger logger = loggerfactory.getlogger(mqreceiver.class);

  @value("${high-retry:5}")
  private int highretry;

  @value("${low-retry:5}")
  private int lowretry;

  @value("${spring.df.high-buffered.topic:spring-df-high-buffered-topic}")
  private string springdfhighbufferedtopic;

  @value("${spring.df.high-buffered.route:spring-df-high-buffered-route}")
  private string springdfhighbufferedroutekey;

  @value("${spring.df.low-buffered.topic:spring-df-low-buffered-topic}")
  private string springdflowbufferedtopic;

  @value("${spring.df.low-buffered.route:spring-df-low-buffered-route}")
  private string springdflowbufferedroutekey;

  private final rabbittemplate rabbittemplate;
  @autowired
  public mqreceiver(rabbittemplate rabbittemplate) {
    this.rabbittemplate = rabbittemplate;
  }

  public void receive(object message) {
    if (logger.isinfoenabled()) {
      logger.info("default receiver: " + message);
    }
  }

  /**
   * 消息从初始队列进入5分钟的高速缓冲队列
   * @param message
   */
  public void highreceiver(object message){
    objectmapper mapper = new objectmapper();
    map msg = mapper.convertvalue(message, map.class);

    try{
      logger.info("这里做消息处理...");
    }catch (exception e){
      int times = msg.get("times") == null ? 0 : (int) msg.get("times");
      if (times < highretry) {
        msg.put("times", times + 1);
        rabbittemplate.convertandsend(springdfhighbufferedtopic,springdfhighbufferedroutekey,message);
      } else {
        msg.put("times", 0);
        rabbittemplate.convertandsend(springdflowbufferedtopic,springdflowbufferedroutekey,message);
      }
    }
  }

  /**
   * 消息从5分钟缓冲队列进入2小时缓冲队列
   * @param message
   */
  public void lowreceiver(object message){
    objectmapper mapper = new objectmapper();
    map msg = mapper.convertvalue(message, map.class);
    
    try {
      logger.info("这里做消息处理...");
    }catch (exception e){
      int times = msg.get("times") == null ? 0 : (int) msg.get("times");
      if (times < lowretry) {
        rabbittemplate.convertandsend(springdflowbufferedtopic,springdflowbufferedroutekey,message);
      }else{
        logger.info("消息无法被消费...");
      }
    } 
  }
}

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持移动技术网。

如对本文有疑问, 点击进行留言回复!!

相关文章:

验证码:
移动技术网