rabbitmq延迟队列,主要是借助消息的ttl(time to live)和死信exchange(dead letter exchanges)来实现。
涉及到2个队列,一个用于发送消息,一个用于消息过期后的转发目标队列。
本例中, 定义2组exchange和queue。
agentpayquery1exchange agentpayquery1queue(routingkey为delay) agentpayquery2exchange agentpayquery2queue(routingkey为delay)
agentpayquery1queue是缓冲队列,消息过期路由到agentpayquery2queue
生产者配置:
<!-- 连接服务配置 --> <rabbit:connection-factory id="connectionfactoryproducer" addresses="${mq.ip}" //192.168.40.40:5672 username="${username}" password="${password}" channel-cache-size="${cache.size}" publisher-confirms="${publisher.confirms}" publisher-returns="${publisher.returns}" virtual-host="/" /> <!--========================出款查询 延迟队列配置 begin =========================--> <rabbit:queue id="agentpayquery2queue" durable="true" auto-delete="false" exclusive="false" name="agentpayquery2queue"/> <rabbit:direct-exchange name="agentpayquery2exchange" durable="true" auto-delete="false" id="agentpayquery2exchange"> <rabbit:bindings> <rabbit:binding queue="agentpayquery2queue" key="delay" /> </rabbit:bindings> </rabbit:direct-exchange> <rabbit:queue id="agentpayquery1queue" durable="true" auto-delete="false" exclusive="false" name="agentpayquery1queue" > <rabbit:queue-arguments> <entry key="x-dead-letter-exchange" value="agentpayquery2exchange"/> </rabbit:queue-arguments> </rabbit:queue> <rabbit:direct-exchange name="agentpayquery1exchange" durable="true" auto-delete="false" id="agentpayquery1exchange"> <rabbit:bindings> <rabbit:binding queue="agentpayquery1queue" key="delay" /> </rabbit:bindings> </rabbit:direct-exchange> <!--定义rabbittemplate实例--> <rabbit:template id="agentpayquerymsgtemplate" exchange="agentpayquery1exchange" routing-key="delay" queue="agentpayquery1queue" connection-factory="connectionfactoryproducer" message-converter="mqmessageconverter" mandatory="true" confirm-callback="publisherconfirmsreturns" return-callback="publisherconfirmsreturns"/> <!--========================出款查询 延迟队列配置 end =========================-->
生产者消息入队:
import org.springframework.amqp.amqpexception; import org.springframework.amqp.core.message; import org.springframework.amqp.core.messagedeliverymode; import org.springframework.amqp.core.messagepostprocessor; import org.springframework.amqp.rabbit.core.rabbittemplate; import org.springframework.beans.factory.annotation.autowired; import org.springframework.stereotype.service; @service public class agentpayqueryproducer { private static final logger log = logmanager.getlogger(agentpayqueryproducer.class.getsimplename()); @autowired private rabbittemplate agentpayquerymsgtemplate; public void senddelay(string message, int delayseconds) { string expiration = string.valueof(delayseconds * 1000); agentpayquerymsgtemplate.convertandsend((object) message, new messagepostprocessor() { @override public message postprocessmessage(message message) throws amqpexception { message.getmessageproperties().setexpiration(expiration); message.getmessageproperties().setdeliverymode(messagedeliverymode.persistent); log.info("出款查询数据入队:{}", new string(message.getbody())); return message; } }); } }
消费端的配置无他:
<!-- 连接服务配置 channel-cache-size="25" --> <rabbit:connection-factory id="connectionfactory" addresses="${mq.ip}" username="${username}" password="${password}" /> <bean id="agentpayqueryconsumer" class="com.emaxcard.rpc.payment.service.impl.batchpay.agentpayqueryconsumer" /> <!-- queue litener 观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象--> <rabbit:queue id="agentpayquery2queue" durable="true" auto-delete="false" exclusive="false" name="agentpayquery2queue" /> <rabbit:listener-container connection-factory="connectionfactory" acknowledge="auto" max-concurrency="20" concurrency="10" prefetch="10"> <rabbit:listener ref="agentpayqueryconsumer" queues="agentpayquery2queue" /> </rabbit:listener-container>
消息消费:
import com.alibaba.fastjson.json; import com.emaxcard.enums.batchpaystatus; import com.emaxcard.exceptions.responseexception; import com.emaxcard.payment.vo.paymentrecord; import com.emaxcard.rpc.payment.model.paymentrecordmodel; import org.springframework.amqp.core.message; import org.springframework.amqp.core.messagelistener; import org.springframework.beans.factory.annotation.autowired; public class agentpayqueryconsumer implements messagelistener { private static final logger log = logmanager.getlogger(); @autowired querygatewayservice querygatewayservice; @autowired agentpayqueryproducer agentpayqueryproducer; @override public void onmessage(message message) { string mqmsg = new string(message.getbody()); log.info("出款查询数据出队:{}", mqmsg); paymentrecord paymentrecordmodel; try { paymentrecordmodel = json.parseobject(mqmsg, paymentrecord.class); } catch (exception ex) { log.info("消息格式不是paymentrecordmodel,结束。"); return; } try { batchpaystatus paystatus = querygatewayservice.querygateway(paymentrecordmodel); // 非终态,继续放入延迟队列 if (batchpaystatus.success != paystatus && batchpaystatus.failed != paystatus) { if (batchpaystatus.notexist == paystatus) { log.info("查询结果是{},不再处理", paystatus); } else { agentpayqueryproducer.senddelay(mqmsg, 10); } } } catch (exception ex) { if (ex instanceof responseexception) { log.info("转账查询{},paymentid{},处理错误:{}", paymentrecordmodel.gettransno(), paymentrecordmodel.getpaymentid(), ex.getmessage()); } else { log.error("处理消息异常:", ex); } } } }
1. 因为是队列,所以即使一个消息比在同一队列中的其他消息提前过期,提前过期的也不会优先进入死信队列,它们还是按照入库的顺序让消费者消费。如果第一进去的消息过期时间是1小时,那么死信队列的消费者也许等1小时才能收到第一个消息。
2. 当缓冲队列里一旦出现未设置过期时间的消息,那么就会造成整个队列堵塞。消费端也无法消费到消息。通过日志可以看到,打印出来的都是 blockingqueueconsumer。
get messages ack mode选择“ack message requeue false”,可以将消息消费掉
如对本文有疑问, 点击进行留言回复!!
荐 深入理解Java中的BigInteger和 BigDecimal,再也不怕面试了
tomact正常启动,但是在日志文件报错java.lang.NoClassDefFoundError: java/util/logging/Logger
servlet整合quartz:servlet中使用quartz,服务器启动时加载任务
荐 Java——集合中的Map接口通过HashMap类实现一些常用的方法
SpringBoot整合mybatis访问时报错Invalid bound statement (not found)
网友评论