当前位置: 移动技术网 > IT编程>开发语言>Java > RabbitMQ-Fanout案例

RabbitMQ-Fanout案例

2020年07月17日  | 移动技术网IT编程  | 我要评论
Consumerpackage sc.app.stc.rmq.rabbitmq;import java.io.IOException;import java.util.List;import org.springframework.beans.BeanUtils;import com.alibaba.fastjson.JSONArray;import com.rabbitmq.client.AMQP;import com.rabbitmq.client.BuiltinExchangeTy

Consumer

package sc.app.stc.rmq.rabbitmq;

import java.io.IOException;
import java.util.List;

import org.springframework.beans.BeanUtils;

import com.alibaba.fastjson.JSONArray;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import sc.app.stc.po.Risk;
import sc.app.stc.rmq.po.ExecutionReport;
import sc.app.stc.rmq.po.RiskResponseBody;
import sc.app.stc.rmq.stream.RiskStream;

public class RiskConsumer {
	
	public static void main(String[] args) {
		consumer();
	}

	/**
	 * topic消费者
	 *
	 * @Description: consumer
	 * @author lq.
	 * @date 2020年5月18日 上午10:42:58
	 * @version V1.0
	 */
	public static void consumer() {
		try {
			Channel channel = RiskRpcServer.getChannelInstance("riskConsumer");
			if(channel==null) {
				System.out.println(" channel is null..");
				return;
			}
			// 交换机声明
			channel.exchangeDeclare(RiskRpcServer.queryRiskFanoutExchange, BuiltinExchangeType.FANOUT,
					RiskRpcServer.exchangeDurable, RiskRpcServer.exchangeAutoDelete, null);

			// 获取一个临时队列
			String queueName = channel.queueDeclare().getQueue();
			// 队列与交换机绑定(参数为:队列名称;交换机名称;routingKey忽略)
			channel.queueBind(queueName, RiskRpcServer.queryRiskFanoutExchange, "");

			// 这里重写了DefaultConsumer的handleDelivery方法,因为发送的时候对消息进行了getByte(),在这里要重新组装成String
			Consumer consumer = new DefaultConsumer(channel) {
				@Override
				public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
						byte[] body) throws IOException {
					super.handleDelivery(consumerTag, envelope, properties, body);
					String json = new String(body, "UTF-8");
					System.out.println(String.format("RiskConsumer received:%s", json));
				}
			};
			// 声明队列中被消费掉的消息(参数为:队列名称;消息是否自动确认;consumer主体)
			channel.basicConsume(queueName, true, consumer);
			// 这里不能关闭连接,调用了消费方法后,消费者会一直连接着rabbitMQ等待消费
		} catch (IOException e) {
			System.out.println("Fanout is error..");
		}
	}
}

Product

package sc.app.stc.rmq.rabbitmq;

import java.io.IOException;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;

public class RiskProduct {
	
	public static void main(String[] args) {
		String message = "";
		sendMessage(message,null);
	}

	/**
	 * fanout生产者
	 *
	 * @Description: product
	 * @author lq.
	 * @date 2020年1月3日 下午3:39:46
	 * @version V1.0
	 */
	public static Channel sendMessage(String message, Channel channel) {
		try {
			if (channel == null) {
				channel = RiskRpcServer.getChannelInstance("riskProduct");
			}
			if(channel==null) {
				System.out.println(" channel is null..");
				return null;
			}
			// 声明交换机(参数为: 交换机名称; 交换机类型,广播模式)
			// 交换机声明
			channel.exchangeDeclare(RiskRpcServer.queryRiskFanoutExchange, BuiltinExchangeType.FANOUT,
					RiskRpcServer.exchangeDurable, RiskRpcServer.exchangeAutoDelete, null);

			// 消息发布(参数为:交换机名称; routingKey,忽略。在广播模式中,生产者声明交换机的名称和类型即可)
			channel.basicPublish(RiskRpcServer.queryRiskFanoutExchange, "", null, message.getBytes());
			System.out.println("********Message********:发送成功");
		} catch (IOException e) {
			System.out.println("FanoutProduct error");
		}
		return channel;
	}
}


本文地址:https://blog.csdn.net/java_lqjsw/article/details/107366780

如您对本文有疑问或者有任何想说的,请点击进行留言回复,万千网友为您解惑!

相关文章:

验证码:
移动技术网