当前位置: 移动技术网 > IT编程>开发语言>Java > 荐 Maven写rabbitMQ教程02

荐 Maven写rabbitMQ教程02

2020年07月16日  | 移动技术网IT编程  | 我要评论

上次我们已经说过了rabbitMQ的三种模式:

①简单模式Hello World

②工作队列模式Work Queue

③发布/订阅模式Publish/Subscribe

有意向的可以回顾下:
Maven写rabbitMQ教程01


废话不多说,直入正题
接下来我们再来学习两种模式


一丶Routing路由工作模式

在这里插入图片描述

路由模式:

  1. 每个消费者监听自己的队列,并且设置routingkey。
  2. 生产者将消息发给交换机,由交换机根据routingkey来转发消息到指定的队列。

代码:

providucer

  • 声明exchange_routing_inform交换机。
  • 声明两个队列并且绑定到此交换机,绑定时需要指定routingkey
  • 发送消息时需要指定routingkey
public class providucer01 {
    //    private static final String QUEUE = "helloworld";
    private static final String QUEUE_EMAIL = "queue_email";
    private static final String QUEUE_SMS = "queue_sms";
    //选择交换机类型为direct,后面同上
    private static final String EXCHANGEE_DIRECT = "exchange_direct";

    public static void main(String[] args) throws IOException, TimeoutException {
        //创建connectionfactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");//设置
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        connectionFactory.setVirtualHost("/");//rabbitmq默认虚拟机名称为“/”,虚拟机相当于一个独立的mq服务
        //因为最后需要关闭连接所以定义到外面
        Connection connection = null;
        Channel channel = null;
        try {
            //通过connectionfactory获取connection
            connection = connectionFactory.newConnection();
            //获取Exchange通道
            channel = connection.createChannel();
            //声明交换机 String exchange, BuiltinExchangeType type
            /**
             *参数明细
             *1、交换机名称
             *2、交换机类型,fanout、topic、direct、headers
             */
            channel.exchangeDeclare(EXCHANGEE_DIRECT, BuiltinExchangeType.DIRECT);

            /**
             *声明队列,如果Rabbit中没有此队列将自动创建
             *param1:队列名称
             *param2:是否持久化
             *param3:队列是否独占此连接
             *param4:队列不再使用时是否自动删除此队列
             *param5:队列参数
             */
            channel.queueDeclare(QUEUE_EMAIL, true, false, false, null);
            channel.queueDeclare(QUEUE_SMS, true, false, false, null);
            //交换机和队列绑定String queue, String exchange, String routingKey
            /**
             *参数明细
             *1、队列名称
             *2、交换机名称
             *3、路由key
             */
            channel.queueBind(QUEUE_EMAIL,EXCHANGEE_DIRECT,QUEUE_EMAIL);
            channel.queueBind(QUEUE_SMS,EXCHANGEE_DIRECT,QUEUE_SMS);
            //自定义消息

            /**
             *消息发布方法
             *param1:Exchange的名称,如果没有指定,则使用Default Exchange
             *param2:routingKey,消息的路由Key,是用于Exchange(交换机)将消息转发到指定的消息队列
             *param3:消息包含的属性
             *param4:消息体
             */
            for (int i = 0; i < 5; i++) {
                String massage = "helloMQ!第"+i+"次";
                //我在这里判断单数发给sms,双数发给email,比较容易发现区别
                if(i%2 == 0) {
                    channel.basicPublish(EXCHANGEE_DIRECT, QUEUE_EMAIL, null, massage.getBytes("utf-8"));
                }else{
                channel.basicPublish(EXCHANGEE_DIRECT, QUEUE_SMS, null, massage.getBytes("utf-8"));}
                System.out.println("send massage: ----" + massage);
            }

        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        } finally {

            if (channel != null) {
                channel.close();
            }
            if (connection != null) {
                connection.close();
            }
        }
    }
}

customer:

public class customer01 {
    private static final String QUEUE_EMAIL = "queue_email";
    //选择路由模式direct
    private static final String EXCHANGEE_DIRECT = "exchange_direct";


