背景
某些场景下,有可能一个方法不能被并发执行,有可能一个方法的特定参数不能被并发执行。比如不能将一个消息发送多次,创建缓存最好只创建一次等等。为了实现上面的目标我们就需要采用同步机制来完成,但同步的逻辑如何实现呢,是否会影响到原有逻辑呢?
嵌入式
这里讲的嵌入式是说获取锁以及释放锁的逻辑与业务代码耦合在一起,又分分布式与单机两种不同场景的不同实现。
单机版本
下面方法,每个productId不允许并发访问,所以这里可以直接用synchronized来锁定不同的参数。
@Service public class ProductAppService { public void invoke(Integer productId) { synchronized (productId) { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.print("productId:" + productId+" time:"+new Date()); } } }
测试脚本:三个相同的参数0,两个不同的参数1和2,通过一个多线程的例子来模似。如果有并发请求的测试工具可能效果会更好。
private void testLock(){ ExecutorService executorService= Executors.newFixedThreadPool(5); executorService.submit(new Runnable() { @Override public void run() { productAppService.invoke2(0); } }); executorService.submit(new Runnable() { @Override public void run() { productAppService.invoke2(0); } }); executorService.submit(new Runnable() { @Override public void run() { productAppService.invoke2(0); } }); executorService.submit(new Runnable() { @Override public void run() { productAppService.invoke2(1); } }); executorService.submit(new Runnable() { @Override public void run() { productAppService.invoke2(2); } }); executorService.shutdown(); }
测试结果如下,0,1,2三个请求未被阻塞,后面的两个0被阻塞。
分布式版本
分布式的除了锁机制不同之外其它的测试方法相同,这里只贴出锁的部分:
public void invoke2(Integer productId) { RLock lock=this.redissonService.getRedisson().getLock(productId.toString()); try { boolean locked=lock.tryLock(3000,500, TimeUnit.MILLISECONDS); if(locked){ Thread.sleep(1000); System.out.print("productId:" + productId+" time:"+new Date()); } } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } }
嵌入式的缺点
比较明显的就是锁的逻辑与业务逻辑混合在一起,增加了程序复杂度而且也不利于锁机制的更替。
注解式
能否将锁的逻辑隐藏起来,通过在特定方法上增加注解来实现呢?就像Spring Cache的应用。当然是可以的,这里我们只需要解决如下三个问题:
定义注解
锁一般有如下几个属性:
@Retention(RetentionPolicy.RUNTIME) @Target(ElementType.METHOD) public @interface RequestLockable { String[] key() default ""; long maximumWaiteTime() default 2000; long expirationTime() default 1000; TimeUnit timeUnit() default TimeUnit.MILLISECONDS; }
实现注解
由于我们的目标是注解式锁,这里通过AOP的方式来实现,具体依赖AspectJ,创建一个拦截器:
public abstract class AbstractRequestLockInterceptor { protected abstract Lock getLock(String key); protected abstract boolean tryLock(long waitTime, long leaseTime, TimeUnit unit,Lock lock) throws InterruptedException; /** * 包的表达式目前还有待优化 TODO */ @Pointcut("execution(* com.chanjet.csp..*(..)) && @annotation(com.chanjet.csp.product.core.annotation.RequestLockable)") public void pointcut(){} @Around("pointcut()") public Object doAround(ProceedingJoinPoint point) throws Throwable{ Signature signature = point.getSignature(); MethodSignature methodSignature = (MethodSignature) signature; Method method = methodSignature.getMethod(); String targetName = point.getTarget().getClass().getName(); String methodName = point.getSignature().getName(); Object[] arguments = point.getArgs(); if (method != null && method.isAnnotationPresent(RequestLockable.class)) { RequestLockable requestLockable = method.getAnnotation(RequestLockable.class); String requestLockKey = getLockKey(method,targetName, methodName, requestLockable.key(), arguments); Lock lock=this.getLock(requestLockKey); boolean isLock = this.tryLock(requestLockable.maximumWaiteTime(),requestLockable.expirationTime(), requestLockable.timeUnit(),lock); if(isLock) { try { return point.proceed(); } finally { lock.unlock(); } } else { throw new RuntimeException("获取锁资源失败"); } } return point.proceed(); } private String getLockKey(Method method,String targetName, String methodName, String[] keys, Object[] arguments) { StringBuilder sb = new StringBuilder(); sb.append("lock.").append(targetName).append(".").append(methodName); if(keys != null) { String keyStr = Joiner.on(".").skipNulls().join(keys); if(!StringUtils.isBlank(keyStr)) { LocalVariableTableParameterNameDiscoverer discoverer = new LocalVariableTableParameterNameDiscoverer(); String[] parameters =discoverer.getParameterNames(method); ExpressionParser parser = new SpelExpressionParser(); Expression expression = parser.parseExpression(keyStr); EvaluationContext context = new StandardEvaluationContext(); int length = parameters.length; if (length > 0) { for (int i = 0; i < length; i++) { context.setVariable(parameters[i], arguments[i]); } } String keysValue = expression.getValue(context, String.class); sb.append("#").append(keysValue); } } return sb.toString(); } }
注意如下几点:
为什么会存在抽象方法?那是为下面的将注解机制与具体的锁实现解耦服务的,目的是希望注解式锁能够得到复用也便于扩展。锁的key生成规则是什么?前缀一般是方法所在类的完全限定名,方法名称以及spel表达式来构成,避免重复。SPEL表达式如何支持?
LocalVariableTableParameterNameDiscoverer它在Spring MVC解析Controller的参数时有用到,可以从一个Method对象中获取参数名称列表。
SpelExpressionParser是标准的spel解析器,利用上面得来的参数名称列表以及参数值列表来获取真实表达式。
问题
基于aspectj的拦截器,@Pointcut中的参数目前未找到动态配置的方法,如果有解决方案的可以告诉我。
将注解机制与具体的锁实现解耦
注解式锁理论上应该与具体的锁实现细节分离,客户端可以任意指定锁,可以是单机下的ReentrantLock也可以是基于redis的分布式锁,当然也可以是基于zookeeper的锁,基于此目的上面我们创建的AbstractRequestLockInterceptor这个拦截器是个抽象类。看下基于redis的分布式锁的子类实现:
@Aspect public class RedisRequestLockInterceptor extends AbstractRequestLockInterceptor { @Autowired private RedissonService redissonService; private RedissonClient getRedissonClient(){ return this.redissonService.getRedisson(); } @Override protected Lock getLock(String key) { return this.getRedissonClient().getLock(key); } @Override protected boolean tryLock(long waitTime, long leaseTime, TimeUnit unit,Lock lock) throws InterruptedException { return ((RLock)lock).tryLock(waitTime,leaseTime,unit); } }
注解式锁的应用
只需要在需要同步的方法上增加@RequestLockable,然后根据需要指定或者不指定key,也可以根据实际场景配置锁等待时间以及锁的生命周期。
@RequestLockable(key = {"#productId"}) public void invoke3(Integer productId) { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.print("productId:" + productId+" time:"+new Date()); }
当然为了拦截器生效,我们需要在配置文件中配置上拦截器。
注解式锁的优点:锁的逻辑与业务代码完全分离,降低了复杂度。灵活的spel表达式可以灵活的构建锁的key。支持多种锁,可以随意切换而不影响业务代码。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。