当前位置: 移动技术网 > IT编程>开发语言>Java > 简单注解实现集群同步锁(spring+redis+注解)

简单注解实现集群同步锁(spring+redis+注解)

2019年07月22日  | 移动技术网IT编程  | 我要评论

互联网面试的时候,是不是面试官常问一个问题如何保证集群环境下数据操作并发问题,常用的synchronized肯定是无法满足了,或许你可以借助for update对数据加锁。本文的最终解决方式你只要在方法上加一个@p4jsyn注解就能保证集群环境下同synchronized的效果,且锁的key可以任意指定。本注解还支持了锁的超时机制。

本文需要对redis、spring和spring-data-redis有一定的了解。当然你可以借助本文的思路对通过注解对方法返回数据进行缓存,类似com.google.code.simple-spring-memcached的@readthroughsinglecache。

第一步:  介绍两个自定义注解p4jsyn、p4jsynkey

p4jsyn:必选项,标记在方法上,表示需要对该方法加集群同步锁;

p4jsynkey:可选项,加在方法参数上,表示以方法某个参数作为锁的key,用来保证更多的坑,p4jsynkey并不是强制要添加的,当没有p4jsynkey标记的情况下只会以p4jsyn的synkey作为锁key。

package com.yaoguoyin.redis.lock; 
import java.lang.annotation.elementtype; 
import java.lang.annotation.inherited; 
import java.lang.annotation.retention; 
import java.lang.annotation.retentionpolicy; 
import java.lang.annotation.target; 
/** 
 * <b>同步锁:</b><br/> 
 * 主要作用是在服务器集群环境下保证方法的synchronize;<br/> 
 * 标记在方法上,使该方法的执行具有互斥性,并不保证并发执行方法的先后顺序;<br/> 
 * 如果原有“a任务”获取锁后任务执行时间超过最大允许持锁时间,且锁被“b任务”获取到,在“b任务”成功货物锁会并不会终止“a任务”的执行;<br/> 
 * <br/> 
 * <b>注意:</b><br/> 
 * 使用过程中需要注意keepmills、towait、sleepmills、maxsleepmills等参数的场景使用;<br/> 
 * 需要安装redis,并使用spring和spring-data-redis等,借助redis nx等方法实现。 
 * 
 * @see com.yaoguoyin.redis.lock.p4jsynkey 
 * @see com.yaoguoyin.redis.lock.redislockaspect 
 * 
 * @author partner4java 
 * 
 */ 
@target({ elementtype.method }) 
@retention(retentionpolicy.runtime) 
@inherited 
public @interface p4jsyn { 
 /** 
 * 锁的key<br/> 
 * 如果想增加坑的个数添加非固定锁,可以在参数上添加@p4jsynkey注解,但是本参数是必写选项<br/> 
 * redis key的拼写规则为 "redissyn+" + synkey + @p4jsynkey<br/> 
 * 
 */ 
 string synkey(); 
 /** 
 * 持锁时间,超时时间,持锁超过此时间自动丢弃锁<br/> 
 * 单位毫秒,默认20秒<br/> 
 * 如果为0表示永远不释放锁,在设置为0的情况下towait为true是没有意义的<br/> 
 * 但是没有比较强的业务要求下,不建议设置为0 
 */ 
 long keepmills() default 20 * 1000; 
 /** 
 * 当获取锁失败,是继续等待还是放弃<br/> 
 * 默认为继续等待 
 */ 
 boolean towait() default true; 
 /** 
 * 没有获取到锁的情况下且towait()为继续等待,睡眠指定毫秒数继续获取锁,也就是轮训获取锁的时间<br/> 
 * 默认为10毫秒 
 * 
 * @return 
 */ 
 long sleepmills() default 10; 
 /** 
 * 锁获取超时时间:<br/> 
 * 没有获取到锁的情况下且towait()为true继续等待,最大等待时间,如果超时抛出 
 * {@link java.util.concurrent.timeoutexception.timeoutexception} 
 * ,可捕获此异常做相应业务处理;<br/> 
 * 单位毫秒,默认一分钟,如果设置为0即为没有超时时间,一直获取下去; 
 * 
 * @return 
 */ 
 long maxsleepmills() default 60 * 1000; 
} 
package com.yaoguoyin.redis.lock; 
import java.lang.annotation.elementtype; 
import java.lang.annotation.inherited; 
import java.lang.annotation.retention; 
import java.lang.annotation.retentionpolicy; 
import java.lang.annotation.target; 
/** 
 * <b>同步锁 key</b><br/> 
 * 加在方法的参数上,指定的参数会作为锁的key的一部分 
 * 
 * @author partner4java 
 * 
 */ 
