当前位置: 移动技术网 > IT编程>开发语言>.net > RabbitMQ与.net core(五) topic类型 与 headers类型 的Exchange

RabbitMQ与.net core(五) topic类型 与 headers类型 的Exchange

2019年01月09日  | 移动技术网IT编程  | 我要评论

有你的快乐 歌词,什坊事件,twice组合

1.topic类型的exchange

我们之前说过topic类型的exchange是direct类型的模糊查询模式,可以通过routkey来实现模糊消费message,topic的模糊匹配有两种模式:

1. 使用*来匹配一个单词

2.使用#来匹配0个或多个单词

我们来看代码

消费端

using rabbitmq.client;
using rabbitmq.client.events;
using system;
using system.collections.generic;
using system.text;
using system.threading;

namespace rabbitmqclient
{
    class program
    {
        private static readonly connectionfactory rabbitmqfactory = new connectionfactory()
        {
            hostname = "39.**.**.**",
            port = 5672,
            username = "root",
            password = "root",
            virtualhost = "/"
        };
        static void main(string[] args)
        {
            var exchangeall = "changeall";
            var queueman = "queueman";
            var quemankey = "man.#";

            using (iconnection conn = rabbitmqfactory.createconnection())
            using (imodel channel = conn.createmodel())
            {
                channel.exchangedeclare(exchangeall, type: "topic", durable: true, autodelete: false);
                channel.queuedeclare(queueman, durable: true, exclusive: false, autodelete: false);
                channel.queuebind(queueman, exchangeall, quemankey);

                channel.basicqos(prefetchsize: 0, prefetchcount: 50, global: false);
                eventingbasicconsumer consumer = new eventingbasicconsumer(channel);
                consumer.received += (model, ea) =>
                {
                    byte[] body = ea.body;
                    string message = encoding.utf8.getstring(body);
                    console.writeline( message);
                    channel.basicack(deliverytag: ea.deliverytag, multiple: false);
                };

                channel.basicconsume(queue: queueman, autoack: false, consumer: consumer);
                console.readline();
            }
        }
    }
}

生产者代码

using rabbitmq.client;
using system;
using system.collections.generic;
using system.text;
using system.threading;
using system.threading.tasks;

namespace rabbitmqconsole
{
    class program
    {
        static void main(string[] args)
        {
            connectionfactory factory = new connectionfactory();
            factory.hostname = "39.**.**.**";
            factory.port = 5672;
            factory.virtualhost = "/";
            factory.username = "root";
            factory.password = "root";

            var exchangeall = "changeall";
            //性别.姓氏.头发长度
            var keymana = "man.chen.long";
            var keymanb = "man.liu.long";
            var keymanc = "woman.liu.long";
            var keymand = "woman.chen.short";

            using (var connection = factory.createconnection())
            {
                using (var channel = connection.createmodel())
                {
                    channel.exchangedeclare(exchangeall, type: "topic", durable: true, autodelete: false);

                    var properties = channel.createbasicproperties();
                    properties.persistent = true;
                    //发布消息
                    channel.basicpublish(exchange: exchangeall,
                    routingkey: keymana,
                    basicproperties: properties,
                    body: encoding.utf8.getbytes(keymana));
                    channel.basicpublish(exchange: exchangeall,
                     routingkey: keymanb,
                     basicproperties: properties,
                     body: encoding.utf8.getbytes(keymanb));
                    channel.basicpublish(exchange: exchangeall,
                     routingkey: keymanc,
                     basicproperties: properties,
                     body: encoding.utf8.getbytes(keymanc));
                    channel.basicpublish(exchange: exchangeall,
                     routingkey: keymand,
                     basicproperties: properties,
                     body: encoding.utf8.getbytes(keymand));
                }
            }
        }
    }
}

我们先运行消费端再运行生产段,结果如下

消费端:

2.headers类型的exchange

生成者代码

using rabbitmq.client;
using system;
using system.collections.generic;
using system.text;
using system.threading;
using system.threading.tasks;

namespace rabbitmqconsole
{
    class program
    {
        static void main(string[] args)
        {
            connectionfactory factory = new connectionfactory();
            factory.hostname = "39.**.**.**";
            factory.port = 5672;
            factory.virtualhost = "/";
            factory.username = "root";
            factory.password = "root";

            var exchangeall = "changeheader";

            using (var connection = factory.createconnection())
            {
                using (var channel = connection.createmodel())
                {
                    channel.exchangedeclare(exchangeall, type: "headers", durable: true, autodelete: false);

                    var properties = channel.createbasicproperties();
                    properties.persistent = true;
                    properties.headers = new dictionary<string, object> {
                        { "sex","man"}
                    };
                    //发布消息
                    channel.basicpublish(exchange: exchangeall,
                    routingkey: "",
                    basicproperties: properties,
                    body: encoding.utf8.getbytes("hihihi"));
                }
            }
        }
    }
}

消费端代码

using rabbitmq.client;
using rabbitmq.client.events;
using system;
using system.collections.generic;
using system.text;
using system.threading;

namespace rabbitmqclient
{
    class program
    {
        private static readonly connectionfactory rabbitmqfactory = new connectionfactory()
        {
            hostname = "39.**.**.**",
            port = 5672,
            username = "root",
            password = "root",
            virtualhost = "/"
        };
        static void main(string[] args)
        {
            var exchangeall = "changeheader";
            var queueman = "queueheader";

            using (iconnection conn = rabbitmqfactory.createconnection())
            using (imodel channel = conn.createmodel())
            {
                channel.exchangedeclare(exchangeall, type: "headers", durable: true, autodelete: false);
                channel.queuedeclare(queueman, durable: true, exclusive: false, autodelete: false);
                channel.queuebind(queueman, exchangeall, "",new dictionary<string, object> { { "sex","man" } });

                channel.basicqos(prefetchsize: 0, prefetchcount: 50, global: false);
                eventingbasicconsumer consumer = new eventingbasicconsumer(channel);
                consumer.received += (model, ea) =>
                {
                    byte[] body = ea.body;
                    string message = encoding.utf8.getstring(body);
                    console.writeline( message);
                    channel.basicack(deliverytag: ea.deliverytag, multiple: false);
                };

                channel.basicconsume(queue: queueman, autoack: false, consumer: consumer);
                console.readline();
            }
        }
    }
}

 

如对本文有疑问,请在下面进行留言讨论,广大热心网友会与你互动!! 点击进行留言回复

相关文章:

验证码:
移动技术网