Zookeeper是架构模式是主从结构,用来存储一些数据,也可以做分布式协调服务,它的主节点叫leader,从节点叫follower。zookeeper允许一半的节点宕机。它的集群模式一般是奇数台服务器。 zookeeper的功能: 1、存储数据:配置信息,元信息 2、选举功能 3、数据同步(ZAB协议) 4、分布式锁 5、监听机制,监听节点的数据变化 单节点的安装 zookeeper的安装其实特别简单。 第一步减压 tar -zxvf zookeeper-3.4.10.tar.gz -C ~/training/ 第二步 配置环境变量
vim ~/.bash_profile ZOOKEEPER_HOME=/root/training/zookeeper-3.4.10 export ZOOKEEPER_HOME PATH=$ZOOKEEPER_HOME/bin:$PATH export PATH 使用户的环境变量配置文件生效 source ~/.bash_profile第三步配置配置文件 它的配置文件需要把conf 目录下的示例配置文件拷贝一份 cp zoo_sample.cfg zoo.cfg 因为启动zookeeper的时候默认使用的conf目录下的zoo.cfg配置文件 配置 dataDir=/root/training/zookeeper-3.4.10/tmp 数据存储的路径 server.1=bigdata111:2888:3888 2888是通信端口,3888是选举端口 数据存储路径要事先创建好 在/root/training/zookeeper-3.4.10/tmp创建一个文件:myid 输入 1 启动:zkServer.sh start zkServer.sh status 这样一个单机版zookeeper就安装成功了 然后同过命令行客户端测试一下: zkClli.sh ZooKeeper节点的类型:两个维度(永久还是临时、有序还是无序) (1)持久化节点 (2)持久化排序节点
临时节点:当客户端会话如果退出,该临时节点就自动删除。 (3)临时节点 (4)临时的有序节点使用代码操作Zookeeper: apache提供了一套api接口专门操作zookeeper叫curator 搭建一个简单maven工程把它引入进去 pom依赖
<dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>3.8.1</version> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>4.0.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>4.0.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-client</artifactId> <version>4.0.0</version> </dependency> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.6</version> </dependency> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>16.0.1</version> </dependency> </dependencies>创永久无序节点
package com.kanxyz.demo; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.CreateMode; public class CreateZKNode { public static void main(String[] args) throws Exception { //如果Zookeeper断掉,设置连接策略 每隔1秒连接一次 最多连接十次 RetryPolicy retryPolicy=new ExponentialBackoffRetry(1000, 10); CuratorFramework client=CuratorFrameworkFactory.builder() .connectString("192.168.112.111:2181") .retryPolicy(retryPolicy) .build(); client.start(); //创建永久无序节点 client.create().forPath("/mypath"); client.close(); } }代码创建节点可以没有数据,但是命令行创建节点是创建不上的 创建永久有序节点
package com.kanxyz.demo; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.schema.SchemaSet; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.CreateMode; public class CreateZKNode { public static void main(String[] args) throws Exception { //如果Zookeeper断掉,设置连接策略 每隔1秒连接一次 最多连接十次 RetryPolicy retryPolicy=new ExponentialBackoffRetry(1000, 10); CuratorFramework client=CuratorFrameworkFactory.builder() .connectString("192.168.112.111:2181") .retryPolicy(retryPolicy) .build(); client.start(); //创建永久无序节点 // client.create().forPath("/mypath"); //创建永久有序节点 client.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath("/node-"); client.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath("/node-"); client.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath("/node-"); client.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath("/node-"); //创建临时无序节点 //创建临时有序节点 client.close(); } }创建临时无序节点和临时有序节点
package com.kanxyz.demo; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.CreateMode; public class CreateZKNode { public static void main(String[] args) throws Exception { //如果Zookeeper断掉,设置连接策略 每隔1秒连接一次 最多连接十次 RetryPolicy retryPolicy=new ExponentialBackoffRetry(1000, 10); CuratorFramework client=CuratorFrameworkFactory.builder() .connectString("192.168.112.111:2181") .retryPolicy(retryPolicy) .build(); client.start(); //创建永久无序节点 // client.create().forPath("/mypath"); //创建永久有序节点 /*client.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath("/node-"); client.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath("/node-"); client.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath("/node-"); client.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath("/node-");*/ //创建临时无序节点 /*client.create().withMode(CreateMode.EPHEMERAL).forPath("/mytempnode"); Thread.sleep(2000);*/ //创建临时有序节点 client.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/tmpnode-"); client.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/tmpnode-"); client.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/tmpnode-"); client.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/tmpnode-"); client.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/tmpnode-"); Thread.sleep(5000); client.close(); } }节点监听
package com.kanxyz.demo; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.cache.PathChildrenCache; import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; import org.apache.curator.retry.ExponentialBackoffRetry; public class ListenerZKDemo { public static void main(String[] args) throws Exception{ RetryPolicy retryPolicy=new ExponentialBackoffRetry(1000, 10); CuratorFramework client=CuratorFrameworkFactory.builder() .connectString("192.168.112.111:2181") .retryPolicy(retryPolicy) .build(); client.start(); //监听节点mypath操作 PathChildrenCache children = new PathChildrenCache(client, "/mypath", true); children.getListenable().addListener(new PathChildrenCacheListener() { public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { System.out.println("监控节点的操作:"+event.getType()); // public static final Type CHILD_ADDED; 子节点添加的类型 // public static final Type CHILD_UPDATED; 子节点更新的类型 // public static final Type CHILD_REMOVED; 子节点移除的类型 switch (event.getType()) { case CHILD_ADDED: System.out.println("节点添加"); break; case CHILD_UPDATED: System.out.println("节点更新"); break; case CHILD_REMOVED: System.out.println("节点移除"); break; default : System.out.println("执行了其他的操作"); break; } } }); children.start(); Thread.sleep(100000); client.close(); System.out.println("监听结束--------------"); } }分布式锁功能
package com.kanxyz.demo; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.apache.curator.retry.ExponentialBackoffRetry; public class ZKDistributeLock { //秒杀总分数 private static int NUMBER=10; //定义秒杀逻辑 private static void getNumber(){ System.out.println("秒杀开始"); System.out.println("我抢第"+NUMBER+"商品"); NUMBER--; try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("秒杀结束"); } public static void main(String[] args) { //创建链接策略 RetryPolicy retryPolicy=new ExponentialBackoffRetry(1000, 10); //创建客户端 CuratorFramework client=CuratorFrameworkFactory.builder() .connectString("192.168.112.111:2181") .retryPolicy(retryPolicy) .build(); client.start(); //创建这把锁 final InterProcessMutex lock = new InterProcessMutex(client, "/aaaa"); for (int i = 0; i < 10; i++) { new Thread(new Runnable() { public void run() { try { //请求得到锁 lock.acquire(); getNumber(); } catch (Exception e) { e.printStackTrace(); }finally { try { //完成本次操作要释放锁 lock.release(); } catch (Exception e) { e.printStackTrace(); } } } }).start(); } //因为是上边是线程不能提前关闭链接客户端 //client.close(); } }