@target({ elementtype.parameter }) 
@retention(retentionpolicy.runtime) 
@inherited 
public @interface p4jsynkey { 
 /** 
 * key的拼接顺序 
 * 
 * @return 
 */ 
 int index() default 0; 
} 

这里就不再对两个注解进行使用上的解释了,因为注释已经说明的很详细了。

使用示例:

package com.yaoguoyin.redis.lock; 
import org.springframework.stereotype.component; 
@component 
public class systest { 
 private static int i = 0; 
 @p4jsyn(synkey = "12345") 
 public void add(@p4jsynkey(index = 1) string key, @p4jsynkey(index = 0) int key1) { 
 i++; 
 system.out.println("i=-===========" + i); 
 } 
} 

第二步:切面编程

在不影响原有代码的前提下,保证执行同步,目前最直接的方式就是使用切面编程

package com.yaoguoyin.redis.lock; 
import java.lang.annotation.annotation; 
import java.lang.reflect.method; 
import java.util.sortedmap; 
import java.util.treemap; 
import java.util.concurrent.timeunit; 
import java.util.concurrent.timeoutexception; 
import org.aspectj.lang.proceedingjoinpoint; 
import org.aspectj.lang.annotation.around; 
import org.aspectj.lang.annotation.aspect; 
import org.aspectj.lang.reflect.methodsignature; 
import org.springframework.beans.factory.annotation.autowired; 
import org.springframework.beans.factory.annotation.qualifier; 
import org.springframework.data.redis.core.boundvalueoperations; 
import org.springframework.data.redis.core.redistemplate; 
/** 
 * 锁的切面编程<br/> 
 * 针对添加@redislock 注解的方法进行加锁 
 * 
 * @see com.yaoguoyin.redis.lock.p4jsyn 
 * 
 * @author partner4java 
 * 
 */ 
