锁存在的意义:
原因其实很简单:因为我们想让同一时刻只有一个线程在执行某段代码。
因为如果同时出现多个线程去执行,可能会带来我们不想要的结果,可能是数据错误,也可能是服务宕机等等。
例如:某平台做活动“秒杀茅台”,假如活动只秒杀1瓶,但是同时有10万人在同一时刻去抢,如果底层不做控制,有10000个人抢到了,额外的9999瓶平台就要自己想办法解决了。此时,我们可以在底层通过加锁或者隐式加锁的方式来解决这个问题。
又比如,以淘宝双11为例,在0点这一刻,如果有几十万甚至上百万的人同时去查看某个商品的详情,这时候会触发商品的查询,如果我们不做控制,全部走到数据库去,那是有可能直接将数据库打垮的。
这个时候一个比较常用的做法就是进行加锁,只让1个线程去查询,其他线程等待这个线程的查询结果后,直接拿结果。在这个例子中,锁用于控制访问数据库的流量,最终起到了保护系统的作用。
分布式锁的概念:
分布式锁其实可以理解为:控制分布式系统有序的去对共享资源进行操作,通过互斥来保持一致性。
举个例子:假设共享的资源就是一个藏有宝藏的房子,里面有各种珠宝,分布式系统就是要进房子寻宝的人,分布式锁就是保证这个房子只有一个门并且一次只有一个人可以进,而且门只有一把钥匙。然后许多人想要进去寻宝,可以,排队,第一个人拿着钥匙把门打开进屋看书并且把门锁上,然后第二个人没有钥匙,那就等着,等第一个出来,然后你在拿着钥匙进去,然后就是以此类推。
锁的分类:
1.线程锁
主要用来给方法、代码块加锁。当某个方法或代码使用锁,在同一时刻仅有一个线程执行该方法或该代码段。线程锁只在同一JVM中有效果,因为线程锁的实现在根本上是依靠线程之间共享内存实现的,比如synchronized是共享对象头,显示锁Lock是共享某个变量(state)。
2.进程锁
为了控制同一操作系统中多个进程访问某个共享资源,因为进程具有独立性,各个进程无法访问其他进程的资源,因此无法通过synchronized等线程锁实现进程锁。
3.分布式锁
当多个进程不在同一个系统中,用分布式锁控制多个进程对资源的访问。
实现分布式锁的方式:
实现分布式锁的方式其实很多,只要能保证对于抢夺“锁”的系统来说,这个东西是唯一的,那么就能用于实现分布式锁。
举个简单的例子,有一个 MySQL 数据库 Order,Order 库里有个 Lock 表只有一条记录,该记录有个状态字段 lock_status,默认为0,表示空闲状态,可以修改为1,表示成功获取锁。
我们的订单系统部署在100台服务器上,这100台服务器可以在“同一时刻”对上述的这1条记录执行修改,修改内容都是从0修改为1,但是 MysQL 会保证最终只会有1个线程修改成功。因此,这条记录其实就可以用于做分布式锁。
常见实现分布式锁的方式有:数据库、Redis、Zookeeper。
分布式锁的实现方式:
1、基于数据库表的方式实现
在数据库中创建一个表,表中包含方法名等字段,并在方法名字段上创建唯一索引,想要执行某个方法,就使用这个方法名向表中插入数据,成功插入则获取锁,执行完成后删除对应的行数据释放锁。
建表语句如下:
DROP TABLE IF EXISTS `method_lock`;
CREATE TABLE `method_lock` (`id` int(11) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',`method_name` varchar(64) NOT NULL COMMENT '锁定的方法名',`desc` varchar(255) NOT NULL COMMENT '备注信息',`update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,PRIMARY KEY (`id`),UNIQUE KEY `uidx_method_name` (`method_name`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8 COMMENT='锁定中的方法';
执行方法
想要执行某个方法,就调用这个方法向数据表method_lock中插入数据:
INSERT INTO method_lock (method_name, desc)
VALUES ('methodName', '测试的methodName');
锁释放
成功插入则表示获取到锁,插入失败则表示获取锁失败;插入成功后,就好继续方法体的内容,执行完成后删除对应的行数据释放锁:
delete from method_lock where method_name ='methodName';
缺点:
这把锁强依赖数据库的可用性,数据库是一个单点,一旦数据库挂掉,会导致业务系统不可用。
这把锁没有失效时间,一旦解锁操作失败,就会导致锁记录一直在数据库中,其他线程无法再获得到锁。
这把锁只能是非阻塞的,因为数据的insert操作,一旦插入失败就会直接报错。没有获得锁的线程并不会进入排队队列,要想再次获得锁就要再次触发获得锁操作。
这把锁是非重入的,同一个线程在没有释放锁之前无法再次获得该锁。因为数据中数据已经存在了。
这把锁是非公平锁,所有等待锁的线程凭运气去争夺锁。
调优:
因为是基于数据库实现的,数据库的可用性和性能将直接影响分布式锁的可用性及性能,所以,数据库需要双机部署、数据同步、主备切换;
不具备可重入的特性,因为同一个线程在释放锁之前,行数据一直存在,无法再次成功插入数据,所以,需要在表中新增一列,用于记录当前获取到锁的机器和线程信息,在再次获取锁的时候,先查询表中机器和线程信息是否和当前机器和线程相同,若相同则直接获取锁;
没有锁失效机制,因为有可能出现成功插入数据后,服务器宕机了,对应的数据没有被删除,当服务恢复后一直获取不到锁,所以,需要在表中新增一列,用于记录失效时间,并且需要有定时任务清除这些失效的数据;
不具备阻塞锁特性,获取不到锁直接返回失败,所以需要优化获取逻辑,循环多次去获取。
在实施的过程中会遇到各种不同的问题,为了解决这些问题,实现方式将会越来越复杂;依赖数据库需要一定的资源开销,性能问题需要考虑。
2.基于redis的实现方案
实现方式:
(1)获取锁的时候,使用setnx加锁,并使用expire命令为锁添加一个超时时间,超过该时间则自动释放锁,锁的value值为一个随机生成的UUID,通过此在释放锁的时候进行判断。
(2)获取锁的时候还设置一个获取的超时时间,若超过这个时间则放弃获取锁。
(3)释放锁的时候,通过UUID判断是不是该锁,若是该锁,则执行delete进行锁释放。
import redis.clients.jedis.Jedis;
import redis.clients.jedis.params.SetParams;import java.util.Collections;public class RedisTool2 {private static Jedis jedis = new Jedis("127.0.0.1",6379);private static final String LOCK_SUCCESS = "OK";private static final String SET_IF_NOT_EXIST = "NX";private static final String SET_WITH_EXPIRE_TIME = "PX";private static final Long RELEASE_SUCCESS = 1L;/*** EX seconds : 将键的过期时间设置为 seconds 秒。 执行 SET key value EX seconds 的效果等同于执行 SETEX key seconds value 。** PX milliseconds : 将键的过期时间设置为 milliseconds 毫秒。 执行 SET key value PX milliseconds 的效果等同于执行 PSETEX key milliseconds value 。** NX : 只在键不存在时, 才对键进行设置操作。 执行 SET key value NX 的效果等同于执行 SETNX key value 。** XX : 只在键已经存在时, 才对键进行设置操作。*//*** 尝试获取分布式锁* @param lockKey 锁* @param requestId 请求标识* @param expireTime 超期时间(过期时间) 需要根据实际的业务场景确定* @return 是否获取成功*/public static boolean tryGetDistributedLock(String lockKey, String requestId, int expireTime) {SetParams params = new SetParams();String result = jedis.set(lockKey, requestId, params.nx().ex(expireTime));if (LOCK_SUCCESS.equals(result)) {return true;}return false;}/*** 尝试获取分布式锁* @param lockKey 锁* @param requestId 请求标识* @param expireTime 超期时间(过期时间)需要根据实际的业务场景确定* @return 是否获取成功*/public static boolean tryGetDistributedLock1(String lockKey, String requestId, int expireTime){//只在键 key 不存在的情况下, 将键 key 的值设置为 value 。若键 key 已经存在, 则 SETNX 命令不做任何动作。设置成功返回1,失败返回0long code = jedis.setnx(lockKey, requestId); //保证加锁的原子操作//通过timeOut设置过期时间保证不会出现死锁【避免死锁】jedis.expire(lockKey, expireTime); //设置键的过期时间if(code == 1){return true;}return false;}/*** 解锁操作* @param key 锁标识* @param value 客户端标识* @return*/public static Boolean unLock(String key,String value){//luaScript 这个字符串是个lua脚本,代表的意思是如果根据key拿到的value跟传入的value相同就执行del,否则就返回0【保证安全性】String luaScript = "if redis.call(\"get\",KEYS[1]) == ARGV[1] then return redis.call(\"del\",KEYS[1]) else return 0 end";//jedis.eval(String,list,list);这个命令就是去执行lua脚本,KEYS的集合就是第二个参数,ARGV的集合就是第三参数【保证解锁的原子操作】Object var2 = jedis.eval(luaScript, Collections.singletonList(key), Collections.singletonList(value));if (RELEASE_SUCCESS == var2) {return true;}return false;}/*** 解锁操作* @param key 锁标识* @param value 客户端标识* @return*/public static Boolean unLock1(String key, String value){//key就是redis的key值作为锁的标识,value在这里作为客户端的标识,只有key-value都比配才有删除锁的权利【保证安全性】String oldValue = jedis.get(key);long delCount = 0; //被删除的key的数量if(oldValue.equals(value)){delCount = jedis.del(key);}if(delCount > 0){ //被删除的key的数量大于0,表示删除成功return true;}return false;}/*** 重试机制:* 如果在业务中去拿锁如果没有拿到是应该阻塞着一直等待还是直接返回,这个问题其实可以写一个重试机制,* 根据重试次数和重试时间做一个循环去拿锁,当然这个重试的次数和时间设多少合适,是需要根据自身业务去衡量的* @param key 锁标识* @param value 客户端标识* @param timeOut 过期时间* @param retry 重试次数* @param sleepTime 重试间隔时间* @return*/public Boolean lockRetry(String key,String value,int timeOut,Integer retry,Long sleepTime){Boolean flag = false;try {for (int i=0;i<retry;i++){flag = tryGetDistributedLock(key,value,timeOut);if(flag){break;}Thread.sleep(sleepTime);}}catch (Exception e){e.printStackTrace();}return flag;}}package com.yibin.blnp.redis;import redis.clients.jedis.Jedis;
import redis.clients.jedis.params.SetParams;/*** 用途:自旋锁** @version v1.01* @Author liaoyibin 2045165565@qq.com* @createDate 2023/3/16 21:12* @modifyRecord <pre>* 版本 修改人 修改时间 修改内容描述* ----------------------------------------------* 1.00 liaoyibin 2023/3/16 21:12 新建* ----------------------------------------------* </pre>*/
public class SpinLock {/*** 成功锁标志**/private static final String LOCK_SUCCESS = "OK";/*** 失败锁标识**/private static final long UNLOCK_SUCCESS = 1L;/*** @author liaoyibin* 描述: 尝试获取分布式锁* @Date 21:19 2023/3/16* @param jedis Redis客户端* @param lockKey 锁键值* @param value 锁的值* @param expireTime 超期时间* @return boolean 是否获取成功**/public static boolean tryLock(Jedis jedis, String lockKey, String value, int expireTime) {// 自旋锁while (true) {// set key value ex seconds nx(只有键不存在的时候才会设置key)String result = jedis.set(lockKey, value,SetParams.setParams().ex(expireTime).nx());if (LOCK_SUCCESS.equals(result)) {return true;}}}/*** @author liaoyibin* 描述: 释放分布式锁* @Date 21:26 2023/3/16* @param jedis Redis客户端* @param lockKey 锁* @return boolean 是否释放成功**/public static boolean unlock(Jedis jedis, String lockKey) {Long result = jedis.del(lockKey);if (UNLOCK_SUCCESS == result) {return true;}return false;}
}package com.yibin.blnp.redis;import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;import java.util.UUID;/*** 用途:分布式自旋锁测试** @version v1.01* @Author liaoyibin 2045165565@qq.com* @createDate 2023/3/16 21:40* @modifyRecord <pre>* 版本 修改人 修改时间 修改内容描述* ----------------------------------------------* 1.00 liaoyibin 2023/3/16 21:40 新建* ----------------------------------------------* </pre>*/
public class SpinLockTest {/*** 初始次数**/private int count = 0;/*** 加锁 key**/private String lockKey = "lock";/*** @author liaoyibin* 描述: 加锁* @Date 21:59 2023/3/16* @param jedis * @return void**/private void addLock(Jedis jedis) {// 加锁boolean locked = SpinLock.tryLock(jedis, lockKey, UUID.randomUUID().toString(), 60);try {if (locked) {for (int i = 0; i < 500; i++) {count++;}}} catch (Exception e) {e.printStackTrace();} finally {SpinLock.unlock(jedis, lockKey);}}public static void main(String[] args) throws Exception {SpinLockTest redisLockTest = new SpinLockTest();JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();jedisPoolConfig.setMinIdle(1);jedisPoolConfig.setMaxTotal(5);JedisPool jedisPool = new JedisPool(jedisPoolConfig,"192.168.56.111", 6379, 1000, "123456");Thread t1 = new Thread(() -> redisLockTest.addLock(jedisPool.getResource()));Thread t2 = new Thread(() -> redisLockTest.addLock(jedisPool.getResource()));t1.start();t2.start();t1.join();t2.join();System.out.println(redisLockTest.count);}
}
基于redis的,优惠券秒杀业务的实现:
package com.hmdp.utils;
public interface ILock {/*** 尝试获取锁* @param timeoutSec 锁的持有时间,过期自动释放* @return true代表获取锁成功,false代表获取锁失败。*/boolean tryLock(long timeoutSec);/*** 释放锁*/void unlock();
}
package com.hmdp.utils;
import org.springframework.data.redis.core.StringRedisTemplate;
import java.util.concurrent.TimeUnit;
/*** @Version 1.0*/
public class SimpleRedisLock implements ILock {//Redisprivate StringRedisTemplate stringRedisTemplate;//业务名称,也就是锁的名称private String name;public SimpleRedisLock(StringRedisTemplate stringRedisTemplate, String name) {this.stringRedisTemplate = stringRedisTemplate;this.name = name;}//key的前缀private static final String KEY_PREFIX = "lock:";@Overridepublic boolean tryLock(long timeoutSec) {//获取线程id,当作set的valuelong threadId = Thread.currentThread().getId();Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId+"", timeoutSec, TimeUnit.SECONDS);return Boolean.TRUE.equals(success);}//释放锁@Overridepublic void unlock() {//删除keystringRedisTemplate.delete(KEY_PREFIX+name);}
}
package com.hmdp.service.impl;
import com.hmdp.dto.Result;
import com.hmdp.entity.SeckillVoucher;
import com.hmdp.entity.VoucherOrder;
import com.hmdp.mapper.VoucherOrderMapper;
import com.hmdp.service.ISeckillVoucherService;
import com.hmdp.service.IVoucherOrderService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.hmdp.utils.RedisIdWorker;
import com.hmdp.utils.SimpleRedisLock;
import com.hmdp.utils.UserHolder;
import org.springframework.aop.framework.AopContext;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
import java.time.LocalDateTime;
/*** <p>* 服务实现类* </p>**/
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {@Resourceprivate ISeckillVoucherService iSeckillVoucherService;@Resourceprivate RedisIdWorker redisIdWorker;@Resourceprivate StringRedisTemplate stringRedisTemplate;@Overridepublic Result seckillVoucher(Long voucherId) {//1.获取优惠券信息SeckillVoucher voucher = iSeckillVoucherService.getById(voucherId);//2.判断是否已经开始if (voucher.getBeginTime().isAfter(LocalDateTime.now())){Result.fail("秒杀尚未开始!");}//3.判断是否已经结束if (voucher.getEndTime().isBefore(LocalDateTime.now())){Result.fail("秒杀已经结束了!");}//4.判断库存是否充足if (voucher.getStock() < 1) {Result.fail("库存不充足!");}//5.扣减库存boolean success = iSeckillVoucherService.update().setSql("stock = stock-1").eq("voucher_id",voucherId).gt("stock",0).update();if (!success){Result.fail("库存不充足!");}Long userId = UserHolder.getUser().getId();//1.创建锁对象SimpleRedisLock lock = new SimpleRedisLock(stringRedisTemplate, "order:" + userId);//2.尝试获取锁boolean isLock = lock.tryLock(1200);if (!isLock){//获取锁失败return Result.fail("一个用户只能下一单!");}try {IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();return proxy.createVoucherOrder(voucherId);} finally {//释放锁lock.unlock();}}@Transactionalpublic Result createVoucherOrder(Long voucherId) {Long userId = UserHolder.getUser().getId();//6.根据优惠券id和用户id判断订单是否已经存在//如果存在,则返回错误信息int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();if (count > 0) {return Result.fail("用户已经购买!");}//7. 创建订单VoucherOrder voucherOrder = new VoucherOrder();//7.1添加订单idLong orderId = redisIdWorker.nextId("order");voucherOrder.setId(orderId);//7.2添加用户idvoucherOrder.setUserId(userId);//7.3添加优惠券idvoucherOrder.setVoucherId(voucherId);save(voucherOrder);//8.返回订单idreturn Result.ok(orderId);}
}
点点关注,点点赞呀,持续更新有用的知识............