文章目录
- 主从架构介绍
- zookeeper
- 利用ZK实现主从架构
主从架构介绍
主从服务架构是一种常见的分布式系统设计模式,常用于提高系统的性能、可用性和扩展性。在这种架构中,系统中的节点被分为两类:主节点(Master)和从节点(Slave)。
zookeeper
zookeeper有以下几个特点:
1.集群部署:一般是3~5台机器组成一个集群,每台机器都在内存保存了zk的全部数据,机器之间互相通信同步数据,客户端连接任何一台机器都可以。
2.顺序一致性:所有的写请求都是有序的;集群中只有leader机器可以写,所有机器都可以读,所有写请求都会分配一个zk集群全局的唯一递增编号:zxid,用来保证各种客户端发起的写请求都是有顺序的。
3.原子性:要么全部机器成功,要么全部机器都不成功。
4.数据一致性:无论客户端连接到哪台节点,读取到的数据都是一致的;leader收到了写请求之后都会同步给其他机器,保证数据的强一致,你连接到任何一台zk机器看到的数据都是一致的。
5.高可用:如果某台机器宕机,会保证数据不丢失。集群中挂掉不超过一半的机器,都能保证集群可用。比如3台机器可以挂1台,5台机器可以挂2台。
6.实时性:一旦数据发生变更,其他节点会实时感知到。
7.高性能:每台zk机器都在内存维护数据,所以zk集群绝对是高并发高性能的,如果将zk部署在高配置物理机上,一个3台机器的zk集群抗下每秒几万请求是没有问题的。
8.高并发:高性能决定的,主要是基于纯内存数据结构来处理,并发能力是很高的,只有一台机器进行写,但是高配置的物理机,比如16核32G,可以支撑几万的写入QPS。所有机器都可以读,选用3台高配机器的话,可以支撑十万+的QPS。
利用ZK实现主从架构
导入依赖
<properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><java.version>1.8</java.version><zookeeper.version>3.4.14</zookeeper.version><curator-framework.version>2.12.0</curator-framework.version><curator-recipes.version>2.12.0</curator-recipes.version><ssdb.version>9.4</ssdb.version><jodatime.version>2.10</jodatime.version><binlog.version>0.21.0</binlog.version><disruptor.version>3.4.2</disruptor.version></properties><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId><exclusions><!-- 去除旧log依赖 --><exclusion><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-logging</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-quartz</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-log4j2 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-log4j2</artifactId></dependency><!-- log4j2异步日志需要加载disruptor-3.0.0.jar或者更高的版本 --><dependency><groupId>com.lmax</groupId><artifactId>disruptor</artifactId><version>3.4.2</version></dependency><!-- zookeeper --><dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>${zookeeper.version}</version></dependency><dependency><groupId>org.apache.curator</groupId><artifactId>curator-framework</artifactId><version>${curator-framework.version}</version></dependency><dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>${curator-recipes.version}</version></dependency>
zk客户端:
public class ZookeeperClient {/*** 客户端*/private CuratorFramework client;/*** Leader选举*/private LeaderLatch leader;public ZookeeperClient(LeaderLatch leader,CuratorFramework client){this.client = client;this.leader = leader;}/*** 启动客户端* @throws Exception*/public void startZKClient() throws Exception {client.start();leader.start();}/*** 关闭客户端* @throws Exception*/public void closeZKClient() throws Exception {leader.close();client.close();}/*** 判断是否变为领导者* @return*/public boolean hasLeadership(){return leader.hasLeadership();}public CuratorFramework getClient() {return client;}public void setClient(CuratorFramework client) {this.client = client;}public LeaderLatch getLeader() {return leader;}public void setLeader(LeaderLatch leader) {this.leader = leader;}
public class ZookeeperClientInfo {/*** 是否是leader 默认为false*/public static boolean isLeader = false;/*** 客户端ID*/private String id;/*** 连接信息字符串*/private String connectString;/*** 节点路径*/private String path;/*** 连接超时时间*/private Integer connectTimeOut;/*** 最大连接次数*/private Integer maxRetries;/*** 重连休眠时间*/private Integer retrySleepTime;public static boolean isLeader() {return isLeader;}public static void setIsLeader(boolean isLeader) {ZookeeperClientInfo.isLeader = isLeader;}public String getId() {return id;}public void setId(String id) {this.id = id;}public String getConnectString() {return connectString == null ? null : connectString.replaceAll("//s+", "");}public void setConnectString(String connectString) {this.connectString = connectString;}public String getPath() {return path;}public void setPath(String path) {this.path = path;}public Integer getConnectTimeOut() {return connectTimeOut;}public void setConnectTimeOut(Integer connectTimeOut) {this.connectTimeOut = connectTimeOut;}public Integer getMaxRetries() {return maxRetries;}public void setMaxRetries(Integer maxRetries) {this.maxRetries = maxRetries;}@Overridepublic String toString() {StringBuilder sb = new StringBuilder();sb.append("ZookeeperClientInfo{ ").append("id=").append(id).append(",isLeader=").append(isLeader).append(", connectString=").append(connectString).append(", path=").append(path).append(",connectTimeOut=").append(connectTimeOut).append(", maxRetries=").append(maxRetries).append(", retrySleepTime=").append(retrySleepTime).append(" }");return sb.toString();}
}
监听器,来监听主节点变更,实现主要是继承LeaderLatchListener类
@Component
public class ZookeeperClientListener implements LeaderLatchListener {private final static Logger log = LoggerFactory.getLogger(ZookeeperClientListener.class);@Autowiredprivate ChangeLeaderService changeLeaderService;/*** 将本服务达成jar包,部署到2台服务器上。启动两个服务。* 1、第一台服务(机器1)启动后抢到leader,会进入到该方法中。另外一台服务(机器2)会进入到notLeader()中。* 2、当机器1宕机后,连接断开后zookeeper会删除临时节点。机器2根据选举会成为leader,成为leader后会进入到isLeader()中* 然后在changeLeaderService.taskExecut() 再次将定时任务做补偿处理。*/@Overridepublic void isLeader() {log.error("{},当前服务已变为leader,将从事业务消费======>>>>", JodaDateUtil.date2String(new Date()));ZookeeperClientInfo.isLeader = true;// 切换机器后,继续执行上一个机器未完成的定时任务。changeLeaderService.taskExecut();}@Overridepublic void notLeader() {log.error("{},当前服务已退出leader,不再从事消费业务=====>>>", JodaDateUtil.date2String(new Date()));ZookeeperClientInfo.isLeader = false;}
}
当服务主从切换时的补偿措施:
public interface ChangeLeaderService {/*** 主从服务切换时,手动触发分配到leader机器上的定时任务中的业务逻辑*/void taskExecut();
}
@Service
public class ChangeLeaderServiceImpl implements ChangeLeaderService {private final static Logger log = LoggerFactory.getLogger(ChangeLeaderServiceImpl.class);
;@Autowiredprivate SSDB ssdb;@Overridepublic void taskExecut() {//TODO 从ssdb 中查询出定时任务的标识,是否正常执行完成,未完成的话,在这里再触发执行。log.info("===ChangeLeaderServiceImpl===taskExecut()===");/*Response response = ssdb.get(Constant.TEST_STATE);if (response.ok() && response.datas.size() > 0) {int tenantState = byteArrayToInt(response.datas.get(0));if (Constant.TASK_EXECUTING == tenantState) { //当前任务未完成,接着完成// service.do(); 伪代码log.info("service.do();");ssdb.set(Constant.TEST_STATE, Constant.TASK_END);}}*/}/*** byte数组转int* @param b* @return*/private int byteArrayToInt(byte[] b){String str = byteArrayToString(b);return StringToInt(str);}/*** byte数组转string* @param b* @return*/private String byteArrayToString(byte[] b) {if (null == b || b.length == 0) {return "";}return new String(b);}/*** string转int* @param str* @return*/private int StringToInt(String str){if (StringUtils.isEmpty(str)){return 0;}return Integer.parseInt(str);}
}
zk的配置信息
@Component
public class ZookeeperConfig {/*** zk 地址*/@Value("${spring.slaveof.zk.addr}")private String addr;/*** 重试策略----最大重试次数*/@Value("${spring.slaveof.zk.max}")private int max;/*** 重试策略-----sleepTime*/@Value("${spring.slaveof.zk.sleep}")private int sleepTime;/*** 连接超时时间*/@Value("${spring.slaveof.zk.connection}")private int connectionTime;/*** 会话超时时间*/@Value("${spring.slaveof.zk.session}")private int sessionTime;public String getAddr() {return addr;}public int getMax() {return max;}public int getSleepTime() {return sleepTime;}public int getConnectionTime() {return connectionTime;}public int getSessionTime() {return sessionTime;}
}
服务启动限制性的类,了解ApplicationRunner,SpringBoot项目启动时,若想在启动之后直接执行某一段代码,就可以用 ApplicationRunner这个接口,并实现接口里面的run(ApplicationArguments args)方法,方法中写上自己的代码逻辑。也就是:spring容器启动完成之后,就会紧接着执行这个接口实现类的run方法。
@Component
public class ZkDemoApplicationRunner implements ApplicationRunner {private final static Logger log = LoggerFactory.getLogger(ZkDemoApplicationRunner.class);@Autowiredprivate ZookeeperClientListener zkClientListener;@Autowiredprivate ZookeeperConfig zookeeperConfig;@Overridepublic void run(ApplicationArguments applicationArguments) throws Exception {log.info("====================>>>>>>>>启动执行zk>>>>>>>>==================");log.error("===>>>>>>>>zookeeper: addr:{}, sleepTime:{}, max:{}, connectionTime:{}=====", zookeeperConfig.getAddr(), zookeeperConfig.getSleepTime(), zookeeperConfig.getMax(), zookeeperConfig.getConnectionTime());CuratorFramework client = CuratorFrameworkFactory.builder().connectString(zookeeperConfig.getAddr()).retryPolicy(new ExponentialBackoffRetry(zookeeperConfig.getSleepTime(), zookeeperConfig.getMax())).connectionTimeoutMs(zookeeperConfig.getConnectionTime()).build();LeaderLatch leaderLatch = new LeaderLatch(client, "/diaoliwei", "client1", LeaderLatch.CloseMode.NOTIFY_LEADER);if (zkClientListener == null) {log.error("==================>>>>>>>>>>>>>>>>zkClientListener is null=====>>>>>>>>>>>");}leaderLatch.addListener(zkClientListener);ZookeeperClient zkClient = new ZookeeperClient(leaderLatch, client);try {zkClient.startZKClient();} catch (Exception e) {log.error("======>>>>>>zk客户端连接失败<<<<<=====error:{}===", e);return;}CuratorFrameworkState state = client.getState();if (CuratorFrameworkState.STOPPED == state) {log.error("zk客户端已关闭");return;}/*while (true) { // 测试日志用try {if(!zkClient.hasLeadership()){log.info("2当前服务不是leader");Thread.sleep(2000);log.error("error:::::::Test02 do it...>>>>>>> ");continue;} else {log.info("2当前服务是leader");}log.info("Test02 do it... ");log.error("Test02 do it...>>>>>>> ");} catch (Exception e) {log.error("Exception=====>>>>>>>>>>>>eeee:", e);}}*///log.info("======>>>>>zk客户端连接成功<<<<<<=======");}
}