当前位置: 移动技术网 > IT编程>开发语言>Java > 用延迟队列取代定时任务

用延迟队列取代定时任务

2018年12月08日  | 移动技术网IT编程  | 我要评论

§1 rabbitmq延迟队列

rabbitmq延迟队列,主要是借助消息的ttl(time to live)和死信exchange(dead letter exchanges)来实现。

涉及到2个队列,一个用于发送消息,一个用于消息过期后的转发目标队列。

 

本例中, 定义2组exchange和queue。

agentpayquery1exchange		agentpayquery1queue(routingkey为delay)
agentpayquery2exchange		agentpayquery2queue(routingkey为delay)
agentpayquery1queue是缓冲队列,消息过期路由到agentpayquery2queue

 

 

§2 生产者

生产者配置:

<!-- 连接服务配置 -->
<rabbit:connection-factory
        id="connectionfactoryproducer"
        addresses="${mq.ip}"    //192.168.40.40:5672
        username="${username}"
        password="${password}"
        channel-cache-size="${cache.size}"
        publisher-confirms="${publisher.confirms}"
        publisher-returns="${publisher.returns}"
        virtual-host="/"
/>

<!--========================出款查询 延迟队列配置 begin =========================-->
<rabbit:queue id="agentpayquery2queue" durable="true" auto-delete="false" exclusive="false" name="agentpayquery2queue"/>
<rabbit:direct-exchange name="agentpayquery2exchange" durable="true" auto-delete="false" id="agentpayquery2exchange">
    <rabbit:bindings>
        <rabbit:binding queue="agentpayquery2queue" key="delay" />
    </rabbit:bindings>
</rabbit:direct-exchange>


<rabbit:queue id="agentpayquery1queue" durable="true" auto-delete="false" exclusive="false" name="agentpayquery1queue" >
    <rabbit:queue-arguments>
        <entry key="x-dead-letter-exchange" value="agentpayquery2exchange"/>
    </rabbit:queue-arguments>
</rabbit:queue>
<rabbit:direct-exchange name="agentpayquery1exchange" durable="true" auto-delete="false" id="agentpayquery1exchange">
    <rabbit:bindings>
        <rabbit:binding queue="agentpayquery1queue" key="delay" />
    </rabbit:bindings>
</rabbit:direct-exchange>

<!--定义rabbittemplate实例-->
<rabbit:template id="agentpayquerymsgtemplate"
                 exchange="agentpayquery1exchange"  routing-key="delay"
                 queue="agentpayquery1queue"
                 connection-factory="connectionfactoryproducer" message-converter="mqmessageconverter"
                 mandatory="true"
                 confirm-callback="publisherconfirmsreturns" return-callback="publisherconfirmsreturns"/>
<!--========================出款查询 延迟队列配置 end =========================-->

 

 

生产者消息入队:

import org.springframework.amqp.amqpexception;
import org.springframework.amqp.core.message;
import org.springframework.amqp.core.messagedeliverymode;
import org.springframework.amqp.core.messagepostprocessor;
import org.springframework.amqp.rabbit.core.rabbittemplate;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.stereotype.service;

@service
public class agentpayqueryproducer {

    private static final logger log = logmanager.getlogger(agentpayqueryproducer.class.getsimplename());

    @autowired
    private rabbittemplate agentpayquerymsgtemplate;

    public void senddelay(string message, int delayseconds) {
        string expiration = string.valueof(delayseconds * 1000);
        agentpayquerymsgtemplate.convertandsend((object) message, new messagepostprocessor() {
            @override
            public message postprocessmessage(message message)
                    throws amqpexception {
                message.getmessageproperties().setexpiration(expiration);
                message.getmessageproperties().setdeliverymode(messagedeliverymode.persistent);
                log.info("出款查询数据入队:{}", new string(message.getbody()));
                return message;
            }
        });
    }
}

 

 

§3消费者

消费端的配置无他:

<!-- 连接服务配置  channel-cache-size="25" -->
<rabbit:connection-factory id="connectionfactory"
                           addresses="${mq.ip}"
                           username="${username}"
                           password="${password}" />

<bean id="agentpayqueryconsumer" class="com.emaxcard.rpc.payment.service.impl.batchpay.agentpayqueryconsumer" />

<!-- queue litener  观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象-->
<rabbit:queue id="agentpayquery2queue" durable="true" auto-delete="false" exclusive="false" name="agentpayquery2queue" />

<rabbit:listener-container connection-factory="connectionfactory" acknowledge="auto"

                           max-concurrency="20"
                           concurrency="10"
                           prefetch="10">
    <rabbit:listener ref="agentpayqueryconsumer" queues="agentpayquery2queue" />
</rabbit:listener-container>

 

消息消费:

import com.alibaba.fastjson.json;
import com.emaxcard.enums.batchpaystatus;
import com.emaxcard.exceptions.responseexception;
import com.emaxcard.payment.vo.paymentrecord;
import com.emaxcard.rpc.payment.model.paymentrecordmodel;
import org.springframework.amqp.core.message;
import org.springframework.amqp.core.messagelistener;
import org.springframework.beans.factory.annotation.autowired;

public class agentpayqueryconsumer implements messagelistener {

    private static final logger log = logmanager.getlogger();

    @autowired
    querygatewayservice querygatewayservice;
    @autowired
    agentpayqueryproducer agentpayqueryproducer;

    @override
    public void onmessage(message message) {
        string mqmsg = new string(message.getbody());
        log.info("出款查询数据出队:{}", mqmsg);
        paymentrecord paymentrecordmodel;
        try {
            paymentrecordmodel = json.parseobject(mqmsg, paymentrecord.class);
        } catch (exception ex) {
            log.info("消息格式不是paymentrecordmodel,结束。");
            return;
        }

        try {
            batchpaystatus paystatus = querygatewayservice.querygateway(paymentrecordmodel);

            // 非终态,继续放入延迟队列
            if (batchpaystatus.success != paystatus && batchpaystatus.failed != paystatus) {
                if (batchpaystatus.notexist == paystatus) {
                    log.info("查询结果是{},不再处理", paystatus);
                } else {
                    agentpayqueryproducer.senddelay(mqmsg, 10);
                }
            }
        } catch (exception ex) {
            if (ex instanceof responseexception) {
                log.info("转账查询{},paymentid{},处理错误:{}",
                        paymentrecordmodel.gettransno(), paymentrecordmodel.getpaymentid(), ex.getmessage());
            } else {
                log.error("处理消息异常:", ex);
            }
        }

    }
}

 

 

§4 使用延迟队列要注意

1. 因为是队列,所以即使一个消息比在同一队列中的其他消息提前过期,提前过期的也不会优先进入死信队列,它们还是按照入库的顺序让消费者消费。如果第一进去的消息过期时间是1小时,那么死信队列的消费者也许等1小时才能收到第一个消息。

2. 当缓冲队列里一旦出现未设置过期时间的消息,那么就会造成整个队列堵塞。消费端也无法消费到消息。通过日志可以看到,打印出来的都是 blockingqueueconsumer。

 

 

 

get messages ack mode选择“ack message requeue false”,可以将消息消费掉

 

如对本文有疑问, 点击进行留言回复!!

相关文章:

验证码:
移动技术网