当前位置: 移动技术网 > IT编程>开发语言>.net > RabbitMQ与.net core(三) fanout类型Exchange 与 消息的过期时间 与 队列的存活时间

RabbitMQ与.net core(三) fanout类型Exchange 与 消息的过期时间 与 队列的存活时间

2019年01月09日  | 移动技术网IT编程  | 我要评论

天津建材市场有哪些,海西金融网,水中迅捷之鲨鱼牙

上一篇我们讲了关于direct类型的exchange,这一片我们来了解一下fanout类型的exchange。

1.exchange的fanout类型

fanout类型的exchange的特点是会把消息发送给与之绑定的所有queue中,我们来测试一下。代码如下

using rabbitmq.client;
using system;
using system.text;
using system.threading;
using system.threading.tasks;

namespace rabbitmqconsole
{
    class program
    {
        static void main(string[] args)
        {
            connectionfactory factory = new connectionfactory();
            factory.hostname = "39.**.**.**";
            factory.port = 5672;
            factory.virtualhost = "/";
            factory.username = "root";
            factory.password = "root";

            var exchange = "change3";
            var route = "route2";
            var queue3 = "queue3";
            var queue4 = "queue4";
            var queue5 = "queue5";

            using (var connection = factory.createconnection())
            {
                using (var channel = connection.createmodel())
                {
                    channel.exchangedeclare(exchange, type: "fanout", durable: true, autodelete: false);
                    channel.queuedeclare(queue3, durable: true, exclusive: false, autodelete: false);
                    channel.queuebind(queue3, exchange, queue3);

                    channel.queuedeclare(queue4, durable: true, exclusive: false, autodelete: false);
                    channel.queuebind(queue4, exchange, queue4);

                    channel.queuedeclare(queue5, durable: true, exclusive: false, autodelete: false);
                    channel.queuebind(queue5, exchange, queue5);

                  
                    var props = channel.createbasicproperties();
                    props.persistent = true;
                    channel.basicpublish(exchange, route, true, props, encoding.utf8.getbytes("hello rabbit"));

                }
            }
        }
    }
}

运行代码,去可视化工具中查看一下

消费其中的一个

using rabbitmq.client;
using rabbitmq.client.events;
using system;
using system.text;
using system.threading;

namespace rabbitmqclient
{
    class program
    {
        private static readonly connectionfactory rabbitmqfactory = new connectionfactory()
        {
            hostname = "39.**.**.**",
            port = 5672,
            username = "root",
            password = "root",
            virtualhost = "/"
        };
        static void main(string[] args)
        {
            var exchange = "change3";
            var route = "route2";
            var queue = "queue3";


            using (iconnection conn = rabbitmqfactory.createconnection())
            using (imodel channel = conn.createmodel())
            {
                channel.exchangedeclare(exchange, "fanout", durable: true, autodelete: false);
                channel.queuedeclare(queue, durable: true, exclusive: false, autodelete: false);
                channel.queuebind(queue, exchange, route);

                channel.basicqos(prefetchsize: 0, prefetchcount: 50, global: false);
                eventingbasicconsumer consumer = new eventingbasicconsumer(channel);
                consumer.received += (model, ea) =>
                {
                    byte[] body = ea.body;
                    string message = encoding.utf8.getstring(body);
                    console.writeline( message+thread.currentthread.managedthreadid);
                    channel.basicack(deliverytag: ea.deliverytag, multiple: false);
                };

                channel.basicconsume(queue: queue, autoack: false, consumer: consumer);
                console.readline();
            }
        }
    }
}

结果如下

大家可以依次消费其他两个queue,这里就不演示了

2.消息的过期时间

我们在发送一些消息的时候,有时希望给消息设置一下过期时间,我们可以通过两种方式来设置

2.1设置队列的过期时间

using rabbitmq.client;
using system;
using system.collections.generic;
using system.text;
using system.threading;
using system.threading.tasks;

namespace rabbitmqconsole
{
    class program
    {
        static void main(string[] args)
        {
            connectionfactory factory = new connectionfactory();
            factory.hostname = "39.**.**.**";
            factory.port = 5672;
            factory.virtualhost = "/";
            factory.username = "root";
            factory.password = "root";

            var exchange = "change4";
            var route = "route2";
            var queue7 = "queue7";

            using (var connection = factory.createconnection())
            {
                using (var channel = connection.createmodel())
                {
                    channel.exchangedeclare(exchange, type: "fanout", durable: true, autodelete: false);
            //队列过期时间,单位毫秒
                    channel.queuedeclare(queue7, durable: true, exclusive: false, autodelete: false,arguments:new dictionary<string, object> { { "x-message-ttl", 8000 } });
                    channel.queuebind(queue7, exchange, queue7);

                    var props = channel.createbasicproperties();
                    props.persistent = true;
                    channel.basicpublish(exchange, route, true, props, encoding.utf8.getbytes("hello rabbit"));

                }
            }
        }
    }
}

