当前位置: 移动技术网 > IT编程>开发语言>Java > rabbitmq的ttl和死信交换机

rabbitmq的ttl和死信交换机

2020年07月31日  | 移动技术网IT编程  | 我要评论
TTL:过期时间 * 1. 队列统一过期(整个队列设置了时间的过期) * 2. 消息单独过期(在发送消息的时候,其中有一条消息有过期时间,而其他的消息都是正常的消息没有设置过期的消息时间限制,只有设置时间消息在队列顶端,才会判断其是否移除掉) * 如果设置了消息的过期时间,也设置了队列的过期时间,它以时间短的为准。 * 队列过期后,会将队列所有消息全部移除。 * 消息过期后,只有消息在队列顶端,才会判断其是否过期(移除掉)成为死信的三种情况 * 1.进入队列的消息超过了队列本身的长度限制,这

TTL:过期时间

消息的过期设置都是在生产者那一端进行的.
首先队列queue就会有永久的队列或者是带有时间的设置的队列.
然后我的队列中的消息也会存在永久或者带有时间的消息.
当我们队列和消息都设置了时间,就会以时间短的那个算起.
* 1. 队列统一过期(整个队列设置了时间的过期)
* 2. 消息单独过期(在发送消息的时候,其中有一条消息有过期时间,而其他的消息都是正常的消息没有设置过期的消息时间限制,只有设置时间消息在队列顶端,才会判断其是否移除掉)
* 如果设置了消息的过期时间,也设置了队列的过期时间,它以时间短的为准。
* 队列过期后,会将队列所有消息全部移除。
* 消息过期后,只有消息在队列顶端,才会判断其是否过期(移除掉)

成为死信的三种情况

 * 1.进入队列的消息超过了队列本身的长度限制,这条消息会成为死信
 * 2.消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false;(新建一个消费者,去监听正常的队列)
 * 3.原队列存在消息过期设置,消息到达超时时间未被消费

在生产者端

rabbitmq.properties

rabbitmq.host=192.168.88.133
rabbitmq.port=5672
rabbitmq.username=guest
rabbitmq.password=guest
rabbitmq.virtual-host=/

spring-rabbitmq-producer.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/context
       https://www.springframework.org/schema/context/spring-context.xsd
       http://www.springframework.org/schema/rabbit
       http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
    <!--加载配置文件-->
    <context:property-placeholder location="classpath:rabbitmq.properties"/>
    <!-- 定义rabbitmq connectionFactory -->
    <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
                               port="${rabbitmq.port}"
                               username="${rabbitmq.username}"
                               password="${rabbitmq.password}"
                               virtual-host="${rabbitmq.virtual-host}"
                               publisher-confirms="true"
                               publisher-returns="true"
    />
    <!--定义管理交换机、队列-->
    <rabbit:admin connection-factory="connectionFactory"/>
    <!--定义rabbitTemplate对象操作可以在代码中方便发送消息-->
    <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>
    <!--ttl-->
    <rabbit:queue name="test_queue_ttl" id="test_queue_ttl">
        <!--设置queue的参数-->
        <rabbit:queue-arguments>
            <!--x-message-ttl指队列的过期时间-->
            <entry key="x-message-ttl" value="60000" value-type="java.lang.Integer"></entry>
        </rabbit:queue-arguments>
    </rabbit:queue>
    <rabbit:topic-exchange name="test_exchange_ttl" >
        <rabbit:bindings>
            <rabbit:binding pattern="ttl.#" queue="test_queue_ttl"></rabbit:binding>
        </rabbit:bindings>
    </rabbit:topic-exchange>
    <!--
      死信队列:
          1. 声明正常的队列(test_queue_dlx)和交换机(test_exchange_dlx)
          2. 声明死信队列(queue_dlx)和死信交换机(exchange_dlx)
          3. 正常队列绑定死信交换机
              设置两个参数:
                  * x-dead-letter-exchange:死信交换机名称
                  * x-dead-letter-routing-key:发送给死信交换机的routingkey
  -->
    <!--
        1. 声明正常的队列(test_queue_dlx)和交换机(test_exchange_dlx)
    -->
    <rabbit:queue name="test_queue_dlx" id="test_queue_dlx">
        <!--3. 正常队列绑定死信交换机-->
        <rabbit:queue-arguments>
            <!--3.1 x-dead-letter-exchange:死信交换机名称-->
            <entry key="x-dead-letter-exchange" value="exchange_dlx" />
            <!--3.2 x-dead-letter-routing-key:发送给死信交换机的routingkey-->
            <entry key="x-dead-letter-routing-key" value="dlx.hehe" />
            <!--4.1 设置队列的过期时间 ttl-->
            <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer" />
            <!--4.2 设置队列的长度限制 max-length -->
            <entry key="x-max-length" value="10" value-type="java.lang.Integer" />
        </rabbit:queue-arguments>
    </rabbit:queue>
    <rabbit:topic-exchange name="test_exchange_dlx">
        <rabbit:bindings>
            <rabbit:binding pattern="test.dlx.#" queue="test_queue_dlx"></rabbit:binding>
        </rabbit:bindings>
    </rabbit:topic-exchange>
    <!--
       2. 声明死信队列(queue_dlx)和死信交换机(exchange_dlx)
   -->
    <rabbit:queue name="queue_dlx" id="queue_dlx"></rabbit:queue>
    <rabbit:topic-exchange name="exchange_dlx">
        <rabbit:bindings>
            <rabbit:binding pattern="dlx.#" queue="queue_dlx"></rabbit:binding>
        </rabbit:bindings>
    </rabbit:topic-exchange>
