Apache Curator学习笔记
Apache Curator(音标[kjʊ(ə)'reɪtə])Framework是ZooKeeper的Keeper(动物园管理员的管理员)。它是一个Java库,提供了比ZooKeeper更加高层的API,更加易用、可靠。Curator的推荐的ZooKeeper版本是 3.5+,但它也和3.4兼容。
Curator实现了自动的连接管理,当会话过期后,你需要重新创建ZooKeeper客户端,并重新设置Watcher。Curator可以透明的重新创建、重试连接。
Curator以组标识 org.apache.curator发布在Maven中心仓库,包含以下构件:
构件 | 说明 |
curator-recipes | 对于大部分用户来说,只需要依赖此构件。包含所有recipes |
curator-async | 异步DSL |
curator-framework | Curator框架的高层API |
curator-client | 客户端,代理ZooKeeper类 |
curator-x-discovery | 基于 Curator框架的服务发现实现 |
curator-x-discovery-server | 用于Curator发现的RESTful服务器 |
当前版本是4.0.0,通常你需要引用下面这个构件。此构件对curator-framework、curator-client有依赖:
1 2 3 4 5 |
<dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>4.0.0</version> </dependency> |
Curator内置了重连逻辑,因此你不再需要手工Watch和管理了:
1 2 3 4 5 6 7 |
String zookeeperConnectionString = "172.21.0.1:2181,172.21.0.2:2181,172.21.0.3:2181"; // 重试策略,如果连接不上ZooKeeper集群如何重连 RetryPolicy retryPolicy = new ExponentialBackoffRetry( 1000, 3 ); CuratorFramework client = CuratorFrameworkFactory.newClient( zookeeperConnectionString, retryPolicy ); client.start(); // 创建znode client.create().forPath( "/tmp", EMPTY ); |
方法 | 说明 | ||
create | 启动一个znode创建操作,可以调用额外方法来设置节点类型、添加Watcher,使用forPath完成操作:
|
||
delete | 启动一个删除操作,使用forPath完成操作:
|
||
checkExists | 启动一个检查znode存在性的操作,使用forPath完成操作 | ||
getData | 启动一个获取znode关联数据的操作,使用forPath完成操作:
|
||
setData | 启动一个设置znode关联数据的操作,使用forPath完成操作 | ||
getChildren | 启动获取znode子节点集合的操作,使用forPath完成操作 | ||
transactionOp | 调用以生成供transaction()使用的操作条目 | ||
transaction | 原子的提交一系列的操作条目 | ||
getACL | 启动一个获取znode访问控制列表的操作,使用forPath完成操作 | ||
setACL | 启动一个设置znode访问控制列表的操作,使用forPath完成操作 |
Recipes封装了很多高层语义,例如互斥锁、Leader选举,你不再需要手工实现了:
1 2 3 4 5 6 7 8 9 10 |
// 分布式互斥锁用法: InterProcessMutex lock = new InterProcessMutex(client, lockPath); if ( lock.acquire(maxWait, waitUnit) ) { try{ // 临界区 } finally{ lock.release(); } } |
1 2 3 4 5 6 7 8 9 10 11 |
// 在多个进程之间选择Leader LeaderSelectorListener listener = new LeaderSelectorListenerAdapter() { public void takeLeadership(CuratorFramework client) throws Exception { // 如果当前进程被选择为Leader则此回调被调用 // 除非当前进程想放弃Leader地位,不要退出此方法 } } LeaderSelector selector = new LeaderSelector(client, path, listener); selector.autoRequeue(); selector.start(); |
Curator提供了此接口,用于处理连接中断:
1 2 3 |
public interface ConnectionStateListener { public void stateChanged(CuratorFramework client, ConnectionState newState); } |
使用某些Recipe时,你应该通过Listenable.addListener注册此接口的实现。
状态 | 说明 |
CONNECTED | 第一次成功连接到ZooKeeper后,进入此状态。对于每个CuratorFramework对象,此状态仅出现一次 |
READONLY | 连接进入只读模式,调用CuratorFrameworkFactory.Builder.canBeReadOnly(true)后导致此状态 |
SUSPENDED | 到ZooKeeper的连接丢失 |
RECONNECTED | 丢失的连接被重新建立 |
LOST |
当Curator认为ZooKeeper会话已经过期,则进入此状态。可能的原因包括:
|
Recipes是Curator提供的高层封装。大部分Recipes都会自动创建所需要的父znode。
在分布式计算领域,多个同类型服务器(节点)通常需要指定其中一员作为Leader,例如主从复制场景下需要指定主节点(Leader)。
Leader通常通过选举产生,在选举前,任何节点均不知道Leader是谁,选举完毕后,则所有节点对Leader是谁达成共识。
Curator提供了两个Recipes来支持选举。其中LeaderSelectorListener已经在起步一章示例过。
用法示例:
1 2 3 4 5 6 7 8 9 10 11 |
String id = "websvr0"; LeaderLatch ll = new LeaderLatch( client, "/pems/websvr", id ); // 需要先启动 // 一旦启动,LeaderLatch会自动联系其它选举参与者,并随机选择一个作为Leader ll.start(); // 任何时候,你可以调用下面的接口来判断当前进程是否为Leader ll.hasLeadership(); // 一直等待,知道变为Leader ll.await(); // 把当前进程从被选举人列表中移除,如果当前是Leader则放弃此权利,其它进程会被选举为Leader ll.close(); |
LeaderLatch会添加一个ConnectionStateListener来监控连接问题:
- 当连接变为SUSPENDED/LOST状态后,当前Leader的hasLeadership()调用返回false(丢失Leader权限)
- 当断开的连接变为RECONNECTED,LeaderLatch会删除之前创建的znode,并创建新的
这是一个分布式的可重入(锁的持有者可以反复获得锁)的共享锁,保证任何时刻只能有一个客户端占有锁。
用法示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
InterProcessMutex mutex = new InterProcessMutex( client, "/mutex/resource" ); // 阻塞,直到获得锁。返回true mutex.acquire(); // 阻塞一定时间,尝试获得锁,返回true/false mutex.acquire( 1000, TimeUnit.SECONDS ); // 释放锁,仅仅当调用线程是当初获得锁的线程时,才可以释放 mutex.release(); // 使得锁可被撤销,当其它进程/线程attemptRevoke此锁时,你会得到通知 mutex.makeRevocable( forLock -> { // 得到通知后,你可以选择释放锁 forLock.release(); } ); // 尝试撤销锁 Revoker.attemptRevoke(client,"/mutex/resource" ); |
你应当设置一个ConnectionStateListener来监听SUSPENDED/LOST状态变更:
- 当连接变为SUSPENDED状态后,你无法确定当前是否仍然持有锁,除非恢复到RECONNECTED状态
- 当连接变为LOST状态后,你肯定已经丢失了锁
API类似于InterProcessMutex,对应类InterProcessSemaphoreMutex。此锁不可以重入。
这是一个可重入的读写锁,所有JVM中的所有线程共享一个分布式的临界区域。此外,该锁是“公平的” —— 每个请求者按照其请求的顺序(ZooKeeper角度)来获得锁。
读写锁维护一对相关联的锁,一个用于读,一个用于写。其中读锁可以被多个进程共同持有,而写锁则是独占的(也不允许读锁被持有)。
持有写锁者,可以继续请求读锁,反之则不行。通过获取写锁 —— 获取读锁 —— 释放写锁,可以实现锁降级。
代码示例:
1 2 3 4 |
InterProcessReadWriteLock lock = new InterProcessReadWriteLock( client, "/resource" ); // 获取两个相关的锁,然后调用InterProcessMutex的API即可 InterProcessMutex readLock = lock.readLock(); InterProcessMutex writeLock = lock.writeLock(); |
你应当设置一个ConnectionStateListener来监听SUSPENDED/LOST状态变更。
这是一个分布式的信号量。所谓信号量,是一个有限数量的资源集,进程可以租借/归还信号量。此信号量实现是“公平的” —— 每个请求者按照其请求的顺序(ZooKeeper角度)来获得信号量。
确定信号量资源数的方式有两种:
- 由SharedCountReader类提供
- 静态声明信号量数量,这要求所有参与者声明相同的数值
代码示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
// 声明一个具有10个资源的信号量 InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2( client, "/semaphore", 10 ); // 通过SharedCount指定资源数量 SharedCountReader sc = new SharedCount( client, "/sharedcount", 10 ); semaphore = new InterProcessSemaphoreV2( client, "/semaphore", sc ); // 租借一个资源,可选参数等待时间 Lease lease = semaphore.acquire(); // 租借N个资源,可选参数等待时间 Collection<Lease> leases = semaphore.acquire( 2 ); // 归还资源 lease.close(); // 或者: semaphore.returnLease( lease ); semaphore.returnAll( leases ); |
你应当设置一个ConnectionStateListener来监听SUSPENDED/LOST状态变更。
持有一系列InterProcessLock的引用,客户端要么获取所有锁,要么失败。当释放时,所有锁一起被释放。
所谓屏障,是一个同步点。每一个进程到达此点都要等待,直到所有进程都到达,则继续。
代码示例:
1 2 3 4 5 6 |
DistributedBarrier barrier = new DistributedBarrier( client, "/barrier" ); // 设置屏障,每个客户端设置一次 barrier.setBarrier(); // 等待所有客户端都到达,如果连接丢失,此方法会抛出异常 barrier.waitOnBarrier(); |
双重屏障,在协作开始之前同步,当足够数量的进程加入到屏障后,开始协作,当所有进程完毕后离开屏障。
代码示例:
1 2 3 4 5 6 7 |
// 建立一个10个资源的屏障 DistributedDoubleBarrier barrier = new DistributedDoubleBarrier( client, "/barrier", 10 ); // 进入屏障,当有10个客户端进入屏障后,阻塞解除 barrier.enter(); // 这里是协作逻辑 ... // 离开屏障,当所有客户端都尝试离开时,阻塞解除 barrier.leave(); |
如果连接丢失,则enter/leave会抛出异常。
一个共享的整数,所有客户端均看到一致性的、最新的数值。
代码示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
// 第三个参数为初始值,假设路径不存在,则自动设置为此值 SharedCount counter = new SharedCount( client, "/counter", 0 ); // 此计数器需要启动 counter.start(); // 设置计数值 counter.setCount( 10 ); // 获取计数值 counter.getCount(); counter.addListener( new SharedCountListener() { @Override public void countHasChanged( SharedCountReader sharedCount, int newCount ) throws Exception { // 计数值变化后异步回调 } @Override public void stateChanged( CuratorFramework curatorFramework, ConnectionState connectionState ) { // 连接数量变化后的异步回调 // 如果连接变为SUSPENDED状态,你必须假设计数器的值不再精确 // 如果连接变为LOST状态,则计数器永久性失效 } } ); |
Recipe | 说明 |
DistributedAtomicLong | 分布式的原子的整数,支持自增、自减、加、减等操作 |
NodeCache | 拥有监控一个znode。每当数据变更或者此节点被删除,NodeCache都会更新自己的状态,反映当前数据(如果节点被删除数据为null) |
PathChildrenCache | 用于监控一个znode。每当添加、更新、删除子节点时,PathChildrenCache都会更新自己的状态,反映最新的子节点集合、子节点数据、子节点状态 |
TreeCache | 监控一个znode的整个子树 |
PersistentNode | 尝试驻留ZooKeeper的节点,即使在连接/会话中断的情况下 |
PersistentTtlNode | 如果你想创建TTL节点,但是又不愿意手工周期性设置其数据,可以使用 |
GroupMember | 管理并缓存一组成员,用于构建集群成员列表 |
Curator提供了一个基于Java 8 Completion Stage的纯异步客户端实现。要使用此实现,添加依赖:
1 2 3 4 5 |
<dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-x-async</artifactId> <version>4.0.0</version> </dependency> |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
String zookeeperConnectionString = "172.21.0.1:2181,172.21.0.2:2181,172.21.0.3:2181"; RetryPolicy retryPolicy = new ExponentialBackoffRetry( 1000, 3 ); CuratorFramework client = CuratorFrameworkFactory.newClient( zookeeperConnectionString, retryPolicy ); client.start(); // 此包装器提供异步API AsyncCuratorFramework async = AsyncCuratorFramework.wrap( client ); // 检查znode /user是否存在 如果存在,打印状态结构 async.checkExists().forPath( "/user" ).thenAccept( stat -> LOGGER.debug( stat.toString() ) ); // 获取数据 async.getData().forPath( "/user" ).thenAccept( data -> LOGGER.debug( new String( data ) ) ); // 在最前面添加watched()调用,可以增加Watcher // 使用AsyncStage(CompletionStage子类型)的event()方法,设置Watcher的回调 async.with( WatchMode.successOnly) // 不关心连接丢失类的事件 .watched().getData().forPath( "/user" ).event().thenAccept( ev -> LOGGER.debug( ev.toString() ) ); // 同步化 async.create().forPath("/user").toCompletableFuture().get(); |
通过串行化机制,Apache Curator支持在ZooKeeper的znode中存储类型化的数据。这一功能由Modeled Curator组件负责。
MC不使用原始的字符串形式的路径,而是使用ZPath来抽象ZooKeeper路径。ZPath可以是简单的字符串,也可以包含路径变量,这些变量可以根据需要替换为动态的取值:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
// 静态路径 ZPath staticPath = ZPath.parse( "/static/path" ); // 动态路径,使用 {} 包围路径变量,路径变量又叫ID,其中可以包含任何东西,例如{ any thing}但是没有什么意义 // 路径变量(ID)总是从左向右依次解析 ZPath path = ZPath.parseWithIds( "/static/{}/{}" ); // 路径变量可以被替换,如果用于替换的对象是NodeName,则调用其nodeName()方法,否则调用toString()方法 ZPath resolvedPath = path.resolved( "path", new NodeName() { public String nodeName() { return "name"; } } ); // 输出内容:/static/path/name System.out.println( resolvedPath ); // 路径可以被部分的解析: System.out.println( path.resolved( "path" ) ); |
此类型包含对ZooKeeper路径进行操作(存取强类型对象)所需的全部元数据:
- 一个ZPath
- 用于串行化对象的串行化器
- 关于如何创建znode的选项(顺序、压缩、TTL等)
- 针对znode的ACL
- 如何删除znode(是否删除子节点)
要写入一个数据模型到znode的关联数据,参考如下代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
public static class ServerInfo implements NodeName { private String name; private String type; @Override public String nodeName() { return getName(); } } ModelSpec<ServerInfo> spec = ModelSpec.builder( ZPath.parseWithIds("/servers/{}"), JacksonModelSerializer.build(ServerInfo.class) ).build(); // 依赖异步客户端 ModeledFramework<ServerInfo> modeledClient = ModeledFramework.wrap(async, spec); ServerInfo info = new ServerInfo(); info.setType("webserver"); info.setName("tk.gmem.cc"); // 输出:/servers/hk.gmem.cc,这个路径是把模型传入ZPath而解析得到的,因为模型实现了NodeName,因此调用其nodeName()替换路径变量 modeledClient.set(info).thenAccept(path -> LOGGER.debug(path)).toCompletableFuture().get(); |
本例使用JacksonJSON作为串行化机制的实现,需要引入依赖:
1 2 3 4 5 |
<dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>LATEST</version> </dependency> |
1 2 3 4 5 6 7 8 9 10 11 12 |
ModelSpec<ServerInfo> spec = ModelSpec.builder( ZPath.parseWithIds("/servers/hk.gmem.cc"), JacksonModelSerializer.build(ServerInfo.class) ).build(); ModeledFramework<ServerInfo> modeledClient = ModeledFramework.wrap(async, spec); modeledClient.read().whenComplete((info, e) -> { if (e != null) { } else { LOGGER.debug(info.getType()); } }).toCompletableFuture().get(); |
这一功能允许你在一个事务中,执行一系列的ZooKeeper操作。你可以使用迁移来保证某个ZooKeeper子树的数据一致性。示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
// 一系列操作组成一个迁移 CuratorOp op1 = async.transactionOp().create().forPath("/parent"); CuratorOp op2 = async.transactionOp().create().forPath("/parent/one"); CuratorOp op3 = async.transactionOp().create().forPath("/parent/two"); CuratorOp op4 = async.transactionOp().create().forPath("/parent/three"); // Migration是一个函数式接口,本质上就是一系列操作 Migration migration = () -> Arrays.asList(op1, op2, op3, op4); // 迁移集可以包含若干个迁移,迁移集必须有个ID MigrationSet set = MigrationSet.build("main", Collections.singletonList(migration)); // 迁移管理器,负责执行迁移 // 迁移管理器会监控迁移集的执行状态,它只会应用那些没有应用的迁移 MigrationManager manager = new MigrationManager(client, lockPath, // 迁移管理器会锁定此路径 metaDataPath, // 存储元数据的路径 executor, // 异步执行器 lockMax // 锁定最长时间 ); manager.migrate(set).exceptionally(e -> { if (e instanceof MigrationException) { // migration checksum failed, etc. } else { // some other kind of error } return null; }); |
在SOA/分布式系统中,服务需要能够相互发现。例如,一个Web服务需要发现缓存服务。可以使用服务发现系统(Service Discovery system)来避免服务位置的硬编码。服务发现系统的功能包括:
- 允许服务注册自己,便于其它服务调用之
- 定位某种服务的但个实例
- 当服务的实例发生变化,可以发出通知
Curator服务发现功能由单独的子项目实现:
1 2 3 4 5 |
<dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-x-discovery</artifactId> <version>4.0.0</version> </dependency> |
这个类代表一个服务的实例,具有名称(同一类型服务共享)、标识符、地址、端口、可选的附加详细信息。服务实例被串行化在znode中,路径为/basePath/serviceName/id。
基于提供策略(provider strategy)对某个特定的命名服务的发现服务进行封装,所谓提供策略,决定了如何从多个服务实例中选择一个实例。内置的策略包括循环选择(Round Robin)、随机、粘性(Sticky,总选择一个实例)。
ServiceProviderBuilder作为ServiceProvider的工厂,由ServiceDiscovery提供。ServiceProviderBuilder允许你设置服务的名称以及其它可选的属性。
ServiceProvider必须在 start()之后才能正常工作,当你不再需要它以后,应该调用 close(),要获取服务实例,调用 getInstance()。当无法连接到某个实例时,你应该调用 noteError(),这样ServiceProvider会根据DownInstancePolicy决定何时将其标注为宕机。
由ServiceDiscoveryBuilder创建,此类负责提供ServiceProviderBuilder。
ServiceProvider必须在 start()之后才能正常工作,当你不再需要它以后,应该调用 close()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
// 当前JVM提供的服务实例 ServiceInstance<InstanceDetail> thisService = ServiceInstance.<InstanceDetail>builder() .name( "cacheService" ) .payload( new InstanceDetail() ) .address( "192.168.0.89" ) .port( 8088 ) .uriSpec( new UriSpec( "http://{address}:{port}" ) ) .build(); // 服务发现 JsonInstanceSerializer<InstanceDetail> serializer = new JsonInstanceSerializer<InstanceDetail>( InstanceDetail.class ); ServiceDiscovery<InstanceDetail> discovery = ServiceDiscoveryBuilder .builder( InstanceDetail.class ) .client( client ) .basePath( "/sd" ) .serializer( serializer ) .thisInstance( thisService ) .build(); // 需要启动 discovery.start(); // 服务提供者 ServiceProvider<InstanceDetail> sp = discovery .serviceProviderBuilder() .serviceName( "cacheService" ) // 随机选取服务实例 .providerStrategy( new RandomStrategy<>() ) // 如果在一分钟内,有10此noteError则认为目标实例宕机 .downInstancePolicy( new DownInstancePolicy( 60, TimeUnit.SECONDS, 10 ) ) .build(); // 需要启动 sp.start(); LOGGER.debug( sp.getInstance().buildUriSpec() ); |
为了让非JVM应用使用服务发现功能,Curator提供了RESTful的WebService,你可以通过HTTP协议来注册、移除、查询服务。
Curator提供了JAX-RS组件,你可以在任何Web容器中,配合JAX-RS提供者(例如Jersey)使用该组件。
可能原因是ZooKeeper服务器的版本与工程声明的ZooKeeper客户端版本不兼容导致。
Leave a Reply