当前位置: 移动技术网 > IT编程>开发语言>Java > java操作RabbitMQ添加队列、消费队列和三个交换机

java操作RabbitMQ添加队列、消费队列和三个交换机

2019年11月21日  | 移动技术网IT编程  | 我要评论

3.19政变,鼻护士,初级英语学习

假设已经在服务器上安装完rabbitmq。

一、发送消息到队列(生产者)

新建一个maven项目,在pom.xml文件加入以下依赖


<dependencies>
    <dependency>
        <groupid>com.rabbitmq</groupid>
        <artifactid>amqp-client</artifactid>
        <version>3.6.5</version>
    </dependency>
</dependencies>
新建一个p1类
package com.rabbitmq.test;

import com.rabbitmq.client.channel;
import com.rabbitmq.client.connection;
import com.rabbitmq.client.connectionfactory;

import java.io.ioexception;
import java.util.concurrent.timeoutexception;

/**
 * @author mowen
 * @create 2019/11/20-11:23
 */
public class p1 {
    public static void main(string[] args) throws ioexception, timeoutexception {
        //消息队列名字
        string queuename="queue";
        //实例连接工厂
        connectionfactory connectionfactory=new connectionfactory();
        //设置地址
        connectionfactory.sethost("192.168.128.233");
        //设置端口
        connectionfactory.setport(5672);
        //设置用户名
        connectionfactory.setusername("mowen");
        //设置密码
        connectionfactory.setpassword("123456");
        //获取连接(跟jdbc很像)
        connection connection = connectionfactory.newconnection();
        //创建通道
        channel channel = connection.createchannel();
        //声明队列。
        //参数1:队列名
        //参数2:持久化 (true表示是,队列将在服务器重启时依旧存在)
        //参数3:独占队列(创建者可以使用的私有队列,断开后自动删除)
        //参数4:当所有消费者客户端连接断开时是否自动删除队列
        //参数5:队列的其他参数
        channel.queuedeclare(queuename,true,false,false,null);

        for (int i = 0; i < 10; i++) {
            string msg="msg"+i;
            // 基本发布消息
            // 第一个参数为交换机名称、
            // 第二个参数为队列映射的路由key、
            // 第三个参数为消息的其他属性、
            // 第四个参数为发送信息的主体
            channel.basicpublish("",queuename,null,msg.getbytes());
        }

        channel.close();
        connection.close();
    }
}

运行后再浏览器进入rabbitmq的控制台,切换到queue看到

二、获取队列消息(消费者)

新建一个c1类

package com.rabbitmq.test;


import com.rabbitmq.client.*;

import java.io.ioexception;
import java.util.concurrent.timeoutexception;

/**
 * @author mowen
 * @create 2019/11/20-13:12
 */
public class c1 {
    public static void main(string[] args) throws ioexception, timeoutexception {
        //消息队列名字
        string queuename="queue";
        //实例连接工厂
        connectionfactory connectionfactory=new connectionfactory();
        //设置地址
        connectionfactory.sethost("192.168.128.233");
        //设置端口
        connectionfactory.setport(5672);
        //设置用户名
        connectionfactory.setusername("mowen");
        //设置密码
        connectionfactory.setpassword("123456");
        //获取连接(跟jdbc很像)
        connection connection = connectionfactory.newconnection();
        //创建通道
        channel channel = connection.createchannel();

        // 创建一个消费者
        consumer consumer = new defaultconsumer(channel){
            @override
            public void handledelivery(string consumertag, envelope envelope, amqp.basicproperties properties, byte[] body) throws ioexception {
                // 消费收到消息的时候调用的回调
                system.out.println("c3接收到:" + new string(body));
            }
        };


        //把消费着绑定到指定队列
        //第一个是队列名
        //第二个是 是否自动确认
        //第三个是消费者
        channel.basicconsume(queuename,true,consumer);

    }
}

运行后输出为

消费者一般都不会关闭,会一直等待队列消息,可以手动关闭程序。

channel.basicconsume(queuename,true,consumer);中的true为收到消息后自动确认,改为false取消自动确认。

在handledelivery方法最后面用

// 手动确认
// 确认收到消息
channel.basicack(envelope.getdeliverytag(),false);

来收到手动确认消息。消费者可以有多个并且可以同时消费一个队列;

