当前位置: 移动技术网 > IT编程>开发语言>.net > ASP.NET Core 2.0利用MassTransit集成RabbitMQ

ASP.NET Core 2.0利用MassTransit集成RabbitMQ

2018年09月05日  | 移动技术网IT编程  | 我要评论

七只鸭子电影网,双色球2013007,北京限号查询

在asp.net core上利用masstransit来集成使用rabbitmq真的很简单,代码也很简洁。近期因为项目需要,我便在这基础上再次进行了封装,抽成了公共方法,使得使用rabbitmq的调用变得更方便简洁。那么,就让咱们来瞧瞧其魅力所在吧。

 

masstransit

先看看masstransit是个什么宝贝(masstransit官网的简介):

masstransit是一个免费的开源轻量级消息总线,用于使用.net框架创建分布式应用程序。masstransit在现有的顶级消息传输上提供了一系列广泛的功能,从而以开发人员友好的方式使用基于消息的会话模式异步连接服务。基于消息的通信是实现面向服务的体系结构的可靠且可扩展的方式。

通俗描述:

masstransit就是一套基于消息服务的高级封装类库,下游可联接rabbitmq、redis、mongodb等服务。

github官网:https://github.com/masstransit/masstransit

 

rabbitmq

rabbitmq是成熟的mq队列服务,是由 erlang 语言开发的 amqp 的开源实现。关于介绍rabbitmq的中文资料也很多,有需要可以自行查找。我这里贴出其官网与下载安装的链接,如下:

官网:

下载与安装:

 

实现代码

通过上面的介绍,咱们已对masstransit与rabbitmq有了初步了解,那么现在来看看如何在asp.net core上优雅的使用rabbitmq吧。

1、创建一个名为“rabbitmqhelp.cs”公共类,用于封装操作rabbitmq的公共方法,并通过nuget来管理并引用“masstransit”与“masstransit.rabbitmq”类库。

2、“rabbitmqhelp.cs”公共类主要对外封装两个静态方法,其代码如下:

  1 using masstransit;
  2 using masstransit.rabbitmqtransport;
  3 using system;
  4 using system.collections.generic;
  5 using system.text;
  6 using system.threading.tasks;
  7 
  8 namespace lezhima.comm
  9 {
 10     /// <summary>
 11     /// rabbitmq公共操作类,基于masstransit库
 12     /// </summary>
 13     public class rabbitmqhelp
 14     {
 15         #region 交换器
 16 
 17         /// <summary>
 18         /// 操作日志交换器
 19         /// 同时需在rabbitmq的管理后台创建同名交换器
 20         /// </summary>
 21         public static readonly string actionlogexchange = "lezhima.actionlogexchange";
 22 
 23 
 24         #endregion
 25 
 26 
 27         #region 声明变量
 28 
 29         /// <summary>
 30         /// mq联接地址,建议放到配置文件
 31         /// </summary>
 32         private static readonly string mqurl = "rabbitmq://192.168.1.181/";
 33 
 34         /// <summary>
 35         /// mq联接账号,建议放到配置文件
 36         /// </summary>
 37         private static readonly string mquser = "admin";
 38 
 39         /// <summary>
 40         /// mq联接密码,建议放到配置文件
 41         /// </summary>
 42         private static readonly string mqpwd = "admin";
 43 
 44         #endregion
 45 
 46         /// <summary>
 47         /// 创建连接对象
 48         /// 不对外公开
 49         /// </summary>
 50         private static ibuscontrol createbus(action<irabbitmqbusfactoryconfigurator, irabbitmqhost> registrationaction = null)
 51         {
 52             //通过masstransit创建mq联接工厂
 53             return bus.factory.createusingrabbitmq(cfg =>
 54             {
 55                 var host = cfg.host(new uri(mqurl), hst =>
 56                 {
 57                     hst.username(mquser);
 58                     hst.password(mqpwd);
 59                 });
 60                 registrationaction?.invoke(cfg, host);
 61             });
 62         }
 63 
 64 
 65         /// <summary>
 66         /// mq生产者
 67         /// 这里使用fanout的交换类型
 68         /// </summary>
 69         /// <param name="obj"></param>
 70         public async static task pushmessage(string exchange, object obj)
 71         {
 72             var bus = createbus();
 73             var sendtouri = new uri($"{mqurl}{exchange}");
 74             var endpoint = await bus.getsendendpoint(sendtouri);
 75             await endpoint.send(obj);
 76         }
 77 
 78         /// <summary>
 79         /// mq消费者
 80         /// 这里使用fanout的交换类型
 81         /// consumer必需是实现iconsumer接口的类实例
 82         /// </summary>
 83         /// <param name="obj"></param>
 84         public static void receivemessage(string exchange, object consumer)
 85         {
 86             var bus = createbus((cfg, host) =>
 87             {
 88                 //从指定的消息队列获取消息 通过consumer来实现消息接收
 89                 cfg.receiveendpoint(host, exchange, e =>
 90                 {
 91                     e.instance(consumer);
 92                 });
 93             });
 94             bus.start();
 95         }
 96     }
 97 }
 98 

 

