当前位置: 移动技术网 > IT编程>开发语言>c# > C#队列学习笔记:RabbitMQ实现客户端相互通讯

C#队列学习笔记:RabbitMQ实现客户端相互通讯

2020年04月10日  | 移动技术网IT编程  | 我要评论
一、引言 fanout类型的Exchange,路由规则非常简单:它会把所有发送到该Exchange的消息,路由到所有与它绑定的Queue中。假设有一个聊天室,各个客户端都订阅在同一fanout exchange type,那每个客户端发送出来的消息,所有的客户端都能收到,因为大家都订阅了。此时,只需 ...

    一、引言

    fanout类型的exchange,路由规则非常简单:它会把所有发送到该exchange的消息,路由到所有与它绑定的queue中。假设有一个聊天室,各个客户端都订阅在同一fanout exchange type,那每个客户端发送出来的消息,所有的客户端都能收到,因为大家都订阅了。此时,只需要简单地限制一下,只有是与我有关的消息,才在聊天界面上显示。这样,即可达到相互通讯的效果。

    二、示例

    2.1、环境准备

    本示例使用easynetq来实现,请先在nuget上安装。

    2.2、实体类

    新建一个实体类messagebody:

    public class messagebody
    {
        public string fromuserid { get; set; }
        public string message { get; set; }
        public string touserid { get; set; }
    }

    2.3、主窗体

    新建一个chatmain窗体:

    代码如下:

    public partial class chatmain : form
    {
        public chatmain()
        {
            initializecomponent();
        }

        /// <summary>
        /// 客户端 a
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>
        private void button1_click(object sender, eventargs e)
        {
            chatwith chatwith = new chatwith(currentuserid: "usera")
            {
                startposition = formstartposition.centerscreen
            };
            chatwith.show();
        }

        /// <summary>
        /// 客户端 b
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>
        private void button2_click(object sender, eventargs e)
        {
            chatwith chatwith = new chatwith(currentuserid: "userb")
            {
                startposition = formstartposition.centerscreen
            };
            chatwith.show();
        }

        /// <summary>
        /// 客户端 c
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>
        private void button3_click(object sender, eventargs e)
        {

            chatwith chatwith = new chatwith(currentuserid: "userc")
            {
                startposition = formstartposition.centerscreen
            };
            chatwith.show();

        }

        /// <summary>
        /// 客户端 d
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>
        private void button4_click(object sender, eventargs e)
        {
            chatwith chatwith = new chatwith(currentuserid: "userd")
            {
                startposition = formstartposition.centerscreen
            };
            chatwith.show();
        }
    }

    2.4、客户端窗体

    新建一个chatwith窗体:

    代码如下:

    public partial class chatwith : form
    {
        public delegate void chatwithdelegate();
        public delegate void chatwithdelegate<t1>(t1 obj1);
        public delegate void chatwithdelegate<t1, t2>(t1 obj1, t2 obj2);

        public string currentuserid { get; }

        private ibus bus;
        public const string connstringmq = "host=192.168.2.242:5672,192.168.2.165:5672;virtualhost=/;username=hello;password=world";
        public const string fanoutexchange = "fanoutec";

        /// <summary>
        /// 有参构造函数
        /// </summary>
        /// <param name="currentuserid"></param>
        public chatwith(string currentuserid)
        {
            initializecomponent();

            //在多线程程序中,新创建的线程不能访问ui线程创建的窗口控件。
            //此时若想访问窗体的控件,可将窗体构造函数中的checkforillegalcrossthreadcalls设置为false。
            //这时线程就能安全地访问窗体控件了。
            checkforillegalcrossthreadcalls = false;

            currentuserid = currentuserid;
        }

        /// <summary>
        /// showmessage重载
        /// </summary>
        /// <param name="msg"></param>
        private void showmessage(string msg)
        {
            if (invokerequired)//invokerequired:当前线程不是创建控件的线程时为true
            {
                begininvoke(new chatwithdelegate<string>(showmessage), msg);
            }
            else
            {
                listviewitem item = new listviewitem(new string[] { datetime.now.tostring("yyyy-mm-dd hh:mm:ss"), msg });
                lvwreceivemsg.items.insert(0, item);
            }
        }

        /// <summary>
        /// showmessage重载
        /// </summary>
        /// <param name="touserid"></param>
        /// <param name="msg"></param>
        private void showmessage(string touserid, string msg)
        {
            if (invokerequired)
            {
                begininvoke(new chatwithdelegate<string, string>(showmessage), touserid, msg);
            }
            else
            {
                listviewitem item = new listviewitem(new string[] { datetime.now.tostring("yyyy-mm-dd hh:mm:ss"), touserid, msg });
                lvwreceivemsg.items.insert(0, item);
            }
        }

        /// <summary>
        /// 绑定队列并订阅
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>
        private void chatwith_load(object sender, eventargs e)
        {
            cmbonline.selectedindex = 0;
            text = text + $"[{currentuserid}]";

            //这里不能使用using,否则订阅者立即就释放了,订阅不到消息。
            bus = rabbithutch.createbus(connstringmq);
            {
                if (bus.isconnected)
                {
                    var exchange = bus.advanced.exchangedeclare(name: fanoutexchange, type: exchangetype.fanout);
                    var queue = bus.advanced.queuedeclare(name: $"{fanoutexchange}_queue_{currentuserid}");
                    bus.advanced.bind(exchange: exchange, queue: queue, routingkey: "");

                    bus.advanced.consume(queue, registration =>
                    {
                        registration.add<messagebody>((message, info) =>
                        {
                            if (message.body.touserid == currentuserid)
                            {
                                showmessage(message.body.fromuserid, message.body.message);
                            }
                        });
                    });
                }
                else
                {
                    showmessage("服务器连接失败。");
                }
            }
        }

        /// <summary>
        /// 发送
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>
        private void btnsend_click(object sender, eventargs e)
        {
            try
            {
                using (var bus = rabbithutch.createbus(connstringmq))
                {
                    if (bus.isconnected)
                    {
                        if (cmbonline.text == "*")//群发
                        {
                            foreach (var item in cmbonline.items.cast<string>().where(s => s != "*" && s != currentuserid))
                            {
                                var exchange = bus.advanced.exchangedeclare(name: fanoutexchange, type: exchangetype.fanout);
                                var messagebody = new messagebody
                                {
                                    fromuserid = currentuserid,
                                    message = txtsendmsg.text,
                                    touserid = item
                                };
                                bus.advanced.publish(exchange: exchange,
                                    routingkey: "",
                                    mandatory: false,
                                    message: new message<messagebody>(messagebody));
                            }
                        }
                        else//私聊
                        {
                            var exchange = bus.advanced.exchangedeclare(name: fanoutexchange, type: exchangetype.fanout);
                            var messagebody = new messagebody
                            {
                                fromuserid = currentuserid,
                                message = txtsendmsg.text,
                                touserid = cmbonline.text
                            };
                            bus.advanced.publish(exchange: exchange,
                                routingkey: "",
                                mandatory: false,
                                message: new message<messagebody>(messagebody));
                        }
                    }
                    else
                    {
                        showmessage("发送消息失败。");
                    }
                }
            }
            catch (exception ex)
            {
                showmessage(ex.message);
            }
        }

        /// <summary>
        /// 关闭
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>
        private void btnclose_click(object sender, eventargs e)
        {
            close();
        }

        /// <summary>
        /// 窗体关闭事件
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>
        private void chatwith_formclosed(object sender, formclosedeventargs e)
        {
            bus?.dispose();
        }
    }

    2.5、运行结果

如您对本文有疑问或者有任何想说的,请点击进行留言回复,万千网友为您解惑!

相关文章:

验证码:
移动技术网