001 redis高并发减库存

文章目录

    • 释放锁加lua脚本
    • String lockValue(唯一标识符作为锁的值)lua脚本
      • 无String lockValue(唯一标识符作为锁的值)
      • 无Lua脚本
      • 加锁的过期时间防死锁
      • 无lockValue代码
    • lockValue
    • 加了lockValue无lua脚本代码
    • 加了lockValue加了lua脚本代码
    • 问题
  • 1DecrProductStockController.java
  • 2DecrProductStockController.java
    • RedisConfig.java
    • pom.xml

synchronized 只能适用于单体应用,不能适用于分布式

分布式锁来解决高并发

若执行业务出现异常,锁释放不走,产生死锁,解决方案给锁加上过期时间,即便业务出现异常,锁还是会释放

释放锁加lua脚本

在分布式系统中,使用 Lua 脚本在 Redis 中释放锁是为了确保释放锁的操作是原子的。这很重要,因为在高并发的场景下,可能会有多个客户端尝试同时获取和释放同一个锁。如果不使用原子操作来释放锁,就可能会出现竞态条件(race condition),导致锁被错误地释放或者根本无法释放。

考虑以下场景:

检查锁的值和删除锁不是原子的:如果没有使用 Lua 脚本,释放锁通常涉及两个步骤:首先检查锁的值是否与我们设置的值相匹配,如果匹配,则删除锁。但是,如果这两个操作不是原子的,就有可能出现以下问题:

在我们检查锁的值之后,但在删除锁之前,另一个客户端可能已经获取了锁(即改变了锁的值)。在这种情况下,如果我们仍然删除锁,就会错误地释放了其他客户端持有的锁。
并发删除导致的问题:如果有多个客户端同时尝试释放同一个锁(可能是因为它们都认为自己是锁的持有者),没有原子性保证的删除操作可能会导致不可预知的行为。

使用 Lua 脚本可以确保这两个步骤(检查和删除)在一个原子操作中完成。Redis 保证 Lua 脚本在执行期间不会被其他 Redis 命令打断,这被称为脚本的原子性。因此,当 Lua 脚本在 Redis 中执行时,它会锁定 Redis,直到脚本执行完成。这确保了在脚本执行期间,不会有其他客户端的命令插入进来改变锁的状态。

在代码中,使用 Lua 脚本来释放锁可以确保只有当锁的值与预期的值(即当前客户端设置的值)匹配时,锁才会被删除。这避免了上述的竞态条件,并保证了锁的安全性。

String lockValue(唯一标识符作为锁的值)lua脚本

使用Redis的INCR、DECR和EXPIRE命令来操作库存,并通过SETNX和DEL命令来管理锁。

这段代码是一个Spring Boot控制器中的方法,用于减少特定产品的库存,并使用Redis进行库存管理和并发控制。现在,我们从无String lockValue(即没有使用唯一标识符作为锁的值)和无Lua脚本两方面来分析这段代码可能存在的问题。

无String lockValue(唯一标识符作为锁的值)

锁的碰撞问题:
在这段代码中,锁的值被硬编码为字符串"locked"。这意味着,如果两个不同的进程或线程尝试同时锁定同一个产品ID,它们都会检查同一个锁键是否存在。虽然这在一定程度上可以防止并发问题(a线程执行decrStock方法并上锁,未解锁就过期,b线程来执行decrStock方法并上锁,a已经执行完判断锁的逻辑,所以会在b上锁期间,释放b的锁),但如果有其他不相关的操作也使用了相同的锁值(“locked”),就可能导致不必要的锁碰撞,从而影响系统的并发性能。
锁的安全性:
使用固定的锁值可能会降低系统的安全性。如果攻击者知道这个固定的锁值,他们可能会尝试破坏锁机制,例如通过删除或篡改锁键来干扰正常的库存扣减流程。
锁的调试和监控:
当使用固定的锁值时,很难追踪和调试与锁相关的问题。如果使用唯一标识符(如UUID)作为锁的值,那么每个锁的实例都会有一个独特的标识符,这有助于在日志或监控系统中跟踪和识别特定的锁操作。

无Lua脚本

无Lua脚本
原子性缺失:
当前代码通过多个命令来实现库存扣减的逻辑,这不是原子操作。如果在get和decrement之间Redis状态发生改变,就可能导致数据不一致。在两个操作之间,库存值可能被其他并发请求修改,从而导致数据不一致或超卖的情况。使用Lua脚本可以将多个操作合并成一个原子操作,确保数据的一致性。
性能优化:
由于库存检查和扣减是两个独立的操作,它们需要两次与Redis服务器的网络交互。这增加了网络延迟和I/O开销,降低了系统的性能。
使用Lua脚本可以减少网络往返时间(RTT),因为所有的操作都在服务器端执行,不需要像当前代码那样在客户端和服务器之间多次通信。使用Lua脚本可以将这两个操作合并成一个原子操作,并通过一次网络交互完成,从而提高性能
错误处理和重试:
Lua脚本的执行是原子的,要么全部成功,要么全部失败,这简化了错误处理和重试的逻辑。在当前代码中,如果decrement操作失败,可能需要复杂的逻辑来决定是否重试以及如何重试。
锁的粒度:
使用Lua脚本还可以更精细地控制锁的粒度。例如,可以在脚本内部实现更复杂的逻辑,而无需长时间占用锁,从而减少锁的持有时间,提高系统的并发性能。
复杂性增加:
没有使用Lua脚本意味着开发者需要处理更多的并发控制和错误处理逻辑。例如,在库存不足时需要手动处理重试或返回错误信息。而使用Lua脚本可以简化这部分逻辑,因为脚本在服务器端执行,可以确保操作的原子性。
潜在的竞争条件:
由于检查和扣减操作不是原子的,因此在高并发场景下更容易出现竞争条件。这可能导致库存扣减的不准确或超卖现象。Lua脚本可以避免这种情况,因为它在服务器端以单个原子操作执行。

