当前位置: 移动技术网 > IT编程>开发语言>c# > C#队列学习笔记:MSMQ入门二

C#队列学习笔记:MSMQ入门二

2020年03月27日  | 移动技术网IT编程  | 我要评论

    一、引言

    按照专用队列解释: machinename\private$\queuename,只针对于本机的程序才可以调用的队列,有些情况下为了安全起见定义为私有队列。所以刚开始的时候认为,要想访问远程消息队列,只能使用公共队列。但是后来发现,公共队列依赖domain controller(域控),在实际部署的时候,要求使用消息队列的应用一定要在某个域中,有些太苛刻!后来发现,私有队列也是可以远程访问的。(很困惑为什么私有队列只能本地访问,这句话,到处都能看到?!)

    二、工作组下的本地c/s

    2.1、项目建立

    新建4个项目:

    2.2、项目代码

    2.2.1、model项目

    /// <summary>
    /// 消息队列实体
    /// </summary>
    [serializable]
    public class mqmessage
    {
        /// <summary>
        /// 对应message的label
        /// </summary>
        public string label { get; set; }

        /// <summary>
        /// 对应message的body,commandtype为操作类型,list<string>为操作列表。
        /// </summary>
        public dictionary<commandtype, list<string>> body { get; set; } = new dictionary<commandtype, list<string>>();

        /// <summary>
        /// 无参构造函数
        /// </summary>
        public mqmessage()
        {
        }

        /// <summary>
        /// 有参构造函数
        /// </summary>
        /// <param name="label"></param>
        /// <param name="body"></param>
        public mqmessage(string label, dictionary<commandtype, list<string>> body)
        {
            label = label;
            body = body;
        }
    }

    /// <summary>
    /// 操作类型
    /// </summary>
    public enum commandtype
    {
        create = 1, //创建
        update = 2, //更新
        delete = 3  //删除
    }

    2.2.2、common项目

    /// <summary>
    /// 日志帮助类
    /// </summary>
    public static class loghelper
    {
        private static readonly string errlogsavepath = configurationmanager.appsettings["errlogsavepath"] ?? appdomain.currentdomain.basedirectory;

        /// <summary>
        /// 异常日志方法重载
        /// </summary>
        /// <param name="ex">异常信息</param>
        public static void writelog(exception ex)
        {
            writelog(geterrmsg(ex));
        }

        /// <summary>
        /// 异常日志方法重载
        /// </summary>
        /// <param name="message">日志内容</param>
        public static void writelog(string message)
        {
            writelog(errlogsavepath, message);
        }

        /// <summary>
        /// 异常日志方法重载
        /// </summary>
        /// <param name="filepath">日志文件路径</param>
        /// <param name="message">日志内容</param>
        public static void writelog(string filepath, string message)
        {
            try
            {
                if (!directory.exists(filepath))
                {
                    directory.createdirectory(filepath);
                }
                string filename = datetime.now.tostring("yyyy-mm-dd") + ".txt";
                using (streamwriter sw = new streamwriter(filepath + "\\" + filename, true))
                {
                    sw.writeline("--------------------------------------------");
                    sw.writeline($"{datetime.now.tolongtimestring()}:{datetime.now.millisecond}\t{message}");
                    sw.close();
                }
            }
            catch (exception ex)
            {
                throw new exception(geterrmsg(ex));
            }
        }

        /// <summary>
        /// 获取异常详细信息
        /// </summary>
        /// <param name="ex"></param>
        /// <returns></returns>
        private static string geterrmsg(exception ex)
        {
            string errmessage = "";
            for (exception tempexception = ex; tempexception != null; tempexception = tempexception.innerexception)
            {
                errmessage += tempexception.message + environment.newline + environment.newline;
            }
            errmessage += ex.tostring();
            return errmessage;
        }
    }
    /// <summary>
    /// 消息队列管理器
    /// </summary>
    public class mqmanager : idisposable
    {
        private messagequeue _mq = null;
        private readonly linktype linktype = linktype.localhost;    //链接类型,远程时使用linktype.remoteserver。
        private readonly string remoteserver = "192.168.2.165";     //远程服务器ip地址

        public static mqmanager linkserver { get; } = new mqmanager();

        /// <summary>
        /// 初始化函数
        /// </summary>
        /// <param name="linktype">链接类型</param>
        public void mqmanagerinit(linktype linktype)
        {
            if (_mq == null)
            {
                string _path;
                if (linktype == linktype.localhost)
                {
                    _path = @".\private$\" + (configurationmanager.appsettings["msmqname"] ?? "helloworld");
                }
                else
                {
                    _path = "formatname:direct=tcp:" + remoteserver + @"\private$\" + (configurationmanager.appsettings["msmqname"] ?? "helloworld");
                }
                _mq = new messagequeue(_path)
                {
                    formatter = new binarymessageformatter()
                };
            }
        }

        /// <summary>
        /// 有参构造函数
        /// </summary>
        public mqmanager()
        {
            mqmanagerinit(linktype);
        }

        /// <summary>
        /// 发送消息队列(事务)
        /// </summary>
        /// <param name="message"></param>
        public void send(mqmessage message)
        {
            messagequeuetransaction transaction = new messagequeuetransaction();
            transaction.begin();
            _mq.send(message.body, message.label, transaction);
            transaction.commit();
        }

        /// <summary>
        /// 接收消息队列
        /// </summary>
        /// <returns></returns>
        public message receive()
        {
            message msg = null;
            try
            {
                msg = _mq.receive(new timespan(0, 0, 1));
            }
            catch (exception ex)
            {
                throw new exception(ex.message);
            }

            return msg;
        }

        /// <summary>
        /// 释放资源
        /// </summary>
        public void dispose()
        {
            if (_mq != null)
            {
                _mq.close();
                _mq.dispose();
                _mq = null;
            }
        }
    }

    /// <summary>
    /// 链接类型
    /// </summary>
    public enum linktype
    {
        localhost = 1,      //本地服务器
        remoteserver = 2    //远程服务器
    }

    2.2.3、send项目

    class program
    {
        static void main(string[] args)
        {
            mqmessage mqmessage = new mqmessage();
            list<string> list = new list<string>();

            console.writeline("请输入内容按回车发送,多个内容请用英文逗号隔开,退出请输入exit。");
            string receivekey = console.readline();

            while (receivekey.tolower() != "exit")
            {
                if (receivekey.length > 0)
                {
                    mqmessage.label = guid.newguid().tostring();

                    list.clear();
                    list = receivekey.split(new char[] { ',' }).tolist();
                    mqmessage.body.clear();
                    mqmessage.body.add(commandtype.create, list);
                    try
                    {
                        mqmanager.linkserver.send(mqmessage);
                        console.writeline("内容已发送成功。");
                    }
                    catch (exception ex)
                    {
                        console.writeline(ex.message);
                        loghelper.writelog(ex);
                    }
                }
                receivekey = console.readline();
            }

            mqmanager.linkserver.dispose();
        }
    }

    2.2.4、receive项目

    /// <summary>
    /// 接收消息队列管理(线程)
    /// </summary>
    public class receivemanager : idisposable
    {
        private thread _thread = null;

        public static receivemanager instance { get; set; } = new receivemanager();

        /// <summary>
        /// 开始
        /// </summary>
        public void start()
        {
            startreceive();
        }

        /// <summary>
        /// 接收线程
        /// </summary>
        private void startreceive()
        {
            _thread = new thread(new threadstart(receive))
            {
                name = "receivethread",
                isbackground = true
            };
            _thread.start();
        }

        /// <summary>
        /// 接收线程调用方法
        /// </summary>
        private void receive()
        {
            message msg = null;
            while (true)
            {
                try
                {
                    msg = mqmanager.linkserver.receive();
                    if (msg != null)
                    {
                        console.writeline("----------------------------------------------------");
                        console.writeline("lable: " + msg.label);
                        dictionary<commandtype, list<string>> keyvaluepairs = msg.body as dictionary<commandtype, list<string>>;
                        console.writeline("body commandtype: " + keyvaluepairs.keys.first());
                        console.writeline("body details: ");
                        foreach (var item in keyvaluepairs.values.first())
                        {
                            console.writeline(item);
                        }
                        console.writeline("----------------------------------------------------");
                    }
                }
                catch (exception ex)
                {
                    console.writeline(ex.message);
                    loghelper.writelog(ex);
                }
                thread.sleep(1000);
            }
        }

        /// <summary>
        /// 结束
        /// </summary>
        public void stop()
        {
            dispose();
        }

        /// <summary>
        /// 释放资源
        /// </summary>
        public void dispose()
        {
            try
            {
                if (_thread != null)
                {
                    _thread.abort();
                    _thread.join();
                    _thread = null;
                }

                mqmanager.linkserver.dispose();
            }
            catch (exception ex)
            {
                console.writeline(ex.message);
            }
        }
    }
    class program
    {
        static void main(string[] args)
        {
            receivemanager.instance.start();
            console.writeline("退出请输入exit");
            string receivekey = console.readline();
            while (receivekey.tolower() != "exit")
            {
                receivekey = console.readline();
            }
            receivemanager.instance.stop();
            console.read();
        }
    }

    2.3、运行测试

    客户端发送hello,world:

    服务端接收到的信息:

    三、工作组下的远程c/s

    3.1、代码调整

    工作组下的远程c/s,代码已经在上面的示例中提供,将common\mqmanager.cs下的:

    private readonly linktype linktype = linktype.localhost;改成private readonly linktype linktype = linktype.remoteserver;即可。

    3.2、访问权限

    既然要与远程服务器交互(发送/接收)队列信息,首当其冲的是访问权限问题,没有权限,一切免谈。

    下面讲一下远程服务器(代码中的192.168.2.165,win7系统)要设置的内容:

    3.2.1、在运行中输入compmgmt.msc->服务和应用程序->消息队列->右键属性->服务器安全性->禁用未经身份验证的 rpc 调用->把勾勾去掉->应用。

    3.2.2、在消息队列->专用队列->新建一个代码中用到的helloworld队列,勾上事务性->确定。

    为什么要手工建helloworld消息队列?因为要对这个队列进行匿名访问授权,后面会讲到。至于事务性这个勾,这个要与代码相一致。因为本示例中使用了messagequeuetransaction来发送事务信息,所以必须得勾上这个勾,不然的话,发送时没有任何的报错信息,但是服务器就是收不到队列信息。

    3.2.3、专用队列->helloworld->右键属性->安全->anonymous logon->完全控制->应用。

    3.2.4、在运行中输入regedit->hkey_local_machine\software\microsoft\msmq\parameters\security->新建两个dword值:allownonauthenticatedrpc、newremotereadserverdenyworkgroupclient->分别双击将数值数据改成1。

    3.2.5、关于防火墙,我是关闭了的,假如您的电脑防火墙是打开了的话,请检查一下message queuing是不是被允许的?

    3.3、运行测试

    客户端发送a,b,c,d:

    服务器端接收到的信息:

 

    参考自:

    

    

    

    

如对本文有疑问, 点击进行留言回复!!

相关文章:

验证码:
移动技术网