阿里云-云小站(无限量代金券发放中)
【腾讯云】云服务器、云数据库、COS、CDN、短信等热卖云产品特惠抢购

ZooKeeper学习总结

275次阅读
没有评论

共计 23502 个字符,预计需要花费 59 分钟才能阅读完成。

1. 概述

Zookeeper 是 Hadoop 的一个子项目,它是分布式系统中的协调系统,可提供的服务主要有:配置服务、名字服务、分布式同步、组服务等。

它有如下的一些特点:

  • 简单

Zookeeper 的核心是一个精简的文件系统,它支持一些简单的操作和一些抽象操作,例如,排序和通知。

  • 丰富

        Zookeeper 的原语操作是很丰富的,可实现一些协调数据结构和协议。例如,分布式队列、分布式锁和一组同级别节点中的“领导者选举”。

  • 高可靠

Zookeeper 支持集群模式,可以很容易的解决单点故障问题。

  • 松耦合交互

不同进程间的交互不需要了解彼此,甚至可以不必同时存在,某进程在 zookeeper 中留下消息后,该进程结束后其它进程还可以读这条消息。

  • 资源库

        Zookeeper 实现了一个关于通用协调模式的开源共享存储库,能使开发者免于编写这类通用协议。

 

2. ZooKeeper 的安装

  • 独立模式安装

Zookeeper 的运行环境是需要 Java 的,建议安装 Oracle 的 java6.

可去官网下载一个稳定的版本,然后进行安装:http://zookeeper.apache.org/

解压后在 zookeeper 的 conf 目录下创建配置文件 zoo.cfg,里面的配置信息可参考统计目录下的 zoo_sample.cfg 文件,我们这里配置为:

tickTime=2000
initLimit=10
syncLimit=5
dataDir=/opt/zookeeper-data/
clientPort=2181

tickTime指定了 ZooKeeper 的基本时间单位(以毫秒为单位);

initLimit指定了启动 zookeeper 时,zookeeper 实例中的随从实例同步到领导实例的初始化连接时间限制,超出时间限制则连接失败(以 tickTime 为时间单位);

syncLimit指定了 zookeeper 正常运行时,主从节点之间同步数据的时间限制,若超过这个时间限制,那么随从实例将会被丢弃;

dataDirzookeeper 存放数据的目录;

clientPort用于连接客户端的端口。

  • 启动一个本地的 ZooKeeper实例
% zkServer.sh start

检查 ZooKeeper 是否正在运行

echo ruok | nc localhost 2181

若是正常运行的话会打印“imok”。

3. ZooKeeper 监控

  • 远程 JMX配置

默认情况下,zookeeper 是支持本地的 jmx 监控的。若需要远程监控 zookeeper,则需要进行进行如下配置。

默认的配置有这么一行:

ZOOMAIN="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.local.only=$JMXLOCALONLY org.apache.zookeeper.server.quorum.QuorumPeerMain"

咱们在 $JMXLOCALONLY 后边添加 jmx 的相关参数配置:

ZOOMAIN="-Dcom.sun.management.jmxremote
        -Dcom.sun.management.jmxremote.local.only=$JMXLOCALONLY
                -Djava.rmi.server.hostname=192.168.1.8
                -Dcom.sun.management.jmxremote.port=1911
                -Dcom.sun.management.jmxremote.ssl=false
                -Dcom.sun.management.jmxremote.authenticate=false
                 org.apache.zookeeper.server.quorum.QuorumPeerMain"

这样就可以远程监控了,可以用 jconsole.exe 或 jvisualvm.exe 等工具对其进行监控。

  • 身份验证

这里没有配置验证信息,如果需要请参见我的博文 jvisualvm 远程监控 tomcat:http://www.cnblogs.com/leocook/p/jvisualvmandtomcat.html

4. Zookeeper 的存储模型

Zookeeper 的数据存储采用的是结构化存储,结构化存储是没有文件和目录的概念,里边的目录和文件被抽象成了节点(node),zookeeper 里可以称为 znode。Znode 的层次结构如下图:

ZooKeeper 学习总结

最上边的是根目录,下边分别是不同级别的子目录。

5. Zookeeper 客户端的使用

  • zkCli.sh

可使用 ./zkCli.sh -server localhost 来连接到 Zookeeper 服务上。

使用 ls / 可查看根节点下有哪些子节点,可以双击 Tab 键查看更多命令。

  • Java客户端

可创建 org.apache.zookeeper.ZooKeeper 对象来作为 zk 的客户端,注意,java api 里创建 zk 客户端是异步的,为防止在客户端还未完成创建就被使用的情况,这里可以使用同步计时器,确保 zk 对象创建完成再被使用。

  • C客户端

可以使用 zhandle_t 指针来表示 zk 客户端,可用 zookeeper_init 方法来创建。可在 ZK_HOME\src\c\src\ cli.c 查看部分示例代码。

Ubuntu 14.04 安装分布式存储 Sheepdog+ZooKeeper  http://www.linuxidc.com/Linux/2014-12/110352.htm

CentOS 6 安装 sheepdog 虚拟机分布式储存  http://www.linuxidc.com/Linux/2013-08/89109.htm

ZooKeeper 集群配置 http://www.linuxidc.com/Linux/2013-06/86348.htm

使用 ZooKeeper 实现分布式共享锁 http://www.linuxidc.com/Linux/2013-06/85550.htm

分布式服务框架 ZooKeeper — 管理分布式环境中的数据 http://www.linuxidc.com/Linux/2013-06/85549.htm

ZooKeeper 集群环境搭建实践 http://www.linuxidc.com/Linux/2013-04/83562.htm

ZooKeeper 服务器集群环境配置实测 http://www.linuxidc.com/Linux/2013-04/83559.htm

ZooKeeper 集群安装 http://www.linuxidc.com/Linux/2012-10/72906.htm

