3. 事务,锁和集成
3.1 事务
3.1.1 基本应用
redis事务的本质:一组命令的集合,一个事务中的所有命令都会被序列化,在执行事务的过程中,会按照顺序执行
redis事务的特点:
- redis单条命令能够保证原子性,但是redis的事务不保证原子性
- redis事务没用隔离级别的概念
- 所有的命令在事务中,并没有直接被执行,只有发起执行命令时候才会被执行
- redis事务执行一次后就会消失
redis事务的阶段:
- 开启事务(multi)
- 命令入队(…)
- 执行事务(exec)
127.0.0.1:6379> multi
OK
127.0.0.1:6379(TX)> set k1 v1
QUEUED
127.0.0.1:6379(TX)> get k1
QUEUED
127.0.0.1:6379(TX)> set k2 v2
QUEUED
127.0.0.1:6379(TX)> exec
1) OK
2) "v1"
3) OK
放弃事务:discard
127.0.0.1:6379> flushdb
OK
127.0.0.1:6379> multi
OK
127.0.0.1:6379(TX)> set k1 v1
QUEUED
127.0.0.1:6379(TX)> get k1
QUEUED
127.0.0.1:6379(TX)> discard
OK
127.0.0.1:6379> get k1
(nil)
3.1.2 异常处理
- 编译型异常:代码语法错误,事务中的所有命令都不会执行
127.0.0.1:6379> multi
OK
127.0.0.1:6379(TX)> set k1 v1
QUEUED
127.0.0.1:6379(TX)> setget k2 v2
(error) ERR unknown command `setget`, with args beginning with: `k2`, `v2`,
127.0.0.1:6379(TX)> set k2 v2
QUEUED
127.0.0.1:6379(TX)> exec
(error) EXECABORT Transaction discarded because of previous errors.
127.0.0.1:6379> get k1
(nil)
- 运行时异常:如果事务队列中存在与发行,那么执行命令的时候,其他命令是可以正常执行的,错误命令会抛出异常
127.0.0.1:6379> multi
OK
127.0.0.1:6379(TX)> set k1 v1
QUEUED
127.0.0.1:6379(TX)> incr k1
QUEUED
127.0.0.1:6379(TX)> set k2 v2
QUEUED
127.0.0.1:6379(TX)> exec
1) OK
2) (error) ERR value is not an integer or out of range
3) OK
3.2 乐观锁
悲观锁:认为什么时候都会出问题,因此无论做什么都会加锁,会影响性能
乐观锁:认为什么时候都不会出问题,因此不会加锁,更新数据的时候会去判断一下,在此期间是否有人修改这个数据
redis乐观锁:也可理解为版本号比较机制,主要是说在读取数据逇时候同时读取其版本号,然后在写入的时候,进行版本号比较,如果一致,则表明此数据在监听期间未被改变,可以写入,如果不一致说明此数据被修改过,不能写入,否则会导致数据不一致的问题。
乐观锁(watch)的操作:
- 获取version
- 更新数据时比较version
操作一:正常操作
127.0.0.1:6379> set money 1000
OK
127.0.0.1:6379> set use 0
OK
127.0.0.1:6379> watch money # 执行事务前,使用watch监控money
OK
127.0.0.1:6379> multi
OK
127.0.0.1:6379(TX)> decrby money 100
QUEUED
127.0.0.1:6379(TX)> incrby use 100
QUEUED
127.0.0.1:6379(TX)> exec # 由于事务期间,money没有被修改过,因此执行成功
1) (integer) 900
2) (integer) 100
127.0.0.1:6379> get money
"900"
操作二:在执行事务时,另起现场修改数据
127.0.0.1:6379> watch money
OK
127.0.0.1:6379> multi
OK
127.0.0.1:6379(TX)> decrby money 200
QUEUED
127.0.0.1:6379(TX)> incrby use 200 # 加入命令后,不执行exec,另起一个线程修改money的值
QUEUED
127.0.0.1:6379(TX)> exec # 事务执行失败
(nil)
127.0.0.1:6379> get money
"1900"
另起的线程:
127.0.0.1:6379> incrby money 1000
(integer) 1900
如果事务执行失败,则需要先使用unwatch命令解锁,再使用watch获取新锁,再执行事务
乐观锁实现秒杀系统
我们知道大多数是基于数据版本(version)的记录机制实现的。即为数据增加一个版本标识,在基于数据库表的版本解决方案中,一般是通过为数据库表增加一个”version”字段来实现读取出数据时,将此版本号一同读出,之后更新时,对此版本号加1。此时,将提交数据的版本号与数据库表对应记录的当前版本号进行比对,如果提交的数据版本号大于数据库当前版本号,则予以更新,否则认为是过期数据。redis中可以使用watch命令会监视给定的key,当exec时候如果监视的key从调用watch后发生过变化,则整个事务会失败。也可以调用watch多次监视多个key。这样就可以对指定的key加乐观锁了。注意watch的key是对整个连接有效的,事务也一样。如果连接断开,监视和事务都会被自动清除。当然了exec,discard,unwatch命令都会清除连接中的所有监视。
Redis中的事务(transaction)是一组命令的集合。事务同命令一样都是Redis最小的执行单位,一个事务中的命令要么都执行,要么都不执行。Redis事务的实现需要用到MULTI和EXEC两个命令,事务开始的时候先向Redis服务器发送MULTI命令,然后依次发送需要在本次事务中处理的命令,最后再发送EXEC命令表示事务命令结束。Redis的事务是下面4个命令来实现:
3.3 jedis
Jedis是Redis官方推荐的java连接开发工具!使用Java操作Redis 中间件!(就是一个jar包)
- 首先需要导入依赖包
<dependencies><!-- https://mvnrepository.com/artifact/redis/clients/jedis --><dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version>3.2.0</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.62</version></dependency>
</dependencies>
- 连接测试
//1.获取连接
Jedis jedis = new Jedis("localhost",6379);
//2.调用对应的方法操作
jedis.set("username","zhangsan");
String username = jedis.get("username");
//3.关闭连接
jedis.close();
使用new方法创建jedis对象,产生jedis对象后,对象的方法就是之前用到的redis命令
3.3.1 jedis api
- string类型
//1.获取连接
Jedis jedis = new Jedis("localhost",6379);
//2.调用对应的方法操作
jedis.set("username","zhangsan");
String username = jedis.get("username");
//可存储指定过期时间的数据
jedis.setex("activeCode",20,"valueString");
System.out.println(username);
//3.关闭连接
jedis.close();
- Hash类型
//1.获取连接
Jedis jedis = new Jedis("localhost",6379);
//2.调用对应的方法操作
//存储hash
jedis.hset("user","name","zhangsan");
jedis.hset("user","age","23");
jedis.hset("user","gender","male");
//获取单个hash数据
String name = jedis.hget("user", "name");
System.out.println(name);
//获取hash的所有map中的数据
Map<String,String> user = jedis.hgetAll("user");
for (String key : user.keySet()) {String value = user.get(key);System.out.println(key+":"+value);
}
//3.关闭连接
jedis.close();
- 列表类型
//1.获取连接
Jedis jedis = new Jedis("localhost",6379);
//2.调用对应的方法操作
//一次可以存多个值
jedis.lpush("mylist","a","b","c");//从左边存
jedis.rpush("mylist","a","b","c");//从右边存
//获取数据
List<String> mylist = jedis.lrange("mylist", 0, -1);
System.out.println(mylist);
String element1 = jedis.lpop("mylist");
System.out.println(element1);
//3.关闭连接
jedis.close();
set类型
//1.获取连接
Jedis jedis = new Jedis("localhost",6379);
//2.调用对应的方法操作
jedis.sadd("myset","java","php","cpp");
Set<String> myset = jedis.smembers("myset");
System.out.println(myset);
//3.关闭连接
jedis.close();
- 有序set类型
//1.获取连接
Jedis jedis = new Jedis("localhost",6379);
//2.调用对应的方法操作
jedis.zadd("mysortedset",3,"亚索");
jedis.zadd("mysortedset",5,"盖伦");
jedis.zadd("mysortedset",4,"猴子");
Set<String> mysortedset = jedis.zrange("mysortedset", 0, -1);
System.out.println(mysortedset);
//3.关闭连接
jedis.close();
3.3.2 jedis连接池
jedis的连接池叫JedisPool,在创建连接池后我们可以从连接池中获取连接,客户端连接Redis使用的是TCP协议,直连的方式每次需要建立TCP连接,而连接池的方式是可以预先初始化好Jedis连接,所以每次只需要从Jedis连接池借用即可,而借用和归还操作是在本地进行的,只有少量的并发同步开销,远远小于新建TCP连接的开销。
用连接池的一次普通的流程:
//创建配置对象
JedisPoolConfig config = new JedisPoolConfig();
config.setMaxTotal(50);//最大的连接数50
config.setMaxIdle(10);//最大的空闲连接数//创建jedis连接池对象
JedisPool jedisPool = new JedisPool(config,"localhost",6379);
//获取连接
Jedis jedis = jedisPool.getResource();
//使用连接
jedis.set("username","hehe");
//关闭、归还连接到连接池中
jedis.close();
将来实际应用的难点在于参数的配置:
#最大活动对象数
redis.pool.maxTotal=1000
#最大能够保持idel状态的对象数
redis.pool.maxIdle=100
#最小能够保持idel状态的对象数
redis.pool.minIdle=50
#当池内没有返回对象时,最大等待时间
redis.pool.maxWaitMillis=10000
#当调用borrow Object方法时,是否进行有效性检查
redis.pool.testOnBorrow=true
#当调用return Object方法时,是否进行有效性检查
redis.pool.testOnReturn=true
#“空闲链接”检测线程,检测的周期,毫秒数。如果为负值,表示不运行“检测线程”。默认为-1.
redis.pool.timeBetweenEvictionRunsMillis=30000
#向调用者输出“链接”对象时,是否检测它的空闲超时;
redis.pool.testWhileIdle=true
# 对于“空闲链接”检测线程而言,每次检测的链接资源的个数。默认为3.
redis.pool.numTestsPerEvictionRun=50
#redis服务器的IP
redis.ip=xxxxxx
#redis服务器的Port
redis1.port=6379
连接池工具类
如果把配置放在代码里的话耦合度会比较高,所以一般我们把配置放在配置文件中,这样要使用的时候加载配置即可。加载配置我们在工具类的静态代码块中执行:
package util;import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;import java.io.FileReader;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;/*** Jedis连接池的工具类* 加载配置文件,配置连接池的参数* 提供获取连接的方法*/
public class JedisPoolUtils {private static JedisPool jedisPool;static{//获取输入流InputStream is = JedisPoolUtils.class.getClassLoader().getResourceAsStream("jedis.properties");//创建properties对象Properties properties = new Properties();try {properties.load(is);} catch (IOException e) {e.printStackTrace();}//获取数据、设置到JedisPoolConfig中JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();jedisPoolConfig.setMaxTotal(Integer.parseInt(properties.getProperty("maxTotal")));jedisPoolConfig.setMaxIdle(Integer.parseInt(properties.getProperty("maxIdle")));String host = properties.getProperty("host");Integer port = Integer.parseInt(properties.getProperty("port"));System.out.println(host+","+port);jedisPool = new JedisPool(jedisPoolConfig,host,port);}/*** 获取连接的方法*/public static Jedis getJedis(){return jedisPool.getResource();}
}
3.3 SpringBoot集成
SpringData是Spring的一个子项目,意在统一和简化对各类型持久化存储和访问,SpringData通过一套类似的API,对关系型数据库、非关系型数据库、搜索引擎等技术进行CRUD。
SpringData提供了很多的模块去支持各种数据库的操作。如SpringData JPA、SpringData JDBC、SpringData Redis、SpringData MongoDB、SpringData Elasticsearch、SpringData Solr等。
在springboot 2.x之后,原来使用jedis被替换为了lettuce
jedis: 采用的直连,多个线程操作的话,是不安全的,如果想要避免,使用jedis pool连接池
lettuce:采用netty,实例可以在多个县城中进行共享,不存在线程不安全的情况,可以减少线程数据
- 新建一个项目,导入依赖
- 配置连接
#配置redis
spring.redis.host=127.0.0.1
spring.redis.port=6379
- 测试
编写测试类
@SpringBootTest
class Redis02SpringbootApplicationTests {@Autowiredprivate RedisTemplate redisTemplate;@Testvoid contextLoads() {
// redisTemplate 操作不同的数据类型,api和我们的指令是一样的
// opsForValue 操作字符串,类似String
// opsForList 操作list
// opsForSet 操作set
// opsForHash 操作hash
// opsForGeo
// 除了基本的操作,我们常用的方法都可以redisTemplate直接操作,比如事务,CRUD// 获取链接对象
// RedisConnection connection = redisTemplate.getConnectionFactory().getConnection();= redisTemplate.getConnectionFactory().getConnection();redisTemplate.opsForValue().set("mykey","dzp");System.out.println(redisTemplate.opsForValue().get("mykey"));}}
看到成功输出
- 编写自己的编写自己的RedisTemplate
@Configuration
public class RedisConfig {
// 这是一个固定模板,在企业中可以直接使用
// 编写自己的RedisTemplate
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
// 我们为了自己开发方便,直接使用<String, Object>RedisTemplate<String, Object> template = new RedisTemplate<String, Object>();template.setConnectionFactory(factory);
// 序列化配置Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);ObjectMapper om = new ObjectMapper();om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);jackson2JsonRedisSerializer.setObjectMapper(om);StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
// key采用string的序列化方式template.setKeySerializer(stringRedisSerializer);
// hash的key也采用string序列化的方式template.setHashKeySerializer(stringRedisSerializer);
// value序列化方式采用jacksontemplate.setValueSerializer(jackson2JsonRedisSerializer);template.afterPropertiesSet();return template;}
}
测试类中指明我们先前写的RedisTemplate
@Autowired@Qualifier("redisTemplate")private RedisTemplate redisTemplate;
然后测试就好,这样使用自己编写的redisTemplate就不会出现字符串前面乱码的情况(这种情况是因为默认使用的序列化方式是JdkSerializationRedisSerializer
)
- 编写工具类RedisUtil
@Component
public final class RedisUtil {@Autowiredprivate RedisTemplate<String, Object> redisTemplate;// =============================common============================/*** 指定缓存失效时间* @param key 键* @param time 时间(秒)*/public boolean expire(String key, long time) {try {if (time > 0) {redisTemplate.expire(key, time, TimeUnit.SECONDS);}return true;} catch (Exception e) {e.printStackTrace();return false;}}/*** 根据key 获取过期时间* @param key 键 不能为null* @return 时间(秒) 返回0代表为永久有效*/public long getExpire(String key) {return redisTemplate.getExpire(key, TimeUnit.SECONDS);}/*** 判断key是否存在* @param key 键* @return true 存在 false不存在*/public boolean hasKey(String key) {try {return redisTemplate.hasKey(key);} catch (Exception e) {e.printStackTrace();return false;}}/*** 删除缓存* @param key 可以传一个值 或多个*/@SuppressWarnings("unchecked")public void del(String... key) {if (key != null && key.length > 0) {if (key.length == 1) {redisTemplate.delete(key[0]);} else {redisTemplate.delete((Collection<String>) CollectionUtils.arrayToList(key));}}}// ============================String=============================/*** 普通缓存获取* @param key 键* @return 值*/public Object get(String key) {return key == null ? null : redisTemplate.opsForValue().get(key);}/*** 普通缓存放入* @param key 键* @param value 值* @return true成功 false失败*/public boolean set(String key, Object value) {try {redisTemplate.opsForValue().set(key, value);return true;} catch (Exception e) {e.printStackTrace();return false;}}/*** 普通缓存放入并设置时间* @param key 键* @param value 值* @param time 时间(秒) time要大于0 如果time小于等于0 将设置无限期* @return true成功 false 失败*/public boolean set(String key, Object value, long time) {try {if (time > 0) {redisTemplate.opsForValue().set(key, value, time, TimeUnit.SECONDS);} else {set(key, value);}return true;} catch (Exception e) {e.printStackTrace();return false;}}/*** 递增* @param key 键* @param delta 要增加几(大于0)*/public long incr(String key, long delta) {if (delta < 0) {throw new RuntimeException("递增因子必须大于0");}return redisTemplate.opsForValue().increment(key, delta);}/*** 递减* @param key 键* @param delta 要减少几(小于0)*/public long decr(String key, long delta) {if (delta < 0) {throw new RuntimeException("递减因子必须大于0");}return redisTemplate.opsForValue().increment(key, -delta);}// ================================Map=================================/*** HashGet* @param key 键 不能为null* @param item 项 不能为null*/public Object hget(String key, String item) {return redisTemplate.opsForHash().get(key, item);}/*** 获取hashKey对应的所有键值* @param key 键* @return 对应的多个键值*/public Map<Object, Object> hmget(String key) {return redisTemplate.opsForHash().entries(key);}/*** HashSet* @param key 键* @param map 对应多个键值*/public boolean hmset(String key, Map<String, Object> map) {try {redisTemplate.opsForHash().putAll(key, map);return true;} catch (Exception e) {e.printStackTrace();return false;}}/*** HashSet 并设置时间* @param key 键* @param map 对应多个键值* @param time 时间(秒)* @return true成功 false失败*/public boolean hmset(String key, Map<String, Object> map, long time) {try {redisTemplate.opsForHash().putAll(key, map);if (time > 0) {expire(key, time);}return true;} catch (Exception e) {e.printStackTrace();return false;}}/*** 向一张hash表中放入数据,如果不存在将创建** @param key 键* @param item 项* @param value 值* @return true 成功 false失败*/public boolean hset(String key, String item, Object value) {try {redisTemplate.opsForHash().put(key, item, value);return true;} catch (Exception e) {e.printStackTrace();return false;}}/*** 向一张hash表中放入数据,如果不存在将创建** @param key 键* @param item 项* @param value 值* @param time 时间(秒) 注意:如果已存在的hash表有时间,这里将会替换原有的时间* @return true 成功 false失败*/public boolean hset(String key, String item, Object value, long time) {try {redisTemplate.opsForHash().put(key, item, value);if (time > 0) {expire(key, time);}return true;} catch (Exception e) {e.printStackTrace();return false;}}/*** 删除hash表中的值** @param key 键 不能为null* @param item 项 可以使多个 不能为null*/public void hdel(String key, Object... item) {redisTemplate.opsForHash().delete(key, item);}/*** 判断hash表中是否有该项的值** @param key 键 不能为null* @param item 项 不能为null* @return true 存在 false不存在*/public boolean hHasKey(String key, String item) {return redisTemplate.opsForHash().hasKey(key, item);}/*** hash递增 如果不存在,就会创建一个 并把新增后的值返回** @param key 键* @param item 项* @param by 要增加几(大于0)*/public double hincr(String key, String item, double by) {return redisTemplate.opsForHash().increment(key, item, by);}/*** hash递减** @param key 键* @param item 项* @param by 要减少记(小于0)*/public double hdecr(String key, String item, double by) {return redisTemplate.opsForHash().increment(key, item, -by);}// ============================set=============================/*** 根据key获取Set中的所有值* @param key 键*/public Set<Object> sGet(String key) {try {return redisTemplate.opsForSet().members(key);} catch (Exception e) {e.printStackTrace();return null;}}/*** 根据value从一个set中查询,是否存在** @param key 键* @param value 值* @return true 存在 false不存在*/public boolean sHasKey(String key, Object value) {try {return redisTemplate.opsForSet().isMember(key, value);} catch (Exception e) {e.printStackTrace();return false;}}/*** 将数据放入set缓存** @param key 键* @param values 值 可以是多个* @return 成功个数*/public long sSet(String key, Object... values) {try {return redisTemplate.opsForSet().add(key, values);} catch (Exception e) {e.printStackTrace();return 0;}}/*** 将set数据放入缓存** @param key 键* @param time 时间(秒)* @param values 值 可以是多个* @return 成功个数*/public long sSetAndTime(String key, long time, Object... values) {try {Long count = redisTemplate.opsForSet().add(key, values);if (time > 0) {expire(key, time);}return count;} catch (Exception e) {e.printStackTrace();return 0;}}/*** 获取set缓存的长度** @param key 键*/public long sGetSetSize(String key) {try {return redisTemplate.opsForSet().size(key);} catch (Exception e) {e.printStackTrace();return 0;}}/*** 移除值为value的** @param key 键* @param values 值 可以是多个* @return 移除的个数*/public long setRemove(String key, Object... values) {try {Long count = redisTemplate.opsForSet().remove(key, values);return count;} catch (Exception e) {e.printStackTrace();return 0;}}// ===============================list=================================/*** 获取list缓存的内容** @param key 键* @param start 开始* @param end 结束 0 到 -1代表所有值*/public List<Object> lGet(String key, long start, long end) {try {return redisTemplate.opsForList().range(key, start, end);} catch (Exception e) {e.printStackTrace();return null;}}/*** 获取list缓存的长度** @param key 键*/public long lGetListSize(String key) {try {return redisTemplate.opsForList().size(key);} catch (Exception e) {e.printStackTrace();return 0;}}/*** 通过索引 获取list中的值** @param key 键* @param index 索引 index>=0时, 0 表头,1 第二个元素,依次类推;index<0时,-1,表尾,-2倒数第二个元素,依次类推*/public Object lGetIndex(String key, long index) {try {return redisTemplate.opsForList().index(key, index);} catch (Exception e) {e.printStackTrace();return null;}}/*** 将list放入缓存** @param key 键* @param value 值*/public boolean lSet(String key, Object value) {try {redisTemplate.opsForList().rightPush(key, value);return true;} catch (Exception e) {e.printStackTrace();return false;}}/*** 将list放入缓存* @param key 键* @param value 值* @param time 时间(秒)*/public boolean lSet(String key, Object value, long time) {try {redisTemplate.opsForList().rightPush(key, value);if (time > 0) {expire(key, time);}return true;} catch (Exception e) {e.printStackTrace();return false;}}/*** 将list放入缓存** @param key 键* @param value 值* @return*/public boolean lSet(String key, List<Object> value) {try {redisTemplate.opsForList().rightPushAll(key, value);return true;} catch (Exception e) {e.printStackTrace();return false;}}/*** 将list放入缓存** @param key 键* @param value 值* @param time 时间(秒)* @return*/public boolean lSet(String key, List<Object> value, long time) {try {redisTemplate.opsForList().rightPushAll(key, value);if (time > 0)expire(key, time);return true;} catch (Exception e) {e.printStackTrace();return false;}}/*** 根据索引修改list中的某条数据** @param key 键* @param index 索引* @param value 值* @return*/public boolean lUpdateIndex(String key, long index, Object value) {try {redisTemplate.opsForList().set(key, index, value);return true;} catch (Exception e) {e.printStackTrace();return false;}}/*** 移除N个值为value** @param key 键* @param count 移除多少个* @param value 值* @return 移除的个数*/public long lRemove(String key, long count, Object value) {try {Long remove = redisTemplate.opsForList().remove(key, count, value);return remove;} catch (Exception e) {e.printStackTrace();return 0;}}
}