官网也讲述的清楚而且还有例子,只不过是英文的,很多人看到英文就不明白说什么了吧,即使有翻译成中文,总觉得哪里怪怪的,有些翻译并不流畅。我还是支持多看官网,官网: 。下面是自己做的一点下笔记,有参考其他文档。如有什么不对的地方,希望大家能够告诉我,通过留言板,像消息队列一样,你发送消息,我接收消息。
public class connectionutil { /** * 建立与rabbitmq的链接 */ public static connection getconnection() throws ioexception, timeoutexception { // 定义连接工厂 connectionfactory factory = new connectionfactory(); // 设置服务地址 factory.sethost("127.0.0.1"); // 端口 factory.setport(5672); // 设置账号信息,用户名、密码、vhost factory.setvirtualhost("/demo"); factory.setusername("guest"); factory.setpassword("guest"); // 通过工厂获取连接 connection connection = factory.newconnection(); return connection; } }
public class send { private final static string queue_name = "simple_queue"; public static void main(string[] args) throws ioexception, timeoutexception { // 获取到连接 connection connection = connectionutil.getconnection(); // 从连接中创建通道,使用通道才能完成消息相关的操作 channel channel = connection.createchannel(); // 声明(创建)队列 channel.queuedeclare(queue_name, false, false, false, null); // 消息内容 string message = "hello word!"; // 向指定的队列中发送消息 channel.basicpublish("", queue_name, null, message.getbytes()); system.out.println(" [x] sent '" + message + "'"); // 关闭通道和连接 channel.close(); connection.close(); } }
public class recv { private final static string queue_name = "simple_queue"; public static void main(string[] args) throws exception { // 获取连接 connection connection = connectionutil.getconnection(); // 创建通道 channel channel = connection.createchannel(); // 声明队列 channel.queuedeclare(queue_name, false, false, false, null); // 定义队列的消费者 defaultconsumer consumer = new defaultconsumer(channel) { // 获取消息,并且处理,这个方法类似事件监听,如果有消息时,会自动被调用 @override public void handledelivery(string consumertag, envelope envelope, amqp.basicproperties properties, byte[] body) throws ioexception { // body 即消息体 string msg = new string(body); system.out.println(" [x] received: " + msg + "!"); } }; // 监听队列,第二个参数:是否自动进行消息确认 channel.basicconsume(queue_name, true, consumer); } }
public class recv2 { private final static string queue_name = "simple_queue"; public static void main(string[] args) throws exception { // 获取连接 connection connection = connectionutil.getconnection(); // 创建通道 channel channel = connection.createchannel(); // 声明队列 channel.queuedeclare(queue_name, false, false, false, null); // 定义队列的消费者 defaultconsumer consumer = new defaultconsumer(channel) { // 获取消息,并且处理,这个方法类似事件监听,如果有消息时,会自动被调用 @override public void handledelivery(string consumertag, envelope envelope, amqp.basicproperties properties, byte[] body) throws ioexception { // body 即消息体 string msg = new string(body); system.out.println(" [x] received: " + msg + "!"); // 手动进行ack channel.basicack(envelope.getdeliverytag(), false); } }; // 监听队列,第二个参数false,手动进行ack channel.basicconsume(queue_name, false, consumer); } }
public class send { private final static string queue_name = "task_work_queue"; public static void main(string[] args) throws exception { // 获取到连接 connection connection = connectionutil.getconnection(); // 从连接中创建通道,使用通道才能完成消息相关的操作 final channel channel = connection.createchannel(); // 声明(创建)队列 channel.queuedeclare(queue_name, false, false, false, null); // 循环发布任务 for (int i=0; i<50; i++) { // 消息内容 string message = "task ... " + i; channel.basicpublish("", queue_name, null, message.getbytes()); system.out.println(" [x] sent '" + message + "'"); thread.sleep(i * 2); } // 关闭通道和连接 channel.close(); connection.close(); } }
public class recv { private final static string queue_name = "task_work_queue"; public static void main(string[] args) throws exception { // 获取连接 connection connection = connectionutil.getconnection(); // 创建通道 channel channel = connection.createchannel(); // 声明队列 channel.queuedeclare(queue_name, false, false, false, null); // 设置每个消费同时只能处理一条消息 channel.basicqos(1); // 定义队列的消费者 defaultconsumer consumer = new defaultconsumer(channel) { // 获取消息,并且处理,这个方法类似事件监听,如果有消息时,会自动被调用 @override public void handledelivery(string consumertag, envelope envelope, amqp.basicproperties properties, byte[] body) throws ioexception { // body 即消息体 string msg = new string(body); system.out.println(" [x] received: " + msg + "!"); try { // 模拟完成任务的耗时:1000ms thread.sleep(1000); } catch (interruptedexception e) { e.printstacktrace(); } channel.basicack(envelope.getdeliverytag(), false); } }; // 监听队列,第二个参数:是否自动进行消息确认 channel.basicconsume(queue_name, false, consumer); } }
/* * 对比上个消费者:耗时小,完成任务多些 */ public class recv2 { private final static string queue_name = "task_work_queue"; public static void main(string[] args) throws exception { // 获取连接 connection connection = connectionutil.getconnection(); // 创建通道 final channel channel = connection.createchannel(); // 声明队列 channel.queuedeclare(queue_name, false, false, false, null); // 设置每个消费同时只能处理一条消息 channel.basicqos(1); // 定义队列的消费者 defaultconsumer consumer = new defaultconsumer(channel) { // 获取消息,并且处理,这个方法类似事件监听,如果有消息时,会自动被调用 @override public void handledelivery(string consumertag, envelope envelope, amqp.basicproperties properties, byte[] body) throws ioexception { // body 即消息体 string msg = new string(body); system.out.println(" [x] received: " + msg + "!"); // 手动进行ack channel.basicack(envelope.getdeliverytag(), false); } }; // 监听队列,第二个参数false,手动进行ack channel.basicconsume(queue_name, false, consumer); } }
public class send { private final static string exchange_name = "fanout_exchange_test"; public static void main(string[] args) throws exception { // 获取连接 connection connection = connectionutil.getconnection(); // 获取通道 channel channel = connection.createchannel(); // 声明exchange,指定类型为fanout channel.exchangedeclare(exchange_name, builtinexchangetype.fanout); // 消息内容 string message = "hello everyone"; // 发布消息到exchange channel.basicpublish(exchange_name, "", null, message.getbytes()); system.out.println(" [生产者] sent '" + message + "'"); channel.close(); connection.close(); } }
public class recv { private final static string queue_name = "fanout_exchange_queue_1"; private final static string exchange_name = "fanout_exchange_test"; public static void main(string[] args) throws ioexception, timeoutexception { // 获取到链接 connection connection = connectionutil.getconnection(); // 获取通道 channel channel = connection.createchannel(); // 声明队列 channel.queuedeclare(queue_name, false, false, false, null); // 绑定队列到交换机 channel.queuebind(queue_name, exchange_name, ""); // 定义队列的消费者 defaultconsumer consumer = new defaultconsumer(channel) { // 获取消息,并且处理,这个方法类似事件监听,如果有消息时,会自动调用 @override public void handledelivery(string consumertag, envelope envelope, amqp.basicproperties properties, byte[] body) throws ioexception { string msg = new string(body); system.out.println(" [消费者1] received: " + msg + "!"); } }; // 监听队列,自动返回完成 channel.basicconsume(queue_name, true, consumer); } }
public class recv2 { private final static string queue_name = "fanout_exchange_queue_2"; private final static string exchange_name = "fanout_exchange_test"; public static void main(string[] args) throws ioexception, timeoutexception { // 获取到链接 connection connection = connectionutil.getconnection(); // 获取通道 channel channel = connection.createchannel(); // 声明队列 channel.queuedeclare(queue_name, false, false, false, null); // 绑定队列到交换机 channel.queuebind(queue_name, exchange_name, ""); // 定义队列的消费者 defaultconsumer consumer = new defaultconsumer(channel) { @override public void handledelivery(string consumertag, envelope envelope, amqp.basicproperties properties, byte[] body) throws ioexception { string msg = new string(body); system.out.println(" [消费者2] received: " + msg + "!"); } }; // 监听队列,自动返回完成 channel.basicconsume(queue_name, true, consumer); } }
public class send { private final static string exchange_name = "direct_exchange_test"; public static void main(string[] args) throws exception { // 获取连接 connection connection = connectionutil.getconnection(); // 获取通道 channel channel = connection.createchannel(); // 声明exchange,指定类型为direct channel.exchangedeclare(exchange_name, builtinexchangetype.direct); // 消息内容 string message = "商品增加了,id = 1002"; // 发布消息到exchange,并且指定routing key为:delete,代表删除商品 channel.basicpublish(exchange_name, "insert", null, message.getbytes()); system.out.println(" [商品服务] sent '" + message + "'"); channel.close(); connection.close(); } }
public class recv { private final static string queue_name = "direct_exchange_queue_1"; private final static string exchange_name = "direct_exchange_test"; public static void main(string[] args) throws ioexception, timeoutexception { // 获取到链接 connection connection = connectionutil.getconnection(); // 获取通道 channel channel = connection.createchannel(); // 声明队列 channel.queuedeclare(queue_name, false, false, false, null); // 绑定队列到交换机,同时指定需要订阅的routing key。假设此处需要update和delete消息 channel.queuebind(queue_name, exchange_name, "update"); channel.queuebind(queue_name, exchange_name, "delete"); // 定义队列的消费者 defaultconsumer consumer = new defaultconsumer(channel) { // 获取消息,并且处理,这个方法类似事件监听,如果有消息时,会自动调用 @override public void handledelivery(string consumertag, envelope envelope, amqp.basicproperties properties, byte[] body) throws ioexception { string msg = new string(body); system.out.println(" [消费者1] received: " + msg + "!"); } }; // 监听队列,自动返回完成 channel.basicconsume(queue_name, true, consumer); } }
public class recv2 { private final static string queue_name = "direct_exchange_queue_2"; private final static string exchange_name = "direct_exchange_test"; public static void main(string[] args) throws ioexception, timeoutexception { // 获取到链接 connection connection = connectionutil.getconnection(); // 获取通道 channel channel = connection.createchannel(); // 声明队列 channel.queuedeclare(queue_name, false, false, false, null); // 绑定队列到交换机,同时指定需要订阅的routing key。假设此处需要update和delete消息 channel.queuebind(queue_name, exchange_name, "update"); channel.queuebind(queue_name, exchange_name, "delete"); channel.queuebind(queue_name, exchange_name, "insert"); // 定义队列的消费者 defaultconsumer consumer = new defaultconsumer(channel) { // 获取消息,并且处理,这个方法类似事件监听,如果有消息时,会自动调用 @override public void handledelivery(string consumertag, envelope envelope, amqp.basicproperties properties, byte[] body) throws ioexception { string msg = new string(body); system.out.println(" [消费者2] received: " + msg + "!"); } }; // 监听队列,自动返回完成 channel.basicconsume(queue_name, true, consumer); } }
public class send { private final static string exchange_name = "topic_durable_exchange_test"; public static void main(string[] args) throws exception { // 获取连接 connection connection = connectionutil.getconnection(); // 获取通道 channel channel = connection.createchannel(); // 开启生产者确认 // channel.confirmselect(); // 声明exchange,指定类型为topic, 并且设置durable为true,持久化 channel.exchangedeclare(exchange_name, builtinexchangetype.topic, true); // 消息内容 string message = "商品新增了,id = 1002"; // 发布消息到exchange,并且指定routing key,消息持久化 channel.basicpublish(exchange_name, "item.insert", messageproperties.persistent_text_plain, message.getbytes()); system.out.println(" [商品服务] sent '" + message + "'"); // 等待rabbitmq的确认消息,true为确认收到,false为发出有误 // channel.waitforconfirms(); channel.close(); connection.close(); } }
public class recv { private final static string queue_name = "topic_durable_exchange_queue_1"; private final static string exchange_name = "topic_durable_exchange_test"; public static void main(string[] args) throws ioexception, timeoutexception { // 获取到链接 connection connection = connectionutil.getconnection(); // 获取通道 channel channel = connection.createchannel(); // 声明队列, 第二个参数:true代表声明为持久化 channel.queuedeclare(queue_name, true, false, false, null); // 绑定队列到交换机,同时指定需要订阅的routing key。假设此处需要update和delete消息 channel.queuebind(queue_name, exchange_name, "item.update"); channel.queuebind(queue_name, exchange_name, "item.delete"); // 定义队列的消费者 defaultconsumer consumer = new defaultconsumer(channel) { // 获取消息,并且处理,这个方法类似事件监听,如果有消息时,会自动调用 @override public void handledelivery(string consumertag, envelope envelope, amqp.basicproperties properties, byte[] body) throws ioexception { string msg = new string(body); system.out.println(" [消费者1] received: " + msg + "!"); } }; // 监听队列,自动返回完成 channel.basicconsume(queue_name, true, consumer); } }
public class recv2 { private final static string queue_name = "topic_durable_exchange_queue_2"; private final static string exchange_name = "topic_durable_exchange_test"; public static void main(string[] args) throws ioexception, timeoutexception { // 获取到链接 connection connection = connectionutil.getconnection(); // 获取通道 channel channel = connection.createchannel(); // 声明队列 channel.queuedeclare(queue_name, true, false, false, null); // 绑定队列到交换机,同时指定需要订阅的routing key。假设此处需要update和delete消息 channel.queuebind(queue_name, exchange_name, "item.*"); // 定义队列的消费者 defaultconsumer consumer = new defaultconsumer(channel) { // 获取消息,并且处理,这个方法类似事件监听,如果有消息时,会自动调用 @override public void handledelivery(string consumertag, envelope envelope, amqp.basicproperties properties, byte[] body) throws ioexception { string msg = new string(body); system.out.println(" [消费者2] received: " + msg + "!"); } }; // 监听队列,自动返回完成 channel.basicconsume(queue_name, true, consumer); } }
<parent>
<groupid>org.springframework.boot</groupid>
<artifactid>spring-boot-starter-parent</artifactid>
<version>2.0.4.release</version>
</parent>
<dependencies>
<dependency>
<groupid>org.springframework.boot</groupid>
<artifactid>spring-boot-starter-amqp</artifactid>
</dependency>
</dependencies>
spring:
rabbitmq:
host: 127.0.0.1
username: guest
password: guest
virtual-host: /demo
@component public class listener { @rabbitlistener(bindings = @queuebinding( value = @queue(value = "spring.test.queue", durable = "true"), exchange = @exchange( value = "spring.test.exchange", type = exchangetypes.topic), key = {"#.#"})) public void listen(string msg) { system.out.println("接收到的消息: " + msg); } }
如对本文有疑问, 点击进行留言回复!!
Springboot项目因为kackson版本问题启动报错解决方案
Java多线程下的其他组件之CyclicBarrier、Callable、Future和FutureTask详解
网友评论