加锁的过期时间防死锁

代码中设置了锁的过期时间(在这个例子中是10秒),这是为了避免死锁而采取的一种预防措施。如果设置了过期时间,即使系统出现异常,锁也会在指定的时间后自动释放,这样可以防止因为异常而导致的永久死锁。
然而,依赖锁的自动过期来解决死锁问题并不是完美的解决方案,因为它引入了新的问题:
锁的续期问题:如果操作需要的时间超过锁的过期时间,那么在操作完成之前锁可能已经过期,这会导致其他线程或进程能够获取到锁并执行操作,从而可能引发并发问题。
业务逻辑的中断:如果在锁过期后业务逻辑还没有执行完,那么其他线程获得锁后可能会打断当前线程的业务逻辑,导致数据状态不一致。
性能问题:如果设置的过期时间太长,虽然可以减少锁过期的风险,但在系统出现异常时,会导致更长时间的资源锁定,影响系统的并发性能。
因此,虽然设置锁的过期时间可以在一定程度上防止死锁,但在设计并发系统时还需要考虑更多因素,比如锁的粒度、锁的续期策略、异常处理机制等,以确保系统的稳定性和性能。
另外,对于重要的业务逻辑,通常建议采用更加健壮的锁机制,比如基于Redis的RedLock算法,或者使用数据库的事务机制来确保数据的一致性。同时,监控和告警系统也应该被建立起来,以便及时发现和处理潜在的锁问题。

无lockValue代码

