近期工作遇到需要业务场景如下,需要每天定时推送给另一系统一批数据,但是由于系统是集群部署的,会造成统一情况下任务争用的情况,所以需要增加分布式锁来保证一定时间范围内有一个job来完成定时任务. 前期考虑的方案有采用zookeeper分布式任务,quartz分布式任务调度,但是由于zookeeper需要增加额外组件,quartz需要增加表,并且项目中现在已经有redis这一组件存在,所以考虑采用redis分布式锁的情况来完成分布式任务抢占这一功能
记录一下走过的弯路.
第一版本:
@override public <t> long set(string key,t value, long cacheseconds) { if (value instanceof hashmap) { boundhashoperations valueoperations = redistemplate.boundhashops(key); valueoperations.putall((map) value); valueoperations.expire(cacheseconds, timeunit.seconds); } else{ //使用map存储 boundhashoperations valueoperations = redistemplate.boundhashops(key); valueoperations.put(key, value); //秒 valueoperations.expire(cacheseconds, timeunit.seconds); } return null; } @override public void del(string key) { redistemplate.delete(key); }
采用set 和 del 完成锁的占用与释放,后经测试得知,set不是线程安全,在并发情况下常常会导致数据不一致.
第二版本:
/** * 分布式锁 * @param range 锁的长度 允许有多少个请求抢占资源 * @param key * @return */ public boolean getlock(int range, string key) { valueoperations<string, integer> valueoper1 = template.opsforvalue(); return valueoper1.increment(key, 1) <= range; } /** * 初始化锁, 设置等于0 * @param key * @param expireseconds * @return */ public void initlock(string key, long expireseconds) { valueoperations<string, integer> operations = template.opsforvalue(); template.setkeyserializer(new genericjackson2jsonredisserializer()); template.setvalueserializer(new genericjackson2jsonredisserializer()); operations.set(key, 0, expireseconds * 1000); } /** * 释放锁 * @param key */ public void releaselock(string key) { valueoperations<string, integer> operations = template.opsforvalue(); template.setkeyserializer(new genericjackson2jsonredisserializer()); template.setvalueserializer(new genericjackson2jsonredisserializer()); template.delete(key); }
采用redis的 increament操作完成锁的抢占.但是释放锁时,是每个线程都可以删除redis中的key值. 并且initlock会降上一次的操作给覆盖掉,所以也废弃掉此方法
最终版本:
import org.springframework.beans.factory.annotation.autowired; import org.springframework.data.redis.connection.redisconnectionfactory; import org.springframework.data.redis.connection.jedis.jedisconnection; import org.springframework.stereotype.service; import org.springframework.util.reflectionutils; import redis.clients.jedis.jedis; import java.lang.reflect.field; import java.util.collections; @service public class redislock { private static final string lock_success = "ok"; private static final string set_if_not_exist = "nx"; private static final string set_with_expire_time = "px"; private static final long release_success = 1l; @autowired private redisconnectionfactory connectionfactory; /** * 尝试获取分布式锁 * @param lockkey 锁 * @param requestid 请求标识 * @param expiretime 超期时间 * @return 是否获取成功 */ public boolean lock(string lockkey, string requestid, int expiretime) { field jedisfield = reflectionutils.findfield(jedisconnection.class, "jedis"); reflectionutils.makeaccessible(jedisfield); jedis jedis = (jedis) reflectionutils.getfield(jedisfield, connectionfactory.getconnection()); string result = jedis.set(lockkey, requestid, set_if_not_exist, set_with_expire_time, expiretime); if (lock_success.equals(result)) { return true; } return false; } /** * 释放分布式锁 * @param lockkey 锁 * @param requestid 请求标识 * @return 是否释放成功 */ public boolean releaselock(string lockkey, string requestid) { string script = "if redis.call('get', keys[1]) == argv[1] then return redis.call('del', keys[1]) else return 0 end"; object result = getjedis().eval(script, collections.singletonlist(lockkey), collections.singletonlist(requestid)); if (release_success.equals(result)) { return true; } return false; } public jedis getjedis() { field jedisfield = reflectionutils.findfield(jedisconnection.class, "jedis"); reflectionutils.makeaccessible(jedisfield); jedis jedis = (jedis) reflectionutils.getfield(jedisfield, connectionfactory.getconnection()); return jedis; } }
如对本文有疑问, 点击进行留言回复!!
星际无限CTO张超:IPFS分布式存储领域仍是蓝海,中链云将开启行业新思路!
厉害!俩月吃透阿里P8架构师推荐608页kafka源码,成功入职蚂蚁
性能 1.84 倍于 Ceph!网易数帆开源分布式存储系统 Curve
荐 面试半年,上个月成功拿到阿里P7offer,全靠我啃烂了这份2020最新面试题!
网友评论