伴随着业务体量的上升,我们的qps与并发问题越来越明显,这时候就需要用到让代码一定情况下进行串行执行的工具:锁
1.业务场景代码
@Override@Transactional(rollbackFor = Exception.class)public Object testBatch(User user) {LambdaQueryWrapper<User> eq = Wrappers.<User>lambdaQuery().eq(User::getBatch, user.getBatch());List<User> userList = list(eq);if (CollUtil.isEmpty(userList)) {save(user);} else {this.lambdaUpdate().eq(User::getBatch, user.getBatch()).set(User::getUsername, user.getUsername()).set(User::getUpdateTime, LocalDateTime.now()).update();}return user;}
备注:上述的代码逻辑在串行执行的时候是没有任何问题的,但是假如同时有两个线程进来:两个线程同时读取到当前batch对应的user为null,那么此时当前两个线程就会同时执行insert语句,导致当前batch本该只有1个user的但是此刻数据库有2个user记录。这个就是并发问题
2.解决方案
此刻我能想到的解决方案有以下三种,此处只讲redis锁
2.1 代码同步执行
2.1.1 redis分布式锁
private int maxCostSeconds = 5;@Override@Transactional(rollbackFor = Exception.class)public Object testBatch(User user) {String key = "userBatch::" + user.getBatch();boolean lock = redisUtil.setNxEx(key, key, maxCostSeconds);LocalDateTime startNow = LocalDateTime.now();LocalDateTime endNow = LocalDateTime.now();// 自选等待获取锁,超过5s就放弃int count = 0;while (!lock) {lock = redisUtil.setNxEx(key, key, maxCostSeconds);if (lock) {break;}endNow = LocalDateTime.now();int costSeconds = endNow.getSecond() - startNow.getSecond();if (costSeconds >= maxCostSeconds) {break;}Thread.sleep(500);System.out.println("获取次数:" + count++);}System.out.println("当前线程获取到了 redis锁,线程名" + Thread.currentThread().getName());if (!lock) {throw new RunTimeException("系统繁忙,请稍后重试");}LambdaQueryWrapper<User> eq = Wrappers.<User>lambdaQuery().eq(User::getBatch, user.getBatch());List<User> userList = list(eq);if (CollUtil.isEmpty(userList)) {save(user);} else {this.lambdaUpdate().eq(User::getBatch, user.getBatch()).set(User::getUsername, user.getUsername()).set(User::getUpdateTime, LocalDateTime.now()).update();}redisUtil.delete(key);return user;}
redis工具类
package com.lzq.learn.utils;import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.data.redis.core.script.RedisScript;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.stereotype.Component;import javax.annotation.Resource;
import java.io.UnsupportedEncodingException;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;@Component
public class RedisUtil {@Resourceprivate RedisTemplate<String, Object> redisTemplate;private final RedisScript<String> lockScript = new DefaultRedisScript<>("if redis.call('set', KEYS[1], ARGV[1], 'NX', 'PX', ARGV[2]) then return ARGV[1] else return nil end", String.class);private final RedisScript<Long> unlockScript = new DefaultRedisScript<>("if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end", Long.class);public boolean acquireLock(String lockKey, String lockValue, long expireTime) {String result = redisTemplate.execute(lockScript, Collections.singletonList(lockKey), lockValue, expireTime);return "OK".equals(result);}public void releaseLock(String lockKey, String lockValue) {redisTemplate.execute(unlockScript, Collections.singletonList(lockKey), lockValue);}/*** 删除key** @param key*/public void delete(String key) {redisTemplate.delete(key);}/*** 批量删除key** @param keys*/public void delete(Collection<String> keys) {redisTemplate.delete(keys);}/*** set NX PX* @param key key* @param value value* @param seconds 过期时间 单位:seconds* @return boolean*/public boolean setNxEx(String key , String value , int seconds){Boolean result = false;try {result = redisTemplate.execute(new RedisCallback<Boolean>() {@Overridepublic Boolean doInRedis(RedisConnection redisConnection) throws DataAccessException {RedisSerializer valueSerializer = redisTemplate.getValueSerializer();RedisSerializer keySerializer = redisTemplate.getKeySerializer();try {Object set = redisConnection.execute("set", keySerializer.serialize(key), value.getBytes("UTF-8"), "NX".getBytes("UTF-8"), "EX".getBytes("UTF-8"),String.valueOf(seconds).getBytes("UTF-8"));return "OK".equals(String.valueOf(set));} catch (UnsupportedEncodingException e) {e.printStackTrace();return false;}}});}catch (Exception e){e.printStackTrace();return false;}return result;}}
2.1.2 java锁(synchronized、lock)
2.2 数据库唯一索引校验
2.3 数据库锁(select for update行锁,version乐观锁)