阿里云-云小站(无限量代金券发放中)
【腾讯云】云服务器、云数据库、COS、CDN、短信等热卖云产品特惠抢购

Zookeeper源码分析:Watcher机制

405次阅读
没有评论

共计 7437 个字符,预计需要花费 19 分钟才能阅读完成。

1. 设置 Watcher
使用 Watcher 需要先实现 Watcher 接口,并将实现类对象传递到指定方法中,如 getChildren, exist 等。Zookeeper 允许在构造 Zookeeper 对象时候指定一个默认 Watcher 对象.getChildren 和 exit 方法可以使用这个默认的 Watcher 对象,也可以指定一个新 Watcher 对象。

Code 1: Watcher 接口

public interface Watcher {

    /**
    * Event 的状态
    */
    public interface Event {
        /**
        * 在事件发生时,ZooKeeper 的状态
        */
        public enum KeeperState {

            @Deprecated
            Unknown (-1),

            Disconnected (0),

            @Deprecated
            NoSyncConnected (1),

            SyncConnected (3),

            AuthFailed (4),

            ConnectedReadOnly (5),

            SaslAuthenticated(6),

            Expired (-112);

            private final int intValue; 

            KeeperState(int intValue) {
                this.intValue = intValue;
            } 

            ……
        }

        /**
        * ZooKeeper 中的事件
        */
        public enum EventType {
            None (-1),
            NodeCreated (1),
            NodeDeleted (2),
            NodeDataChanged (3),
            NodeChildrenChanged (4);

            private final int intValue;    // Integer representation of value
                                            // for sending over wire
            EventType(int intValue) {
                this.intValue = intValue;
            }
            …… 
        }
    }

    //Watcher 的回调方法
    abstract public void process(WatchedEvent event);
}

 

Code 2: Zookeeper.getChildren(final String, Watcher)方法

public List<String> getChildren(final String path, Watcher watcher)
    throws KeeperException, InterruptedException
{
    final String clientPath = path;
    PathUtils. validatePath(clientPath);

    WatchRegistration wcb = null;
    // 如果 watcher 不等于 null, 构建 WatchRegistration 对象,
    // 该对象描述了 watcher 和 path 之间的关系
    if (watcher != null) {
        wcb = new ChildWatchRegistration(watcher, clientPath);
    }
   
    // 在传入的 path 加上 root path 前缀,构成服务器端的绝对路径
    final String serverPath = prependChroot(clientPath);
   
    // 构建 RequestHeader 对象
    RequestHeader h = new RequestHeader();
    // 设置操作类型为 OpCode. getChildren
    h.setType(ZooDefs.OpCode. getChildren);
    // 构建 GetChildrenRequest 对象
    GetChildrenRequest request = new GetChildrenRequest();
    // 设置 path
    request.setPath(serverPath);
    // 设置是否使用 watcher
    request.setWatch(watcher != null);
    // 构建 GetChildrenResponse 对象
    GetChildrenResponse response = new GetChildrenResponse();
    // 提交请求,并阻塞等待结果
    ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
    if (r.getErr() != 0) {
        throw KeeperException.create(KeeperException.Code. get(r.getErr()),
                clientPath);
    }
    return response.getChildren();
}

Follower 的 NIOServerCnxn 类接到了 Client 的请求,会调用 ZookeeperServer.processPacket()方法。该方法会构建一个 Request 对象,并调用第一个处理器 FollowerRequestProcessor。

由于我们的请求只是一个读操作,而不是一个 Quorum 请求或者 sync 请求,所以 FollowerRequestProcessor 不需要调用 Follower.request()方法将请求转给 Leader,只需要将请求传递到下一个处理器 CommitProcessor。

处理器 CommitProcessor 线程发现请求是读请求后,直接将 Requet 对象加入到 toProcess 队列中,在接下的循环中会调用 FinalRequestProcessor.processRequest 方法进行处理。

FinalRequestProcessor.processRequest 方法最终会调用 ZKDatabase 中的读操作方法(如 statNode 和 getData 方法),而 ZKDatabase 的这些方法会最终调用 DataTree 类的方法来获取指定 path 的 znode 信息并返回给 Client 端,同时也会设置 Watcher。

Code 3: FinalRequestProcessor 对 OpCode.getData 请求的处理