当有多个消费者同时消费同一个队列时,收到的消息是平均分配的(消费者没收到之前已经确认每个消费者受到的消息),

但当其中一个消费者性能差的话,会影响其他的消费者,因为还要等它收完消息,这样会拖累其他消费者。

可以设置channel 的basicqos方法

//设置最多接受消息数量
// 设置了这个参数之后要吧自动确认关掉
channel.basicqos(1);

三、扇形(fanout)交换机

扇形交换机是基本的交换机类型,会把收到的消息以广播的形式发送到绑定的队列里,因为不需要经过条件筛选,所以它的速度最快。


在生产者项目新建一个fanout类

package com.rabbitmq.routing;

import com.rabbitmq.client.channel;
import com.rabbitmq.client.connection;
import com.rabbitmq.client.connectionfactory;

import java.io.ioexception;
import java.util.concurrent.timeoutexception;

/**
 * @author mowen
 * @date  2019/11/20-11:23
 */
public class fanout {
    public static void main(string[] args) throws ioexception, timeoutexception {
        //交换机名字
        string exchangename="fanout";
        //交换机名字类型
        string exchangetype="fanout";
        //消息队列名字
        string queuename1="fanout.queue1";
        string queuename2="fanout.queue2";
        string queuename3="fanout.queue3";
        //实例连接工厂
        connectionfactory connectionfactory=new connectionfactory();
        //设置地址
        connectionfactory.sethost("192.168.128.233");
        //设置端口
        connectionfactory.setport(5672);
        //设置用户名
        connectionfactory.setusername("mowen");
        //设置密码
        connectionfactory.setpassword("123456");
        //获取连接(跟jdbc很像)
        connection connection = connectionfactory.newconnection();
        //创建通道
        channel channel = connection.createchannel();
        //声明队列。
        //参数1:队列名
        //参数2:持久化 (true表示是,队列将在服务器重启时依旧存在)
        //参数3:独占队列(创建者可以使用的私有队列,断开后自动删除)
        //参数4:当所有消费者客户端连接断开时是否自动删除队列
        //参数5:队列的其他参数
        channel.queuedeclare(queuename1,true,false,false,null);
        channel.queuedeclare(queuename2,true,false,false,null);
        channel.queuedeclare(queuename3,true,false,false,null);

        //声明交换机
        channel.exchangedeclare(exchangename,exchangetype);

        //队列绑定到交换机
        channel.queuebind(queuename1,exchangename,"");
        channel.queuebind(queuename2,exchangename,"");
        channel.queuebind(queuename3,exchangename,"");

        for (int i = 0; i < 10; i++) {
            string msg="msg"+i;
            // 基本发布消息
            // 第一个参数为交换机名称、
            // 第二个参数为队列映射的路由key、
            // 第三个参数为消息的其他属性、
            // 第四个参数为发送信息的主体
            channel.basicpublish(exchangename,"",null,msg.getbytes());
        }

        channel.close();
        connection.close();
    }
}

运行后在rabbitmq网页管理后台的queue会看到

切换到exchanges会看到一个

就是我们声明的交换机,点击会看到我们绑定的队列

四、直连(direct)交换机

直连交换机会带路由功能,队列通过routing_key与直连交换机绑定,发送消息需要指定routing_key,交换机收到消息时,交换机会根据routing_key发送到指定队列里,同样的routing_key可以支持多个队列。


在生产者项目新建direct类

package com.rabbitmq.routing;

import com.rabbitmq.client.channel;
import com.rabbitmq.client.connection;
import com.rabbitmq.client.connectionfactory;

import java.io.ioexception;
import java.util.concurrent.timeoutexception;

/**
 * @author mowen
 * @date  2019/11/20-11:23
 */