Zookeeper3.4.6 的安装 http://www.linuxidc.com/Linux/2015-05/117697.htm

6. Zookeeper 创建 Znode

Znode 有两种类型:短暂的和持久的。短暂的 znode 在创建的客户端与服务器端断开(无论是明确的断开还是故障断开)连接时,该 znode 都会被删除;相反,持久的 znode 则不会。

public class CreateGroup implements Watcher{private static final int SESSION_TIMEOUT = 1000;//会话延时

    private ZooKeeper zk = null;
    private CountDownLatch countDownLatch = new CountDownLatch(1);//同步计数器

    public void process(WatchedEvent event) {if(event.getState() == KeeperState.SyncConnected){countDownLatch.countDown();//计数器减一
        }
    }

    /**
     * 创建 zk 对象
     * 当客户端连接上 zookeeper 时会执行 process(event)里的 countDownLatch.countDown(),计数器的值变为 0,则 countDownLatch.await()方法返回。* @param hosts
     * @throws IOException
     * @throws InterruptedException
     */
    public void connect(String hosts) throws IOException, InterruptedException {zk = new ZooKeeper(hosts, SESSION_TIMEOUT, this);
        countDownLatch.await();//阻塞程序继续执行
    }
    
    /**
     * 创建 group
     * 
     * @param groupName 组名
     * @throws KeeperException
     * @throws InterruptedException
     */
    public void create(String groupName) throws KeeperException, InterruptedException {String path = "/" + groupName;
        String createPath = zk.create(path, null, Ids.OPEN_ACL_UNSAFE/*允许任何客户端对该 znode 进行读写 */, CreateMode.PERSISTENT/* 持久化的 znode*/);
        System.out.println("Created" + createPath);
    }
    
    /**
     * 关闭 zk
     * @throws InterruptedException
     */
    public void close() throws InterruptedException {if(zk != null){try {zk.close();
            } catch (InterruptedException e) {throw e;
            }finally{zk = null;
                System.gc();}
        }
    }
}

这里我们使用了同步计数器 CountDownLatch,在 connect 方法中创建执行了 zk = new ZooKeeper(hosts, SESSION_TIMEOUT, this); 之后,下边接着调用了 CountDownLatch 对象的 await 方法阻塞,因为这是 zk 客户端不一定已经完成了与服务端的连接,在客户端连接到服务端时会触发观察者调用 process()方法,我们在方法里边判断一下触发事件的类型,完成连接后计数器减一,connect 方法中解除阻塞。

还有两个地方需要注意:这里创建的 znode 的访问权限是 open 的,且该 znode 是持久化存储的。

测试类如下:

public class CreateGroupTest {private static String hosts = "192.168.1.8";
    private static String groupName = "zoo";
    
    private CreateGroup createGroup = null;
    
    /**
     * init
     * @throws InterruptedException 
     * @throws KeeperException 
     * @throws IOException 
     */
    @Before
    public void init() throws KeeperException, InterruptedException, IOException {createGroup = new CreateGroup();
        createGroup.connect(hosts);
    }
    
    @Test
    public void testCreateGroup() throws KeeperException, InterruptedException {createGroup.create(groupName);
    }
    
    /**
     * 销毁资源
     */
    @After
    public void destroy() {try {createGroup.close();
            createGroup = null;
            System.gc();} catch (InterruptedException e) {e.printStackTrace();
        }
    }
}

由于 zk 对象的创建和销毁代码是可以复用的,所以这里我们把它分装成了接口:

/**
 * 连接的观察者,封装了 zk 的创建等
 * @author leo
 *
 */
public class ConnectionWatcher implements Watcher {private static final int SESSION_TIMEOUT = 5000;

    protected ZooKeeper zk = null;
    private CountDownLatch countDownLatch = new CountDownLatch(1);

    public void process(WatchedEvent event) {KeeperState state = event.getState();
        
        if(state == KeeperState.SyncConnected){countDownLatch.countDown();
        }
    }
    
    /**
     * 连接资源
     * @param hosts
     * @throws IOException
     * @throws InterruptedException
     */
    public void connection(String hosts) throws IOException, InterruptedException {zk = new ZooKeeper(hosts, SESSION_TIMEOUT, this);
        countDownLatch.await();}
    
    /**
     * 释放资源
     * @throws InterruptedException
     */
    public void close() throws InterruptedException {if (null != zk) {try {zk.close();
            } catch (InterruptedException e) {throw e;
            }finally{zk = null;
                System.gc();}
        }
    }
}

7. Zookeeper 删除 Znode

/**
 * 删除分组
 * @author leo
 *
 */
public class DeleteGroup extends ConnectionWatcher {public void delete(String groupName) {String path = "/" + groupName;
        
        try {List<String> children = zk.getChildren(path, false);
            
            for(String child : children){zk.delete(path + "/" + child, -1);
            }
            zk.delete(path, -1);//版本号为 -1,
        } catch (KeeperException e) {e.printStackTrace();
        } catch (InterruptedException e) {e.printStackTrace();
        }
    }
}

zk.delete(path,version)方法的第二个参数是 znode 版本号,如果提供的版本号和 znode 版本号一致才会删除这个 znode,这样可以检测出对 znode 的修改冲突。通过将版本号设置为 -1,可以绕过这个版本检测机制,无论 znode 的版本号是什么,都会直接将其删除。

测试类:

 
public class DeleteGroupTest {private static final String HOSTS = "192.168.1.137";
    private static final String groupName = "zoo";
    
    private DeleteGroup deleteGroup = null;
    
    @Before
    public void init() throws IOException, InterruptedException {deleteGroup = new DeleteGroup();
        deleteGroup.connection(HOSTS);
    }
    
    @Test
    public void testDelete() throws IOException, InterruptedException, KeeperException {deleteGroup.delete(groupName);
    }
    