@aspect 
public class redislockaspect { 
 @autowired 
 @qualifier("redistemplate") 
 private redistemplate<string, long> redistemplate; 
 @around("execution(* com.yaoguoyin..*(..)) && @annotation(com.yaoguoyin.redis.lock.p4jsyn)") 
 public object lock(proceedingjoinpoint pjp) throws throwable { 
 p4jsyn lockinfo = getlockinfo(pjp); 
 if (lockinfo == null) { 
  throw new illegalargumentexception("配置参数错误"); 
 } 
 string synkey = getsynkey(pjp, lockinfo.synkey()); 
 if (synkey == null || "".equals(synkey)) { 
  throw new illegalargumentexception("配置参数synkey错误"); 
 } 
 boolean lock = false; 
 object obj = null; 
 try { 
  // 超时时间 
  long maxsleepmills = system.currenttimemillis() + lockinfo.maxsleepmills(); 
  while (!lock) { 
  long keepmills = system.currenttimemillis() + lockinfo.keepmills(); 
  lock = setifabsent(synkey, keepmills); 
  // 得到锁,没有人加过相同的锁 
  if (lock) { 
   obj = pjp.proceed(); 
  } 
  // 锁设置了没有超时时间 
  else if (lockinfo.keepmills() <= 0) { 
   // 继续等待获取锁 
   if (lockinfo.towait()) { 
   // 如果超过最大等待时间抛出异常 
   if (lockinfo.maxsleepmills() > 0 && system.currenttimemillis() > maxsleepmills) { 
    throw new timeoutexception("获取锁资源等待超时"); 
   } 
   timeunit.milliseconds.sleep(lockinfo.sleepmills()); 
   } else { 
   break; 
   } 
  } 
  // 已过期,并且getandset后旧的时间戳依然是过期的,可以认为获取到了锁 
  else if (system.currenttimemillis() > getlock(synkey) && (system.currenttimemillis() > getset(synkey, keepmills))) { 
   lock = true; 
   obj = pjp.proceed(); 
  } 
  // 没有得到任何锁 
  else { 
   // 继续等待获取锁 
   if (lockinfo.towait()) { 
   // 如果超过最大等待时间抛出异常 
   if (lockinfo.maxsleepmills() > 0 && system.currenttimemillis() > maxsleepmills) { 
    throw new timeoutexception("获取锁资源等待超时"); 
   } 
   timeunit.milliseconds.sleep(lockinfo.sleepmills()); 
   } 
   // 放弃等待 
   else { 
   break; 
   } 
  } 
  } 
 } catch (exception e) { 
  e.printstacktrace(); 
  throw e; 
 } finally { 
  // 如果获取到了锁,释放锁 
  if (lock) { 
  releaselock(synkey); 
  } 
 } 
 return obj; 
 } 
 /** 
 * 获取包括方法参数上的key<br/> 
 * redis key的拼写规则为 "redissyn+" + synkey + @p4jsynkey 
 * 
 */ 
 private string getsynkey(proceedingjoinpoint pjp, string synkey) { 
 try { 
  synkey = "redissyn+" + synkey; 
  object[] args = pjp.getargs(); 
  if (args != null && args.length > 0) { 
  methodsignature methodsignature = (methodsignature) pjp.getsignature(); 
  annotation[][] paramannotationarrays = methodsignature.getmethod().getparameterannotations(); 
  sortedmap<integer, string> keys = new treemap<integer, string>(); 
 
  for (int ix = 0; ix < paramannotationarrays.length; ix++) { 
   p4jsynkey p4jsynkey = getannotation(p4jsynkey.class, paramannotationarrays[ix]); 
   if (p4jsynkey != null) { 
   object arg = args[ix]; 
   if (arg != null) { 
    keys.put(p4jsynkey.index(), arg.tostring()); 
   } 
   } 
  } 
  if (keys != null && keys.size() > 0) { 
   for (string key : keys.values()) { 
   synkey = synkey + key; 
   } 
  } 
  } 
  return synkey; 
 } catch (exception e) { 
  e.printstacktrace(); 
 } 
 return null; 
 } 
 @suppresswarnings("unchecked") 
 private static <t extends annotation> t getannotation(final class<t> annotationclass, final annotation[] annotations) { 
 if (annotations != null && annotations.length > 0) { 
  for (final annotation annotation : annotations) { 
  if (annotationclass.equals(annotation.annotationtype())) { 
   return (t) annotation; 
  } 
  } 
 } 
 return null; 
 } 
 /** 
 * 获取redislock注解信息 
 */ 
 private p4jsyn getlockinfo(proceedingjoinpoint pjp) { 
 try { 
  methodsignature methodsignature = (methodsignature) pjp.getsignature(); 
  method method = methodsignature.getmethod(); 
  p4jsyn lockinfo = method.getannotation(p4jsyn.class); 
  return lockinfo; 
 } catch (exception e) { 
  e.printstacktrace(); 
 } 
 return null; 
 } 
 public boundvalueoperations<string, long> getoperations(string key) { 
 return redistemplate.boundvalueops(key); 
 } 
 /** 
 * set {@code value} for {@code key}, only if {@code key} does not exist. 
 * <p> 
 * see http://redis.io/commands/setnx 
 * 
 * @param key 
 *  must not be {@literal null}. 
 * @param value 
 *  must not be {@literal null}. 
 * @return 
 */ 
 public boolean setifabsent(string key, long value) { 
 return getoperations(key).setifabsent(value); 
 } 
 public long getlock(string key) { 
 long time = getoperations(key).get(); 
 if (time == null) { 
  return 0; 
 } 
 return time; 
 } 
 public long getset(string key, long value) { 
 long time = getoperations(key).getandset(value); 
 if (time == null) { 
  return 0; 
 } 
 return time; 
 } 
 public void releaselock(string key) { 
 redistemplate.delete(key); 
 } 
} 

redislockaspect会对添加注解的方法进行特殊处理,具体可看lock方法。

大致思路就是:

1、首选借助redis本身支持对应的setifabsent方法,该方法的特点是如果redis中已有该数据不保存返回false,不存该数据保存返回true;

2、如果setifabsent返回true标识拿到同步锁,可进行操作,操作后并释放锁;

3、如果没有通过setifabsent拿到数据,判断是否对锁设置了超时机制,没有设置判断是否需要继续等待;

4、判断是否锁已经过期,需要对(system.currenttimemillis() > getlock(synkey) && (system.currenttimemillis() > getset(synkey, keepmills)))进行细细的揣摩一下,getset可能会改变了其他人拥有锁的超时时间,但是几乎可以忽略;

5、没有得到任何锁,判断继续等待还是退出。

第三步:spring的基本配置

#*****************jedis连接参数设置*********************# 
 
#redis服务器ip # 
redis.hostname=127.0.0.1 
 
#redis服务器端口号# 
redis.port=6379 
 
#redis服务器外部访问密码 
redis.password=xxxxxxxxxx 
 
#************************jedis池参数设置*******************# 
 
#jedis的最大分配对象# 
jedis.pool.maxactive=1000 
 
jedis.pool.minidle=100 
 
#jedis最大保存idel状态对象数 # 
jedis.pool.maxidle=1000 
 
