当前位置: 移动技术网 > IT编程>开发语言>Java > rabbitMq 初步

rabbitMq 初步

2019年04月30日  | 移动技术网IT编程  | 我要评论

rabbitmq的工作原理

它的基本结构

组成部分说明如下:

broker:消息队列服务进程,此进程包括两个部分:exchange和queue。

exchange:消息队列交换机,按一定的规则将消息路由转发到某个队列,对消息进行过虑。

queue:消息队列,存储消息的队列,消息到达队列并转发给指定的消费方。

producer:消息生产者,即生产方客户端,生产方客户端将消息发送到mq。

consumer:消息消费者,即消费方客户端,接收mq转发的消息。

 

maven举例配置

<dependency>
<groupid>com.rabbitmq</groupid>
<artifactid>amqp‐client</artifactid>
<version>4.0.3</version><!‐‐此版本与spring boot 1.5.9版本匹配‐‐>
</dependency>
<dependency>
<groupid>org.springframework.boot</groupid>
<artifactid>spring‐boot‐starter‐logging</artifactid>
</dependency>

生产者举例demo

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.qualifier;
import org.springframework.context.annotation.bean;
import org.springframework.context.annotation.configuration;

@configuration
public class rabbitmqconfig {
    public static final string queue_inform_email = "queue_inform_email";
    public static final string queue_inform_sms = "queue_inform_sms";
    public static final string exchange_topics_inform="exchange_topics_inform";
    public static final string routingkey_email="inform.#.email.#";
    public static final string routingkey_sms="inform.#.sms.#";

    //声明交换机
    @bean(exchange_topics_inform)
    public exchange exchange_topics_inform(){
        //durable(true) 持久化,mq重启之后交换机还在
        return exchangebuilder.topicexchange(exchange_topics_inform).durable(true).build();
    }

    //声明queue_inform_email队列
    @bean(queue_inform_email)
    public queue queue_inform_email(){
        return new queue(queue_inform_email);
    }
    //声明queue_inform_sms队列
    @bean(queue_inform_sms)
    public queue queue_inform_sms(){
        return new queue(queue_inform_sms);
    }

    //routingkey_email队列绑定交换机,指定routingkey
    @bean
    public binding binding_queue_inform_email(@qualifier(queue_inform_email) queue queue,
                                              @qualifier(exchange_topics_inform) exchange exchange){
        return bindingbuilder.bind(queue).to(exchange).with(routingkey_email).noargs();
    }
    //routingkey_sms队列绑定交换机,指定routingkey
    @bean
    public binding binding_routingkey_sms(@qualifier(queue_inform_sms) queue queue,
                                              @qualifier(exchange_topics_inform) exchange exchange){
        return bindingbuilder.bind(queue).to(exchange).with(routingkey_sms).noargs();
    }
}

 

消费者举例demo

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.qualifier;
import org.springframework.context.annotation.bean;
import org.springframework.context.annotation.configuration;

@configuration
public class rabbitmqconfig {
    public static final string queue_inform_email = "queue_inform_email";
    public static final string queue_inform_sms = "queue_inform_sms";
    public static final string exchange_topics_inform="exchange_topics_inform";
    public static final string routingkey_email="inform.#.email.#";
    public static final string routingkey_sms="inform.#.sms.#";

    //声明交换机
    @bean(exchange_topics_inform)
    public exchange exchange_topics_inform(){
        //durable(true) 持久化,mq重启之后交换机还在
        return exchangebuilder.topicexchange(exchange_topics_inform).durable(true).build();
    }

    //声明queue_inform_email队列
    @bean(queue_inform_email)
    public queue queue_inform_email(){
        return new queue(queue_inform_email);
    }
    //声明queue_inform_sms队列
    @bean(queue_inform_sms)
    public queue queue_inform_sms(){
        return new queue(queue_inform_sms);
    }