    @After
    public void destroy() throws InterruptedException {if(null != deleteGroup){try {deleteGroup.close();
            } catch (InterruptedException e) {throw e;
            }finally{deleteGroup = null;
                System.gc();}
        }
    }
}

8. Zookeeper 的相关操作

ZooKeeper 中共有 9 中操作:

create:创建一个 znode

delete:删除一个 znode

exists:测试一个 znode

getACL,setACL:获取 / 设置一个 znode 的 ACL(权限控制)

getChildren:获取一个 znode 的子节点

getData,setData:获取 / 设置一个 znode 所保存的数据

sync:将客户端的 znode 视图与 ZooKeeper 同步

这里更新数据是必须要提供 znode 的版本号(也可以使用 - 1 强制更新,这里可以执行前通过 exists 方法拿到 znode 的元数据 Stat 对象,然后从 Stat 对象中拿到对应的版本号信息),如果版本号不匹配,则更新会失败。因此一个更新失败的客户端可以尝试是否重试或执行其它操作。

9. ZooKeeper 的 API

ZooKeeper 的 api 支持多种语言,在操作时可以选择使用同步 api 还是异步 api。同步 api 一般是直接返回结果,异步 api 一般是通过回调来传送执行结果的,一般方法中有某参数是类 AsyncCallback 的内部接口,那么该方法应该就是异步调用,回调方法名为 processResult。

10. 观察触发器

可以对客户端和服务器端之间的连接设置观察触发器(后边称之为 zookeeper 的状态观察触发器),也可以对 znode 设置观察触发器。

  • 状态观察器

zk 的整个生命周期如下:

ZooKeeper 学习总结

可在创建 zk 对象时传入一个观察器,在完成 CONNECTING 状态到 CONNECTED 状态时,观察器会触发一个事件,该触发的事件类型为 NONE,通过 event.getState()方法拿到事件状态为 SyncConnected。有一点需要注意的就是,在 zk 调用 close 方法时不会触发任何事件,因为这类的显示调用是开发者主动执行的,属于可控的,不用使用事件通知来告知程序。这一块在下篇博文还会详细解说。

  • 设置 znode的观察器

可以在读操作 exists、getChildren 和 getData 上设置观察,在执行写操作 create、delete 和 setData 将会触发观察事件,当然,在执行写的操作时,也可以选择是否触发 znode 上设置的观察器,具体可查看相关的 api。

当观察的 znode被创建、删除或其数据被更新时,设置在 exists 上的观察将会被触发;

当观察的 znode被删除或数据被更新时,设置在 getData 上的观察将会被触发;

当观察的 znode的子节点被创建、删除或 znode自身被删除时,设置在 getChildren 上的观察将会被触发,可通过观察事件的类型来判断被删除的是 znode 还是它的子节点。

ZooKeeper 学习总结

对于 NodeCreatedNodeDeleted根据路径就能发现是哪个 znode 被写;对于 NodeChildrenChanged 可根据 getChildren 来获取新的子节点列表。

注意:在收到收到触发事件到执行读操作之间,znode 的状态可能会发生状态,这点需要牢记。

至此,编写简单的 zookeeper 应该是可以的了,下篇博文咱们来深入探讨 zookeeper 的相关知识。

参考地址:http://zookeeper.apache.org/doc/r3.4.6/

参考书籍:《Hadoop 权威指南》

Hadoop 权威指南(第 3 版) 修订版(带目录书签) 中文 PDF 高清晰 下载见 http://www.linuxidc.com/Linux/2016-03/129542.htm

更多详情见请继续阅读下一页的精彩内容:http://www.linuxidc.com/Linux/2016-07/133179p2.htm

其实 zookeeper 系列的学习总结很早就写完了,这段时间在准备找工作的事情,就一直没有更新了。下边给大家送上,文中如有不恰当的地方,欢迎给予指证,不胜感谢!。

1. 数据模型

1.1. 只适合存储小数据

Zk 维护着一个逻辑上的树形层次结构,树中的节点称为 znode,个 znode 都有一个 ACL(权限控制)。Zookeeper 是被设计用来协调服务的,因此 znode 里存储的都是小数据,而不是大容量的数据,数据容量一般在 1MB 范围内。

1.2. 操作的原子性

Znode 的数据读写是原子的,要么读或写了完整的数据,要么就失败,不会出现只读或写了部分数据。

1.3. Znode 的路径

和 Unix 中的文件系统路径格式很想,但是只支持绝对路径,不支持相对路径,也不支持点号(”.”和”..”)。

1.4. 短暂的 znode 和持久的 znode

Znode 有两种类型:短暂的和持久的。短暂的 znode 生命周期仅限创建它的客户端与服务器端之间的连接没有断开,客户端断开连接后,znode 将会被删除。

1.5. 顺序 znode

名称中包含 Zookeeper 指定顺序号的 znode。若在创建 znode 时设置了顺序标识,那么该 znode 被创建后,名字的后边将会附加一串数字,该数字是由一个单调递增的计数器来生成的。例如,创建节点时传入的 path 是”/aa/bb”,创建后的则可能是”/aa/bb0002”,再次创建后是”/aa/bb0003”。

Znode 的创建模式 CreateMode 有四种,分别是:EPHEMERAL(短暂的 znode)、EPHEMERAL_SEQUENTIAL(短暂的顺序 znode)、PERSISTENT(持久的 znode)和PERSISTENT_SEQUENTIAL(持久的顺序 znode)。如果您已经看过了上篇博文,那么这里的 api 调用应该是很好理解的,见:http://www.cnblogs.com/leocook/p/zk_0.html。

1.6. 观察

这部分在上篇博文中已经做了详细的说明,包括连接的观察和 znode 的观察, 这部分在构建一个稳定的 zookeeper 应用中有着很重要的作用,具体会在下边说到。