    public static void main(String[] args) throws IOException, TimeoutException {
        //创建connectionfactory
        ConnectionFactory factory = new ConnectionFactory();
        //设置MabbitMQ所在服务器的ip和端口
        factory.setHost("127.0.0.1");
        factory.setPort(5672);
        //获取connection
        Connection connection = factory.newConnection();
        //获取通道
        Channel channel = connection.createChannel();
        //声明交换机 String exchange, BuiltinExchangeType type
        /**
         *参数明细
         *1、交换机名称
         *2、交换机类型,fanout、topic、direct、headers
         */
        channel.exchangeDeclare(EXCHANGEE_DIRECT,BuiltinExchangeType.DIRECT);
        //声明队列
        channel.queueDeclare(QUEUE_EMAIL, true, false, false, null);
        //定义消费方法
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            /**
             *消费者接收消息调用此方法
             *@param consumerTag 消费者的标签,在channel.basicConsume()去指定
             *@param  envelope  消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)
             *@param properties
             *@param body
             *@throws IOException
             **/
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)throws IOException {
                //交换机
                String exchange = envelope.getExchange();
                //路由key
                String routingKey = envelope.getRoutingKey();
                //消息id
                long deliveryTag = envelope.getDeliveryTag();
                //消息内容
                String msg = new String(body, "utf-8");
                System.out.println("receive message:" + msg);
            }
        };
        /**
         *监听队列String queue, boolean autoAck,Consumer callback
         *参数明细
         *1、队列名称
         *2、是否自动回复,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置  为false则需要手动回复
         *3、消费消息的方法,消费者接收到消息后调用此方法
         */
        channel.basicConsume(QUEUE_EMAIL, false, consumer);
    }
}

customer02跟01代码一样,只需要在声明队列时改掉队列名称

//把customer01的属性改为这个
private static final String QUEUE_SMS = "queue_sms";

//声明队列时也要改下
//声明队列
        channel.queueDeclare(QUEUE_SMS, true, false, false, null);

改完之后就可以执行了
在这里插入图片描述在这里插入图片描述在这里插入图片描述
可以看到我们的提供者只发送了一次消息,两个消费者都通过Key接收到了自己需要的消息。

打开RabbitMQ的管理界面,观察交换机绑定情况:
在这里插入图片描述
使用生产者发送若干条消息,交换机根据routingkey转发消息到指定的队列。


二丶Topics通配符工作模式

在这里插入图片描述
Topics路由模式:
1、每个消费者监听自己的队列,并且设置带通配符的routingkey。
2、生产者将消息发给broker,由交换机根据routingkey来转发消息到指定的队列。

代码

providucer:

public class providucer01 {
    private static final String QUEUE_EMAIL = "queue_email";
    private static final String QUEUE_SMS = "queue_sms";
    private static final String EXCHANGEE_TOPIC = "exchange_topic";