这样过8秒去queue就看不到该消息了

2.2设置message的过期时间

using rabbitmq.client;
using system;
using system.collections.generic;
using system.text;
using system.threading;
using system.threading.tasks;

namespace rabbitmqconsole
{
    class program
    {
        static void main(string[] args)
        {
            connectionfactory factory = new connectionfactory();
            factory.hostname = "39.**.**.**";
            factory.port = 5672;
            factory.virtualhost = "/";
            factory.username = "root";
            factory.password = "root";

            var exchange = "change4";
            var route = "route2";
            var queue7 = "queue7";

            using (var connection = factory.createconnection())
            {
                using (var channel = connection.createmodel())
                {
                    channel.exchangedeclare(exchange, type: "fanout", durable: true, autodelete: false);
                    channel.queuedeclare(queue7, durable: true, exclusive: false, autodelete: false,arguments:new dictionary<string, object> { { "x-message-ttl", 8000 } });
                    channel.queuebind(queue7, exchange, queue7);

                    var props = channel.createbasicproperties();
            //message过期时间,单位毫秒
                    props.expiration = "30000";
                    props.persistent = true;
                    channel.basicpublish(exchange, route, true, props, encoding.utf8.getbytes("hello rabbit"));

                }
            }
        }
    }
}

我们发现还是8秒就过期了,说明如果同时设置了队列与消息的过期时间,则按照队列的时间过期。我们把队列的过期时间去掉重新试一下。

using rabbitmq.client;
using system;
using system.collections.generic;
using system.text;
using system.threading;
using system.threading.tasks;

namespace rabbitmqconsole
{
    class program
    {
        static void main(string[] args)
        {
            connectionfactory factory = new connectionfactory();
            factory.hostname = "39.**.**.**";
            factory.port = 5672;
            factory.virtualhost = "/";
            factory.username = "root";
            factory.password = "root";

            var exchange = "change4";
            var route = "route2";
            var queue7 = "queue7";

            using (var connection = factory.createconnection())
            {
                using (var channel = connection.createmodel())
                {
                    channel.exchangedeclare(exchange, type: "fanout", durable: true, autodelete: false);
                    channel.queuedeclare(queue7, durable: true, exclusive: false, autodelete: false);
                    channel.queuebind(queue7, exchange, queue7);

                    var props = channel.createbasicproperties();
                    props.expiration = "30000";
                    props.persistent = true;
                    channel.basicpublish(exchange, route, true, props, encoding.utf8.getbytes("hello rabbit"));

                }
            }
        }
    }
}

3.队列生存时间

我们还可以设置一个队列的生存时间

using rabbitmq.client;
using system;
using system.collections.generic;
using system.text;
using system.threading;
using system.threading.tasks;

namespace rabbitmqconsole
{
    class program
    {
        static void main(string[] args)
        {
            connectionfactory factory = new connectionfactory();
            factory.hostname = "39.**.**.**";
            factory.port = 5672;
            factory.virtualhost = "/";
            factory.username = "root";
            factory.password = "root";

            var exchange = "change4";
            var route = "route2";
            var queue8 = "queue8";

            using (var connection = factory.createconnection())
            {
                using (var channel = connection.createmodel())
                {
                    channel.exchangedeclare(exchange, type: "fanout", durable: true, autodelete: false);
                    channel.queuedeclare(queue8, durable: true, exclusive: false, autodelete: false,arguments: new dictionary<string, object> {
                        { "x-expires",10000} //设置当前队列的过期时间为10000毫秒
                    });
                    channel.queuebind(queue8, exchange, queue8);

                    var props = channel.createbasicproperties();
                    props.persistent = true;
                    channel.basicpublish(exchange, route, true, props, encoding.utf8.getbytes("hello rabbit"));

                }
            }
        }
    }
}

这样10秒后队列就消失了

如对本文有疑问,请在下面进行留言讨论,广大热心网友会与你互动!! 点击进行留言回复

相关文章:

验证码:
移动技术网