Lettuce是一个可伸缩的线程安全的Redis客户端,提供了同步,异步和响应式使用方式。 如果多线程避免阻塞和事务操作(如BLPOP和MULTI / EXEC),则多个线程可共享一个连接。 Lettuce使用通信使用netty。 支持先进的Redis功能,如Sentinel,群集,管道传输,自动重新连接和Redis数据模型。
下面分享来自网易后端工程师的Lettuce的使用心得~
自己整理的Java架构学习视频和大厂项目底层知识点,需要的同学欢迎私信我【资料】发给你~一起学习进步!
Lettuce在Spring boot中的配置
@Bean(name="clusterRedisURI") RedisURI clusterRedisURI(){ return RedisURI.builder().withHost("xxx").withPort(6954).build(); } @Bean ClusterClientOptions clusterClientOptions(){ return ClusterClientOptions.builder().autoReconnect(true).maxRedirects(1024).build(); } @Bean RedisClusterClient redisClusterClient(ClientResources clientResources, ClusterClientOptions clusterClientOptions, RedisURI clusterRedisURI){ RedisClusterClient redisClusterClient= RedisClusterClient.create(clientResources,clusterRedisURI); redisClusterClient.setOptions(clusterClientOptions); return redisClusterClient; } @Bean(destroyMethod = "close") StatefulRedisClusterConnection statefulRedisClusterConnection(RedisClusterClient redisClusterClient){ return redisClusterClient.connect(); }
基本的使用方式
@Bean(name="clusterRedisURI") RedisURI clusterRedisURI(){ return RedisURI.builder().withHost("xxx").withPort(6954).build(); } @Bean ClusterClientOptions clusterClientOptions(){ return ClusterClientOptions.builder().autoReconnect(true).maxRedirects(1024).build(); } @Bean RedisClusterClient redisClusterClient(ClientResources clientResources, ClusterClientOptions clusterClientOptions, RedisURI clusterRedisURI){ RedisClusterClient redisClusterClient= RedisClusterClient.create(clientResources,clusterRedisURI); redisClusterClient.setOptions(clusterClientOptions); return redisClusterClient; } @Bean(destroyMethod = "close") StatefulRedisClusterConnection statefulRedisClusterConnection(RedisClusterClient redisClusterClient){ return redisClusterClient.connect(); }
集群模式
@Bean(name="clusterRedisURI") RedisURI clusterRedisURI(){ return RedisURI.builder().withHost("xxx").withPort(6954).build(); } @Bean ClusterClientOptions clusterClientOptions(){ return ClusterClientOptions.builder().autoReconnect(true).maxRedirects(1024).build(); } @Bean RedisClusterClient redisClusterClient(ClientResources clientResources, ClusterClientOptions clusterClientOptions, RedisURI clusterRedisURI){ RedisClusterClient redisClusterClient= RedisClusterClient.create(clientResources,clusterRedisURI); redisClusterClient.setOptions(clusterClientOptions); return redisClusterClient; } @Bean(destroyMethod = "close") StatefulRedisClusterConnection statefulRedisClusterConnection(RedisClusterClient redisClusterClient){ return redisClusterClient.connect(); }
客户端订阅事件
客户端使用事件总线传输运行期间产生的事件;EventBus可以从客户端资源进行配置和获取,并用于客户端和自定义事件。
如下事件可以被客户端发送:
- 连接事件
- 测量事件 (Lettuce命令延迟测量(CommandLatency))
- 集群拓扑事件
订阅所有事件,并将事件输出到控制台
client.getResources().eventBus().get().subscribe(e -> { System.out.println("client 订阅事件: " + e); });
输出到内容有:
client 订阅事件: ConnectionActivatedEvent [/xx:49910 -> /xx:6008]client 订阅事件: ConnectionActivatedEvent [/xx:49911 -> /xx:6018]client 订阅事件: ConnectedEvent [/xx:49912 -> /xx:6018]
发布事件
用户除了可以通过事件总线订阅事件外还可以通过事件总线发布自定义事件
eventBus.publish(new Event() { @Override public String toString() { return "自定义事件"; } });
订阅到到内容如下:
client 订阅事件: 自定义事件
读写分离
lettuce master/slave模式支持读写分离,下面看看具体使用方式,只需要指定ReadFrom就可以了
@Bean(destroyMethod = "close") StatefulRedisMasterSlaveConnection statefulRedisMasterSlaveConnection(RedisClient redisClient, RedisURI redisURI) { StatefulRedisMasterSlaveConnection connection = MasterSlave.connect(redisClient, new Utf8StringCodec(), redisURI); connection.setReadFrom(ReadFrom.NEAREST); return connection; }}
ReadFrom可选参数以及含义:
参数含义
MASTER
从master节点读取
SLAVE
从slave节点读取
MASTER_PREFERRED
从master节点读取,如果master节点不可以则从slave节点读取
SLAVE_PREFERRED
从slave节点读取,如果slave节点不可用则倒退到master节点读取
NEAREST
从最近到节点读取
下面看看源码是如何实现读写分离的,
//根据意图获取连接 public StatefulRedisConnection getConnection(Intent intent) { if (debugEnabled) { logger.debug("getConnection(" + intent + ")"); } //如果readFrom不为null且是READ if (readFrom != null && intent == Intent.READ) { //根据readFrom配置从已知节点中选择可用节点描述 List selection = readFrom.select(new ReadFrom.Nodes() { @Override public List getNodes() { return knownNodes; } @Override public Iterator iterator() { return knownNodes.iterator(); } }); //如果可选择节点集合为空则抛出异常 if (selection.isEmpty()) { throw new RedisException(String.format("Cannot determine a node to read (Known nodes: %s) with setting %s", knownNodes, readFrom)); } try { //遍历所有可用节点 for (RedisNodeDescription redisNodeDescription : selection) { //获取节点连接 StatefulRedisConnection readerCandidate = getConnection(redisNodeDescription); //如果节点连接不是打开到连接则继续查找下一个连接 if (!readerCandidate.isOpen()) { continue; } //返回可用连接 return readerCandidate; } //如果没有找到可用连接,默认返回第一个 return getConnection(selection.get(0)); } catch (RuntimeException e) { throw new RedisException(e); } } //如果没有配置readFrom或者不是READ 则返回master连接 return getConnection(getMaster()); }
自定义负载均衡
通过上文的读写分离实现代码可以发现,只需要readFrom select方法每次返回的list都是随机无序的就可以实现随机的负载均衡
public class Sharded< C extends StatefulRedisConnection,V> { private TreeMap nodes; private final Hashing algo = Hashing.MURMUR_HASH; private final Map resources = new LinkedHashMap<>(); private RedisClient redisClient; private String password; private Set sentinels; private RedisCodec codec; public Sharded(List masters, RedisClient redisClient, String password, Set sentinels, RedisCodec codec) { this.redisClient = redisClient; this.password = password; this.sentinels = sentinels; this.codec = codec; initialize(masters); } private void initialize(List masters) { nodes = new TreeMap<>(); for (int i = 0; i != masters.size(); ++i) { final String master = masters.get(i); for (int n = 0; n < 160; n++) { nodes.put(this.algo.hash("SHARD-" + i + "-NODE-" + n), master); } RedisURI.Builder builder = RedisURI.builder(); for (HostAndPort hostAndPort : sentinels) { builder.withSentinel(hostAndPort.getHostText(), hostAndPort.getPort()); } RedisURI redisURI = builder.withPassword(password).withSentinelMasterId(master).build(); resources.put(master, MasterSlave.connect(redisClient, codec, redisURI)); } } public StatefulRedisConnection getConnectionBy(String key) { return resources.get(getShardInfo(SafeEncoder.encode(key))); } public Collection getAllConnection(){ return Collections.unmodifiableCollection(resources.values()); } public String getShardInfo(byte[] key) { SortedMap tail = nodes.tailMap(algo.hash(key)); if (tail.isEmpty()) { return nodes.get(nodes.firstKey()); } return tail.get(tail.firstKey()); } public void close(){ for(StatefulRedisConnection connection: getAllConnection()){ connection.close(); } } private static class SafeEncoder { static byte[] encode(final String str) { try { if (str == null) { throw new IllegalArgumentException("value sent to redis cannot be null"); } return str.getBytes("UTF-8"); } catch (UnsupportedEncodingException e) { throw new RuntimeException(e); } } } private interface Hashing { Hashing MURMUR_HASH = new MurmurHash(); long hash(String key); long hash(byte[] key); } private static class MurmurHash implements Hashing { static long hash64A(byte[] data, int seed) { return hash64A(ByteBuffer.wrap(data), seed); } static long hash64A(ByteBuffer buf, int seed) { ByteOrder byteOrder = buf.order(); buf.order(ByteOrder.LITTLE_ENDIAN); long m = 0xc6a4a7935bd1e995L; int r = 47; long h = seed ^ (buf.remaining() * m); long k; while (buf.remaining() >= 8) { k = buf.getLong(); k *= m; k ^= k >>> r; k *= m; h ^= k; h *= m; } if (buf.remaining() > 0) { ByteBuffer finish = ByteBuffer.allocate(8).order(ByteOrder.LITTLE_ENDIAN); // for big-endian version, do this first: // finish.position(8-buf.remaining()); finish.put(buf).rewind(); h ^= finish.getLong(); h *= m; } h ^= h >>> r; h *= m; h ^= h >>> r; buf.order(byteOrder); return h; } public long hash(byte[] key) { return hash64A(key, 0x1234ABCD); } public long hash(String key) { return hash(SafeEncoder.encode(key)); } } }
来源:网易工程师--张伟
有任何问题欢迎留言交流~
整理总结不易,如果觉得这篇文章有意思的话,欢迎转发、收藏,给我一些鼓励~
有想看的内容或者建议,敬请留言!
最近利用空余时间整理了一些精选Java架构学习视频和大厂项目底层知识点,需要的同学欢迎私信我发给你~一起学习进步!有任何问题也欢迎交流~
Java日记本,每日存档超实用的技术干货学习笔记,每天陪你前进一点点~