2.  ACL

即:Access Control List(访问控制列表)。Znode 被创建时带有一个 ACL 列表,zk 提供了下边 三种身份验证模式

  • digest

用户名 + 密码验证。

  • host

客户端主机名 hostname 验证。

  • ip

客户端的 IP 验证。

  • auth

使用 sessionID 验证

  • world

无验证,默认是无任何权限。该模式较为特殊,在 zk连接添加 ACL中会说到

ACL 权限对应如下表:

ZooKeeper 学习总结

在设置 ACL 时,可以给 zk 客户端和服务器端的连接设置 ACL,也可以在创建 znode 时,给 znode 设置 ACL,在创建了 znode 后,如果有 zk 客户端来操作 znode,只有满足权限要求时,才能完成相对应的操作:

2.1. 给 ZK 连接添加 ACL

可使用 zk 对象的 addAuthInfo()方法来添加验证模式,如使用 digest 模式进行身份验证:zk.addAuthInfo(“digest”,”username:passwd”.getBytes());

在 zookeeper 对象被创建时,初始化会被添加 world 验证模式。world 身份验证模式的验证 id 是”anyone”。

若该连接创建了 znode,那么他将会被添加 auth 身份验证模式的验证 id 是””,即空字符串,这里将使用 sessionID 进行验证。

2.2. 给 znode 设置 ACL

  • 自己创建 ACL

创建 ACL 对象时,可用 ACL 类的构造方法 ACL(int perms, Id id)

其中 参数 perms 表示权限,在接口 org.apache.zookeeper.ZooDefs.Perms 中有相关的常量:READ、WRITE、CREATE、DELETE、ALL 和 ADMIN,它们值如下表:

ZooKeeper 学习总结

Id参数是验证模式 ,可用构造方法 Id(String scheme, String id) 来创建。参数 scheme 是验证模式,digest、host 或 ip,id 是对应的验证,digest 对应用户名和密码对,如“user:passwd”;host 对应主机名,如”localhost”;ip 对应 ip 地址,如”192.168.1.120”。

  • 使用 api中预设的 ACL

在创建 znode 时可以设置该 znode 的 ACL 列表。接口 org.apache.zookeeper.ZooDefs.Ids 中有一些已经设置好的权限常量,例如:

(1)、OPEN_ACL_UNSAFE:完全开放。事实上这里是采用了 world 验证模式,由于每个 zk 连接都有 world 验证模式,所以 znode 在设置了 OPEN_ACL_UNSAFE 时,是对所有的连接开放。

(2)、CREATOR_ALL_ACL:给创建该 znode 连接所有权限。事实上这里是采用了 auth 验证模式,使用 sessionID 做验证。所以设置了 CREATOR_ALL_ACL 时,创建该 znode 的连接可以对该 znode 做任何修改。

(3)、READ_ACL_UNSAFE:所有的客户端都可读。事实上这里是采用了 world 验证模式,由于每个 zk 连接都有 world 验证模式,所以 znode 在设置了 READ_ACL_UNSAFE 时,所有的连接都可以读该 znode。

注:红色部分是本人阅读源码的一些研究,auth 和 world 的相关描述经供参考。

3. 运行模式

Zookeeper 有两种运行模式:独立模式(standalone mode)和复制模式(replicated mode)。

3.1. 独立模式

只有一个 zookeeper 服务实例,不可保证高可靠性和恢复性,可在测试环境中使用,生产环境不建议使用。

3.2. 复制模式

复制模式也就是集群模式,有多个 zookeeper 实例在运行,建议多个 zk 实例是在不同的服务器上。集群中不同 zookeeper 实例之间数据不停的同步。有半数以上的实例保持正常运行,zk 服务就能正常运行,例如:有 5 个 zk 实例,挂了 2 个,还剩 3 个,依然可以正常工作;如有 6 个 zk 实例,挂了 3 个,则不能正常工作。

每个 znode 的修改都会被复制到超过半数的机器上,这样就会保证至少有一台机器会保存最新的状态,其余的副本最终都会跟新到这个状态。Zookeeper 为实现这个功能,使用了 Zab 协议,该协议有两个可以无限重复的阶段:

  • 选举领导

集群中所有的 zk 实例会选举出来一个“领导实例”(leader),其它实例称之为“随从实例”(follower)。如果 leader 出现故障,其余的实例会选出一台 leader,并一起提供服务,若之前的 leader 恢复正常,便成为 follower。选举 follower 是一个很快的过程,性能影响不明显。

Leader 主要功能是协调所有实例实现写操作的原子性,即:所有的写操作都会转发给 leader,然后 leader 会将更新广播给所有的 follower,当半数以上的实例都完成写操作后,leader 才会提交这个写操作,随后客户端会收到写操作执行成功的响应。

  • 原子广播

上边已经说到:所有的写操作都会转发给 leader,然后 leader 会将更新广播给所有的 follower,当半数以上的实例都完成写操作后,leader 才会提交这个写操作,随后客户端会收到写操作执行成功的响应。这么来的话,就实现了客户端的写操作的原子性,每个写操作要么成功要么失败。逻辑和数据库的两阶段提交协议很像。

3.3. 复制模式下的数据一致性

Znode 的每次写操作都相当于数据库里的一次事务提交,每个写操作都有个全局唯一的 ID,称为:zxid(ZooKeeper Transaction)。ZooKeeper 会根据写操作的 zxid 大小来对操作进行排序,zxid 小的操作会先执行。zk 下边的这些特性保证了它的数据一致性:

  • 顺序一致性

任意客户端的写操作都会按其发送的顺序被提交。如果一个客户端把某 znode 的值改为 a,然后又把值改为 b(后面没有其它任何修改),那么任何客户端在读到值为 b 之后都不会再读到 a。

  • 原子性