public class direct {
    public static void main(string[] args) throws ioexception, timeoutexception {
        string exchangename="direct";
        string exchangetype="direct";
        //消息队列名字
        string queuename1="direct.queue1";
        string queuename2="direct.queue2";
        string queuename3="direct.queue3";
        //实例连接工厂
        connectionfactory connectionfactory=new connectionfactory();
        //设置地址
        connectionfactory.sethost("192.168.128.233");
        //设置端口
        connectionfactory.setport(5672);
        //设置用户名
        connectionfactory.setusername("mowen");
        //设置密码
        connectionfactory.setpassword("123456");
        //获取连接(跟jdbc很像)
        connection connection = connectionfactory.newconnection();
        //创建通道
        channel channel = connection.createchannel();
        //声明队列。
        //参数1:队列名
        //参数2:持久化 (true表示是,队列将在服务器重启时依旧存在)
        //参数3:独占队列(创建者可以使用的私有队列,断开后自动删除)
        //参数4:当所有消费者客户端连接断开时是否自动删除队列
        //参数5:队列的其他参数
        channel.queuedeclare(queuename1,true,false,false,null);
        channel.queuedeclare(queuename2,true,false,false,null);
        channel.queuedeclare(queuename3,true,false,false,null);

        //声明交换机
        channel.exchangedeclare(exchangename,exchangetype);

        //队列绑定到交换机并指定rouing_key
        channel.queuebind(queuename1,exchangename,"key1");
        channel.queuebind(queuename2,exchangename,"key2");
        channel.queuebind(queuename3,exchangename,"key1");

        for (int i = 0; i < 10; i++) {
            string msg="msg"+i;
            // 基本发布消息
            // 第一个参数为交换机名称、
            // 第二个参数为队列映射的路由key、
            // 第三个参数为消息的其他属性、
            // 第四个参数为发送信息的主体
            channel.basicpublish(exchangename,"key1",null,msg.getbytes());
        }

        channel.close();
        connection.close();
    }
}

运行后到后台的queue会看到

切换到exchanges会看到

点击进去

五、主题(topic)交换机

主题交换机的routing_key可以有一定的规则,交换机和队列的routing_key需要采用*.#.*…..的格式

每个部分用.分开

*代表一个单词(不是字符)

#代表任意数量(0或n个)单词


在生产者项目新进topic类

package com.rabbitmq.routing;

import com.rabbitmq.client.channel;
import com.rabbitmq.client.connection;
import com.rabbitmq.client.connectionfactory;

import java.io.ioexception;
import java.util.concurrent.timeoutexception;

/**
 * @author mowen
 * @date  2019/11/20-11:23
 */
public class topic {
    public static void main(string[] args) throws ioexception, timeoutexception {
        string exchangename="topic";
        string exchangetype="topic";
        //消息队列名字
        string queuename1="topic.queue1";
        string queuename2="topic.queue2";
        string queuename3="topic.queue3";
        //实例连接工厂
        connectionfactory connectionfactory=new connectionfactory();
        //设置地址
        connectionfactory.sethost("192.168.128.233");
        //设置端口
        connectionfactory.setport(5672);
        //设置用户名
        connectionfactory.setusername("mowen");
        //设置密码
        connectionfactory.setpassword("123456");
        //获取连接(跟jdbc很像)
        connection connection = connectionfactory.newconnection();
        //创建通道
        channel channel = connection.createchannel();
        //声明队列。
        //参数1:队列名
        //参数2:持久化 (true表示是,队列将在服务器重启时依旧存在)
        //参数3:独占队列(创建者可以使用的私有队列,断开后自动删除)
        //参数4:当所有消费者客户端连接断开时是否自动删除队列
        //参数5:队列的其他参数
        channel.queuedeclare(queuename1,true,false,false,null);
        channel.queuedeclare(queuename2,true,false,false,null);
        channel.queuedeclare(queuename3,true,false,false,null);

        //声明交换机
        channel.exchangedeclare(exchangename,exchangetype);

        //队列绑定到交换机并指定rouing_key
        channel.queuebind(queuename1,exchangename,"com.aaa.*");
        channel.queuebind(queuename2,exchangename,"com.*.topic");
        channel.queuebind(queuename3,exchangename,"com.bbb.*");

        for (int i = 0; i < 10; i++) {
            string msg="msg"+i;
            // 基本发布消息
            // 第一个参数为交换机名称、
            // 第二个参数为队列映射的路由key、
            // 第三个参数为消息的其他属性、
            // 第四个参数为发送信息的主体
            channel.basicpublish(exchangename,"com.aaa.topic",null,msg.getbytes());
        }

        channel.close();
        connection.close();
    }
}

运行后,到后台queue会看到

切换到exchanges会看到

点击进入会看到

如对本文有疑问,请在下面进行留言讨论,广大热心网友会与你互动!! 点击进行留言回复

相关文章:

验证码:
移动技术网