基于redisson实现分布式锁
之前背过分布式锁几种实现方案的八股文,但是并没有真正自己实操过。现在对AOP有了更深一点的理解,就自己来实现一遍。
1、分布式锁的基础知识
分布式锁是相对于普通的锁的。普通的锁在具体的方法层面去锁,单体应用情况下,各个进入的请求都只能进入到一个应用里面,也就能达到锁住方法的效果。
而分布式的系统,将一个项目部署了多个实例,通过nginx去做请求转发将请求分到各个实例。不同实例之间共用代码,共用数据库这些。比如一个请求进入A实例,获得了锁;如果继续有请求进入A实例,则会排队等待。但如果请求进入的是B实例呢?B实例的锁和A实例没有关系,那么进入B实例的请求也会获取到锁,然后进入方法。这样锁的作用就没有达到。这种情况下,就引出了分布式锁,这是专门为了解决分布式系统的并发问题的。做法是让不同的实例都能使用同一个锁。比如redis,redis内部是单线程的,把锁放在redis,这样就可以多个实例共用一个锁。
2、Redisson
Redisson是架设在Redis基础上的一个Java驻内存数据网格(In-Memory Data Grid)。充分的利用了Redis键值数据库提供的一系列优势,基于Java实用工具包中常用接口,为使用者提供了一系列具有分布式特性的常用工具类。
利用Redisson可以很轻松就实现分布式锁。
3、实现的几个点
如何动态将拼接key所需的参数值传入到切面?
有两种方案,一种是通过给参数加上注解来标识那些参数是作为key使用的。在切面内通过获取方法的参数上的注解来获取所需的参数值。
另一种是通过定义key的参数名,在切面获取方法参数的参数名进行对比,一样的就是所需的参数。
获取锁失败如何重试?
Redisson提供了重试的能力,以及重试等待时长等。
4、代码实现
前置操作:项目引入Redis
引入Redisson依赖
<dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.13.6</version></dependency>
项目yaml文件
spring:#reids配置redis:database: 1host: 127.0.0.1port: 6379# redisson配置redisson:# 数据库编号database: 1# 节点地址address: redis://127.0.0.1:6379
编写Redisson配置文件
package com.yumoxuan.myapp.config;
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RedissonConfig {@Value("${spring.redis.redisson.address}")public String address;@Value("${spring.redis.redisson.database}")public Integer database;@Beanpublic RedissonClient getRedissonClient(){Config config=new Config();config.useSingleServer().setAddress(address).setDatabase(database);return Redisson.create(config);}
}
定义分布式锁注解
package com.yumoxuan.myapp.core.aspect.annotation;import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import java.util.concurrent.TimeUnit;@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface DistributedLock {/*** 锁的名称* @return*/String name() default "";/*** 锁的参数key* @return*/String[] key() default {};/*** 锁过期时间* @return*/int expireTime() default 30;/*** 锁过期时间单位* @return*/TimeUnit timeUnit() default TimeUnit.SECONDS;/*** 等待获取锁超时时间* @return*/int waitTime() default 10;
}
切面实现
package com.yumoxuan.myapp.core.aspect;
import com.yumoxuan.myapp.core.aspect.annotation.DistributedLock;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.lang.reflect.Method;
import java.util.concurrent.TimeUnit;
@Slf4j
@Aspect
@Component
public class DistributedLockAspect {@Resourcepublic RedissonClient redissonClient;@Pointcut("@annotation(com.yumoxuan.myapp.core.aspect.annotation.DistributedLock)")public void pointcut() {}@Around("pointcut()")public Object around(ProceedingJoinPoint point) throws Throwable {MethodSignature signature = (MethodSignature) point.getSignature();Method method = signature.getMethod();DistributedLock annotation = method.getAnnotation(DistributedLock.class);String name = annotation.name();String key = name;String[] keys = annotation.key();int expireTime = annotation.expireTime();TimeUnit timeUnit = annotation.timeUnit();int waitTime = annotation.waitTime();Object[] args = point.getArgs(); // 参数值String[] paramNames = signature.getParameterNames(); // 参数名for (int i = 0; i < args.length; i++) {String paramName = paramNames[i];for (int j = 0; j < keys.length; j++) {if (paramName.equals(keys[j])) {key = key + ":" + args[i];}}}log.info(Thread.currentThread().getId()+" "+"key:"+key);RLock lock = redissonClient.getLock(key);boolean res = false;Object obj = null;try {if (waitTime == -1) {res = true;lock.lock(expireTime, timeUnit);} else {res = lock.tryLock(waitTime, expireTime, timeUnit);}if (res) {obj = point.proceed();} else {log.error("分布式锁获取异常");}} finally {//释放锁if (res) {lock.unlock();}}return obj;}
}
验证效果
@ApiOperation(value="my_app_user-添加", notes="my_app_user-添加")@PostMapping(value = "/add")@DistributedLock(name = "添加分布式锁",key = {"userId"})public Result<?> add(@RequestBody MyAppUser myAppUser, String userId) {log.info(Thread.currentThread().getId()+" "+new Date());try {Thread.sleep(5000);} catch (InterruptedException e) {e.printStackTrace();}return Result.ok();}
如下图,76.77的key是一样的,从打印的时间上看,两次打印时间相差了5秒,说明后者被阻塞。而68的线程key和另外两个不一样,所以没有被阻塞,在76和77的中间就运行了。符合分布式锁的效果。
5、拓展
要实现一个功能完善的分布式锁,其实还有很多内容。
比如锁要过期了,但事务未执行完毕,则需要进行锁续期,这个可以通过守护线程实现。