Zookeeper的API操作

    技术2022-07-10  127

    Java API操作

    6.1 原生 API

    1)Maven依赖

    <!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper --> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.6</version> <type>pom</type> </dependency>

    2)相关操作

    (1)获得客户端对象

    private ZooKeeper zooKeeper; @Before public void getCline() throws Exception{ zooKeeper = new ZooKeeper("192.168.134.99:2181",2000,null); }

    (2)获取节点值

    @Test public void getData()throws Exception{ /*获取节点值*/ byte[] data = zooKeeper.getData("/ba/123", null, null); System.out.println(new String(data)); }

    (3)获取节点孩子

    @Test public void getChild() throws Exception{ /*获取当前输入节点下的子节点*/ List<String> children = zooKeeper.getChildren("/ba/gjf", null); children.forEach((a)->{ System.out.println(a); }); }

    (4)创建节点

    @Test public void createNode() throws Exception{ /*创建持久节点*/ Object value = null; value = "12312312"; byte[] bytes = ((String) value).getBytes(); String s = zooKeeper.create("/ba/123", bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); System.out.println(s); }

    (5)更新节点值

    @Test public void setData() throws Exception{ Stat stat = zooKeeper.setData("/ba/123", new String("123123").getBytes(), -1); System.out.println(stat); }

    (6)判断节点是否存在

    @Test public void exist() throws Exception{ Stat stat = zooKeeper.exists("/ba/13", false); if (stat==null) { System.out.println("该节点不存在!"); }else { System.out.println("该节点存在!"); } }

    6.2 zkCline API

    1)Mavne依赖

    <dependency> <groupId>com.101tec</groupId> <artifactId>zkclient</artifactId> <version>0.8</version> </dependency>

    2)核心API

    ZkClient client=new ZkClient("ip:port");

    3)相关操作

    package test; import org.I0Itec.zkclient.ZkClient; import java.util.List; /** * Create by GuoJF on 2019/3/12 */ public class Main { public static void main(String[] args) { ZkClient zkClient = new ZkClient("192.168.134.5:2181"); /* * 获取根目录下所有的node信息 * * */ List<String> children = zkClient.getChildren("/"); children.forEach((a)->{ System.out.println(a); }); } }

    (1)创建节点

    @Test public void createNode() { String result = zkClient.create("/ba/001", "001", CreateMode.PERSISTENT); System.out.println(result); }

    (2)获取节点

    @Test public void getData() { Object readData = zkClient.readData("/ba/001"); System.out.println(readData.toString()); }

    (3)更新节点值

    @Test public void setData() { zkClient.writeData("/ba/001", "123123"); }

    (4)删除节点

    @Test public void delNode() { boolean delete = zkClient.delete("/ba/001"); if (delete) { System.out.println("删除成功!"); } else { System.out.println("删除失败或者节点不存在!"); } }

    (5)判断节点是否存在

    @Test public void exist(){ boolean exists = zkClient.exists("/ba/001"); if (exists){ System.out.println("该节点已经存在!"); }else { System.out.println("该节点不存在!"); } }

    (6)获取孩子节点

    @Test public void getChild(){ List<String> children = zkClient.getChildren("/ba"); for (String child : children) { System.out.println(child); } }

    6.3 Curator API

    1)Maven依赖

    <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>2.7.1</version> </dependency>

    2)相关操作

    (1)获取客户端对象

    private CuratorFramework curatorFramework; @Before public void getClient() { /* * 重连策略 四种实现 * ExponentialBackoffRetry、RetryNTimes、RetryOneTimes、RetryUntilElapsed * */ ExponentialBackoffRetry backoffRetry = new ExponentialBackoffRetry(1000, 1000); curatorFramework = CuratorFrameworkFactory.newClient("192.168.134.99:2181", backoffRetry); curatorFramework.start(); }

    (2)创建节点

    @Test public void createNode() throws Exception { String s = curatorFramework.create().withMode(CreateMode.PERSISTENT).forPath("/ba/002", new String("123").getBytes()); System.out.println(s); }

    (3)获取节点

    @Test public void getData() throws Exception { byte[] bytes = curatorFramework.getData().forPath("/ba/002"); System.out.println(new String(bytes)); }

    (4)更新节点值

    @Test public void setData() { try { curatorFramework.setData().forPath("/ba/001", new String("123123").getBytes()); System.out.println("更新成功!"); } catch (KeeperException.NoNodeException e) { System.out.println("更新失败,该节点不存在!"); } catch (Exception e) { e.printStackTrace(); } }

    (5)获取孩子节点

    @Test public void getChild() throws Exception { curatorFramework.getChildren().forPath("/ba").forEach((child) -> { System.out.println(child); }); }

    (6)删除节点

    @Test public void delNode() throws Exception { curatorFramework.delete().forPath("/ba/002"); }

    6.4 Watcher接口

    1)原生API实现Watch接口

    在这里使用的是原生的Apache的Java API

    <!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper --> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.6</version> <type>pom</type> </dependency>

    ZooKeeper Watcher监视使客户端能够接收来自ZooKeeper服务器的通知,并在发生时处理这些事件。 ZooKeeper Java API提供了一个名为Watcher的公共接口,客户端事件处理程序类必须实现该接口才能接收有关来自ZooKeeper服务器的事件通知。 以编程方式,使用这种客户端的应用程序通过向客户端注册回调(callback)对象来处理这些事件。

    package test; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import java.io.IOException; import java.util.UUID; /** * Create by GuoJF on 2019/3/12 */ public class DataUpdater { private static String hostPort = "192.168.134.5:2181"; private static String zooDataPath = "/ba/gjf"; ZooKeeper zk; public DataUpdater() throws IOException { try { zk = new ZooKeeper(hostPort, 2000, null); } catch (IOException e) { e.printStackTrace(); } } public void run() throws InterruptedException, KeeperException { while (true) { String uuid = UUID.randomUUID().toString(); byte zoo_data[] = uuid.getBytes(); zk.setData(zooDataPath, zoo_data, -1); try { Thread.sleep(5000); // Sleep for 5 secs } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } } public static void main(String[] args) throws IOException, InterruptedException, KeeperException { DataUpdater dataUpdater = new DataUpdater(); dataUpdater.run(); } } package test; import org.apache.zookeeper.*; /** * Create by GuoJF on 2019/3/13 */ public class DataWatcher implements Watcher,Runnable { ZooKeeper zooKeeper; public DataWatcher() { try { zooKeeper = new ZooKeeper("192.168.134.5:2181",2000,this); if (zooKeeper.exists("/ba/gjf",this)==null) { zooKeeper.create("/ba/gjf", "get".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } }catch (Exception e){ e.printStackTrace(); } } public static void main(String[] args) { DataWatcher dataWatcher = new DataWatcher(); dataWatcher.run(); } @Override public void process(WatchedEvent event) { if (event.getType() == Event.EventType.NodeDataChanged){ System.out.println("数据发改变了吧"); try { byte[] data = zooKeeper.getData("/ba/gjf", this, null); String s = new String(data); System.out.println(s); }catch (Exception e){ } } } public void run() { try { synchronized (this) { while (true) { wait(); } } } catch (InterruptedException e) { e.printStackTrace(); Thread.currentThread().interrupt(); } } }

    2)ZkClient实现Watcher接口

    @Test public void watcherInterface() throws Exception { zkClient.subscribeDataChanges("/ba/gjf", new IZkDataListener() { @Override public void handleDataChange(String dataPath, Object data) throws Exception { System.out.println("节点名称:" + dataPath); System.out.println("节点名称:" + data.toString()); } @Override public void handleDataDeleted(String dataPath) throws Exception { } }); Thread.sleep(Integer.MAX_VALUE); }

    3)Curator API实现Watcher接口

    ZooKeeper原生的API支持通过注册Watcher来进行事件监听,但是Watcher通知是一次性的,因此开发过程中需要反复注册Watcher,比较繁琐。Curator引入了Cache来监听ZooKeeper服务端的事件。Cache对ZooKeeper事件监听进行了封装,能够自动处理反复注册监听,简化了ZooKeeper原生API繁琐的开发过程。

    <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>2.7.1</version> </dependency>

    (1)NodeCache

    ​ 监听指定的数据变化

    /* *NodeCache * * */ //在注册监听器的时候,如果传入此参数,当事件触发时,逻辑由线程池处理 ExecutorService pool = Executors.newFixedThreadPool(2); NodeCache nodeCache = new NodeCache(curatorFramework, "/ba/gjf", false); nodeCache.start(true); nodeCache.getListenable().addListener(new NodeCacheListener() { @Override public void nodeChanged() throws Exception { System.out.println(new String(nodeCache.getCurrentData().getData())); } }, pool);

    (2)PathChildrenCache

    ​ 监听指定节点下所有子节点的数据变化

    /** * 监听子节点的变化情况 */ PathChildrenCache childrenCache = new PathChildrenCache(curatorFramework, "/ba", true); childrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT); childrenCache.getListenable().addListener( new PathChildrenCacheListener() { @Override public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { switch (event.getType()) { case CHILD_ADDED: System.out.println("CHILD_ADDED: " + event.getData().getPath()); break; case CHILD_REMOVED: System.out.println("CHILD_REMOVED: " + event.getData().getPath()); break; case CHILD_UPDATED: System.out.println("CHILD_UPDATED: " + event.getData().getPath()); break; default: break; } } }, pool );
    Processed: 0.018, SQL: 9