</beans>

测试方法用于检测时间的过期,和检测死信队列

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq-producer.xml")
public class TestProducer {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    /**
     * TTL:过期时间
     * 1. 队列统一过期(整个队列设置了时间的过期)
     * <p>
     * 2. 消息单独过期(在发送消息的时候,其中有一条消息有过期时间,而其他的消息都是正常的消息没有设置过期的消息时间限制,只有设置时间消息在队列顶端,才会判断其是否移除掉)
     * <p>
     * <p>
     * 如果设置了消息的过期时间,也设置了队列的过期时间,它以时间短的为准。
     * 队列过期后,会将队列所有消息全部移除。
     * 消息过期后,只有消息在队列顶端,才会判断其是否过期(移除掉)
     */
    @Test
    public void testTtl() {
      /*  for (int i = 0; i < 10; i++) {
            // 发送消息
            rabbitTemplate.convertAndSend("test_exchange_ttl", "ttl.hehe", "message ttl....");
        }*/
        // 消息后处理对象,设置一些消息的参数信息
        MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                //1.设置message的信息
                message.getMessageProperties().setExpiration("5000");//消息的过期时间
                //2.返回该消息
                return message;
            }
        };
        //消息单独过期
        //rabbitTemplate.convertAndSend("test_exchange_ttl", "ttl.hehe", "message ttl....",messagePostProcessor);
        for (int i = 0; i < 10; i++) {
            if (i == 5) {
                //消息单独过期
                rabbitTemplate.convertAndSend("test_exchange_ttl", "ttl.hehe", "message ttl....", messagePostProcessor);
            } else {
                //不过期的消息
                rabbitTemplate.convertAndSend("test_exchange_ttl", "ttl.hehe", "message ttl....");
            }
        }
    }
    /**
     * 测试死信队列
     * 成为死信的三种情况
     * 1.进入队列的消息超过了队列本身的长度限制,这条消息会成为死信
     * 2.消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false;(新建一个消费者,去监听正常的队列)
     * 3.原队列存在消息过期设置,消息到达超时时间未被消费
     */
    @Test
    public void testDlx() {
       /* //3.原队列存在消息过期设置,消息到达超时时间未被消费
       rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.dlx","我是一条消息,我会成为死信马?");*/
       /* //1.进入队列的消息超过了队列本身的长度限制,这条消息会成为死信
        for (int i = 0; i < 20; i++) {        rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.dlx","我是一条消息,我会成为死信马?");
        }*/
       rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.dlx","我是一条消息,我会成为死信马?");
    }

}

成为死信的第二种情况
消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false;(新建一个消费者,去监听正常的队列)

@Component
public class SpringDlxListener implements ChannelAwareMessageListener {
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
//basicAck(long deliveryTag, boolean multiple)
        try {
            System.out.println(new String(message.getBody()));
            System.out.println("处理业务逻辑");
            //人为发送异常,消费者会拒绝接受消息
            int i = 1 / 0;
            //一次性设置1数据的传输
            if (deliveryTag % 1 == 0) {
                channel.basicAck(deliveryTag, true);
            }
        } catch (Exception e) {
        //消费者拒绝接受消息,并且不让消息重新进入队列,此时消息就会成为死信消息
            channel.basicNack(deliveryTag, true, false);
        }
    }
}

rabbitmq.properties

rabbitmq.host=192.168.88.133
rabbitmq.port=5672
rabbitmq.username=guest
rabbitmq.password=guest
rabbitmq.virtual-host=/

spring-rabbitmq-consumer.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/context
       https://www.springframework.org/schema/context/spring-context.xsd
       http://www.springframework.org/schema/rabbit
       http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
    <!--加载配置文件-->
    <context:property-placeholder location="classpath:rabbitmq.properties"/>
    <!-- 定义rabbitmq connectionFactory -->
    <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
                               port="${rabbitmq.port}"
                               username="${rabbitmq.username}"
                               password="${rabbitmq.password}"
                               virtual-host="${rabbitmq.virtual-host}"
    />
    <context:component-scan base-package="com.itheima.rabbitmq.listener"/>
    <!--设置手动的签收,acknowledge="manual"-->
    <rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="1" auto-declare="true">
        <rabbit:listener ref="springDlxListener" queue-names="test_queue_dlx"/>
    </rabbit:listener-container>
</beans>

测试方法用于消费者一直监听消息

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq-consumer.xml")
public class TestConsumer {
    @Test
    public void test1(){
        boolean flag = true;
        while (true){
        }
    }
}

本文地址:https://blog.csdn.net/QiYang1024/article/details/107690987

如您对本文有疑问或者有任何想说的,请 点击进行留言回复,万千网友为您解惑!

相关文章:

验证码:
移动技术网