这一点再前面已经说了,写操作只有成功和失败两种状态,不存在只写了百分之多少这么一说。

  • 单一系统映像

客户端只会连接 host 列表中状态最新的那些实例。如果正在连接到的实例挂了,客户端会尝试重新连接到集群中的其他实例,那么此时滞后于故障实例的其它实例都不会接收该连接请求,只有和故障实例版本相同或更新的实例才接收该连接请求。

  • 持久性

写操作完成之后将会被持久化存储,不受服务器故障影响。

  • 及时性

在对某个 znode 进行读操作时,应该先执行 sync 方法,使得读操作的连接所连的 zk 实例能与 leader 进行同步,从而能读到最新的类容。

注意:sync调用是异步的,无需等待调用的返回,zk服务器会保证所有后续的操作会在 sync 操作完成之后才执行,哪怕这些操作是在执行 sync 之前被提交的。

4. 提高 ZooKeeper 应用的容错

分布式环境是很复杂的,网络的不可靠、单点故障等问题都是经常发生的。那么在构建一个分布式应用程序时,这些问题都是需要慎重考虑的。因此,如何构建一个可复原的分布式应用将成为一个值得讨论的话题。Java api 中每个异常都对应一类故障模式,下边我们将会以 Java api 中的异常为例来讨论 ZooKeeper 应用程序中可能会出现的一些故障。

4.1. Java API 中的一些常见异常

  • InterruptedException异常

若客户端的某操作被中断,则会抛出 InterruptedException 异常。抛出该异常时,不一定是出现故��,只能表明某个 zookeeper 操作被中断而已。

  • KeeperException异常

服务器发出错误信号或是服务器存在通信故障。该类现在共有 21 个子类,分为 3 大类:

 (1)、状态异常

当一个客户端对 zk 的某操作失败时,就会出现状态异常。例如:更新数据时所指定的版本号不正确就会抛出异常 BadVersionException、若在短暂的 znode 下创建子节点则会抛出异常 NoChildrenForEphemeralsException。

(2)、可恢复的异常

那些在 zk 会话中可以恢复的异常叫可恢复的异常。当丢失 zk 连接时就会抛出异常 ConnectionLossException,这时 zk 会自动尝试重新连接,以确保会话的完整性。Zk 无法判断 ConnectionLossException 异常相关的操作是否成功执行,有可能出现只完成部分,那么是否重新执行刚才的操作就得知道该操作是否是等幂的。

等幂操作是指一次或多次执行都会产生相同结果的操作;非等幂操作是指一次或多次执行会产生不相同结果的操作。非等幂操作就不能盲目操作了。

写操作里有创建、删除、修改。在一个分布式环境中,删除 zk 里的 znode 或是修改 znode 的数据是等幂的,只有创建 znode 可能不是等幂的,创建顺序 znode 就是一个非等幂的操作。

那么怎么样才能避免创建顺序 znode 不会出现重复创建呢?下面我来展开讨论:

假设的场景:

客户端:客户端任务是在连接到 zk 服务端时会只创建一个顺序 znode;

ConnectionLossException抛出 ConnectionLossException 异常重新连接后会话没有失效,但是 zk 无法判断创建 znode 的操作是否成功。

我们知道顺序 znode 的节点名称格式是形如”znodeName<sequentialNumber>”,zk 客户端和服务器端的会话有个全局唯一的 sessionID,我们可以把 sessionID 加入 znode 的名称中,形如:” znodeName<sessionID><sequentialNumber>”,sequentialNumber 是相对于父 znode 唯一的。这样我们在创建某个 znode 之前先判断一下父 znode 下有无名称形如” znodeName<sessionID>“这样字符开头的子 znode,就能确保每个客户端连接只创建一个 znode。

这种场景会在什么时候会遇到呢?在我们要实现一个分布式锁的时候,核心思想之一就在这里。那么问题来了,什么是分布式锁呢?后面会有独立的博文来讲解关于它的代码实现。

(3)、不可恢复的异常

不可恢复的异常发生时,所有的短暂 znode 都将会丢失,只有程序中显示的重建 zk 连接,并重建 znode 的状态。例如:会话过期会抛出异常 SessionExpiredException,身份验证失败会抛出异常 AuthFailedException。

(4)、异常捕捉处理

每个子类对应一种异常状态,且每个子类都对应一个关于错误类型的信息代码,可以通过 code 方法拿到。处理该种异常有两种办法:

1、通过检测错误代码(可调用 code 方法老获取)来确定是哪种异常,再决定应该采取何种补救措施;

2、通过追捕等价的 KeeperException 异常,然后再每段捕捉代码中执行相应的操作。

4.2. 构建可靠的 zookeeper 应用

上面说到了 zk 服务器端可能出现一些网络故障或单点故障登,那么怎么编写一个可靠的 zk 客户端程序来应对可能不稳定的 zk 实例呢?这里我们向一个 znode 写数据为例,来实现它:

/**
 * 显示配置
 * @throws KeeperException 服务器发出错误信号或是服务器存在通信故障。该类现在共有 21 个子类,* 分为 3 大类:<br/>
 * 1、状态异常(如:BadVersionException、NoChildrenForEphemeralsException);* 2、可恢复的异常(如:ConnectionLossException);* 3、不可恢复的异常(如:SessionExpiredException、AuthFailedException)。* 每个子类对应一种异常状态,且每个子类都对应一个关于错误类型的信息代码,可以通过 code 方法拿到。* 处理该种异常有两种办法:<br/>
 * 1、通过 <b> 检测错误代码 </b> 来决定应该采取何种补救措施;<br/>
 * 2、通过 <b> 追捕等价的 KeeperException 异常 </b>,然后再每段捕捉代码中执行相应的操作。* @throws InterruptedException zookeeper 操作被中断。<b> 并不一定就是出现故障,只能表明相对应的操作被取消 </b>。*/
