当前位置: 移动技术网 > IT编程>开发语言>.net > RabbitMQ与.net core(四) 消息的优先级 与 死信队列

RabbitMQ与.net core(四) 消息的优先级 与 死信队列

2019年01月09日  | 移动技术网IT编程  | 我要评论

翟鸿燊亮剑精神,线程数,高清mv下载网站

1.消息的优先级

假如现在有个需求,我们需要让一些优先级最高的通知推送到客户端,我们可以使用redis的sortedset,也可以使用我们今天要说的rabbit的消息优先级属性

producer代码

using rabbitmq.client;
using system;
using system.collections.generic;
using system.text;
using system.threading;
using system.threading.tasks;

namespace rabbitmqconsole
{
    class program
    {
        static void main(string[] args)
        {
            connectionfactory factory = new connectionfactory();
            factory.hostname = "39.**.**.**";
            factory.port = 5672;
            factory.virtualhost = "/";
            factory.username = "root";
            factory.password = "root";

            var exchange = "change4";
            var route = "route2";
            var queue9 = "queue9";

            using (var connection = factory.createconnection())
            {
                using (var channel = connection.createmodel())
                {
                    channel.exchangedeclare(exchange, type: "fanout", durable: true, autodelete: false);
            //x-max-priority属性必须设置,否则消息优先级不生效
                    channel.queuedeclare(queue9, durable: true, exclusive: false, autodelete: false,arguments: new dictionary<string, object> { { "x-max-priority", 50 } });
                    channel.queuebind(queue9, exchange, queue9);
                    while(true)
                    {
                        var messagestr = console.readline();
                        var messagepri = console.readline();
                        var props = channel.createbasicproperties();
                        props.persistent = true;
                        props.priority = (byte)int.parse(messagepri);//设置消息优先级
                        channel.basicpublish(exchange, route, true, props, encoding.utf8.getbytes(messagestr));
                    }
                }
            }
        }
    }
}

consumer代码

using rabbitmq.client;
using rabbitmq.client.events;
using system;
using system.collections.generic;
using system.text;
using system.threading;

namespace rabbitmqclient
{
    class program
    {
        private static readonly connectionfactory rabbitmqfactory = new connectionfactory()
        {
            hostname = "39.**.**.**",
            port = 5672,
            username = "root",
            password = "root",
            virtualhost = "/"
        };
        static void main(string[] args)
        {
            var exchange = "change4";
            var route = "route2";
            var queue9 = "queue9";


            using (iconnection conn = rabbitmqfactory.createconnection())
            using (imodel channel = conn.createmodel())
            {
                channel.exchangedeclare(exchange, "fanout", durable: true, autodelete: false);
                channel.queuedeclare(queue9, durable: true, exclusive: false, autodelete: false, arguments: new dictionary<string, object> { { "x-max-priority", 50 } });
                channel.queuebind(queue9, exchange, route);

                channel.basicqos(prefetchsize: 0, prefetchcount: 50, global: false);
                eventingbasicconsumer consumer = new eventingbasicconsumer(channel);
                consumer.received += (model, ea) =>
                {
                    byte[] body = ea.body;
                    string message = encoding.utf8.getstring(body);
                    console.writeline( message);
                    channel.basicack(deliverytag: ea.deliverytag, multiple: false);
                };

                channel.basicconsume(queue: queue9, autoack: false, consumer: consumer);
                console.readline();
            }
        }
    }
}

运行producer

在运行consumer

可以看出消息是按优先级消费的

2.死信队列

死信队列可以用来做容错机制,当我们的消息处理异常时我们可以把消息放入到死信队列中,以便后期处理,死信的产生有三种

1.消息被拒(basic.reject or basic.nack)并且没有重新入队(requeue=false);

2.当前队列中的消息数量已经超过最大长度。

3.消息在队列中过期,即当前消息在队列中的存活时间已经超过了预先设置的ttl(time to live)时间;

看代码

using rabbitmq.client;
using system;
using system.collections.generic;
using system.text;
using system.threading;
using system.threading.tasks;

namespace rabbitmqconsole
{
    class program
    {
        static void main(string[] args)
        {
            connectionfactory factory = new connectionfactory();
            factory.hostname = "39.**.**.**";
            factory.port = 5672;
            factory.virtualhost = "/";
            factory.username = "root";
            factory.password = "root";

            var exchangea = "changea";
            var routea = "routea";
            var queuea = "queuea";

            var exchanged = "changed";
            var routed = "routed";
            var queued = "queued";

            using (var connection = factory.createconnection())
            {
                using (var channel = connection.createmodel())
                {
                    channel.exchangedeclare(exchanged, type: "fanout", durable: true, autodelete: false);
                    channel.queuedeclare(queued, durable: true, exclusive: false, autodelete: false);
                    channel.queuebind(queued, exchanged, routed);

                    channel.exchangedeclare(exchangea, type: "fanout", durable: true, autodelete: false);
                    channel.queuedeclare(queuea, durable: true, exclusive: false, autodelete: false, arguments: new dictionary<string, object> {
                                         { "x-dead-letter-exchange",exchanged}, //设置当前队列的dlx
                                         { "x-dead-letter-routing-key",routed}, //设置dlx的路由key,dlx会根据该值去找到死信消息存放的队列
                                         { "x-message-ttl",10000} //设置消息的存活时间,即过期时间
                                         });
                    channel.queuebind(queuea, exchangea, routea);


                    var properties = channel.createbasicproperties();
                    properties.persistent = true;
                    //发布消息
                    channel.basicpublish(exchange: exchangea,
                                         routingkey: routea,
                                         basicproperties: properties,
                                         body: encoding.utf8.getbytes("message"));
                }
            }
        }
    }
}

这样10秒后消息过期,我们可以看到queued中有了消息


如对本文有疑问,请在下面进行留言讨论,广大热心网友会与你互动!! 点击进行留言回复

相关文章:

验证码:
移动技术网