case OpCode. getData: {
              lastOp = “GETD”;
              GetDataRequest getDataRequest = new GetDataRequest();
              ByteBufferInputStream. byteBuffer2Record(request.request,
                      getDataRequest);
              // 获得 znode 对象
              DataNode n = zks.getZKDatabase().getNode(getDataRequest.getPath());
              // n 为 null, 抛出 NoNodeException 异常
              if (n == null) {
                  throw new KeeperException.NoNodeException();
              }
              Long aclL;
              synchronized(n) {
                  aclL = n. acl;
              }
              // 检查是否有读权限
              PrepRequestProcessor. checkACL(zks, zks.getZKDatabase().convertLong(aclL),
                      ZooDefs.Perms. READ,
                      request. authInfo);
              // 构建状态对象 stat
              Stat stat = new Stat();
              // 获得指定 path 的 znode 数据,
              // 如果 GetDataRequest.getWatcher() 返回 true, 将 ServerCnxn 类型对象 cnxn 传递进去。
              //ServerCnxn 是实现了 Watcher 接口
              byte b[] = zks.getZKDatabase().getData(getDataRequest.getPath(), stat,
                      getDataRequest. getWatch() ? cnxn : null);
              // 构建 GetDataResponse 对象
              rsp = new GetDataResponse(b, stat);
              break;
          }

Code 4: DataTree.getData()方法

public byte[] getData(String path, Stat stat, Watcher watcher)
        throws KeeperException.NoNodeException {
    // 从 nodes map 中获取指定 path 的 DataNode 对象
    DataNode n = nodes.get(path);
    // 如果 n 为 null, 则抛出 NoNodeException 异常
    if (n == null) {
        throw new KeeperException.NoNodeException();
    }
    synchronized (n) {
        // 将 n 的状态 copy 到 stat 中
        n.copyStat(stat);
        // 如果 watcher 不会 null, 则将(path, watcher) 键值对放入 dataWatchers Map 里
        if (watcher != null) {
            dataWatches.addWatch(path, watcher);
        }
        // 返回节点数据
        return n.data ;
    }
}

2. 修改 znode 数据触发 Watcher
在 Zookeeper 二阶段提交的 COMMIT 阶段。当 Follower 从 Leader 那接收到一个写请求的 Leader.COMMIT 数据包,会调用 FinalRequestProcessor.processRequest()方法。Leader 本身在发送完 Leader.COMMIT 数据包,也会调用 FinalRequestProcessor.processRequest()方法。

如果是 setData 修改数据请求,那么 FinalRequestProcessor.processRequest()方法最终会调用到 DataTree.setData 方法将 txn 应用到指定 znode 上,同时触发 Watcher,并发送 notification 给 Client 端。

其关 SetData 请求的时序图如下:

Zookeeper 源码分析:Watcher 机制

triggerWatcher

Code 5: DataTree.setData()方法

public Stat setData(String path, byte data[], int version, long zxid,
        long time) throws KeeperException.NoNodeException {
    Stat s = new Stat();
    // 根据 path, 获得 DataNode 对象 n
    DataNode n = nodes.get(path);
    // 如果 n 为 null, 则抛出 NoNodeException 异常
    if (n == null) {
        throw new KeeperException.NoNodeException();
    }
    byte lastdata[] = null;
    synchronized (n) {
        lastdata = n. data;
        n. data = data;
        n. stat.setMtime(time);
        n. stat.setMzxid(zxid);
        n. stat.setVersion(version);
        n.copyStat(s);
    }
    // now update if the path is in a quota subtree.
    String lastPrefix = getMaxPrefixWithQuota(path);
    if(lastPrefix != null) {
      this.updateBytes(lastPrefix, (data == null ? 0 : data.length)
          – (lastdata == null ? 0 : lastdata.length));
    }
    // 触发 Watcher
    dataWatches.triggerWatch(path, EventType.NodeDataChanged);
    return s;
}

 

