当前位置: 移动技术网 > IT编程>开发语言>c# > C#队列学习笔记:RabbitMQ优先级队列

C#队列学习笔记:RabbitMQ优先级队列

2020年04月11日  | 移动技术网IT编程  | 我要评论

    一、引言

    在具体业务中可能会遇到一些要提前处理的消息,比如普通客户的消息按先进先出的顺序处理,vip客户的消息要提前处理。在rabbitmq中,消息优先级的实现方式是:在声明queue时设置队列的x-max-priority属性,然后在publish消息时,设置消息的优先级即可。

    rabbitmq优先级队列注意事项:

    1)rabbitmq3.5以后才支持优先级队列。

    2)只有当消费者不足,不能及时进行消费的情况下,优先级队列才会生效。

    3)优先级取值范围在0~9之间,数值越大则优先级越高。

    二、示例

    2.1、发送端(生产端)

    新建一个控制台项目send,并添加一个类rabbitmqconfig。

    class rabbitmqconfig
    {
        public static string host { get; set; }

        public static string virtualhost { get; set; }

        public static string username { get; set; }

        public static string password { get; set; }

        public static int port { get; set; }

        static rabbitmqconfig()
        {
            host = "192.168.2.242";
            virtualhost = "/";
            username = "hello";
            password = "world";
            port = 5672;
        }
    }
    class program
    {
        static void main(string[] args)
        {
            console.writeline("按任意键开始生产。");
            console.readline();
            prioritymessagepublish();
            console.readline();
        }

        private static void prioritymessagepublish()
        {
            const string messageprefix = "message_";
            const int publishmessagecount = 6;
            byte messagepriority = 0;

            var factory = new connectionfactory()
            {
                hostname = rabbitmqconfig.host,
                port = rabbitmqconfig.port,
                virtualhost = rabbitmqconfig.virtualhost,
                username = rabbitmqconfig.username,
                password = rabbitmqconfig.password,
                protocol = protocols.defaultprotocol
            };

            using (var connection = factory.createconnection())
            {
                using (var channel = connection.createmodel())
                {
                    //设置队列优先级,取值范围在0~255之间。
                    dictionary<string, object> dict = new dictionary<string, object>
                    {
                        { "x-max-priority", 255 }
                    };

                    //声明队列
                    channel.queuedeclare(queue: "priority", durable: true, exclusive: false, autodelete: false, arguments: dict);


                    //向该消息队列发送消息message
                    random random = new random();
                    for (int i = 0; i < publishmessagecount; i++)
                    {
                        var properties = channel.createbasicproperties();
                        messagepriority = (byte)random.next(0, 9);
                        properties.priority = messagepriority;//设置消息优先级,取值范围在0~9之间。
                        var message = messageprefix + i.tostring();
                        var body = encoding.utf8.getbytes(message);
                        channel.basicpublish(exchange: "", routingkey: "priority", basicproperties: properties, body: body);
                        console.writeline($"{datetime.now.tostring()} send {message} , priority {messagepriority}");
                    }
                }
            }
        }
    }

    2.2、接收端(消费端)

    新建一个控制台项目receive,按住alt键,将发送端rabbitmqconfig类拖一个快捷方式到receive项目中。

    class program
    {
        static void main(string[] args)
        {
            console.writeline("按任意键开始消费。");
            console.readline();
            prioritymessagesubscribe();
        }

        public static void prioritymessagesubscribe()
        {
            var factory = new connectionfactory()
            {
                hostname = rabbitmqconfig.host,
                username = rabbitmqconfig.username,
                password = rabbitmqconfig.password
            };

            using (var connection = factory.createconnection())
            {
                using (var channel = connection.createmodel())
                {
                    channel.basicqos(prefetchsize: 0, prefetchcount: 1, global: false);
                    var consumer = new eventingbasicconsumer(channel);
                    consumer.received += async (model, ea) =>
                    {
                        await task.run(() =>
                        {
                            var message = encoding.utf8.getstring(ea.body);
                            thread.sleep(1000 * 2);
                            channel.basicack(deliverytag: ea.deliverytag, multiple: false);//手动消息确认
                            console.writeline($"{datetime.now.tostring()} received {message}");
                        });
                    };
                    channel.basicconsume(queue: "priority", noack: false, consumer: consumer);//需要启用消息响应,否则priority无效。
                    console.readkey();
                }
            }
        }
    }

    2.3、运行结果

    从消费情况可以看出,message_2及message_3由于priority优先级最高都是7,所以它们会被最早消费,而message_5的priority是0,所以最后才被消费。

如对本文有疑问, 点击进行留言回复!!

相关文章:

验证码:
移动技术网