    public static void main(String[] args) throws IOException, TimeoutException {
        //创建connectionfactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");//设置
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        connectionFactory.setVirtualHost("/");//rabbitmq默认虚拟机名称为“/”,虚拟机相当于一个独立的mq服务
        //因为最后需要关闭连接所以定义到外面
        Connection connection = null;
        Channel channel = null;
        try {
            //通过connectionfactory获取connection
            connection = connectionFactory.newConnection();
            //获取Exchange通道
            channel = connection.createChannel();
            //声明交换机 String exchange, BuiltinExchangeType type
            /**
             *参数明细
             *1、交换机名称
             *2、交换机类型,fanout、topic、direct、headers
             */
            channel.exchangeDeclare(EXCHANGEE_TOPIC, BuiltinExchangeType.TOPIC);

            /**
             *声明队列,如果Rabbit中没有此队列将自动创建
             *param1:队列名称
             *param2:是否持久化
             *param3:队列是否独占此连接
             *param4:队列不再使用时是否自动删除此队列
             *param5:队列参数
             */
            channel.queueDeclare(QUEUE_EMAIL, true, false, false, null);
            channel.queueDeclare(QUEUE_SMS, true, false, false, null);
            //交换机和队列绑定String queue, String exchange, String routingKey
            /**
             *参数明细
             *1、队列名称
             *2、交换机名称
             *3、路由key
             channel.queueBind(QUEUE_EMAIL, EXCHANGEE_TOPIC, QUEUE_EMAIL);
             channel.queueBind(QUEUE_SMS, EXCHANGEE_TOPIC, QUEUE_SMS);*/
            //自定义消息

            /**
             *消息发布方法
             *param1:Exchange的名称,如果没有指定,则使用Default Exchange
             *param2:routingKey,消息的路由Key,是用于Exchange(交换机)将消息转发到指定的消息队列
             *param3:消息包含的属性
             *param4:消息体
             */
            for (int i = 0; i < 5; i++) {
                String massage = "helloMQ!第" + i + "次";
                if (i % 2 == 0 && i < 4) {
                    channel.basicPublish(EXCHANGEE_TOPIC, "inform.email", null, massage.getBytes("utf-8"));
                } else if (i % 2 != 0) {
                    channel.basicPublish(EXCHANGEE_TOPIC, "inform.sms", null, massage.getBytes("utf-8"));
                } else {
                    channel.basicPublish(EXCHANGEE_TOPIC, "inform.email.sms", null, massage.getBytes("utf-8"));
                }
                System.out.println("send massage: ----" + massage);
            }

        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        } finally {

            if (channel != null) {
                channel.close();
            }
            if (connection != null) {
                connection.close();
            }
        }
    }
}

先更改交换机类型为TOPIC,然后判断消息发送;

customer:

public class customer01 {
    private static final String QUEUE_EMAIL = "queue_email";
    private static final String EXCHANGEE_TOPIC = "exchange_topic";


    public static void main(String[] args) throws IOException, TimeoutException {
        //创建connectionfactory
        ConnectionFactory factory = new ConnectionFactory();
        //设置MabbitMQ所在服务器的ip和端口
        factory.setHost("127.0.0.1");
        factory.setPort(5672);
        //获取connection
        Connection connection = factory.newConnection();
        //获取通道
        Channel channel = connection.createChannel();
        //声明交换机 String exchange, BuiltinExchangeType type
        /**
         *参数明细
         *1、交换机名称
         *2、交换机类型,fanout、topic、direct、headers
         */
        channel.exchangeDeclare(EXCHANGEE_TOPIC, BuiltinExchangeType.TOPIC);
        //声明队列
        channel.queueDeclare(QUEUE_EMAIL, true, false, false, null);
        //绑定队列
        /**
         *参数明细
         *1、队列名称
         *2、交换机名称
         *3、路由key*/
        channel.queueBind(QUEUE_EMAIL, EXCHANGEE_TOPIC, "inform.#.email.#");
        //定义消费方法
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            /**
             *消费者接收消息调用此方法
             *@param consumerTag 消费者的标签,在channel.basicConsume()去指定
             *@param  envelope  消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)
             *@param properties
             *@param body
             *@throws IOException
             **/
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //交换机
                String exchange = envelope.getExchange();
                //路由key
                String routingKey = envelope.getRoutingKey();
                //消息id
                long deliveryTag = envelope.getDeliveryTag();
                //消息内容
                String msg = new String(body, "utf-8");
                System.out.println("receive message:" + msg);
            }
        };
        /**
         *监听队列String queue, boolean autoAck,Consumer callback
         *参数明细
         *1、队列名称
         *2、是否自动回复,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置  为false则需要手动回复
         *3、消费消息的方法,消费者接收到消息后调用此方法
         */
        channel.basicConsume(QUEUE_EMAIL, false, consumer);
    }
}

customer02同上,只需要修改队列即可;
执行两个customer,后执行providucer;
观察输出结果
providucer:
在这里插入图片描述
customer:

在这里插入图片描述
在这里插入图片描述
发现两个customer把各自对应的消息都接收了,并全部接收第四次的消息,说明通配符生效,执行成功!

观察交换机:
在这里插入图片描述
发现Topic模式更多加强大,它可以实现Routing、publish/subscirbe模式的功能。

本文地址:https://blog.csdn.net/qq_47387336/article/details/107336616

如对本文有疑问, 点击进行留言回复!!

相关文章:

验证码:
移动技术网