public static void write(String path, String value) throws KeeperException, InterruptedException {int retries = 0;
    
    while (true) {try {Stat stat = zk.exists(path, false);
            if(stat == null){zk.create(path, value.getBytes(CHARSET), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }else {zk.setData(path, value.getBytes(CHARSET), -1);
            }
            break;
        } catch(KeeperException.SessionExpiredException e){//TODO 此处会话过期,抛出异常,由上层调用来重新创建 zookeeper 对象
            throw e;
        }catch(KeeperException.AuthFailedException e){//TODO 此处身份验证时,抛出异常,由上层来终止程序运行
            throw e;
        }catch (KeeperException e) {//检查有没有超出尝试的次数
            if(retries == MAXRETRIES){throw e;
            }
            retries++;
            TimeUnit.SECONDS.sleep(RETRY_PERIOD_SECONDS);
        }
    }
}

如果您是一名 Java 开发人员,那么我觉得上面的这些代码没什么好解释的了。下边看上层调用是怎么处理的:

int flag = 0;

while (true) {try {write(path, value);
        break;
    } catch (KeeperException.SessionExpiredException e) {// TODO: 重新创建、开始一个新的会话
        e.printStackTrace();
        zk = new ZooKeeper(hosts, SESSION_TIMEOUT, this);
    } catch (KeeperException e) {// TODO 尝试了多次,还是出错,只有退出了
        e.printStackTrace();
        flag = 1;
        break;
    }catch(KeeperException.AuthFailedException e){//TODO 此处身份验证时,终止程序运行
        e.printStackTrace();
        flag = 1;
        break;
    } catch (IOException e) {// TODO 创建 zookeeper 对象失败,无法连接到 zk 集群
        e.printStackTrace();
        flag = 1;
        break;
    } 
}

System.exit(flag);

关于编写一个可恢复的 zookeeper 应用,这一块理解了,其它地方应该就是触类旁通了。

本文永久更新链接地址:http://www.linuxidc.com/Linux/2016-07/133179.htm

1. 概述

Zookeeper 是 Hadoop 的一个子项目,它是分布式系统中的协调系统,可提供的服务主要有:配置服务、名字服务、分布式同步、组服务等。

它有如下的一些特点:

  • 简单

Zookeeper 的核心是一个精简的文件系统,它支持一些简单的操作和一些抽象操作,例如,排序和通知。

  • 丰富

        Zookeeper 的原语操作是很丰富的,可实现一些协调数据结构和协议。例如,分布式队列、分布式锁和一组同级别节点中的“领导者选举”。

  • 高可靠

Zookeeper 支持集群模式,可以很容易的解决单点故障问题。

  • 松耦合交互

不同进程间的交互不需要了解彼此,甚至可以不必同时存在,某进程在 zookeeper 中留下消息后,该进程结束后其它进程还可以读这条消息。

  • 资源库

        Zookeeper 实现了一个关于通用协调模式的开源共享存储库,能使开发者免于编写这类通用协议。

 

2. ZooKeeper 的安装

  • 独立模式安装

Zookeeper 的运行环境是需要 Java 的,建议安装 Oracle 的 java6.

可去官网下载一个稳定的版本,然后进行安装:http://zookeeper.apache.org/

解压后在 zookeeper 的 conf 目录下创建配置文件 zoo.cfg,里面的配置信息可参考统计目录下的 zoo_sample.cfg 文件,我们这里配置为:

tickTime=2000
initLimit=10
syncLimit=5
dataDir=/opt/zookeeper-data/
clientPort=2181

tickTime指定了 ZooKeeper 的基本时间单位(以毫秒为单位);

initLimit指定了启动 zookeeper 时,zookeeper 实例中的随从实例同步到领导实例的初始化连接时间限制,超出时间限制则连接失败(以 tickTime 为时间单位);

syncLimit指定了 zookeeper 正常运行时,主从节点之间同步数据的时间限制,若超过这个时间限制,那么随从实例将会被丢弃;

dataDirzookeeper 存放数据的目录;

clientPort用于连接客户端的端口。

  • 启动一个本地的 ZooKeeper实例
% zkServer.sh start

检查 ZooKeeper 是否正在运行

echo ruok | nc localhost 2181

若是正常运行的话会打印“imok”。

3. ZooKeeper 监控

  • 远程 JMX配置

默认情况下,zookeeper 是支持本地的 jmx 监控的。若需要远程监控 zookeeper,则需要进行进行如下配置。

默认的配置有这么一行:

ZOOMAIN="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.local.only=$JMXLOCALONLY org.apache.zookeeper.server.quorum.QuorumPeerMain"

咱们在 $JMXLOCALONLY 后边添加 jmx 的相关参数配置:

ZOOMAIN="-Dcom.sun.management.jmxremote
        -Dcom.sun.management.jmxremote.local.only=$JMXLOCALONLY
                -Djava.rmi.server.hostname=192.168.1.8
                -Dcom.sun.management.jmxremote.port=1911
                -Dcom.sun.management.jmxremote.ssl=false
                -Dcom.sun.management.jmxremote.authenticate=false
                 org.apache.zookeeper.server.quorum.QuorumPeerMain"

这样就可以远程监控了,可以用 jconsole.exe 或 jvisualvm.exe 等工具对其进行监控。

  • 身份验证

这里没有配置验证信息,如果需要请参见我的博文 jvisualvm 远程监控 tomcat:http://www.cnblogs.com/leocook/p/jvisualvmandtomcat.html

4. Zookeeper 的存储模型

Zookeeper 的数据存储采用的是结构化存储,结构化存储是没有文件和目录的概念,里边的目录和文件被抽象成了节点(node),zookeeper 里可以称为 znode。Znode 的层次结构如下图:

ZooKeeper 学习总结

最上边的是根目录,下边分别是不同级别的子目录。

5. Zookeeper 客户端的使用

  • zkCli.sh

可使用 ./zkCli.sh -server localhost 来连接到 Zookeeper 服务上。

使用 ls / 可查看根节点下有哪些子节点,可以双击 Tab 键查看更多命令。

  • Java客户端

可创建 org.apache.zookeeper.ZooKeeper 对象来作为 zk 的客户端,注意,java api 里创建 zk 客户端是异步的,为防止在客户端还未完成创建就被使用的情况,这里可以使用同步计时器,确保 zk 对象创建完成再被使用。

  • C客户端

可以使用 zhandle_t 指针来表示 zk 客户端,可用 zookeeper_init 方法来创建。可在 ZK_HOME\src\c\src\ cli.c 查看部分示例代码。

Ubuntu 14.04 安装分布式存储 Sheepdog+ZooKeeper  http://www.linuxidc.com/Linux/2014-12/110352.htm

CentOS 6 安装 sheepdog 虚拟机分布式储存  http://www.linuxidc.com/Linux/2013-08/89109.htm

ZooKeeper 集群配置 http://www.linuxidc.com/Linux/2013-06/86348.htm

使用 ZooKeeper 实现分布式共享锁 http://www.linuxidc.com/Linux/2013-06/85550.htm

分布式服务框架 ZooKeeper — 管理分布式环境中的数据 http://www.linuxidc.com/Linux/2013-06/85549.htm

ZooKeeper 集群环境搭建实践 http://www.linuxidc.com/Linux/2013-04/83562.htm

ZooKeeper 服务器集群环境配置实测 http://www.linuxidc.com/Linux/2013-04/83559.htm

ZooKeeper 集群安装 http://www.linuxidc.com/Linux/2012-10/72906.htm

Zookeeper3.4.6 的安装 http://www.linuxidc.com/Linux/2015-05/117697.htm

6. Zookeeper 创建 Znode

Znode 有两种类型:短暂的和持久的。短暂的 znode 在创建的客户端与服务器端断开(无论是明确的断开还是故障断开)连接时,该 znode 都会被删除;相反,持久的 znode 则不会。

public class CreateGroup implements Watcher{private static final int SESSION_TIMEOUT = 1000;//会话延时

    private ZooKeeper zk = null;
    private CountDownLatch countDownLatch = new CountDownLatch(1);//同步计数器

    public void process(WatchedEvent event) {if(event.getState() == KeeperState.SyncConnected){countDownLatch.countDown();//计数器减一
        }
    }

    /**
     * 创建 zk 对象
     * 当客户端连接上 zookeeper 时会执行 process(event)里的 countDownLatch.countDown(),计数器的值变为 0,则 countDownLatch.await()方法返回。* @param hosts
     * @throws IOException
     * @throws InterruptedException
     */
    public void connect(String hosts) throws IOException, InterruptedException {zk = new ZooKeeper(hosts, SESSION_TIMEOUT, this);
        countDownLatch.await();//阻塞程序继续执行
    }
    
    /**
     * 创建 group
     * 
     * @param groupName 组名
     * @throws KeeperException
     * @throws InterruptedException
     */
    public void create(String groupName) throws KeeperException, InterruptedException {String path = "/" + groupName;
        String createPath = zk.create(path, null, Ids.OPEN_ACL_UNSAFE/*允许任何客户端对该 znode 进行读写 */, CreateMode.PERSISTENT/* 持久化的 znode*/);
        System.out.println("Created" + createPath);
    }
    
    /**
     * 关闭 zk
     * @throws InterruptedException
     */
    public void close() throws InterruptedException {if(zk != null){try {zk.close();
            } catch (InterruptedException e) {throw e;
            }finally{zk = null;
                System.gc();}
        }
    }
}

这里我们使用了同步计数器 CountDownLatch,在 connect 方法中创建执行了 zk = new ZooKeeper(hosts, SESSION_TIMEOUT, this); 之后,下边接着调用了 CountDownLatch 对象的 await 方法阻塞,因为这是 zk 客户端不一定已经完成了与服务端的连接,在客户端连接到服务端时会触发观察者调用 process()方法,我们在方法里边判断一下触发事件的类型,完成连接后计数器减一,connect 方法中解除阻塞。

还有两个地方需要注意:这里创建的 znode 的访问权限是 open 的,且该 znode 是持久化存储的。

测试类如下:

public class CreateGroupTest {private static String hosts = "192.168.1.8";
    private static String groupName = "zoo";
    
    private CreateGroup createGroup = null;
    
    /**
     * init
     * @throws InterruptedException 
     * @throws KeeperException 
     * @throws IOException 
     */
    @Before
    public void init() throws KeeperException, InterruptedException, IOException {createGroup = new CreateGroup();
        createGroup.connect(hosts);
    }
    
    @Test
    public void testCreateGroup() throws KeeperException, InterruptedException {createGroup.create(groupName);
    }
    
    /**
     * 销毁资源
     */
    @After
    public void destroy() {try {createGroup.close();
            createGroup = null;
            System.gc();} catch (InterruptedException e) {e.printStackTrace();
        }
    }
}

由于 zk 对象的创建和销毁代码是可以复用的,所以这里我们把它分装成了接口:

/**
 * 连接的观察者,封装了 zk 的创建等
 * @author leo
 *
 */
public class ConnectionWatcher implements Watcher {private static final int SESSION_TIMEOUT = 5000;

    protected ZooKeeper zk = null;
    private CountDownLatch countDownLatch = new CountDownLatch(1);

    public void process(WatchedEvent event) {KeeperState state = event.getState();
        
        if(state == KeeperState.SyncConnected){countDownLatch.countDown();
        }
    }
    
    /**
     * 连接资源
     * @param hosts
     * @throws IOException
     * @throws InterruptedException
     */
    public void connection(String hosts) throws IOException, InterruptedException {zk = new ZooKeeper(hosts, SESSION_TIMEOUT, this);
        countDownLatch.await();}
    
    /**
     * 释放资源
     * @throws InterruptedException
     */
    public void close() throws InterruptedException {if (null != zk) {try {zk.close();
            } catch (InterruptedException e) {throw e;
            }finally{zk = null;
                System.gc();}
        }
    }
}

7. Zookeeper 删除 Znode

/**
 * 删除分组
 * @author leo
 *
 */
public class DeleteGroup extends ConnectionWatcher {public void delete(String groupName) {String path = "/" + groupName;
        
        try {List<String> children = zk.getChildren(path, false);
            
            for(String child : children){zk.delete(path + "/" + child, -1);
            }
            zk.delete(path, -1);//版本号为 -1,
        } catch (KeeperException e) {e.printStackTrace();
        } catch (InterruptedException e) {e.printStackTrace();
        }
    }
}

zk.delete(path,version)方法的第二个参数是 znode 版本号,如果提供的版本号和 znode 版本号一致才会删除这个 znode,这样可以检测出对 znode 的修改冲突。通过将版本号设置为 -1,可以绕过这个版本检测机制,无论 znode 的版本号是什么,都会直接将其删除。

测试类:

 
public class DeleteGroupTest {private static final String HOSTS = "192.168.1.137";
    private static final String groupName = "zoo";
    
    private DeleteGroup deleteGroup = null;
    
    @Before
    public void init() throws IOException, InterruptedException {deleteGroup = new DeleteGroup();
        deleteGroup.connection(HOSTS);
    }
    
    @Test
    public void testDelete() throws IOException, InterruptedException, KeeperException {deleteGroup.delete(groupName);
    }
    
    @After
    public void destroy() throws InterruptedException {if(null != deleteGroup){try {deleteGroup.close();
            } catch (InterruptedException e) {throw e;
            }finally{deleteGroup = null;
                System.gc();}
        }
    }
}

8. Zookeeper 的相关操作

ZooKeeper 中共有 9 中操作:

create:创建一个 znode

delete:删除一个 znode

exists:测试一个 znode

getACL,setACL:获取 / 设置一个 znode 的 ACL(权限控制)

getChildren:获取一个 znode 的子节点

getData,setData:获取 / 设置一个 znode 所保存的数据

sync:将客户端的 znode 视图与 ZooKeeper 同步

这里更新数据是必须要提供 znode 的版本号(也可以使用 - 1 强制更新,这里可以执行前通过 exists 方法拿到 znode 的元数据 Stat 对象,然后从 Stat 对象中拿到对应的版本号信息),如果版本号不匹配,则更新会失败。因此一个更新失败的客户端可以尝试是否重试或执行其它操作。

9. ZooKeeper 的 API

ZooKeeper 的 api 支持多种语言,在操作时可以选择使用同步 api 还是异步 api。同步 api 一般是直接返回结果,异步 api 一般是通过回调来传送执行结果的,一般方法中有某参数是类 AsyncCallback 的内部接口,那么该方法应该就是异步调用,回调方法名为 processResult。

10. 观察触发器

可以对客户端和服务器端之间的连接设置观察触发器(后边称之为 zookeeper 的状态观察触发器),也可以对 znode 设置观察触发器。

  • 状态观察器

zk 的整个生命周期如下:

ZooKeeper 学习总结

可在创建 zk 对象时传入一个观察器,在完成 CONNECTING 状态到 CONNECTED 状态时,观察器会触发一个事件,该触发的事件类型为 NONE,通过 event.getState()方法拿到事件状态为 SyncConnected。有一点需要注意的就是,在 zk 调用 close 方法时不会触发任何事件,因为这类的显示调用是开发者主动执行的,属于可控的,不用使用事件通知来告知程序。这一块在下篇博文还会详细解说。

  • 设置 znode的观察器

可以在读操作 exists、getChildren 和 getData 上设置观察,在执行写操作 create、delete 和 setData 将会触发观察事件,当然,在执行写的操作时,也可以选择是否触发 znode 上设置的观察器,具体可查看相关的 api。

当观察的 znode被创建、删除或其数据被更新时,设置在 exists 上的观察将会被触发;

当观察的 znode被删除或数据被更新时,设置在 getData 上的观察将会被触发;

当观察的 znode的子节点被创建、删除或 znode自身被删除时,设置在 getChildren 上的观察将会被触发,可通过观察事件的类型来判断被删除的是 znode 还是它的子节点。

ZooKeeper 学习总结

对于 NodeCreatedNodeDeleted根据路径就能发现是哪个 znode 被写;对于 NodeChildrenChanged 可根据 getChildren 来获取新的子节点列表。

注意:在收到收到触发事件到执行读操作之间,znode 的状态可能会发生状态,这点需要牢记。

至此,编写简单的 zookeeper 应该是可以的了,下篇博文咱们来深入探讨 zookeeper 的相关知识。

参考地址:http://zookeeper.apache.org/doc/r3.4.6/

参考书籍:《Hadoop 权威指南》

Hadoop 权威指南(第 3 版) 修订版(带目录书签) 中文 PDF 高清晰 下载见 http://www.linuxidc.com/Linux/2016-03/129542.htm

更多详情见请继续阅读下一页的精彩内容:http://www.linuxidc.com/Linux/2016-07/133179p2.htm

正文完
星哥玩云-微信公众号
post-qrcode
 0
星锅
版权声明:本站原创文章,由 星锅 于2022-01-21发表,共计23502字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
【腾讯云】推广者专属福利,新客户无门槛领取总价值高达2860元代金券,每种代金券限量500张,先到先得。
阿里云-最新活动爆款每日限量供应
评论(没有评论)
验证码
【腾讯云】云服务器、云数据库、COS、CDN、短信等云产品特惠热卖中