    //routingkey_email队列绑定交换机,指定routingkey
    @bean
    public binding binding_queue_inform_email(@qualifier(queue_inform_email) queue queue,
                                              @qualifier(exchange_topics_inform) exchange exchange){
        return bindingbuilder.bind(queue).to(exchange).with(routingkey_email).noargs();
    }
    //routingkey_sms队列绑定交换机,指定routingkey
    @bean
    public binding binding_routingkey_sms(@qualifier(queue_inform_sms) queue queue,
                                              @qualifier(exchange_topics_inform) exchange exchange){
        return bindingbuilder.bind(queue).to(exchange).with(routingkey_sms).noargs();
    }
}

工作模式 

rabbitmq有以下几种工作模式 :

1、work queues

2、publish/subscribe

3、routing

4、topics

5、header

6、rpc

 

work queues

work queues与入门程序相比,多了一个消费端,两个消费端共同消费同一个队列中的消息。

应用场景:对于 任务过重或任务较多情况使用工作队列可以提高任务处理的速度。

测试:

1、使用入门程序,启动多个消费者。

2、生产者发送多个消息。

结果:

1、一条消息只会被一个消费者接收;

2、rabbit采用轮询的方式将消息是平均发送给消费者的;

3、消费者在处理完某条消息后,才会收到下一条消息。

 

publish/subscribe 发布订阅模式

发布订阅模式:

1、每个消费者监听自己的队列。

2、生产者将消息发给broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收

到消息

 

routin

路由模式:

1、每个消费者监听自己的队列,并且设置routingkey。

2、生产者将消息发给交换机,由交换机根据routingkey来转发消息到指定的队列。

这是一种非常灵活的模式,经常被用到

 

topics

 

 

路由模式:

1、每个消费者监听自己的队列,并且设置带统配符的routingkey。

2、生产者将消息发给broker,由交换机根据routingkey来转发消息到指定的队列。

 

header模式

header模式与routing不同的地方在于,header模式取消routingkey,使用header中的 key/value(键值对)匹配

队列。

案例:

根据用户的通知设置去通知用户,设置接收email的用户只接收email,设置接收sms的用户只接收sms,设置两种

通知类型都接收的则两种通知都有效。

 

生产者demo:

 

map<string, object> headers_email = new hashtable<string, object>();
headers_email.put("inform_type", "email");
map<string, object> headers_sms = new hashtable<string, object>();
headers_sms.put("inform_type", "sms");
channel.queuebind(queue_inform_email,exchange_headers_inform,"",headers_email);
channel.queuebind(queue_inform_sms,exchange_headers_inform,"",headers_sms);

通知demo :

string message = "email inform to user"+i;
map<string,object> headers = new hashtable<string, object>();
headers.put("inform_type", "email");//匹配email通知消费者绑定的header
//headers.put("inform_type", "sms");//匹配sms通知消费者绑定的header
amqp.basicproperties.builder properties = new amqp.basicproperties.builder();
properties.headers(headers);
//email通知
channel.basicpublish(exchange_headers_inform, "", properties.build(), message.getbytes());

发送邮件消费者 : 

channel.exchangedeclare(exchange_headers_inform, builtinexchangetype.headers);
map<string, object> headers_email = new hashtable<string, object>();
headers_email.put("inform_email", "email");
//交换机和队列绑定
channel.queuebind(queue_inform_email,exchange_headers_inform,"",headers_email);
//指定消费队列
channel.basicconsume(queue_inform_email, true, consumer);

 

rpc

 

 

rpc即客户端远程调用服务端的方法 ,使用mq可以实现rpc的异步调用,基于direct交换机实现,流程如下:

1、客户端即是生产者就是消费者,向rpc请求队列发送rpc调用消息,同时监听rpc响应队列。

2、服务端监听rpc请求队列的消息,收到消息后执行服务端的方法,得到方法返回的结果

3、服务端将rpc方法 的结果发送到rpc响应队列

4、客户端(rpc调用方)监听rpc响应队列,接收到rpc调用结果。

 

 

如对本文有疑问,请在下面进行留言讨论,广大热心网友会与你互动!! 点击进行留言回复

相关文章:

验证码:
移动技术网