当前位置: 移动技术网 > IT编程>脚本编程>Python > 如何通过Python实现RabbitMQ延迟队列

如何通过Python实现RabbitMQ延迟队列

2020年11月28日  | 移动技术网IT编程  | 我要评论
最近在做一任务时,遇到需要延迟处理的数据,最开始的做法是现将数据存储在数据库,然后写个脚本,隔五分钟扫描数据表再处理数据,实际效果并不好。因为系统本身一直在用rabbitmq做异步处理任务的中间件,所

最近在做一任务时,遇到需要延迟处理的数据,最开始的做法是现将数据存储在数据库,然后写个脚本,隔五分钟扫描数据表再处理数据,实际效果并不好。因为系统本身一直在用rabbitmq做异步处理任务的中间件,所以想到是否可以利用rabbitmq实现延迟队列。功夫不负有心人,rabbitmq虽然没有现成可用的延迟队列,但是可以利用其两个重要特性来实现之:1、time to live(ttl)消息超时机制;2、dead letter exchanges(dlx)死信队列。下面将具体描述实现原理以及实现代

延迟队列的基础原理time to live(ttl)

rabbitmq可以针对queue设置x-expires 或者 针对message设置 x-message-ttl,来控制消息的生存时间,如果超时(两者同时设置以最先到期的时间为准),则消息变为dead letter(死信)
rabbitmq消息的过期时间有两种方法设置。

通过队列(queue)的属性设置,队列中所有的消息都有相同的过期时间。(本次延迟队列采用的方案)对消息单独设置,每条消息ttl可以不同。

如果同时使用,则消息的过期时间以两者之间ttl较小的那个数值为准。消息在队列的生存时间一旦超过设置的ttl值,就成为死信(dead letter)

dead letter exchanges(dlx)

rabbitmq的queue可以配置x-dead-letter-exchange 和x-dead-letter-routing-key(可选)两个参数,如果队列内出现了dead letter,则按照这两个参数重新路由转发到指定的队列。

  • x-dead-letter-exchange:出现死信(dead letter)之后将dead letter重新发送到指定exchange
  • x-dead-letter-routing-key:出现死信(dead letter)之后将dead letter重新按照指定的routing-key发送

队列中出现死信(dead letter)的情况有:

  • 消息或者队列的ttl过期。(延迟队列利用的特性)
  • 队列达到最大长度
  • 消息被消费端拒绝(basic.reject or basic.nack)并且requeue=false

综合上面两个特性,将队列设置ttl规则,队列ttl过期后消息会变成死信,然后利用dlx特性将其转发到另外的交换机和队列就可以被重新消费,达到延迟消费效果。

延迟队列设计及实现(python)

从上面描述,延迟队列的实现大致分为两步:

产生死信,有两种方式per-message ttl和 queue ttl,因为我的需求中是所有的消息延迟处理时间相同,所以本实现中采用 queue ttl设置队列的ttl,如果需要将队列中的消息设置不同的延迟处理时间,则设置per-message ttl()

设置死信的转发规则,dead letter exchanges设置方法()

完整代码如下:

"""
created on fri aug 3 17:00:44 2018

@author: bge
"""
import pika,json,logging
class rabbitmqclient:
  def __init__(self, conn_str='amqp://user:pwd@host:port/%2f'):
    self.exchange_type = "direct"
    self.connection_string = conn_str
    self.connection = pika.blockingconnection(pika.urlparameters(self.connection_string))
    self.channel = self.connection.channel()
    self._declare_retry_queue() #retryqueue and retryexchange
    logging.debug("connection established")
  def close_connection(self):
    self.connection.close()
    logging.debug("connection closed")
  def declare_exchange(self, exchange):
    self.channel.exchange_declare(exchange=exchange,
                   exchange_type=self.exchange_type,
                   durable=true)
  def declare_queue(self, queue):
    self.channel.queue_declare(queue=queue,
                  durable=true,)
  def declare_delay_queue(self, queue,dlx='retryexchange',ttl=60000):
    """
    创建延迟队列
    :param ttl: ttl的单位是us,ttl=60000 表示 60s
    :param queue:
    :param dlx:死信转发的exchange
    :return:
    """
    arguments={}
    if dlx:
      #设置死信转发的exchange
      arguments[ 'x-dead-letter-exchange']=dlx
    if ttl:
      arguments['x-message-ttl']=ttl
    print(arguments)
    self.channel.queue_declare(queue=queue,
                  durable=true,
                  arguments=arguments)
  def _declare_retry_queue(self):
    """
    创建异常交换器和队列,用于存放没有正常处理的消息。
    :return:
    """
    self.channel.exchange_declare(exchange='retryexchange',
                   exchange_type='fanout',
                   durable=true)
    self.channel.queue_declare(queue='retryqueue',
                  durable=true)
    self.channel.queue_bind('retryqueue', 'retryexchange','retryqueue')
  def publish_message(self,routing_key, msg,exchange='',delay=0,ttl=none):
    """
    发送消息到指定的交换器
    :param exchange: rabbitmq交换器
    :param msg: 消息实体,是一个序列化的json字符串
    :return:
    """
    if delay==0:
      self.declare_queue(routing_key)
    else:
      self.declare_delay_queue(routing_key,ttl=ttl)
    if exchange!='':
      self.declare_exchange(exchange)
    self.channel.basic_publish(exchange=exchange,
                  routing_key=routing_key,
                  body=msg,
                  properties=pika.basicproperties(
                    delivery_mode=2,
                    type=exchange
                  ))
    self.close_connection()
    print("message send out to %s" % exchange)
    logging.debug("message send out to %s" % exchange)
  def start_consume(self,callback,queue='#',delay=1):
    """
    启动消费者,开始消费rabbitmq中的消息
    :return:
    """
    if delay==1:
      queue='retryqueue'
    else:
      self.declare_queue(queue)
    self.channel.basic_qos(prefetch_count=1)
    try:
      self.channel.basic_consume( # 消费消息
        callback, # 如果收到消息,就调用callback函数来处理消息
        queue=queue, # 你要从那个队列里收消息
      )
      self.channel.start_consuming()
    except keyboardinterrupt:
      self.stop_consuming()
  def stop_consuming(self):
    self.channel.stop_consuming()
    self.close_connection()
  def message_handle_successfully(channel, method):
    """
    如果消息处理正常完成,必须调用此方法,
    否则rabbitmq会认为消息处理不成功,重新将消息放回待执行队列中
    :param channel: 回调函数的channel参数
    :param method: 回调函数的method参数
    :return:
    """
    channel.basic_ack(delivery_tag=method.delivery_tag)
  def message_handle_failed(channel, method):
    """
    如果消息处理失败,应该调用此方法,会自动将消息放入异常队列
    :param channel: 回调函数的channel参数
    :param method: 回调函数的method参数
    :return:
    """
    channel.basic_reject(delivery_tag=method.delivery_tag, requeue=false)

发布消息代码如下:

from mq.rabbitmq import rabbitmqclient
print("start program")
client = rabbitmqclient()
msg1 = '{"key":"value"}'
client.publish_message('test-delay',msg1,delay=1,ttl=10000)
print("message send out")

消费者代码如下:

from mq.rabbitmq import rabbitmqclient
import json
print("start program")
client = rabbitmqclient()
def callback(ch, method, properties, body):
    msg = body.decode()
    print(msg)
    # 如果处理成功,则调用此消息回复ack,表示消息成功处理完成。
    rabbitmqclient.message_handle_successfully(ch, method)
queue_name = "retryqueue"
client.start_consume(callback,queue_name,delay=0)

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持移动技术网。

如您对本文有疑问或者有任何想说的,请点击进行留言回复,万千网友为您解惑!

相关文章:

验证码:
移动技术网