当前位置: 移动技术网 > IT编程>开发语言>c# > C#队列学习笔记:RabbitMQ使用多线程提高消费吞吐率

C#队列学习笔记:RabbitMQ使用多线程提高消费吞吐率

2020年04月14日  | 移动技术网IT编程  | 我要评论

    一、引言

    使用工作队列的一个好处就是它能够并行的处理队列。如果堆积了很多任务,我们只需要添加更多的工作者(workers)就可以了,扩展很简单。本例使用多线程来创建多信道并绑定队列,达到多workers的目的。

    二、示例

    2.1、环境准备

    在nuget上安装rabbitmq.client。

    2.2、工厂类

    添加一个工厂类rabbitmqfactory:

    /// <summary>
    /// 多路复用技术(multiplexing)目的:为了避免创建多个tcp而造成系统资源的浪费和超载,从而有效地利用tcp连接。
    /// </summary>
    public static class rabbitmqfactory
    {
        private static iconnection sharedconnection;
        private static int channelcount { get; set; }
        private static readonly object _locker = new object();

        public static iconnection sharedconnection
        {
            get
            {
                if (channelcount >= 1000)
                {
                    if (sharedconnection != null && sharedconnection.isopen)
                    {
                        sharedconnection.close();
                    }
                    sharedconnection = null;
                    channelcount = 0;
                }
                if (sharedconnection == null)
                {
                    lock (_locker)
                    {
                        if (sharedconnection == null)
                        {
                            sharedconnection = getconnection();
                            channelcount++;
                        }
                    }
                }
                return sharedconnection;
            }
        }

        private static iconnection getconnection()
        {
            var factory = new connectionfactory
            {
                hostname = "192.168.2.242",
                username = "hello",
                password = "world",
                port = amqptcpendpoint.usedefaultport,//5672
                virtualhost = connectionfactory.defaultvhost,//使用默认值:"/"
                protocol = protocols.defaultprotocol,
                automaticrecoveryenabled = true
            };
            return factory.createconnection();
        }
    }

    2.3、主窗体

    代码如下:

    public partial class rabbitmqmultithreading : form
    {
        public delegate void listviewdelegate<t>(t obj);

        public rabbitmqmultithreading()
        {
            initializecomponent();
        }

        /// <summary>
        /// showmessage重载
        /// </summary>
        /// <param name="msg"></param>
        private void showmessage(string msg)
        {
            if (invokerequired)
            {
                begininvoke(new listviewdelegate<string>(showmessage), msg);
            }
            else
            {
                listviewitem item = new listviewitem(new string[] { datetime.now.tostring("yyyy/mm/dd hh:mm:ss ffffff"), msg });
                lvwmsg.items.insert(0, item);
            }
        }

        /// <summary>
        /// showmessage重载
        /// </summary>
        /// <param name="format"></param>
        /// <param name="args"></param>
        private void showmessage(string format, params object[] args)
        {
            if (invokerequired)
            {
                begininvoke(new methodinvoker(delegate ()
                {
                    listviewitem item = new listviewitem(new string[] { datetime.now.tostring("yyyy/mm/dd hh:mm:ss ffffff"), string.format(format, args) });
                    lvwmsg.items.insert(0, item);
                }));
            }
            else
            {
                listviewitem item = new listviewitem(new string[] { datetime.now.tostring("yyyy/mm/dd hh:mm:ss ffffff"), string.format(format, args) });
                lvwmsg.items.insert(0, item);
            }
        }

        /// <summary>
        /// 生产者
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>
        private void btnsend_click(object sender, eventargs e)
        {
            int messagecount = 100;
            var factory = new connectionfactory
            {
                hostname = "192.168.2.242",
                username = "hello",
                password = "world",
                port = amqptcpendpoint.usedefaultport,//5672
                virtualhost = connectionfactory.defaultvhost,//使用默认值:"/"
                protocol = protocols.defaultprotocol,
                automaticrecoveryenabled = true
            };
            using (var connection = factory.createconnection())
            {
                using (var channel = connection.createmodel())
                {
                    channel.queuedeclare(queue: "hello", durable: true, exclusive: false, autodelete: false, arguments: null);
                    string message = "hello world";
                    var body = encoding.utf8.getbytes(message);
                    for (int i = 1; i <= messagecount; i++)
                    {
                        channel.basicpublish(exchange: "", routingkey: "hello", basicproperties: null, body: body);
                        showmessage($"send {message}");
                    }
                }
            }
        }

        /// <summary>
        /// 消费者
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>
        private async void btnreceive_click(object sender, eventargs e)
        {
            random random = new random();
            int rallynumber = random.next(1, 1000);
            int channelcount = 0;

            await task.run(() =>
            {
                try
                {
                    int asynccount = 10;
                    list<task<bool>> tasks = new list<task<bool>>();
                    var connection = rabbitmqfactory.sharedconnection;
                    for (int i = 1; i <= asynccount; i++)
                    {
                        tasks.add(task.factory.startnew(() => messageworkitemcallback(connection, rallynumber)));
                    }
                    task.waitall(tasks.toarray());

                    string syncresultmsg = $"集结号 {rallynumber} 已吹起号角--" +
                        $"本次开启信道成功数:{tasks.count(s => s.result == true)}," +
                        $"本次开启信道失败数:{tasks.count() - tasks.count(s => s.result == true)}" +
                        $"累计开启信道成功数:{channelcount + tasks.count(s => s.result == true)}";
                    showmessage(syncresultmsg);
                }
                catch (exception ex)
                {
                    showmessage($"集结号 {rallynumber} 消费异常:{ex.message}");
                }
            });
        }

        /// <summary>
        /// 异步方法
        /// </summary>
        /// <param name="state"></param>
        /// <param name="rallynumber"></param>
        /// <returns></returns>
        private bool messageworkitemcallback(object state, int rallynumber)
        {
            bool syncresult = false;
            imodel channel = null;
            try
            {
                iconnection connection = state as iconnection;
                //不能使用using (channel = connection.createmodel())来创建信道,让rabbitmq自动回收channel。
                channel = connection.createmodel();
                channel.queuedeclare(queue: "hello", durable: true, exclusive: false, autodelete: false, arguments: null);
                channel.basicqos(prefetchsize: 0, prefetchcount: 1, global: false);
                var consumer = new eventingbasicconsumer(channel);
                consumer.received += (model, ea) =>
                {
                    var message = encoding.utf8.getstring(ea.body);
                    thread.sleep(1000);
                    showmessage($"集结号 {rallynumber} received {message}");
                    channel.basicack(deliverytag: ea.deliverytag, multiple: false);
                };
                channel.basicconsume(queue: "hello", autoack: false, consumer: consumer);
                syncresult = true;
            }
            catch (exception ex)
            {
                syncresult = false;
                showmessage(ex.message);
            }
            return syncresult;
        }
    }

    2.4、运行结果

    多点几次消费者即可增加信道,提升消费能力。

如对本文有疑问, 点击进行留言回复!!

相关文章:

验证码:
移动技术网