当前位置: 移动技术网 > IT编程>开发语言>c# > C#队列学习笔记:RabbitMQ延迟队列

C#队列学习笔记:RabbitMQ延迟队列

2020年04月12日  | 移动技术网IT编程  | 我要评论

    一、引言

    日常生活中,很多的app都有延迟队列的影子。比如在手机淘宝上,经常遇到app派发的限时消费红包,一般有几个小时或24小时不等。假如在红包倒计时的过程中,没有消费掉红包的话,红包会自动失效。假如上述行为使用rabbitmq延时队列来理解的话,就是在你收到限时消费红包的时候,手机淘宝会自动发一条延时消息到队列中以供消费。在规定时间内,则可正常消费,否则依ttl自动失效。

    在rabbitmq中,有两种方式来实现延时队列:一种是基于队列方式,另外一种是基于消息方式。

    二、示例

    2.1、发送端(生产端)

    新建一个控制台项目send,并添加一个类rabbitmqconfig。

    class rabbitmqconfig
    {
        public static string host { get; set; }

        public static string virtualhost { get; set; }

        public static string username { get; set; }

        public static string password { get; set; }

        public static int port { get; set; }

        static rabbitmqconfig()
        {
            host = "192.168.2.242";
            virtualhost = "/";
            username = "hello";
            password = "world";
            port = 5672;
        }
    }
    class program
    {
        static void main(string[] args)
        {
            console.writeline("c# rabbitmq实现延迟队列有以下两种方式:");
            console.writeline("1、基于队列方式实现延迟队列,请按1开始生产。");
            console.writeline("2、基于消息方式实现延迟队列,请按2开始生产。");

            string choosechar = console.readline();
            if (choosechar == "1")
            {
                delaymessagepublishbyqueueexpires();
            }
            else if (choosechar == "2")
            {
                delaymessagepublishbymessagettl();
            }
            console.readline();
        }

        /// <summary>
        /// 基于队列方式实现延迟队列
        /// 将队列中所有消息的ttl(time to live,即过期时间)设置为一样
        /// </summary>
        private static void delaymessagepublishbyqueueexpires()
        {
            const string messageprefix = "message_";
            const int publishmessagecount = 6;
            const int quequeexpiryseconds = 1000 * 30;
            const int messageexpiryseconds = 1000 * 10;

            var factory = new connectionfactory()
            {
                hostname = rabbitmqconfig.host,
                port = rabbitmqconfig.port,
                virtualhost = rabbitmqconfig.virtualhost,
                username = rabbitmqconfig.username,
                password = rabbitmqconfig.password,
                protocol = protocols.defaultprotocol
            };

            using (var connection = factory.createconnection())
            {
                using (var channel = connection.createmodel())
                {
                    //当同时指定了queue和message的ttl值,则两者中较小的那个才会起作用。
                    dictionary<string, object> dict = new dictionary<string, object>
                    {
                        { "x-expires", quequeexpiryseconds },//队列过期时间
                        { "x-message-ttl", messageexpiryseconds },//消息过期时间
                        { "x-dead-letter-exchange", "dead exchange 1" },//过期消息转向路由
                        { "x-dead-letter-routing-key", "dead routing key 1" }//过期消息转向路由的routing key
                    };

                    //声明队列
                    channel.queuedeclare(queue: "delay1", durable: true, exclusive: false, autodelete: false, arguments: dict);


                    //向该消息队列发送消息message
                    for (int i = 0; i < publishmessagecount; i++)
                    {
                        var message = messageprefix + i.tostring();
                        var body = encoding.utf8.getbytes(message);
                        channel.basicpublish(exchange: "", routingkey: "delay1", basicproperties: null, body: body);
                        thread.sleep(1000 * 2);
                        console.writeline($"{datetime.now.tostring()} send {message} messageexpiryseconds {messageexpiryseconds / 1000}");
                    }
                }
            }
        }

        /// <summary>
        /// 基于消息方式实现延迟队列
        /// 对队列中消息进行单独设置,每条消息的ttl可以不同。
        /// </summary>
        private static void delaymessagepublishbymessagettl()
        {
            const string messageprefix = "message_";
            const int publishmessagecount = 6;
            int messageexpiryseconds = 0;

            var factory = new connectionfactory()
            {
                hostname = rabbitmqconfig.host,
                port = rabbitmqconfig.port,
                virtualhost = rabbitmqconfig.virtualhost,
                username = rabbitmqconfig.username,
                password = rabbitmqconfig.password,
                protocol = protocols.defaultprotocol
            };

            using (var connection = factory.createconnection())
            {
                using (var channel = connection.createmodel())
                {
                    dictionary<string, object> dict = new dictionary<string, object>
                    {
                        { "x-dead-letter-exchange", "dead exchange 2" },//过期消息转向路由
                        { "x-dead-letter-routing-key", "dead routing key 2" }//过期消息转向路由的routing key
                    };

                    //声明队列
                    channel.queuedeclare(queue: "delay2", durable: true, exclusive: false, autodelete: false, arguments: dict);

                    //向该消息队列发送消息message
                    random random = new random();
                    for (int i = 0; i < publishmessagecount; i++)
                    {
                        messageexpiryseconds = i * 1000;
                        var properties = channel.createbasicproperties();
                        properties.expiration = messageexpiryseconds.tostring();
                        var message = messageprefix + i.tostring();
                        var body = encoding.utf8.getbytes(message);
                        channel.basicpublish(exchange: "", routingkey: "delay2", basicproperties: properties, body: body);
                        console.writeline($"{datetime.now.tostring()} send {message} messageexpiryseconds {messageexpiryseconds / 1000}");
                    }
                }
            }
        }
    }

    2.2、接收端(消费端)

    新建一个控制台项目receive,按住alt键,将发送端rabbitmqconfig类拖一个快捷方式到receive项目中。

    class program
    {
        static void main(string[] args)
        {
            console.writeline("c# rabbitmq实现延迟队列有以下两种方式:");
            console.writeline("1、基于队列方式实现延迟队列,请按1开始消费。");
            console.writeline("2、基于消息方式实现延迟队列,请按2开始消费。");

            string choosechar = console.readline();
            if (choosechar == "1")
            {
                delaymessageconsumebyqueueexpires();
            }
            else if (choosechar == "2")
            {
                delaymessageconsumebymessagettl();
            }
            console.readline();
        }

        public static void delaymessageconsumebyqueueexpires()
        {
            var factory = new connectionfactory()
            {
                hostname = rabbitmqconfig.host,
                port = rabbitmqconfig.port,
                virtualhost = rabbitmqconfig.virtualhost,
                username = rabbitmqconfig.username,
                password = rabbitmqconfig.password,
                protocol = protocols.defaultprotocol
            };

            using (var connection = factory.createconnection())
            {
                using (var channel = connection.createmodel())
                {
                    channel.exchangedeclare(exchange: "dead exchange 1", type: "direct");
                    string name = channel.queuedeclare().queuename;
                    channel.queuebind(queue: name, exchange: "dead exchange 1", routingkey: "dead routing key 1");

                    var consumer = new eventingbasicconsumer(channel);
                    consumer.received += (model, ea) =>
                    {
                        var message = encoding.utf8.getstring(ea.body);
                        console.writeline($"{datetime.now.tostring()} received {message}");
                    };
                    channel.basicconsume(queue: name, noack: true, consumer: consumer);
                    console.readkey();
                }
            }
        }

        public static void delaymessageconsumebymessagettl()
        {
            var factory = new connectionfactory()
            {
                hostname = rabbitmqconfig.host,
                port = rabbitmqconfig.port,
                virtualhost = rabbitmqconfig.virtualhost,
                username = rabbitmqconfig.username,
                password = rabbitmqconfig.password,
                protocol = protocols.defaultprotocol
            };

            using (var connection = factory.createconnection())
            {
                using (var channel = connection.createmodel())
                {
                    channel.exchangedeclare(exchange: "dead exchange 2", type: "direct");
                    string name = channel.queuedeclare().queuename;
                    channel.queuebind(queue: name, exchange: "dead exchange 2", routingkey: "dead routing key 2");

                    var consumer = new eventingbasicconsumer(channel);
                    consumer.received += (model, ea) =>
                    {
                        var message = encoding.utf8.getstring(ea.body);
                        console.writeline($"{datetime.now.tostring()} received {message}");
                    };
                    channel.basicconsume(queue: name, noack: true, consumer: consumer);
                    console.readkey();
                }
            }
        }
    }

    2.3、运行结果

-----------------------------------------------------------------------------------------------------------

如对本文有疑问, 点击进行留言回复!!

相关文章:

验证码:
移动技术网