在操作数据库时,为了加快程序的执行速度,在新增或更新数据时,可以通过批量提交的方式来减少应用和数据库间的传输次数;在redis中也有这样的技术实现批量处理,也就是管道——Pipeline。它也是通过批量提交数据的方式来实现的,将要执行的redis命令提交到pipeline中,pipeline一次性的将数据发送给服务器,服务器再逐条执行命令。在执行命令过程中不是原子性的,可以插入其他命令执行。
下面演示在jedis中使用管道:
<dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version>4.3.0</version>
</dependency>
先通过一个测试示例代码看一下运行时间差异:
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import redis.clients.jedis.Pipeline;import java.time.Duration;public class JedisUtil {/*** 连接地址*/private String host;/*** 连接端口号*/private int port;/*** 密码*/private String password;/*** 连接池*/private JedisPool jedisPool;/*** 连接初始化* @param host* @param port* @param password*/public JedisUtil(String host, int port, String password) {this.host = host;this.port = port;this.password = password;JedisPoolConfig config = new JedisPoolConfig();config.setMaxTotal(256);config.setMaxIdle(256);config.setMinIdle(1);config.setMaxWait(Duration.ofMillis(300));if(password != null && !"".equals(password)) {jedisPool = new JedisPool(config, host, port, 500, password);} else {jedisPool = new JedisPool(config, this.host, this.port, 500);}}/*** 关闭连接池*/public void close() {if(jedisPool != null && !jedisPool.isClosed()) {jedisPool.clear();jedisPool.close();}}/*** 获取连接* @return*/public Jedis getJedis() {if(jedisPool != null && !jedisPool.isClosed()) {return jedisPool.getResource();}return null;}/*** 归还jedis对象* @param jedis*/public void returnJedis(Jedis jedis) {if(jedis != null) {jedis.close();}}public static void main(String[] args) {// 获取jedis连接JedisUtil util = new JedisUtil("192.168.56.101", 6379, "");Jedis jedis = util.getJedis();// 设置键的数量:100万int KEY_COUNT = 1_000_000;// 普通方式setlong start1 = System.currentTimeMillis();for(int i = 0; i < KEY_COUNT; i++) {jedis.set("key1_" + i, "value1_" + i);}System.out.println("use time : " + (System.currentTimeMillis() - start1) + "ms");// 清理数据库的key,线上系统不要使用jedis.flushDB();// 使用管道setlong start2 = System.currentTimeMillis();Pipeline pipeline = jedis.pipelined();int num = 0;for(int i = 0; i < KEY_COUNT; i++) {pipeline.set("key2_" + i, "value2_" + i);if(num++ >= 200) {
// pipeline.syncAndReturnAll();pipeline.sync();pipeline.close();pipeline = jedis.pipelined();num = 0;}}if(num != 0) {pipeline.syncAndReturnAll();pipeline.close();}System.out.println("pipeline : " + (System.currentTimeMillis() - start2) + "ms");// 清理数据库的key,线上系统不要使用jedis.flushDB();}
}
上面的代码运行两次,调整先后顺序分别运行,得到的运行时间:
use time : 79297ms
pipeline : 2036mspipeline : 1747ms
use time : 85078ms
可以看到两次运行的时间差异还是非常明显的,基本上差距40~50倍,再实际运行时可以多次测试并调整每次pipeline提交命令的条数,找到每次提交数据时性能最好的数据条数。pipeline每次提交数据量不宜过多,太多的命令一次提交会导致客户端等待结果时间比较长,也会让连接的缓冲区数据量过大。
pipeline本身没有过多内容需要讲解,下面介绍一下如何在redisTemplate中使用pipeline,redisTemplate中已经提供了对应方法executePipelined()可以直接调用,它支持两个类型的参数:RedisCallback更接近redis原生命令,但是需要自己将键和值都转换为字节码传递过去;SessionCallback对操作进行了封装,可以根据操作不同的数据类型进行转换,方便api使用。
List<Object> datas = redisTemplate.executePipelined(new RedisCallback<Object>() {@Overridepublic Object doInRedis(RedisConnection connection) throws DataAccessException {connection.set("key1".getBytes(StandardCharsets.UTF_8), "value1".getBytes(StandardCharsets.UTF_8));connection.set("key2".getBytes(StandardCharsets.UTF_8), "value2".getBytes(StandardCharsets.UTF_8));connection.set("key3".getBytes(StandardCharsets.UTF_8), "value3".getBytes(StandardCharsets.UTF_8));connection.set("key4".getBytes(StandardCharsets.UTF_8), "value4".getBytes(StandardCharsets.UTF_8));connection.set("key5".getBytes(StandardCharsets.UTF_8), "value5".getBytes(StandardCharsets.UTF_8));connection.set("key6".getBytes(StandardCharsets.UTF_8), "value6".getBytes(StandardCharsets.UTF_8));connection.get("key1".getBytes(StandardCharsets.UTF_8));// 这里必须返回null,在 connection.closePipeline() 时覆盖原来的返回值,所以返回值没有必要设置,设置会报错return null;}
});
List<Object> datas = redisTemplate.executePipelined(new SessionCallback<Object>() {@Overridepublic <K, V> Object execute(RedisOperations<K, V> operations) throws DataAccessException {ValueOperations<String, String> op1 = (ValueOperations<String, String>) operations.opsForValue();op1.set("key7", "value7");op1.set("key8", "value8");op1.get("key2");SetOperations<String, String> op2 = (SetOperations<String, String>) operations.opsForSet();op2.add("set_demo", "value1", "value2", "value3");op2.randomMember("set_demo");return null;}
});
pipeline非常显著的提升系统性能,对于redis这种内存数据库,每天的请求量会非常高,对于系统优化来说,管道技术的使用应该成为代码的一个优化点。