当前位置: 移动技术网 > IT编程>数据库>Redis > 详解redis是如何实现队列消息的ack

详解redis是如何实现队列消息的ack

2017年12月08日  | 移动技术网IT编程  | 我要评论
前言 由于公司提供的队列实在太过于蛋疼而且还限制不能使用其他队列,但为了保证数据安全性需要一个可以有ack功能的队列。 原生的redis中通过l/r push/pop方

前言

由于公司提供的队列实在太过于蛋疼而且还限制不能使用其他队列,但为了保证数据安全性需要一个可以有ack功能的队列。

原生的redis中通过l/r push/pop方式来实现队列的功能,这个当然是没办法满足需求的(没有ack功能),所以需要自己对redis的list(队列)做个小小的调整。

大体思路为在pop时将pop出的数据放到备份的地方,当有ack请求(确认消息被消耗)后将备份的信息删除掉;每次在pop前需要检查备份队列中有没有过期的数据没有ack的,如果有则push到list中后再从list中pop出来。

以下脚本使用lua实现,只需要在执行前加载到redis中即可。

消息本身需要包含id属性

push没什么问题,原生即可(此处以lpush为例)

pop时脚本

local not_empty = function(x)
 return (type(x) == "table") and (not x.err) and (#x ~= 0)
end

local qname = argv[1] --队列名称
local currenttime = argv[2] --当前时间,这个需要从外部传入,不能使用redis自身时间,如果使用自身时间可能导致redis本身的backup在重放请求时出现不一致性
local considerasfailmaxtimespan = argv[3] --超时时间设定,当消息超过一定时间还没有ack则认为此消息需要再次入队

local zsetname= qname ..'backup'
local hashname= qname ..'context'

local tmp = redis.call('zrangebyscore',zsetname , '-inf', tonumber(currenttime) - tonumber(considerasfailmaxtimespan), 'limit', 0, 1)
if (not_empty(tmp)) then
 redis.call('zrem', zsetname, tmp[1]) --此处拿出的为消息的唯一id
 redis.call('lpush', qname, redis.call('hget', hashname, tmp[1]))
end
tmp = redis.call('rpop', qname)
if (tmp) then
 local msg = cjson.decode(tmp)
 local id = msg['id']
 redis.call('zadd', zsetname, tonumber(currenttime), id)
 redis.call('hset',hashname , id, tmp)
end
return tmp

ack时候比较简单,只需要将指定id从set和hash中删除即可

 local key = argv[1]
 local qname=argv[2]
 redis.call('zrem', qname..'backup', key)
 redis.call('hdel', qname..'context', key)

在程序中使用前需要显示load这两个脚本,后面直接调用这两个脚本的sha值即可执行。

总结

以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作能带来一定的帮助,如果有疑问大家可以留言交流,谢谢大家对移动技术网的支持。

如您对本文有疑问或者有任何想说的,请点击进行留言回复,万千网友为您解惑!

相关文章:

验证码:
移动技术网