package com.example.controller;  import org.springframework.beans.factory.annotation.Autowired;  
import org.springframework.data.redis.core.RedisTemplate;  
import org.springframework.data.redis.core.ValueOperations;  
import org.springframework.web.bind.annotation.GetMapping;  
import org.springframework.web.bind.annotation.PathVariable;  
import org.springframework.web.bind.annotation.RequestMapping;  
import org.springframework.web.bind.annotation.RestController;  import java.util.concurrent.TimeUnit;  @RestController  
@RequestMapping("test")  
public class DecrProductStockController {  @Autowired  private RedisTemplate<String, Integer> redisTemplate;  @GetMapping("decrStock/{proId}")  public String decrStock(@PathVariable("proId") Integer proId) {  String lockKey = "lock_pro_" + proId;  String proKey = "pro_stock_" + proId;  // 尝试获取锁  Boolean isLocked = redisTemplate.opsForValue().setIfAbsent(lockKey, "locked", 10, TimeUnit.SECONDS);  if (Boolean.TRUE.equals(isLocked)) {  try {  ValueOperations<String, Integer> ops = redisTemplate.opsForValue();  Integer stock = ops.get(proKey);  if (stock != null && stock > 0) {  ops.decrement(proKey); // 减库存  System.out.println(Thread.currentThread().getName() + " 用户已减库存----");  } else {  System.out.println("库存不足了============");  }  } finally {  // 释放锁  redisTemplate.delete(lockKey);  System.out.println(Thread.currentThread().getName() + " 用户已释放锁");  }  } else {  // 未能获取锁,可以重试或返回失败信息  try {  Thread.sleep(100); // 简单的重试延迟  } catch (InterruptedException e) {  Thread.currentThread().interrupt(); // 恢复中断状态  }  return decrStock(proId); // 重试减库存操作  }  return "success";  }  
}

lockValue

lockValue的使用是为了确保只有加锁的线程(或进程)能够解锁。每个尝试获取锁的线程都会生成一个独特的lockValue,通常是UUID。这个值被设置为锁的值,因此在解锁时,只有知道正确lockValue的线程才能成功执行解锁操作。
为什么要设置lockValue:
避免误解锁:在分布式系统中,可能有多个线程或进程尝试同时访问和修改同一资源。如果没有 lockValue,而只是简单地检查锁是否存在,然后删除它,那么一个线程可能会误删其他线程设置的锁,这会导致数据不一致和其他线程能够错误地获取到它们本不应该获取的资源。

提高安全性:通过为每个锁分配一个独特的值,我们可以确保只有设置该锁的线程才能删除它。这是通过比较当前锁的值与尝试解锁的线程提供的 lockValue 来实现的。

防止锁重入:如果一个线程已经持有一个锁,并且没有 lockValue 的检查,它可能会错误地重新获取同一个锁(如果它试图这样做的话)。虽然在这个特定的例子中可能不是主要问题,但在更复杂的系统中,这是一个重要的考虑因素。

处理锁的过期:锁通常会设置过期时间,以防线程在持有锁时崩溃,从而防止死锁。如果有 lockValue,即使锁由于超时而过期,其他线程也不能简单地删除它,除非它们知道正确的 lockValue。这增加了系统的健壮性。

在这个代码中,redisTemplate.execute(deleteLockLua, Arrays.asList(lockKey), lockValue); 这一行确保只有知道正确 lockValue 的线程才能执行解锁操作。这是通过使用 Lua 脚本来原子性地检查锁的值是否匹配提供的 lockValue,如果是,则删除锁。这保证了即使有多个线程尝试解锁,也只有设置锁的线程能够成功解锁。

所以,虽然一个线程在执行 decrStock() 方法并上锁时,其他线程不能直接执行解锁操作,但 lockValue 提供了一个额外的安全层,确保解锁操作的正确性和安全性。

==========
在Redis中,实现分布式锁的一种常见做法是使用SETNX(SET if Not eXists)命令或者Redis的Lua脚本来尝试获取锁,并设置一个过期时间,以防锁无限期地被占用。如果锁设置了过期时间,那么即使持有锁的客户端崩溃或由于某种原因无法释放锁,锁也会在过期时间到达后自动被Redis删除。

你提到的lockValue可以是一个随机生成的字符串,用作锁的值,以确保只有知道这个值的客户端才能释放锁。这是一种安全措施,用来防止一个客户端误删其他客户端设置的锁。

当锁由于超时而过期时,Redis会自动删除这个键(key),因此锁在Redis中将不再存在。这意味着,一旦锁的过期时间到达,任何客户端都可以尝试再次获取这个锁,因为旧的锁已经从Redis中删除了。

为了增加系统的健壮性,客户端在尝试释放锁时应该检查lockValue是否与它设置的值相匹配。如果不匹配,那么客户端就不应该删除这个锁,因为这可能是一个由其他客户端设置的锁。这可以通过使用Redis的Lua脚本来原子性地完成检查和删除操作来实现。

总的来说,当锁在Redis中由于超时而过期时,它将不再存在于Redis中,任何知道如何正确设置lockValue的客户端都可以尝试获取这个锁。

加了lockValue无lua脚本代码

为了增强锁的安全性,避免潜在的锁碰撞,并确保每个锁请求都是唯一的,可以在尝试获取锁时使用一个随机生成的唯一标识符作为锁的值。
redisTemplate的类型从RedisTemplate<String, Integer>更改为RedisTemplate<String, String>,因为现在要在Redis中存储UUID字符串作为锁的值。

在尝试获取锁时,使用lockValue作为锁的值,而不是硬编码的字符串。

在释放锁之前,检查当前锁的值是否与之前设置的lockValue相匹配。这确保了只有锁的原始设置者才能释放锁,从而避免了意外删除其他线程或进程设置的锁。

由于现在在Redis中存储的是字符串值,因此在处理库存数量时需要适当地转换数据类型。

这些更改提高了锁的安全性和可靠性,降低了锁碰撞和误删除锁的风险。

package com.example.controller;  import org.springframework.beans.factory.annotation.Autowired;  
import org.springframework.data.redis.core.RedisTemplate;  
import org.springframework.data.redis.core.ValueOperations;  
import org.springframework.web.bind.annotation.GetMapping;  
import org.springframework.web.bind.annotation.PathVariable;  
import org.springframework.web.bind.annotation.RequestMapping;  
import org.springframework.web.bind.annotation.RestController;  import java.util.UUID;  
import java.util.concurrent.TimeUnit;  @RestController  
@RequestMapping("test")  
public class DecrProductStockController {  @Autowired  private RedisTemplate<String, String> redisTemplate; // 注意这里改为String, String类型  @GetMapping("decrStock/{proId}")  public String decrStock(@PathVariable("proId") Integer proId) {  String lockKey = "lock_pro_" + proId;  String proKey = "pro_stock_" + proId;  String lockValue = UUID.randomUUID().toString().replace("-", ""); // 生成唯一锁值  // 尝试获取锁  Boolean isLocked = redisTemplate.opsForValue().setIfAbsent(lockKey, lockValue, 10, TimeUnit.SECONDS);  if (Boolean.TRUE.equals(isLocked)) {  try {  ValueOperations<String, Integer> ops = redisTemplate.opsForValue();  Integer stock = ops.get(proKey);  if (stock != null && stock > 0) {  ops.decrement(proKey); // 减库存  System.out.println(Thread.currentThread().getName() + " 用户已减库存----");  } else {  System.out.println("库存不足了============");  }  } finally {  // 检查并释放锁,确保只有锁的持有者才能释放  if (lockValue.equals(redisTemplate.opsForValue().get(lockKey))) {  redisTemplate.delete(lockKey);  }  System.out.println(Thread.currentThread().getName() + " 用户已释放锁");  }  } else {  // 未能获取锁,可以重试或返回失败信息  try {  Thread.sleep(100); // 简单的重试延迟  } catch (InterruptedException e) {  Thread.currentThread().interrupt(); // 恢复中断状态  }  return decrStock(proId); // 重试减库存操作  }  return "success";  }  
}

加了lockValue加了lua脚本代码

为了使用 Lua 脚本来确保操作的原子性,需要对原代码进行一些修改,并将 Lua 脚本集成到 decrStock 方法中。以下是一个修改后的版本,包括 Lua 脚本的加载和执行
在这个修改后的版本中,添加了两个 Lua 脚本:一个用于减少库存,另一个用于释放锁。这两个脚本都通过 DefaultRedisScript 类加载,并在需要时通过 RedisTemplate 执行。这样可以确保减库存和释放锁的操作是原子的,从而避免了并发问题。注意,Lua 脚本中的 KEYS[1] 和 ARGV[1] 分别对应于执行脚本时传递的键和参数列表。在这个例子中,键是库存或锁的键,参数是锁的值。

package com.example.controller;  import org.springframework.beans.factory.annotation.Autowired;  
import org.springframework.data.redis.core.RedisTemplate;  
import org.springframework.data.redis.core.script.DefaultRedisScript;  
import org.springframework.scripting.support.StaticScriptSource;  
import org.springframework.web.bind.annotation.GetMapping;  
import org.springframework.web.bind.annotation.PathVariable;  
import org.springframework.web.bind.annotation.RestController;  
import org.springframework.web.bind.annotation.RequestMapping;  import java.util.Collections;  
import java.util.UUID;  
import java.util.concurrent.TimeUnit;  @RestController  
@RequestMapping("test")  
public class DecrProductStockController {  @Autowired  private RedisTemplate<String, String> redisTemplate;  // Lua 脚本,用于减库存和释放锁  private static final String DECR_STOCK_LUA_SCRIPT =  "if redis.call('get', KEYS[1]) ~= false and tonumber(redis.call('get', KEYS[1])) > 0 then " +  "    redis.call('decrby', KEYS[1], 1) " +  "    return 1 " +  "else " +  "    return -1 " +  "end";  private static final String RELEASE_LOCK_LUA_SCRIPT =  "if redis.call('get', KEYS[1]) == ARGV[1] then " +  "    return redis.call('del', KEYS[1]) " +  "else " +  "    return 0 " +  "end";  private DefaultRedisScript<Long> decrStockScript;  private DefaultRedisScript<Long> releaseLockScript;  @Autowired  public DecrProductStockController(RedisTemplate<String, String> redisTemplate) {  this.redisTemplate = redisTemplate;  // 初始化 Lua 脚本  decrStockScript = new DefaultRedisScript<>();  decrStockScript.setScriptText(DECR_STOCK_LUA_SCRIPT);  decrStockScript.setResultType(Long.class);  releaseLockScript = new DefaultRedisScript<>();  releaseLockScript.setScriptText(RELEASE_LOCK_LUA_SCRIPT);  releaseLockScript.setResultType(Long.class);  }  @GetMapping("decrStock/{proId}")  public String decrStock(@PathVariable("proId") Integer proId) {  String lockKey = "lock_pro_" + proId;  String proKey = "pro_stock_" + proId;  String lockValue = UUID.randomUUID().toString().replace("-", "");  // 尝试获取锁  Boolean isLocked = redisTemplate.opsForValue().setIfAbsent(lockKey, lockValue, 10, TimeUnit.SECONDS);  if (Boolean.TRUE.equals(isLocked)) {  try {  // 执行 Lua 脚本来减库存  Long result = redisTemplate.execute(decrStockScript, Collections.singletonList(proKey));  if (result == 1) {  System.out.println(Thread.currentThread().getName() + " 用户已减库存----");  } else if (result == -1) {  System.out.println("库存不足了============");  }  } finally {  // 执行 Lua 脚本来释放锁  redisTemplate.execute(releaseLockScript, Collections.singletonList(lockKey), lockValue);  System.out.println(Thread.currentThread().getName() + " 用户已释放锁");  }  } else {  // 重试逻辑  try {  Thread.sleep(100);  } catch (InterruptedException e) {  Thread.currentThread().interrupt();  }  return decrStock(proId);  }  return "success";  }  
}

问题

锁的释放可能存在问题:
在releaseLockScript中,比较了KEYS[1]对应的值和ARGV[1]。但是,在Redis中,键对应的值可能是以字符串形式存储的,而UUID生成的lockValue在比较时应该确保类型一致。为了确保比较的正确性,应该在Lua脚本中将get得到的值转换为字符串后再进行比较。

redis.call(‘get’, KEYS[1])返回的是一个字符串(如果存在的话),不需要转换ARGV[1],因为它是作为Lua脚本的参数传入的,本身就是字符串。但为了保险起见,可以在Lua脚本中显式地将ARGV[1]转换为字符串,虽然这通常是不必要的。

另外,在decrStockScript中,当库存为0时,脚本直接返回-1。这可能会导致一些竞态条件,因为如果两个请求几乎同时到达,并且当前库存为1,那么两个请求都可能看到库存大于0,并都尝试减少库存。为了避免这种情况,我们可以使用Redis的事务功能,或者更简单地,在Lua脚本中添加一个额外的检查来确保减库存的操作是原子的。

// Lua 脚本,用于释放锁  
private static final String RELEASE_LOCK_LUA_SCRIPT =  "if redis.call('get', KEYS[1]) == tostring(ARGV[1]) then " +  "    return redis.call('del', KEYS[1]) " +  "else " +  "    return 0 " +  "end";  // ... 其他代码保持不变 ...  // 在 decrStock 方法中,不需要做任何额外的修改,因为 Lua 脚本已经处理了字符串转换。

关于decrStockScript,为了确保减库存的原子性,可以在减少库存之前再次检查库存是否大于0。这样,即使有多个请求几乎同时到达,也只有一个请求能够成功减少库存。


// Lua 脚本,用于减库存  
private static final String DECR_STOCK_LUA_SCRIPT =  "local stock = tonumber(redis.call('get', KEYS[1])) " +  "if stock and stock > 0 then " +  "    redis.call('decrby', KEYS[1], 1) " +  "    return 1 " +  "else " +  "    return -1 " +  "end";  // ... 其他代码保持不变 ...

锁的过期时间可能过短:
锁设置了10秒的过期时间,这在某些情况下可能过短,尤其是在高并发或有网络延迟的环境中。建议根据实际情况调整锁的过期时间。

重试逻辑可能导致大量递归调用:
如果获取锁失败,代码会等待100毫秒后递归调用decrStock方法。这种重试机制没有最大重试次数限制,可能导致无限递归和栈溢出错误。应该加入一个重试次数的限制。

异常处理不完善:
在执行Lua脚本时,如果Redis服务器出现问题或者网络问题导致脚本执行失败,应该有相应的异常处理逻辑。

代码冗余和可读性:
部分代码可以简化和重构以提高可读性。


// ...(其他部分保持不变)  @RestController  
@RequestMapping("test")  
public class DecrProductStockController {  // ...(其他部分保持不变)  // 修改后的减库存方法  @GetMapping("decrStock/{proId}")  public String decrStock(@PathVariable("proId") Integer proId) {  String lockKey = "lock_pro_" + proId;  String proKey = "pro_stock_" + proId;  String lockValue = UUID.randomUUID().toString().replace("-", "");  int maxRetries = 5; // 最大重试次数  int retries = 0; // 当前重试次数  while (retries < maxRetries) {  // 尝试获取锁  Boolean isLocked = redisTemplate.opsForValue().setIfAbsent(lockKey, lockValue, 30, TimeUnit.SECONDS); // 锁的过期时间调整为30秒  if (Boolean.TRUE.equals(isLocked)) {  try {  // 执行Lua脚本来减库存  Long result = redisTemplate.execute(decrStockScript, Collections.singletonList(proKey));  if (result == 1) {  System.out.println(Thread.currentThread().getName() + " 用户已减库存----");  return "success"; // 减库存成功,直接返回成功信息  } else if (result == -1) {  System.out.println("库存不足了============");  return "stock_not_enough"; // 库存不足,返回错误信息  }  } finally {  // 执行Lua脚本来释放锁,修改Lua脚本以确保类型一致  redisTemplate.execute(releaseLockScript, Collections.singletonList(lockKey), lockValue);  System.out.println(Thread.currentThread().getName() + " 用户已释放锁");  }  } else {  retries++; // 增加重试次数  try {  // 等待一段时间后重试,这里使用200毫秒作为等待时间,可根据实际情况调整  Thread.sleep(200);  } catch (InterruptedException e) {  Thread.currentThread().interrupt();  // 处理中断异常,根据实际情况决定是否需要抛出或记录日志等操作  }  }  }  // 重试次数达到上限仍未成功获取锁,返回错误信息或进行其他处理  return "failed_to_acquire_lock"; // 无法获取锁,返回错误信息  }  
}  
// ...(其他部分保持不变)

1DecrProductStockController.java

decrProductStock.lua

if redis.call('get', KEYS[1]) ~='0'thenreturn redis.call('decrby',KEYS[1],ARGV[1])
elsereturn -1;
end

deleteLock.lua


if redis.call('get', KEYS[1]) == ARGV[1]thenreturn redis.call('del', KEYS[1])elsereturn 0
end

package com.example.controller;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.scripting.support.ResourceScriptSource;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import javax.annotation.PostConstruct;
import java.util.Arrays;
import java.util.UUID;
import java.util.concurrent.TimeUnit;/*** 测试高并发减库存*/
@RestController
@RequestMapping("test")
public class DecrProductStockController {@Autowiredprivate RedisTemplate redisTemplate;private  DefaultRedisScript<Long> decrStockLua;private  DefaultRedisScript<Long> deleteLockLua;//在本类di注入之后,在初始化的时候,就要加载lua脚本@PostConstructpublic void loadLuaScript(){System.out.println("加载lua脚本");decrStockLua = new DefaultRedisScript<>();decrStockLua.setResultType(Long.class);decrStockLua.setScriptSource(new ResourceScriptSource(new ClassPathResource("decrProductStock.lua")));deleteLockLua = new DefaultRedisScript<>();deleteLockLua.setResultType(Long.class);deleteLockLua.setScriptSource(new ResourceScriptSource(new ClassPathResource("deleteLock.lua")));}/*** 1.redis:(1)商品 (2)锁key-(lock_pro_101) value(UUID)* 2.能否获得锁 setnx key value ex 10* 3.减库存:(1)获得库存get number != null >0   (2)decr number ===>原子性lus脚本* 4释放锁(1) if get key == uuid (2) del key ===>原子性lua脚本** @param proId* @return*/@GetMapping("decrStock/{proId}")public String decrStock(@PathVariable("proId") Integer proId){System.out.println(Thread.currentThread().getName() + "用户开始抢购商品" + proId);//锁的key-valueString lockKey = "lock_pro_"+proId;String lockValue = UUID.randomUUID().toString().replace("-","");boolean isGetLock = redisTemplate.opsForValue().setIfAbsent(lockKey,lockValue,10, TimeUnit.SECONDS);//商品的key:pro_::101 -product//商品库存key: pro_stock::101 -5String proKey = "pro_stock_"+proId;//获得到锁if(isGetLock){System.out.println(Thread.currentThread().getName() + "用户获得到了商品的锁"+lockKey);//            System.out.println(Thread.currentThread().getName()+",获得到了商品的锁"+lockKey);
//           Integer stock = (Integer)redisTemplate.opsForValue().get(proKey);
//            if(stock != null && stock >0){
//                redisTemplate.opsForValue().decrement(proKey);
//            }//减库存----LUALong result = (long) redisTemplate.execute(decrStockLua,Arrays.asList(proKey),1);if (result == -1){System.out.println("库存不足了============");}else{System.out.println(Thread.currentThread().getName() + "用户已减库存----");}//释放锁
//            if(redisTemplate.opsForValue().get(lockKey)==lockValue){
//                redisTemplate.delete(lockKey)
//            }
//            System.out.println(Thread.currentThread().getName()+"已释放锁");
//            System.out.println(Thread.currentThread().getName()+"目前的库存>>>");//释放锁 ====LUAredisTemplate.execute(deleteLockLua,Arrays.asList(lockKey),lockValue);System.out.println(Thread.currentThread().getName()+"用户已释放锁");System.out.println("****************");}else {try {Thread.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}decrStock(proId);}return "success";}}

2DecrProductStockController.java


package com.example.controller;import com.example.entity.Product;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.scripting.support.ResourceScriptSource;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import javax.annotation.PostConstruct;
import java.util.Arrays;
import java.util.UUID;
import java.util.concurrent.TimeUnit;/*** 测试高并发减库存*/
@RestController
@RequestMapping("test")
public class DecrProductStockController {@Autowiredprivate RedisTemplate redisTemplate;private DefaultRedisScript<Long> decrStockLua;private  DefaultRedisScript<Long> deleteLockLua;// 在本类DI注入之后,在初始化的时候,就要加载lua脚本@PostConstructpublic void loadLuaScript(){System.out.println("加载lua脚本。。。。。。。。。。。。。。");decrStockLua = new DefaultRedisScript<>();decrStockLua.setResultType(Long.class);decrStockLua.setScriptSource(new ResourceScriptSource(new ClassPathResource("decrProductStock.lua")));deleteLockLua = new DefaultRedisScript<>();deleteLockLua.setResultType(Long.class);deleteLockLua.setScriptSource(new ResourceScriptSource(new ClassPathResource("deleteLock.lua")));}/*** 1. redis : (1)商品 (2)锁 key- (lock_pro_101) value (UUID)* 2. 能否获得锁 : setnx key value ex 10* 3. 减库存: (1)获得库存get number  !=null >0 (2)decr number   ====> 原子性 LUA 脚本* 4. 释放锁   (1)if get key == uuid  (2) del key ====> 原子性 LUA 脚本*/@GetMapping("decrStock/{proId}")public String decrStock(@PathVariable("proId") Integer proId){System.out.println(Thread.currentThread().getName() + "用户开始抢购商品"+ proId);// 锁的key-valueString lockKey = "lock_pro_"+proId;String lockValue = UUID.randomUUID().toString().replace("-","");boolean isGetLock = redisTemplate.opsForValue().setIfAbsent(lockKey,lockValue,10, TimeUnit.SECONDS);//商品的key : pro_::101 - product//商品库存的key : pro_stock::101 - 5String proKey = "pro_stock_"+proId;//获得到锁if(isGetLock){System.out.println(Thread.currentThread().getName() + "用户获得到了商品的锁"+ lockKey);// 减库存====LUALong result = (Long) redisTemplate.execute(decrStockLua,Arrays.asList(proKey),1);if(result == -1){System.out.println("库存不足了==================================");}else{System.out.println(Thread.currentThread().getName() + "用户已减库存-------");}// 释放锁 ====LUAredisTemplate.execute(deleteLockLua, Arrays.asList(lockKey),lockValue);System.out.println(Thread.currentThread().getName() + "用户已释放锁 ");System.out.println("********************************************************");//  System.out.println(Thread.currentThread().getName()  +"目前的库存是>>>>>>>>>>>>>>>" + redisTemplate.opsForValue().get(proKey));}else{try {Thread.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}decrStock(proId);}return "success";}
}

RedisConfig.java


package com.example.config;import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateDeserializer;
import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateTimeDeserializer;
import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateSerializer;
import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateTimeSerializer;
import org.springframework.cache.annotation.CachingConfigurerSupport;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.cache.RedisCacheConfiguration;
import org.springframework.data.redis.cache.RedisCacheManager;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.serializer.*;import java.time.Duration;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;/*** 配置redistemplate序列化*/
@Configuration
@EnableCaching
public class RedisConfig extends CachingConfigurerSupport {//过期时间-1天private Duration timeToLive = Duration.ofDays(-1);/*** RedisTemplate 先关配置** @param factory* @return*/@Bean@SuppressWarnings("all")public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {RedisTemplate<String, Object> template = new RedisTemplate<String, Object>();template.setConnectionFactory(factory);Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);ObjectMapper om = new ObjectMapper();om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);//LocalDatetime序列化JavaTimeModule timeModule = new JavaTimeModule();timeModule.addDeserializer(LocalDate.class,new LocalDateDeserializer(DateTimeFormatter.ofPattern("yyyy-MM-dd")));timeModule.addDeserializer(LocalDateTime.class,new LocalDateTimeDeserializer(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));timeModule.addSerializer(LocalDate.class,new LocalDateSerializer(DateTimeFormatter.ofPattern("yyyy-MM-dd")));timeModule.addSerializer(LocalDateTime.class,new LocalDateTimeSerializer(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));om.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);om.registerModule(timeModule);jackson2JsonRedisSerializer.setObjectMapper(om);StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();// key采用String的序列化方式template.setKeySerializer(stringRedisSerializer);// hash的key也采用String的序列化方式template.setHashKeySerializer(stringRedisSerializer);// value序列化方式采用jacksontemplate.setValueSerializer(jackson2JsonRedisSerializer);// hash的value序列化方式采用jacksontemplate.setHashValueSerializer(jackson2JsonRedisSerializer);template.afterPropertiesSet();return template;}@Beanpublic RedisCacheManager cacheManager(RedisConnectionFactory connectionFactory) {//默认1RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig().entryTtl(timeToLive).serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(keySerializer())).serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(valueSerializer())).disableCachingNullValues();RedisCacheManager redisCacheManager = RedisCacheManager.builder(connectionFactory).cacheDefaults(config).transactionAware().build();return redisCacheManager;}@BeanRedisMessageListenerContainer listenerContainer(RedisConnectionFactory connectionFactory) {RedisMessageListenerContainer listenerContainer = new RedisMessageListenerContainer();listenerContainer.setConnectionFactory(connectionFactory);return listenerContainer;}/*** key 类型* @return*/private RedisSerializer<String> keySerializer() {return  new StringRedisSerializer();}/*** 值采用JSON序列化* @return*/private RedisSerializer<Object> valueSerializer() {return new GenericJackson2JsonRedisSerializer();}}

pom.xml

<artifactId>springboot_redis_demo</artifactId><version>0.0.1-SNAPSHOT</version><packaging>war</packaging><name>springboot_redis_demo</name><description>springboot_redis_demo</description>

在这个Java类中,Lua脚本被用于两个关键的操作:减少商品库存和删除锁。这种做法主要是为了确保在高并发环境下,这两个操作能够以原子性的方式执行,防止数据不一致。

减少商品库存 (decrStockLua):

Lua脚本被用于减少库存是为了确保操作的原子性。在高并发场景下,如果没有使用Lua脚本来确保原子性,那么在检查库存和减少库存之间可能会出现竞态条件(race condition)。比如,两个请求几乎同时检查库存,发现都还有库存,然后都进行减少操作,这可能导致超卖。

使用Lua脚本,可以将检查和减少库存这两个操作合并成一个原子操作,确保在检查库存后立即进行减少操作,而不会被其他请求打断。如果库存不足,Lua脚本可以返回一个特定的值(在这个例子中是-1),Java代码可以根据这个返回值来判断库存是否足够。

删除锁 (deleteLockLua):

同样地,删除锁的操作也使用了Lua脚本来确保原子性。在分布式系统中,为了避免多个进程或线程同时修改同一资源,通常会使用锁来确保同一时间只有一个进程/线程可以操作资源。在这个例子中,锁是通过Redis的键来实现的,每个商品都有一个对应的锁。

当某个线程获得了锁,并完成了库存减少的操作后,它需要释放这个锁,以便其他线程可以获得锁并继续操作。使用Lua脚本来删除锁可以确保在检查锁的值和删除锁这两个操作之间是原子性的。这是非常重要的,因为在高并发环境下,如果不使用原子操作,可能会出现一个线程正在释放锁的同时,另一个线程错误地获得了这个锁。

总的来说,Lua脚本在这里的作用是确保关键操作的原子性,从而防止在高并发环境下出现数据不一致的情况。

检查与删除之间的竞态条件:如果没有原子性保证,一个线程可能在检查锁之后、删除锁之前被其他线程抢占,导致错误的线程释放了锁。

误删其他线程的锁:如果两个线程几乎同时尝试释放同一个锁,没有原子性操作可能会导致一个线程的删除操作覆盖了另一个线程的删除操作,从而造成数据的不一致。

为了避免这些问题,可以使用一个Lua脚本来确保这两个步骤(验证和删除)在一个原子操作中完成。Lua脚本在Redis中执行时,会阻塞其他命令的执行,直到脚本执行完成,从而保证了脚本内的命令序列是原子的。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/2763.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Java | 多线程】LockSupport 的使用和注意事项

了解一下 LockSupport LockSupport是一个类&#xff0c;位于java.util.concurrent.locks包中&#xff0c;提供了基本的线程同步机制。 LockSupport的主要作用是挂起和唤醒线程。它提供了两个主要的静态方法&#xff1a;park()和unpark()。 park()&#xff1a;用于挂起当前线…

Harmony专栏 TypeScript教程

TypeScript教程 TypeScript简介TypeScript安装使用vscode开发TypeScript应用TypeScript编译器TypeScript基本语法TypeScript变量TypeScript let const var区别TypeScript 常量TypeScript数据类型TypeScript数字类型numberTypeScript字符串类型stringTypeScript 字符串常用函数…

thsi指针用法总结

1 c类对象中的变量和函数是分开存储的 2 所以对象共用一份成员函数&#xff0c;类的大小是指非静态的成员变量&#xff1b; this 完成链式操作 const 修饰成员函数

图像空间(IS),高级特征空间(FS)和两阶段(TS)重采样

what is the 3 types :image space (IS), high-level feature space (FS) and two-stage (TS) resampling? what is the difference? pros and cons? 这三种类型&#xff1a;图像空间&#xff08;IS&#xff09;&#xff0c;高级特征空间&#xff08;FS&#xff09;和两阶段…

Kafka 3.x.x 入门到精通(02)——对标尚硅谷Kafka教程

Kafka 3.x.x 入门到精通&#xff08;02&#xff09;——对标尚硅谷Kafka教程 2. Kafka基础2.1 集群部署2.1.1 解压文件2.1.2 安装ZooKeeper2.1.3 安装Kafka2.1.4 封装启动脚本 2.2 集群启动2.2.1 相关概念2.2.1.1 代理&#xff1a;Broker2.2.1.2 控制器&#xff1a;Controller …

【Linux 开发第一篇】如何在安装中完成自定义配置分区

安装配置自定义配置分区 在安装Centos的过程中&#xff0c;我们可以在安装位置部分手动配置分区 选择我要配置分区&#xff0c;点击完成&#xff1a; 我们自动分区分为三个分区&#xff1a;boot分区&#xff08;引导分区&#xff09;&#xff0c;swap&#xff08;交换分区&…

云备份项目--项目介绍

&#x1f4df;作者主页&#xff1a;慢热的陕西人 &#x1f334;专栏链接&#xff1a;C云备份项目 &#x1f4e3;欢迎各位大佬&#x1f44d;点赞&#x1f525;关注&#x1f693;收藏&#xff0c;&#x1f349;留言 主要内容项目一些详细信息的介绍 文章目录 云备份项目1.项目介绍…

ElasticSearch(3)

目录 126.ES聚合中的Metric聚合有哪些?如何解释? 127.ES聚合中的管道聚合有哪些?如何理解? 128.如何理解ES的结构和底层实现? 129.ES内部读取文档是怎样的?如何实现的? 130.ES内部索引文档是怎样的?如何实现的?</

WPS表格,怎样保留每个人的最近日期的那一行数据?

方法很多&#xff0c;这里演示使用排序删除重复项 来完成。具体操作如下&#xff1a; 1. 选中数据区域中任意一个单元格&#xff0c;注意要么全选数据区域&#xff0c;要么只选一个单元格 2. 点击数据选项卡&#xff0c;排序&#xff0c;自定义排序&#xff0c; 在弹出对话框…

告别互信息:跨模态人员重新识别的变分蒸馏

Farewell to Mutual Information: Variational Distillation for Cross-Modal Person Re-Identification 摘要&#xff1a; 信息瓶颈 (IB) 通过在最小化冗余的同时保留与预测标签相关的所有信息&#xff0c;为表示学习提供了信息论原理。尽管 IB 原理已应用于广泛的应用&…

简述MASM宏汇编

Hello , 我是小恒不会java。今天写写x86相关底层的东西 寄存器 8086由BIU和EU组成 8088/8086寄存器有14个。8通用&#xff0c;4段&#xff0c;1指针&#xff0c;1标志 8个通用寄存器&#xff1a;这些寄存器可以用来存储任意类型的数据&#xff0c;包括整数、地址等。8086有8个…

【Java--数据结构】提升数据处理速度!深入理解Java中的顺序表机制

欢迎关注个人主页&#xff1a;逸狼 创造不易&#xff0c;可以点点赞吗~ 如有错误&#xff0c;欢迎指出~ 目录 两种创建顺序表的方法及区别 认识ArrayList的构造方法 不带参数的构造方法 带参数的构造方法 利用Collection 构造方法 举例 ArrayList 常用方法演示 add addAll remo…

Linux进程详解三:进程状态

文章目录 进程状态Linux下的进程状态运行态-R阻塞态浅度休眠-S深度睡眠-D暂停状态-T暂停状态-t 终止态僵尸-Z死亡-X 孤儿进程 进程状态 进程的状态&#xff0c;本质上就是一个整型变量&#xff0c;在task_struct中的一个整型变量。 状态的存在决定了你的后续行为动作。 Linu…

直接用表征还是润色改写?LLM用于文生图prompt语义增强的两种范式

直接用表征还是润色改写&#xff1f;LLM用于文生图prompt语义增强的两种范式 导语 目前的文生图模型大多数都是使用 CLIP text encoder 作为 prompt 文本编码器。众所周知&#xff0c;由于训练数据是从网络上爬取的简单图文对&#xff0c;CLIP 只能理解简单语义&#xff0c;而…

拿捏 顺序表(1)

目录 1. 顺序表的分类2. 顺序表实现3. 顺序表实现完整代码4. 总结 前言: 一天xxx想存储一组数据, 并且能够轻松的实现删除和增加, 此时数组大胆站出, 但是每次都需要遍历一遍数组, 来确定已经存储的元素个数, 太麻烦了, 于是迎来了顺序表不屑的调侃: 数组你不行啊… 顺序表是一…

C++面向对象——类与对象

文章目录 类与对象构造函数、析构函数get/set方法函数&#xff1a;类内声明、类外定义staticthis指针友元名字空间 类与对象 #include<iostream> #include<string> using namespace std; /* 类与对象 */ class Person{public:string name;// 固有属性&#xff0c…

第二期书生浦语大模型训练营第四次笔记

大模型微调技术 大模型微调是一种通过在预训练模型的基础上&#xff0c;有针对性地微调部分参数以适应特定任务需求的方法。 微调预训练模型的方法 微调所有层&#xff1a;将预训练模型的所有层都参与微调&#xff0c;以适应新的任务。 微调顶层&#xff1a;只微调预训练模型…

oracle 12c+ max_string_size参数

一个客户的数据库版本是19.3,在做数据库复制的时候,目标端报错了,查看了一下问题发现表的字段长度有不对,在12c以前我们都知道varchar的长度最大是4000,但是客户这里居然有32767: 把客户的建表语句弄出来,放到我的一个19c的测试环境进行测试: 发现报错了: 这里报错很明显了,是M…

开通一个幻兽帕鲁专用服务器多少钱?阿里云挺便宜

阿里云开通一个幻兽帕鲁专用服务器多少钱&#xff1f;26元1个月。目前阿里云幻兽帕鲁专用服务器4核16G配置26.52元1个月、149元半年&#xff0c;8核32G服务器90.60元一个月、271.80元3个月&#xff0c;幻兽帕鲁服务器活动页面 aliyunfuwuqi.com/go/palworld 阿里云服务器网整理…

$nextTick 原理及作用

Vue 的 nextTick 其本质是对 JavaScript 执行原理 EventLoop 的一种应用。 nextTick 的核心是利用了如 Promise 、MutationObserver、setImmediate、setTimeout的原生 JavaScript 方法来模拟对应的微/宏任务的实现, 本质是为了利用 JavaScript 的这些异步回调任务队列来实现…