Zookeeper简介
Zookeeper的数据模型
- 层次化的目录结构,命名符合常规文件系统规范
- 每个节点在zookeeper中叫做znode,并且有一个唯一的路径标识
- 节点znode可以包含数据和子节点,但是EPHEMERAL类型的节点不能有子节点
- Znode中的互刷可以有多个版本,比如某一个路径下存在多个数据版本,那么查询这个路径下的数据就需要带上版本号
- 客户端应用可以在节点上设置监视器
- 节点不支持部分读写,而是一次性完整读写
Zookeeper的节点
- Znode有两种类型,短暂的(ephemeral)和持久节点(persistent)
- Znode的类型在创建时确定并且不能修改
- 短暂的Znode的客户端会话结束,zookeeper会将该短暂znode删除,短暂znode不可以有子节点
- 持久Znode不依赖客户端会话,只有当客户端明确要删除改Znode时候才会被删掉
- Znode有四种形式的目录节点:
- persistent 是永久节点
- persistent_sequential 是永久有序节点
- ephemeral 是临时节点
- ephemeral_sequential 是临时有序节点
Zookeeper的角色
- 领导者(leader),负责进行投票和发起决议,更新系统状态
- 学习者(learner),包括跟随者(follower)和观察者(observer),follower用于接收客户端请求并向客户端返回结果,在选主过程中参与投票
- 观察者(Observer),可以接受客户端连接,将写请求转发给leader,但observer不参加投票过程,只同步leader的状态,observer的目的是为了扩展系统,提高读取速度。
- 客户端(client),请求发起方:
Zookeeper的顺序号
- 创建Znode时设置顺序标识,znode名称后面会附加一个值
- 顺序号是一个单递增的计数器,由父节点维护
- 在分布式系统中,顺序号可以被用于所有事件进行全局排序,这样客户端可以通过顺序号推断事件的顺序
Zookeeper的读写机制
- zookeeper是一个由多个server组成的集群
- 一个leader,多个follower
- 每个server保存一份数据副本
- 全局数据一致
- 分布式读写
- 更新请求转发由leader实施
Zookeeper的保证
- 更新请求顺序进行,来自同一个client的更新请求按其发送顺序依次执行
- 数据更新原子性,依次数据更新要么成功,要么失败
- 全局唯一数据视图,client无论链接到哪个server,数据视图都是一致的
- 实时性,在一定事件范围内,client能督导的最新数据
Zookeeper的api接口
- String create(String path,byte[] data, List acl, CreateMode createMode):
- 创建一个给定目录节点的path,并给他设置数据,CreateMode标识有四种形式目录,分别是:
- PERSISTENT:持久化目录节点,这个目录节点存储的数据不会丢失;
- PERSISTEN_SEQUENTIAL:顺序自动编号的目录节点,这种节点会根据当前已经存在的节点数自动加1,然后返回给客户端已经成功创建的目录节点名
- EPHEMERAL:临时目录节点,一旦创建这个节点的客户端与服务器断开,也就是session超时,这种节点会被自动删除
- EPHEMERAL_SEQUENTIAL:临时自动编号节点
- Stat exists(String path, boolean watch):
- 判断某个path是否存在,并设置是否监控这个目录节点,这里的watcher是在创建zookeeper实例时指定的watcher,exists方法还有一个重载方法,可以执行特定的watcher
- Stat exists(String path, Watcher watcher)
- 重载方法,这里给某个目录节点设置特定的watcher,watcher在zookeeper是一个核心功能,watcher可以监控目录加点的数据变化以及子目录的变化,一旦这些状态发生变化,服务器就会通知所有设置在这个目录节点上的watcher,从而每个客户端都很快知道他所关注的节点的状态发生变化,而做出相应的反应
…
- 重载方法,这里给某个目录节点设置特定的watcher,watcher在zookeeper是一个核心功能,watcher可以监控目录加点的数据变化以及子目录的变化,一旦这些状态发生变化,服务器就会通知所有设置在这个目录节点上的watcher,从而每个客户端都很快知道他所关注的节点的状态发生变化,而做出相应的反应
观察(watcher)
- watcher在Zookeeper中是一个核心功能,watcher可以监控目录节点的数据变化以及子目录的变化,一旦这些状态发生变化,服务器就会通知所有设置在这个目录节点上的 Watcher,从而每个客户端都很快知道它所关注的目录节点的状态发生变化,从而做出相应反应(一个节点可以设置多个watcher,也就是每个客户端都可以在一个节点上设置观察)
- 可以设置观察的操作:exists,getChildren, getData
- 可以触发观察操作:create, delete,setData
写操作与zookeeper内部事件的对应关系
写操作与watcher的对应关系
每个znode被创建时候都会带有一个ACL列表,用于决定谁可以对他执行何种操作
ACL访问控制列表(Access Control List, ACL)
- 身份验证模式有三种:
- digest:用户名密码
- host:通过客户端的主机名来识别客户端
- ip:通过客户端的ip来识别客户端
- new ACL(Perms.READ, new Id(“host”, “example.com”)); 这个ACL对应身份验证模式是host,符合改模式的身份是exmple.com,权限组合是:READ
Znode的节点状态
Zookeeper工作原理
- zookeeper的核心是原子广播,这个机制保证了各个server之间的同步。实现这个机制的协议叫Zab协议。Zab协议有两种模式,他们分别是恢复模式和广播模式。当服务启动或者在领导者崩溃后,Zab就进入恢复模式,当领导者被选举出来,且大多数server的完成了和leader的状态同步后,恢复模式就结束了。状态同步保证了leader和server具有相同的系统状态。
- 一旦leader已经和多数follower进行了状态同步后,他就可以开始广播消息,即进入广播状态。这时候,当一个server加入zookeeper服务中,他会在恢复模式下启动,发现leader,并和leader进行状态同步。待到同步结束,他也参与消息广播。Zookeeper服务一直维持在broadcast状态,直到崩溃或者leader失去了大部分followers支持。
- 广播模式需要保证proposal被按顺序处理,因此ZK采用了递增的事务id号码(zxid)来保证,所有的提议(proposal)都被踢出的时候加上了zxid。实现中zxid是一个64位的数字,它高32位是epoch用来标识leader关系是否改变,每次一个leader被选出来,他都会有一个新的epoch,低32位是个递增计数
- 当leader崩溃或者leader失去大多数follower,这时候zk进入恢复模式,恢复模式需要重新选举出一个新的leader,让所有server都恢复到一个正确的状态。
Leader选举
- 每个server启动以后都询问其他Server他要投票给谁
- 对于其他server的询问,server每次根据自己的状态都回复自己推荐的leader的id和上一次处理事务的zxid(系统启动时每个server都会推荐自己)
- 收到所有Server回复后,就计算出zxid最大的那个Server,并将这个Server相关信息设置成下一次要投票的Server
- 计算这过程中获得票数最大的server为胜者,如果获胜者的票数超过半数,则该server被选为leader。否则,继续这个过程,知道leader被选出来
- leader就会开始邓艾server链接
- Follower链接leader将最大的zxid发送给leader
- Leader根据fillower的zxid确定同步点
- 完成同步后通知follow,已经成为uptodate状态
- Follower收到uptodate消息后,又可以重新接受client的请求进行服务了
应用场景1-统一命名服务
- 分布式应用中,通常需要有一套完整的命名规则,既能够产生唯一的名称又便于人识别和记住,通常情况下用树形结构是一个理想的选择,树形的名称结构是一个有层次的目录结构,即对人友好又不会重复。
- Name Service是Zookeeper内置的功能,只要调用ZookeeperApi就能实现,如调用create接口就可以很容易创建一个目录节点从而得到一个全局唯一的path,这个path就可以作为一个名称。
- 阿里的分布式服务框架Dubbo中使用Zookeeper来作其命名服务,维护全局的服务地址列表:
- 服务提供者在启动时候,想ZK上指定节点/dubbo/${serviceName}/providers 目录下写入自己的URL地址,这个操作就完成了服务的发布。
- 服务消费者启动的时候,订阅/dubbo/serviceName/providers目录下的提供者URL地址,并向/dubbo/{serviceName}/consumer目录下写入自己的URL地址
- 注意:所有想ZK注册的地址都是临时节点,这样就能够保证服务提供者和消费者能够自动感应资源的变化
- 另外Dubbo还有针对服务力度的监控,方法是订阅/dubbo/${serviceName}目录下所有提供者和消费者的信息
应用场景2-配置管理
- 配置管理在分布式应用环境中很常见,例如同一个应用系统需要多台PCServer运行,但是他们运行的应用系统的某些配置项是相同的,如果需要修改这些相同的配置,那么必须同时修改每台云溪这个应用系统的PCServer,这样就非常麻烦而且容易出错
- 将配置信息保存在Zookeeper的某个目录节点中,然后将所有需要修改的应用机器监控配置信息的状态,一旦配置信息变化,每台机器就会收到Zookeeper的通知,然后从Zookeeper获取新的配置信息应用到系统中。
应用场景3-集群管理
- Zookeeper能够很容易的实现集群管理的功能,如有多台Server组成一个服务器集群,那么必须要有一个总管知道当前集群中每一台服务器的服务状态,一旦有集群不能提供服务,集群中其他集群必须知道,从而做出调整重新分配服务策略。同当增加集群的服务能力时,就会增加一台或者多台Server,同样也需要让总管知道
- Zookeeper不仅能维护当前集群中集群的服务状态,而且能够选出一个总管,让这个总管来管理集群,这就是Zookeeper的另一个功能Leader Election。
- 他们的实现方式都是在Zookeeper上创建一个ephemeral类型的目录节点,然后每个Server在他们创建目录节点的父目录节点上调用getChildren(String Path,boolean watch)方法并设置watch为true,由于是Ephemeral目录节点,当创建他的Server死去,这个目录节点也随之被删除,所以Children将会变化,这时候getChildren上的watch将会被调用,所有其他Server就知道已经有某一台Server死去,新增加Server也是同样的道理。
- Zookeeper如何实现Leader Election,也就是选出一个Master Server,和签名的一样每台Server 创建一个Ephemeral目录节点,不同的是他还是一个SEQUENTIAL目录节点,是一个有序的临时节点,我们可以选出当前最小编号的Server是Master,加入这个最小编号Server死去,由于是Ephemeral节点,死去的Server对应的节点也会被删掉,所以当前的节点列表中会出现另外一个最小编号节点,我们就选这个为当前Master,这就实现动态选择Master,避免了传统意义上单Master容易出现单节点故障问题。
zk.create("/testRootPath/testChildPath1","1".getBytes(), Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);zk.create(“/testRootPath/testChildPath2”,“2”.getBytes(), Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);zk.create("/testRootPath/testChildPath3","3".getBytes(), Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);zk.create("/testRootPath/testChildPath4","4".getBytes(), Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);System.out.println(zk.getChildren("/testRootPath", false));
//输出[testChildPath10000000000, testChildPath20000000001, testChildPath40000000003, testChildPath30000000002]
- 规定最小编号的为Master,所以我们水Servers节点做监控的时候,得到服务器列表,只要所有集群机器逻辑认为最小编号节点为master,那么master就被选出
- 这个master宕机znode就删除,新列表推送新的列表,在选出最小编号master,这样就动态master选举
- LeaderElection关键代码
void findLeader() throws InterruptedException { byte[] leader = null; try { leader = zk.getData(root + "/leader", true, null); } catch (Exception e) { logger.error(e); } if (leader != null) { following(); } else { String newLeader = null; try { byte[] localhost = InetAddress.getLocalHost().getAddress(); newLeader = zk.create(root + "/leader", localhost, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); } catch (Exception e) { logger.error(e); } if (newLeader != null) { leading(); } else { mutex.wait(); } } }
应用场景4-共享锁
- 共享锁在同一个进程中很容易实现,但是在跨JVM,不同Server之间就不好实现。Zookeeper却很容易实现这个功能,实现方式也是需要获得锁的Server创建一个EPHEMERAL_SEQUENTIAL目录节点(有序临时节点),然后调用getChildren方法获取当前目录节点列表最小的目录节点是不是就是自己创建的目录节点,如果正是自己创建的,那么他就获得这个锁,如果不是那么它就调用exists(String path,boolean watch)方法并且监控Zookeeper生目录节点列表的变化,一直到自己创建的节点是列表中最小编号的目录节点,从而获得锁,释放锁很简单,只要删除签名它自己所创建的目录节点就行了
- 同步锁关键代码如下:
void getLock() throws KeeperException, InterruptedException{ List<String> list = zk.getChildren(root, false); String[] nodes = list.toArray(new String[list.size()]); Arrays.sort(nodes); if(myZnode.equals(root+"/"+nodes[0])){ doAction(); } else{ waitForLock(nodes[0]); } } void waitForLock(String lower) throws InterruptedException, KeeperException {Stat stat = zk.exists(root + "/" + lower,true); if(stat != null){ mutex.wait(); } else{ getLock(); } }
应用场景5-队列管理
- Zookeeper可以处理两种类型的队列:
- 当一个队列的成员都聚齐时,这个队列才可用,否则一直等待所有成员到大,这种是同步队列
- 队列按照FIFO方式进行入队和出队操作,例如,实现生产者消费者模型。
- 创建一个父目录/synchronizing,每个成员都监控目录/synchronizing/start是否存在,然后每个成员都加入这个队列(创建/synchronizing/member_i的临时目录节点),然后每个成员获取/synchronizing目录的所有目录节点,判断i的值是否已经是成员的个数,如果小于成员的个数等待/synchronizing/start的出现,如果相等就创建/synchronizing/start
- 同步队列代码:
void addQueue() throws KeeperException, InterruptedException{ zk.exists(root + "/start",true); zk.create(root + "/" + name, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); synchronized (mutex) { List<String> list = zk.getChildren(root, false); if (list.size() < size) { mutex.wait(); } else { zk.create(root + "/start", new byte[0], Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT); } }
}
- 当队列没有满进入wait,然后一直等待watch的通知,watch的代码如下
public void process(WatchedEvent event) { if(event.getPath().equals(root + "/start") &&event.getType() == Event.EventType.NodeCreated){ System.out.println("得到通知"); super.process(event); doAction(); } }
- FIFO队列用Zookeeper实现思路:
- 在特定目录下创建SEQUENTIAL类型(有序类型)的目录/queue_i,这样就能保证所有成员入队列都有编号,出队了时通过getChildren方法可以返回当前所有的队列中的元素,然后消费其中最小的一个,这样就保证FIFO
- 生产者代码:
boolean produce(int i) throws KeeperException, InterruptedException{ ByteBuffer b = ByteBuffer.allocate(4); byte[] value; b.putInt(i); value = b.array(); zk.create(root + "/element", value, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL); return true;
}
- 消费者代码
int consume() throws KeeperException, InterruptedException{ int retvalue = -1; Stat stat = null; while (true) { synchronized (mutex) { List<String> list = zk.getChildren(root, true); if (list.size() == 0) { mutex.wait(); } else { Integer min = new Integer(list.get(0).substring(7)); for(String s : list){ Integer tempValue = new Integer(s.substring(7)); if(tempValue < min) min = tempValue; } byte[] b = zk.getData(root + "/element" + min,false, stat); zk.delete(root + "/element" + min, 0); ByteBuffer buffer = ByteBuffer.wrap(b); retvalue = buffer.getInt(); return retvalue; } } }
}
应用场景6-负载均衡
- 这里的负载均衡是指的软负载均衡。分布式系统中,都是多服务集群部署。服务消费者需要在集群中选择一个来执行相关业务,就去比较经典的生产者消费者问题。
- 消息中间件中发布者和订阅者的负载均衡,linkedin开源的kafkaMQ和阿里开源的metaq都是zookeeper来做负载均衡,这里以metaq为案例如下:
- 生产者负载均衡:metaq发送消息时候,生产者在发送消息的时候必须选择一台broker上的一个分区来发送消息,因此metaq在运行过程中会吧broker和对应的分区信息全部注册到ZK指定节点上,默认的策略是一个依次轮询的过程,生产者在通过ZK获取分区列表后,会按照brokerid和partition的顺序排列组织成一个有序的分区列表,发送的时候,按照从头到位循环往复的方式选择一个分区来发送消息。
- 消费负载均衡:在消费过程中,一个消费者会消费一个或多个分区中的消息,但是一个分区只会由一个消费者来消费。MetaQ的消费策略是:
- 每个分区针对同一个group只挂载一个消费者
- 如果同一个group的消费者数目大于分区数目,则有部分消费者需要额外承担消费任务
- 如果同一个group的消费者数目小于分区数目,则又部分消费者需要额外承担消费任务。
- 在某个消费者故障或者重启等情况下,其他消费者会感知到这一个变化(通过Zookeeper watch 消费者列表),然后重新进行负载均衡,保证所有分区都有消费者进行消费。
总结
- Zookeeper作为Hadoop项目中子项目,是Hadoop集群管理一个必不可少的模块,主要用来控制集群中数量,如他管理Hadoop集群中NameNode,还有Hbase中Master Election,Server之间状态同步等。
- Zookeeper提供了一套很好的分布式集群管理机制,就是他这种基于层次型的目录树的数据结构,并对树中节点进行有效管理,从而可以设计出多种多样的分布式数据管理模型。
下一篇Zookeeper理解—ZAB协议