3、“rabbitmqhelp.cs”公共类已经有了mq“生产者”与“消费者”两个对外的静态公共方法,其中“生产者”方法可以在业务代码中直接调用,可传递json、对象等类型的参数向指定的交换器发送数据。而“消费者”方法是从指定交换器中进行接收绑定,但接收到的数据处理功能则交给了“consumer”类(因为在实际项目中,不同的数据有不同的业务处理逻辑,所以这里我们直接就通过iconsumer接口交给具体的实现类去做了)。那么,下面我们再来看看消费者里传递进来的“consumer”类的代码吧:

  1 using masstransit;
  2 using system;
  3 using system.collections.generic;
  4 using system.text;
  5 using system.threading.tasks;
  6 
  7 namespace lezhima.storage.consumer
  8 {
  9     /// <summary>
 10     /// 从mq接收并处理数据
 11     /// 实现masstransit的iconsumer接口
 12     /// </summary>
 13     public class logconsumer : iconsumer<actionlog>
 14     {
 15         /// <summary>
 16         /// 实现consume方法
 17         /// 接收并处理数据
 18         /// </summary>
 19         /// <param name="context"></param>
 20         /// <returns></returns>
 21         public task consume(consumecontext<actionlog> context)
 22         {
 23             return task.run(async () =>
 24             {
 25                 //获取接收到的对象
 26                 var amsg = context.message;
 27                 console.writeline($"recevied by consumer:{amsg}");
 28                 console.writeline($"recevied by consumer:{amsg.actionlogid}");
 29             });
 30         }
 31     }
 32 }
 33 

 

调用代码

1、生产者调用代码如下:

  1 /// <summary>
  2 /// 测试mq生产者
  3 /// </summary>
  4 /// <returns></returns>
  5 [httpget]
  6 public async task<mobiresult> addmessagetest()
  7 {
  8     //声明一个实体对象
  9     var model = new actionlog();
 10     model.actionlogid = guid.newguid();
 11     model.createtime = datetime.now;
 12     model.updatetime = datetime.now;
 13     //调用mq
 14     await rabbitmqhelp.pushmessage(rabbitmqhelp.actionlogexchange, model);
 15 
 16     return new mobiresult(1000, "操作成功");
 17 }

 

2、消费者调用代码如下:

  1 using lezhima.storage.consumer;
  2 using microsoft.extensions.configuration;
  3 using system;
  4 using system.io;
  5 
  6 namespace lezhima.storage
  7 {
  8     class program
  9     {
 10         static void main(string[] args)
 11         {
 12             var conf = new configurationbuilder()
 13               .setbasepath(directory.getcurrentdirectory())
 14               .addjsonfile("appsettings.json", true, true)
 15               .build();
 16 
 17             //调用接收者
 18             rabbitmqhelp.receivemessage(rabbitmqhelp.actionlogexchange,
 19              new logconsumer()
 20             );
 21 
 22             console.readline();
 23         }
 24     }
 25 }
 26 

 

总结

1、基于masstransit库使得我们使用rabbitmq变得更简洁、方便。而基于再次封装后,生产者与消费者将不需要关注具体的业务,也跟业务代码解耦了,更能适应项目的需要。

2、rabbitmq的交换器需在其管理后台自行创建,而这里使用的fanout类型是因为其发送速度最快,且能满足我的项目需要,各位可视自身情况选用不同的类型。fanout类型不会存储消息,必需要消费者绑定交换器后才会发送给消费者。

 

如对本文有疑问,请在下面进行留言讨论,广大热心网友会与你互动!! 点击进行留言回复

相关文章:

验证码:
移动技术网