当前位置: 移动技术网 > IT编程>开发语言>.net > RabbitMQ-消费者"未处理完的消息"丢失

RabbitMQ-消费者"未处理完的消息"丢失

2018年10月22日  | 移动技术网IT编程  | 我要评论

工程塑料,5iwwe,榆树庄拆迁

一个关于客户端(消费者)开启自动应答,重启后"未处理消息丢失"的小坑。(主要是对rabbitmq理解不够)

首先,申明一下: 本文所谓的 "丢失消息" 不是指服务器宕机、重启等原因导致内存中消息丢失,也就是说不是关于消息持久化的问题

 

  使用c# 编写测试。

  问题表象:  消费者开启自动应答,某时,消费者掉线(关闭/崩溃等),届时重启消费者,发现消费者未处理完的消息丢失

  条件: 服务器不宕机、不重启,只有一个消费者、一个生产者。

  消息流向:  消息--->生产者--->交换器--->队列--->消费者

  问题的处理: 消费者开启手动应答,若再出现之前情况,消息不丢失。

  先给个代码。

  生产者代码如下:

static void main(string[] args)
        {
            var factory = new connectionfactory() { hostname = "localhost" };
            using (var connection = factory.createconnection())
            using (var channel = connection.createmodel())
            {
          //申明广播类型交换器
                channel.exchangedeclare(exchange: "ex1", type: "fanout");
          //申明队列
                channel.queuedeclare(queue: "test1",
                                     durable: false,
                                     exclusive: false,
                                     autodelete: false,
                                     arguments: null);

                int count = 0;
                
                while (true)
                {
                    count++;
                    var body = encoding.utf8.getbytes(count.tostring());
            //向key为 p 的交换器 ex1 上推数据 channel.basicpublish(exchange: "ex1", routingkey: "p", basicproperties: null, body: body); console.writeline($"send msg {count}"); system.threading.thread.sleep(1000); } } }

 

 消费者代码如下(开启自动应答):

static void main(string[] args)
        {
            var factory = new connectionfactory() { hostname = "localhost" };
            using (var connection = factory.createconnection())
            using (var channel = connection.createmodel())
            {
          //队列与交换器绑定
                channel.queuebind(queue: "test1",
                              exchange: "ex1",
                              routingkey: "p");

                var consumer = new eventingbasicconsumer(channel);

                consumer.received += (model, ea) =>
                {
                    var body = ea.body;
                    var message = encoding.utf8.getstring(body);

                    console.writeline($"收到消息 -- {message}");

                    system.threading.thread.sleep(2000);
                };

                channel.basicconsume(queue: "test1",
                                     autoack: true,
                                     consumer: consumer);

                console.readline();
            }

        }

*交换器、队列必须先申明,两样都存在后才能进行绑定。 

 

 

生产者向队列推送消息,队列中展现状态为ready的数据就是未被消费的消息。

推送90个消息:

 

队列中存了90个未应答的消息

 

打开消费者:

发现现在消息已经全部被自动应答,队列已清空。

 

再启动消费者:

如预料,空空一片。

 

        小结 :开启消费者(开启自动应答),发现队列中状态为ready的消息全部被应答,队列中状态为ready的消息清空,不等消费者处理完这些消息,关闭消费者,然后再开启消费者,消费者不会再收到消息,出现消费者"未处理"完的消息丢失的问题。

 

同之前先屯90个消息。

然后关闭自动应答。

 

开启消费者:

 

消息状态一次性全部变成unacked。 因为没有写手动处理消息的逻辑,所以unacked状态的消息不会变少。

 

然后关闭消费者:

rabbitmq 未删除无应答的消息,消息重新转为ready状态,继续等待连接消费者处理。

 

再开启消费者:

没有出现丢失未处理完消息的情况。

 

     小结:开启消费者(关闭自动应答),发现队列中状态为ready的消息状态全部转变为unacked,队列中状态为ready的消息清空,随消费者应答,队列中状态为unacked的消息逐渐减少,关闭消费者,发现队列中状态为unacked的消息重新改变回ready状态,

       

  结论:

  关闭自动应答可避免这种消息"丢失的情况"。

  另外在开启自动应答 ack=true 的情况下,需要保证一定有消费者在线,才能保证消息都被接收处理。开启手动应答必然消耗更多资源,因为 rabbitmq 需要根据应答标号去删除队列中对应的消息。

 

以上仅个人理解,若有错误,欢迎指正~

  

如对本文有疑问,请在下面进行留言讨论,广大热心网友会与你互动!! 点击进行留言回复

相关文章:

验证码:
移动技术网