说明:在多线程情况下,我们需要用到锁来控制线程对资源的访问,当在多线程+分布式的情况下,如果使用synchronized (this)
,这会在每台服务器实例上都生成一个锁对象,而这个锁只会对当前实例生效,无法对其他服务器实例起作用。
或者,当我们的操作涉及到多数据源的情况,也无法使用synchronized (this)
,这时就需要使用到分布式锁,将锁从具体实例中抽出来,放在一个公共的地方。
分布式锁,可以通过Redis或者Zookeeper来实现,本文介绍使用Redis实现分布式锁,以及Redisson的使用。
Demo
首先,搭建一个简单的Demo,有get和set两个接口;
(controller)
import com.hezy.pojo.Student;
import com.hezy.service.DemoService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
@RequestMapping("/demo")
public class DemoController {@Autowiredprivate DemoService demoService;@GetMapping("/get")public String get() {return demoService.get();}@GetMapping("/set")public String set(Student student) {return demoService.set(student);}
}
set接口,用于接收一个Student对象,并存入到Redis中;
@Overridepublic String set(Student student) {redisTemplate.opsForValue().set("student", student);return "success";}
get接口,用于从Redis中读取对象,如果没有则从数据库中读取,并存入到Redis中;
@Overridepublic String get() {// 访问redis缓存Student student = (Student) redisTemplate.opsForValue().get("student");// 如果缓存中有数据,直接返回if (student != null) {System.out.println("从缓存中获取数据");return student.toString();} else {synchronized (this){System.out.println("从数据库中获取数据");student = new Student("hezy", 18);this.set(student);return "success";}}}
(Service实现类)
import com.hezy.pojo.Student;
import com.hezy.service.DemoService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;@Service
public class DemoServiceImpl implements DemoService {@Autowiredprivate RedisTemplate redisTemplate;@Overridepublic String get() {// 访问redis缓存Student student = (Student) redisTemplate.opsForValue().get("student");// 如果缓存中有数据,直接返回if (student != null) {System.out.println("从缓存中获取数据");return student.toString();} else {synchronized (this){System.out.println("从数据库中获取数据");student = new Student("hezy", 18);this.set(student);return "success";}}}@Overridepublic String set(Student student) {redisTemplate.opsForValue().set("student", student);return "success";}
}
(Student对象)
import java.io.Serializable;public class Student implements Serializable {private String name;private int age;public Student() {}public Student(String name, int age) {this.name = name;this.age = age;}public String getName() {return name;}public void setName(String name) {this.name = name;}public int getAge() {return age;}public void setAge(int age) {this.age = age;}@Overridepublic String toString() {return "Student{" +"name='" + name + '\'' +", age=" + age +'}';}
}
测试
分别对set和get接口进行测试;
(set接口)
(Redis中存入了数据)
(get接口)
多线程测试
先删掉Redis中的缓存;
使用Apifox对get接口进行多线程访问测试,看下控制台的输出结果;
执行结束;
控制台结果,可以发现,开头这里访问了三次数据库,这是为什么?
这是因为在多线程情况下,访问数据库,存入Redis的操作还没完成,但是其他的线程已经通过了student == null
的判断,在等待锁释放后,依次访问数据库,所以导致了这种情况;
解决方法是在单例模式中使用到的方法,双重检查锁定(Double-Check Locking),如下,在锁的代码块里面再做一层判断;
public String get() {// 访问redis缓存Student student = (Student) redisTemplate.opsForValue().get("student");// 如果缓存中有数据,直接返回if (student != null) {System.out.println("从缓存中获取数据");return student.toString();} else {synchronized (this){student = (Student) redisTemplate.opsForValue().get("student");if (student != null) {System.out.println("从缓存中获取数据");return student.toString();}System.out.println("从数据库中获取数据");student = new Student("hezy", 18);this.set(student);return "success";}}}
缓存删掉,再跑一遍测试,可以看到这回就只有头一次访问了数据库;
分布式锁
上面的内容不是本文的重点,使用Redis实现分布式锁,具体是通过Redis的这个命令:
setnx [key] [value]
表示当这个key在Redis不存在时,set才能成功,这不正是锁的机制吗?set成功表示获取锁;
在RedisTemplate中,是用下面这个方法;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.redis.core.RedisTemplate;@SpringBootTest
public class RedisTest {@Autowiredprivate RedisTemplate redisTemplate;@Testpublic void test() {Boolean a = redisTemplate.opsForValue().setIfAbsent("lock", "hezy");System.out.println(a);Boolean b = redisTemplate.opsForValue().setIfAbsent("lock", "hezy");System.out.println(b);}
}
执行结果;
可以将上面get()接口的代码改造一下,加上分布式锁,同时在finally代码块中释放分布式锁;
public String get() {// 访问redis缓存Student student = (Student) redisTemplate.opsForValue().get("student");// 如果缓存中有数据,直接返回if (student != null) {System.out.println("从缓存中获取数据");return student.toString();} else {// 获取分布式锁Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", "hezy");if (lock) {try {student = (Student) redisTemplate.opsForValue().get("student");if (student != null) {System.out.println("从缓存中获取数据");return student.toString();}System.out.println("从数据库中获取数据");student = new Student("hezy", 18);this.set(student);return "success";} finally {// 释放锁redisTemplate.delete("lock");}}return "";}}
这就是Redis实现分布式锁的方式,但是仔细分析就会发现这种方式有问题:
如果在try代码块里面有BUG,程序中断,导致finally代码块没有执行,锁无法被释放怎么办?可以考虑给分布式锁设置一个有效时间,到了时间自动释放。
但是时间要设置多少?设置过短,程序没有执行完,有效期到了锁自动释放,其他线程获取锁,然后原来的线程执行完,不就把其他线程的分布式锁给释放了吗?凉凉;设置太长,又会导致程序中断,分布式锁需要较长的时间才能被释放,影响性能;
Redisson
为了解决原生Redis方法实现分布式锁的问题,可以考虑使用Redisson,如下:
(导入依赖)
<!--redisson依赖--><dependency><groupId>org.redisson</groupId><artifactId>redisson-spring-boot-starter</artifactId><version>3.11.2</version></dependency>
(装配对象)
@Autowiredprivate RedissonClient redissonClient;
(改造代码)
@Overridepublic String get() {// 访问redis缓存Student student = (Student) redisTemplate.opsForValue().get("student");// 如果缓存中有数据,直接返回if (student != null) {System.out.println("从缓存中获取数据");return student.toString();} else {// 获取分布式锁RLock lock = redissonClient.getLock("lock");// 尝试获取锁lock.lock();try {student = (Student) redisTemplate.opsForValue().get("student");if (student != null) {System.out.println("从缓存中获取数据");return student.toString();}System.out.println("从数据库中获取数据");student = new Student("hezy", 18);this.set(student);return "success";} finally {// 释放锁lock.unlock();}}}
Redisson底层实现原理是,当线程获取到锁时,会执行lua脚本,将锁写入到Redis中,获取锁失败时,会调用while循环,不断尝试获取锁;而当线程在获取锁后,执行失败会自动释放锁,默认是30秒,而如果锁时间过期,程序还在执行,会启动一个看门狗(Watch Dog)后台线程,不断给锁续期,直至程序执行完成。
以上就是Redis分布式锁的实现,以及Redisson的使用;