Code 6: WatchManage.triggerWatcher()方法,触发 Watcher。

Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {
    WatchedEvent e = new WatchedEvent(type,
            KeeperState. SyncConnected, path);
    HashSet<Watcher> watchers;
    synchronized (this) {
        // 从 watchTable 删除掉 path 对于的 watcher
        watchers = watchTable.remove(path);
        if (watchers == null || watchers.isEmpty()) {
            if (LOG .isTraceEnabled()) {
                ZooTrace. logTraceMessage(LOG,
                        ZooTrace. EVENT_DELIVERY_TRACE_MASK,
                        “No watchers for ” + path);
            }
            return null;
        }
        for (Watcher w : watchers) {
            HashSet<String> paths = watch2Paths.get(w);
            if (paths != null) {
                paths.remove(path);
            }
        }
    }
    // 循环处理所有关于 path 的 Watcher, 这里 Watcher 对象实际上就是 ServerCnxn 类型对象
    for (Watcher w : watchers) {
        if (supress != null && supress.contains(w)) {
            continue;
        }
        w.process(e);
    }
    return watchers;
}

 

Code 7: NIOServerCnxn.process 方法,发送 notification 给 Client 端

synchronized public void process (WatchedEvent event) {
    ReplyHeader h = new ReplyHeader(-1, -1L, 0);
    if (LOG .isTraceEnabled()) {
        ZooTrace. logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK ,
                                “Deliver event ” + event + ” to 0x”
                                + Long. toHexString(this. sessionId)
                                + ” through ” + this );
    }

    // Convert WatchedEvent to a type that can be sent over the wire
    WatcherEvent e = event.getWrapper();
   
    // 发送 notification 给 Client 端
    sendResponse(h, e, “notification”);
}

3. 总结
Watcher 具有 one-time trigger 的特性,在代码中我们也可以看到一个 watcher 被处理后会立即从 watchTable 中删掉。

————————————– 分割线 ————————————–

Ubuntu 14.04 安装分布式存储 Sheepdog+ZooKeeper  http://www.linuxidc.com/Linux/2014-12/110352.htm

CentOS 6 安装 sheepdog 虚拟机分布式储存  http://www.linuxidc.com/Linux/2013-08/89109.htm

ZooKeeper 集群配置 http://www.linuxidc.com/Linux/2013-06/86348.htm

使用 ZooKeeper 实现分布式共享锁 http://www.linuxidc.com/Linux/2013-06/85550.htm

分布式服务框架 ZooKeeper — 管理分布式环境中的数据 http://www.linuxidc.com/Linux/2013-06/85549.htm

ZooKeeper 集群环境搭建实践 http://www.linuxidc.com/Linux/2013-04/83562.htm

ZooKeeper 服务器集群环境配置实测 http://www.linuxidc.com/Linux/2013-04/83559.htm

ZooKeeper 集群安装 http://www.linuxidc.com/Linux/2012-10/72906.htm

正文完
星哥玩云-微信公众号
post-qrcode
 0
星锅
版权声明:本站原创文章,由 星锅 于2022-01-20发表,共计7437字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
【腾讯云】推广者专属福利,新客户无门槛领取总价值高达2860元代金券,每种代金券限量500张,先到先得。
阿里云-最新活动爆款每日限量供应
评论(没有评论)
验证码
【腾讯云】云服务器、云数据库、COS、CDN、短信等云产品特惠热卖中

星哥玩云

星哥玩云
星哥玩云
分享互联网知识
用户数
4
文章数
19350
评论数
4
阅读量
7960745
文章搜索
热门文章
星哥带你玩飞牛NAS-6:抖音视频同步工具,视频下载自动下载保存

星哥带你玩飞牛NAS-6:抖音视频同步工具,视频下载自动下载保存

星哥带你玩飞牛 NAS-6:抖音视频同步工具,视频下载自动下载保存 前言 各位玩 NAS 的朋友好,我是星哥!...
星哥带你玩飞牛NAS-3:安装飞牛NAS后的很有必要的操作

星哥带你玩飞牛NAS-3:安装飞牛NAS后的很有必要的操作

星哥带你玩飞牛 NAS-3:安装飞牛 NAS 后的很有必要的操作 前言 如果你已经有了飞牛 NAS 系统,之前...
我把用了20年的360安全卫士卸载了

我把用了20年的360安全卫士卸载了

我把用了 20 年的 360 安全卫士卸载了 是的,正如标题你看到的。 原因 偷摸安装自家的软件 莫名其妙安装...
再见zabbix!轻量级自建服务器监控神器在Linux 的完整部署指南