#jedis池没有对象返回时,最大等待时间 # 
jedis.pool.maxwait=5000 
 
#jedis调用borrowobject方法时,是否进行有效检查# 
jedis.pool.testonborrow=true 
 
#jedis调用returnobject方法时,是否进行有效检查 # 
jedis.pool.testonreturn=true 
<?xml version="1.0" encoding="utf-8"?> 
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/xmlschema-instance" xmlns:p="http://www.springframework.org/schema/p" xmlns:context="http://www.springframework.org/schema/context" xmlns:jee="http://www.springframework.org/schema/jee"xmlns:tx="http://www.springframework.org/schema/tx" xmlns:aop="http://www.springframework.org/schema/aop"xmlns:redis="http://www.springframework.org/schema/redis" xmlns:cache="http://www.springframework.org/schema/cache" xsi:schemalocation="http://www.springframework.org/schema/beans  http://www.springframework.org/schema/beans/spring-beans-4.2.xsd  http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd  http://www.springframework.org/schema/aop  http://www.springframework.org/schema/aop/spring-aop-4.1.xsd  http://www.springframework.org/schema/redis  http://www.springframework.org/schema/redis/spring-redis.xsd 
http://www.springframework.org/schema/cache  http://www.springframework.org/schema/cache/spring-cache.xsd"> 
 <!-- 开启注解 --> 
 <aop:aspectj-autoproxy /> 
 <bean class="com.yaoguoyin.redis.lock.redislockaspect" /> 
 <!-- 扫描注解包范围 --> 
 <context:component-scan base-package="com.yaoguoyin" /> 
 <!-- 引入redis配置 --> 
 <context:property-placeholder location="classpath:config.properties" /> 
 <!-- 连接池 --> 
 <bean id="poolconfig" class="redis.clients.jedis.jedispoolconfig"> 
 <property name="minidle" value="${jedis.pool.minidle}" /> 
 <property name="maxidle" value="${jedis.pool.maxidle}" /> 
 <property name="maxwaitmillis" value="${jedis.pool.maxwait}" /> 
 </bean> 
 <!-- p:password="${redis.pass}" --> 
 <bean id="redisconnectionfactory" class="org.springframework.data.redis.connection.jedis.jedisconnectionfactory" p:host-name="${redis.hostname}" p:port="${redis.port}" 
 p:password="${redis.password}" p:pool-config-ref="poolconfig" /> 
 <!-- 类似于jdbctemplate --> 
 <bean id="redistemplate" class="org.springframework.data.redis.core.redistemplate" p:connection-factory-ref="redisconnectionfactory" /> 
</beans> 

redis的安装本文就不再说明。

测试

package com.yaoguoyin.redis; 
import org.junit.runner.runwith; 
import org.springframework.test.context.contextconfiguration; 
import org.springframework.test.context.junit4.abstractjunit4springcontexttests; 
import org.springframework.test.context.junit4.springjunit4classrunner; 
@runwith(springjunit4classrunner.class) 
@contextconfiguration(locations = { "classpath:meta-inf/spring/redis.xml" }) 
public class basetest extends abstractjunit4springcontexttests { 
} 
package com.yaoguoyin.redis.lock; 
import java.util.concurrent.timeunit; 
import org.junit.test; 
import org.springframework.beans.factory.annotation.autowired; 
import com.yaoguoyin.redis.basetest; 
public class redistest extends basetest { 
 @autowired 
 private systest systest; 
 @test 
 public void testhello() throws interruptedexception { 
 for (int i = 0; i < 100; i++) { 
  new thread(new runnable() { 
  @override 
  public void run() { 
   try { 
   timeunit.seconds.sleep(1); 
   } catch (interruptedexception e) { 
   e.printstacktrace(); 
   } 
   systest.add("xxxxx", 111111); 
  } 
  }).start(); 
 } 
 timeunit.seconds.sleep(20); 
 } 
 @test 
 public void testhello2() throws interruptedexception{ 
 systest.add("xxxxx", 111111); 
 timeunit.seconds.sleep(10); 
 } 
} 

你可以对

void com.yaoguoyin.redis.lock.systest.add(@p4jsynkey(index=1) string key, @p4jsynkey(index=0) int key1)

去除注解@p4jsyn进行测试对比。

ps:本demo的执行性能取决于redis和java交互距离;成千山万单锁并发建议不要使用这种形式,直接通过redis等解决,本demo只解决小并发不想耦合代码的形式。

以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,同时也希望多多支持移动技术网!

如对本文有疑问, 点击进行留言回复!!

相关文章:

验证码:
移动技术网