当前位置: 移动技术网 > IT编程>开发语言>Java > RabbitMQ(基于Spring)通过设置队列的过期时间延时时间,监听死信队列来实现延时取消订单

RabbitMQ(基于Spring)通过设置队列的过期时间延时时间,监听死信队列来实现延时取消订单

2020年07月17日  | 移动技术网IT编程  | 我要评论

直接上步骤
1、导入pom坐标

<properties>
    <slf4j.version>1.6.6</slf4j.version>
    <log4j.version>1.2.12</log4j.version>
</properties><dependencies>


    <!--log4j相关坐标-->
    <!-- log start -->
    <dependency>
        <groupId>log4j</groupId>
        <artifactId>log4j</artifactId>
        <version>${log4j.version}</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <version>${slf4j.version}</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
        <version>${slf4j.version}</version>
    </dependency>
    <!-- log end -->
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.12</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.amqp</groupId>
        <artifactId>spring-rabbit</artifactId>
        <version>1.7.5.RELEASE</version>
    </dependency>
    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>4.0.2</version>
    </dependency>

</dependencies>`

2、相关Spring的配置文件

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:aop="http://www.springframework.org/schema/aop"
       xmlns:tx="http://www.springframework.org/schema/tx" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/aop
http://www.springframework.org/schema/aop/spring-aop.xsd
http://www.springframework.org/schema/tx
http://www.springframework.org/schema/tx/spring-tx.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">


    <rabbit:connection-factory id="connectionFactory" host="127.0.0.1" port="5672"
                               username="admin" password="admin" virtual-host="/v_admin"/>

    <!--2.定义Rabbit模板。指定连接工厂以及定义exchange-->
    <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" exchange="fanoutExchange">

    </rabbit:template>
    <!--MQ的管理,包括队列、交换器声明等-->

    <rabbit:admin connection-factory="connectionFactory"/>
    <!--定义队列,自动声明-->

    <!--消费者start-->
    <!--监听的队列,监听到之后的方法-->
    <!--不知道为什么所有队列都能监听到-->
  <bean id="foo" class="rabbitMQTest.MyConsumer"/>

    <rabbit:listener-container connection-factory="connectionFactory">

        <rabbit:listener ref="foo" method="listen" queue-names="my_dlx_queue"/>
    </rabbit:listener-container>
    <!--消费者end-->

    <!--定义死信队列和死信交换机-->
    <rabbit:queue id="my_dlx_queue" name="my_dlx_queue" auto-declare="true"/>

    <!--定义广播类型交换机;并绑定上述两个队列-->
    <rabbit:direct-exchange name="my_dlx_exchange" auto-declare="true">
        <rabbit:bindings>
            <!--绑定路由键my_ttl_dlx my_max_dlx,可以将过期的消息转移到my_dlx_queue队列-->
            <rabbit:binding key="my_ttl_dlx" queue="my_dlx_queue"/>
        </rabbit:bindings>
    </rabbit:direct-exchange>



<!--定义一个队列,设置6秒过期-->
    <rabbit:queue id="my_ttl_dlx_queue" name="my_ttl_dlx_queue" auto-declare="true">
        <rabbit:queue-arguments>
            <!--投递到该队列的消息如果没有被消费都将在6秒后投递到私信交换机-->
            <entry key="x-message-ttl" value-type="long" value="10000"/>
            <!--设置当消息过期后投递到对应的死信交换机-->
            <entry key="x-dead-letter-exchange" value="my_dlx_exchange"/>

        </rabbit:queue-arguments>
    </rabbit:queue>

    <!--定义定向交换机 根据不同的路由key投递消息  ,这个交换机是测试代码投递消息的时候投递给的对象my_ttl_dlx-->
    <rabbit:direct-exchange name="my_normal_exchange" id="my_normal_exchange" auto-declare="true">
        <rabbit:bindings>
            <rabbit:binding key="my_ttl_dlx" queue="my_ttl_dlx_queue"/>
        </rabbit:bindings>
    </rabbit:direct-exchange>

</beans>

3、编写生产者,即发送消息的,将消息发送到延时队列my_ttl_dlx_queue中,该队列中所有数据会在上面设置的long类型的时间之后失效,并被投递到死信队列my_dlx_queue。

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.support.AbstractApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

/**
 * @Author Fy
 * @Time 2020年7月16日 10:28:18
 * @QQ 1057072154
 */
public class SpringMain {
    public static void main(String[] args) throws Exception {
        //RabbitMQ模板
        AbstractApplicationContext ctx =new ClassPathXmlApplicationContext("classpath:applicationContext-rabbitmq.xml");
        //发送消息
        RabbitTemplate template = ctx.getBean(RabbitTemplate.class);
        for (int i = 0; i < 10; i++) {
            template.convertAndSend("my_normal_exchange",
                    "my_ttl_dlx","测试过期消息6秒之后将被投递到死信队列");
        }

//        ctx.destroy();
    }
}

4、编写消费者

package rabbitMQTest;

public class MyConsumer {

//    @RabbitListener(queues = "my_dlx_queue")
    public  void listen(String message){
        System.out.println("消费者收到的消息为::"+message);
    }

}

5、程序启动后会自动监听
监听处于堵塞状态的队列
6、10秒之后延时队列中的消息过期,被交换机投递到死信队列,消费者监听到,开始消费,
在这里插入图片描述
忽略图中写的6秒,其实我设置的延时是10秒。
7、根据自己需求往自己的项目中去进行集成。有的在SSM集成这个RabbitMQ的时候会报相关异常,可能添加这个坐标就可以解决

		<!--添加了这个jar就不报错了-->
		<dependency>
			<groupId>org.springframework.retry</groupId>
			<artifactId>spring-retry</artifactId>
			<version>1.0.3.RELEASE</version>
		</dependency>

本文地址:https://blog.csdn.net/weixin_43620015/article/details/107377412

如对本文有疑问, 点击进行留言回复!!

相关文章:

验证码:
移动技术网