再见zabbix!轻量级自建服务器监控神器在Linux 的完整部署指南

再见 zabbix!轻量级自建服务器监控神器在 Linux 的完整部署指南 在日常运维中,服务器监控是绕不开的...
飞牛NAS中安装Navidrome音乐文件中文标签乱码问题解决、安装FntermX终端

飞牛NAS中安装Navidrome音乐文件中文标签乱码问题解决、安装FntermX终端

飞牛 NAS 中安装 Navidrome 音乐文件中文标签乱码问题解决、安装 FntermX 终端 问题背景 ...
阿里云CDN
阿里云CDN-提高用户访问的响应速度和成功率
随机文章
小白也能看懂:什么是云服务器?腾讯云 vs 阿里云对比

小白也能看懂:什么是云服务器?腾讯云 vs 阿里云对比

小白也能看懂:什么是云服务器?腾讯云 vs 阿里云对比 星哥玩云,带你从小白到上云高手。今天咱们就来聊聊——什...
星哥带你玩飞牛NAS-6:抖音视频同步工具,视频下载自动下载保存

星哥带你玩飞牛NAS-6:抖音视频同步工具,视频下载自动下载保存

星哥带你玩飞牛 NAS-6:抖音视频同步工具,视频下载自动下载保存 前言 各位玩 NAS 的朋友好,我是星哥!...
从“纸堆”到“电子化”文档:用这个开源系统打造你的智能文档管理系统

从“纸堆”到“电子化”文档:用这个开源系统打造你的智能文档管理系统

从“纸堆”到“电子化”文档:用这个开源系统打造你的智能文档管理系统 大家好,我是星哥。公司的项目文档存了一堆 ...
星哥带你玩飞牛NAS-1:安装飞牛NAS

星哥带你玩飞牛NAS-1:安装飞牛NAS

星哥带你玩飞牛 NAS-1:安装飞牛 NAS 前言 在家庭和小型工作室场景中,NAS(Network Atta...
开源MoneyPrinterTurbo 利用AI大模型,一键生成高清短视频!

开源MoneyPrinterTurbo 利用AI大模型,一键生成高清短视频!

  开源 MoneyPrinterTurbo 利用 AI 大模型,一键生成高清短视频! 在短视频内容...

免费图片视频管理工具让灵感库告别混乱

一言一句话
-「
手气不错
150元打造低成本NAS小钢炮,捡一块3865U工控板

150元打造低成本NAS小钢炮,捡一块3865U工控板

150 元打造低成本 NAS 小钢炮,捡一块 3865U 工控板 一块二手的熊猫 B3 工控板 3865U,搭...
支付宝、淘宝、闲鱼又双叕崩了,Cloudflare也瘫了连监控都挂,根因藏在哪?

支付宝、淘宝、闲鱼又双叕崩了,Cloudflare也瘫了连监控都挂,根因藏在哪?

支付宝、淘宝、闲鱼又双叕崩了,Cloudflare 也瘫了连监控都挂,根因藏在哪? 最近两天的互联网堪称“故障...
星哥带你玩飞牛NAS硬件 01:捡垃圾的最爱双盘,暴风二期矿渣为何成不老神话?

星哥带你玩飞牛NAS硬件 01:捡垃圾的最爱双盘,暴风二期矿渣为何成不老神话?

星哥带你玩飞牛 NAS 硬件 01:捡垃圾的最爱双盘,暴风二期矿渣为何成不老神话? 前言 在选择 NAS 用预...
仅2MB大小!开源硬件监控工具:Win11 无缝适配,CPU、GPU、网速全维度掌控

仅2MB大小!开源硬件监控工具:Win11 无缝适配,CPU、GPU、网速全维度掌控

还在忍受动辄数百兆的“全家桶”监控软件?后台偷占资源、界面杂乱冗余,想查个 CPU 温度都要层层点选? 今天给...
零成本上线!用 Hugging Face免费服务器+Docker 快速部署HertzBeat 监控平台

零成本上线!用 Hugging Face免费服务器+Docker 快速部署HertzBeat 监控平台

零成本上线!用 Hugging Face 免费服务器 +Docker 快速部署 HertzBeat 监控平台 ...