一起游侠,暗香诀,松耸菌
producer:消息的生产者,也就是创建消息的对象
exchange:消息的接受者,也就是用来接收消息的对象,exchange接收到消息后将消息按照规则发送到与他绑定的queue中。下面我们来定义一个producer与exchange。
using rabbitmq.client; namespace rabbitmqconsole { class program { static void main(string[] args) { connectionfactory factory = new connectionfactory(); factory.hostname = "39.**.**.**"; factory.port = 5672; factory.virtualhost = "/"; factory.username = "root"; factory.password = "root"; var exchange = "change2"; var route = "route2"; var queue = "queue2"; using (var connection = factory.createconnection()) { using (var channel = connection.createmodel()) { channel.exchangedeclare(exchange, type:"direct", durable: true, autodelete: false); //创建exchange } } } } }
可以看到echange的参数有:
type:可选项为,fanout,direct,topic,headers。区别如下:
fanout:发送到所有与当前exchange绑定的queue中
direct:发送到与消息的routekey相同的rueue中
topic:fanout的模糊版本
headers:发送到与消息的header属性相同的queue中
durable:持久化
autodelete:当最后一个绑定(队列或者exchange)被unbind之后,该exchange自动被删除。
运行程序,可以在可视化界面看到change2
接下来我们可以创建与change2绑定的queue
using (var channel = connection.createmodel()) { channel.exchangedeclare(exchange, type: "direct", durable: true, autodelete: false); channel.queuedeclare(queue, durable: true, exclusive: false, autodelete: false); #创建queue2 channel.queuebind(queue, exchange, route); #将queue2绑定到exchange2 }
可以看到echange的参数有:
durable:持久化
exclusive:如果为true,则queue只在channel存在时存在,channel关闭则queue消失
autodelete:当最后一个绑定(队列或者exchange)被unbind之后,该exchange自动被删除。
去可视化界面看queue
using (var channel = connection.createmodel()) { channel.exchangedeclare(exchange, type: "direct", durable: true, autodelete: false); channel.queuedeclare(queue, durable: true, exclusive: false, autodelete: false); channel.queuebind(queue, exchange, route); var props = channel.createbasicproperties(); props.persistent = true; #持久化 channel.basicpublish(exchange, route, true, props, encoding.utf8.getbytes("hello rabbit")); }
using rabbitmq.client; using system; using system.text; namespace rabbitmqclient { class program { private static readonly connectionfactory rabbitmqfactory = new connectionfactory() { hostname = "39.**.**.**", port = 5672, username = "root", password = "root", virtualhost = "/" }; static void main(string[] args) { var exchange = "change2"; var route = "route2"; var queue = "queue2"; using (iconnection conn = rabbitmqfactory.createconnection()) using (imodel channel = conn.createmodel()) { channel.exchangedeclare(exchange, "direct", durable: true, autodelete: false); channel.queuedeclare(queue, durable: true, exclusive: false, autodelete: false); channel.queuebind(queue, exchange, route); while (true) { var message = channel.basicget(queue, true); #第二个参数说明自动释放消息,如为false需手动释放消息 if(message!=null) { var msgbody = encoding.utf8.getstring(message.body); console.writeline(string.format("***接收时间:{0},消息内容:{1}", datetime.now.tostring("yyyy-mm-dd hh:mm:ss"), msgbody)); } system.threading.thread.sleep(timespan.fromseconds(1)); } } } } }
运行查看结果
查看可视化界面
while (true) { var message = channel.basicget(queue, false);#设置为手动释放 if(message!=null) { var msgbody = encoding.utf8.getstring(message.body); console.writeline(string.format("***接收时间:{0},消息内容:{1}", datetime.now.tostring("yyyy-mm-dd hh:mm:ss"), msgbody)); } channel.basicack(message.deliverytag, false); #手动释放 system.threading.thread.sleep(timespan.fromseconds(1)); }
我们再发一条消息,然后开始消费,加个断点调试一下
查看一下queue中消息状态
然后直接取消调试,不让程序走到释放的那一步,再查看一下消息状态
这么说来只要不走到 channel.basicack(message.deliverytag, false);这一行,消息就不会被释放掉,我们让程序直接走到这一行代码,查看一下消息的状态
如图已经被释放了
while (true) { var message = channel.basicget(queue, false); if(message!=null) { var msgbody = encoding.utf8.getstring(message.body); console.writeline(string.format("***接收时间:{0},消息内容:{1}", datetime.now.tostring("yyyy-mm-dd hh:mm:ss"), msgbody)); console.writeline(message.deliverytag); #当前消息被处理的次序数 if (1==1) channel.basicreject(message.deliverytag, true); } system.threading.thread.sleep(timespan.fromseconds(1)); }
重新发送4条消息
开始消费
我们可以看到消息一直没有没消费,因为消息被处理之后又放到了队尾
using (iconnection conn = rabbitmqfactory.createconnection()) using (imodel channel = conn.createmodel()) { channel.exchangedeclare(exchange, "direct", durable: true, autodelete: false); channel.queuedeclare(queue, durable: true, exclusive: false, autodelete: false); channel.queuebind(queue, exchange, route); channel.basicqos(prefetchsize: 0, prefetchcount: 10, global: false); #一次接受10条消息,否则rabbit会把所有的消息一次性推到client,会增大client的负荷 eventingbasicconsumer consumer = new eventingbasicconsumer(channel); consumer.received += (model, ea) => { byte[] body = ea.body; string message = encoding.utf8.getstring(body); console.writeline( message+thread.currentthread.managedthreadid); channel.basicack(deliverytag: ea.deliverytag, multiple: false); }; channel.basicconsume(queue: queue, autoack: false, consumer: consumer); console.readline(); }
如对本文有疑问,请在下面进行留言讨论,广大热心网友会与你互动!! 点击进行留言回复
Net Core Web Api项目与在NginX下发布的方法
asp.net core3.1 引用的元包dll版本兼容性问题解决方案
IdentityServer4实现.Net Core API接口权限认证(快速入门)
ASP.NET Core MVC通过IViewLocationExpander扩展视图搜索路径的实现
网友评论