阿里云-云小站(无限量代金券发放中)
【腾讯云】云服务器、云数据库、COS、CDN、短信等热卖云产品特惠抢购

Zookeeper源码分析:Watcher机制

107次阅读
没有评论

共计 7437 个字符,预计需要花费 19 分钟才能阅读完成。

1. 设置 Watcher
使用 Watcher 需要先实现 Watcher 接口,并将实现类对象传递到指定方法中,如 getChildren, exist 等。Zookeeper 允许在构造 Zookeeper 对象时候指定一个默认 Watcher 对象.getChildren 和 exit 方法可以使用这个默认的 Watcher 对象,也可以指定一个新 Watcher 对象。

Code 1: Watcher 接口

public interface Watcher {

    /**
    * Event 的状态
    */
    public interface Event {
        /**
        * 在事件发生时,ZooKeeper 的状态
        */
        public enum KeeperState {

            @Deprecated
            Unknown (-1),

            Disconnected (0),

            @Deprecated
            NoSyncConnected (1),

            SyncConnected (3),

            AuthFailed (4),

            ConnectedReadOnly (5),

            SaslAuthenticated(6),

            Expired (-112);

            private final int intValue; 

            KeeperState(int intValue) {
                this.intValue = intValue;
            } 

            ……
        }

        /**
        * ZooKeeper 中的事件
        */
        public enum EventType {
            None (-1),
            NodeCreated (1),
            NodeDeleted (2),
            NodeDataChanged (3),
            NodeChildrenChanged (4);

            private final int intValue;    // Integer representation of value
                                            // for sending over wire
            EventType(int intValue) {
                this.intValue = intValue;
            }
            …… 
        }
    }

    //Watcher 的回调方法
    abstract public void process(WatchedEvent event);
}

 

Code 2: Zookeeper.getChildren(final String, Watcher)方法

public List<String> getChildren(final String path, Watcher watcher)
    throws KeeperException, InterruptedException
{
    final String clientPath = path;
    PathUtils. validatePath(clientPath);

    WatchRegistration wcb = null;
    // 如果 watcher 不等于 null, 构建 WatchRegistration 对象,
    // 该对象描述了 watcher 和 path 之间的关系
    if (watcher != null) {
        wcb = new ChildWatchRegistration(watcher, clientPath);
    }
   
    // 在传入的 path 加上 root path 前缀,构成服务器端的绝对路径
    final String serverPath = prependChroot(clientPath);
   
    // 构建 RequestHeader 对象
    RequestHeader h = new RequestHeader();
    // 设置操作类型为 OpCode. getChildren
    h.setType(ZooDefs.OpCode. getChildren);
    // 构建 GetChildrenRequest 对象
    GetChildrenRequest request = new GetChildrenRequest();
    // 设置 path
    request.setPath(serverPath);
    // 设置是否使用 watcher
    request.setWatch(watcher != null);
    // 构建 GetChildrenResponse 对象
    GetChildrenResponse response = new GetChildrenResponse();
    // 提交请求,并阻塞等待结果
    ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
    if (r.getErr() != 0) {
        throw KeeperException.create(KeeperException.Code. get(r.getErr()),
                clientPath);
    }
    return response.getChildren();
}

Follower 的 NIOServerCnxn 类接到了 Client 的请求,会调用 ZookeeperServer.processPacket()方法。该方法会构建一个 Request 对象,并调用第一个处理器 FollowerRequestProcessor。

由于我们的请求只是一个读操作,而不是一个 Quorum 请求或者 sync 请求,所以 FollowerRequestProcessor 不需要调用 Follower.request()方法将请求转给 Leader,只需要将请求传递到下一个处理器 CommitProcessor。

处理器 CommitProcessor 线程发现请求是读请求后,直接将 Requet 对象加入到 toProcess 队列中,在接下的循环中会调用 FinalRequestProcessor.processRequest 方法进行处理。

FinalRequestProcessor.processRequest 方法最终会调用 ZKDatabase 中的读操作方法(如 statNode 和 getData 方法),而 ZKDatabase 的这些方法会最终调用 DataTree 类的方法来获取指定 path 的 znode 信息并返回给 Client 端,同时也会设置 Watcher。

Code 3: FinalRequestProcessor 对 OpCode.getData 请求的处理

case OpCode. getData: {
              lastOp = “GETD”;
              GetDataRequest getDataRequest = new GetDataRequest();
              ByteBufferInputStream. byteBuffer2Record(request.request,
                      getDataRequest);
              // 获得 znode 对象
              DataNode n = zks.getZKDatabase().getNode(getDataRequest.getPath());
              // n 为 null, 抛出 NoNodeException 异常
              if (n == null) {
                  throw new KeeperException.NoNodeException();
              }
              Long aclL;
              synchronized(n) {
                  aclL = n. acl;
              }
              // 检查是否有读权限
              PrepRequestProcessor. checkACL(zks, zks.getZKDatabase().convertLong(aclL),
                      ZooDefs.Perms. READ,
                      request. authInfo);
              // 构建状态对象 stat
              Stat stat = new Stat();
              // 获得指定 path 的 znode 数据,
              // 如果 GetDataRequest.getWatcher() 返回 true, 将 ServerCnxn 类型对象 cnxn 传递进去。
              //ServerCnxn 是实现了 Watcher 接口
              byte b[] = zks.getZKDatabase().getData(getDataRequest.getPath(), stat,
                      getDataRequest. getWatch() ? cnxn : null);
              // 构建 GetDataResponse 对象
              rsp = new GetDataResponse(b, stat);
              break;
          }

Code 4: DataTree.getData()方法

public byte[] getData(String path, Stat stat, Watcher watcher)
        throws KeeperException.NoNodeException {
    // 从 nodes map 中获取指定 path 的 DataNode 对象
    DataNode n = nodes.get(path);
    // 如果 n 为 null, 则抛出 NoNodeException 异常
    if (n == null) {
        throw new KeeperException.NoNodeException();
    }
    synchronized (n) {
        // 将 n 的状态 copy 到 stat 中
        n.copyStat(stat);
        // 如果 watcher 不会 null, 则将(path, watcher) 键值对放入 dataWatchers Map 里
        if (watcher != null) {
            dataWatches.addWatch(path, watcher);
        }
        // 返回节点数据
        return n.data ;
    }
}

2. 修改 znode 数据触发 Watcher
在 Zookeeper 二阶段提交的 COMMIT 阶段。当 Follower 从 Leader 那接收到一个写请求的 Leader.COMMIT 数据包,会调用 FinalRequestProcessor.processRequest()方法。Leader 本身在发送完 Leader.COMMIT 数据包,也会调用 FinalRequestProcessor.processRequest()方法。

如果是 setData 修改数据请求,那么 FinalRequestProcessor.processRequest()方法最终会调用到 DataTree.setData 方法将 txn 应用到指定 znode 上,同时触发 Watcher,并发送 notification 给 Client 端。

其关 SetData 请求的时序图如下:

Zookeeper 源码分析:Watcher 机制

triggerWatcher

Code 5: DataTree.setData()方法

public Stat setData(String path, byte data[], int version, long zxid,
        long time) throws KeeperException.NoNodeException {
    Stat s = new Stat();
    // 根据 path, 获得 DataNode 对象 n
    DataNode n = nodes.get(path);
    // 如果 n 为 null, 则抛出 NoNodeException 异常
    if (n == null) {
        throw new KeeperException.NoNodeException();
    }
    byte lastdata[] = null;
    synchronized (n) {
        lastdata = n. data;
        n. data = data;
        n. stat.setMtime(time);
        n. stat.setMzxid(zxid);
        n. stat.setVersion(version);
        n.copyStat(s);
    }
    // now update if the path is in a quota subtree.
    String lastPrefix = getMaxPrefixWithQuota(path);
    if(lastPrefix != null) {
      this.updateBytes(lastPrefix, (data == null ? 0 : data.length)
          – (lastdata == null ? 0 : lastdata.length));
    }
    // 触发 Watcher
    dataWatches.triggerWatch(path, EventType.NodeDataChanged);
    return s;
}

 

Code 6: WatchManage.triggerWatcher()方法,触发 Watcher。

Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {
    WatchedEvent e = new WatchedEvent(type,
            KeeperState. SyncConnected, path);
    HashSet<Watcher> watchers;
    synchronized (this) {
        // 从 watchTable 删除掉 path 对于的 watcher
        watchers = watchTable.remove(path);
        if (watchers == null || watchers.isEmpty()) {
            if (LOG .isTraceEnabled()) {
                ZooTrace. logTraceMessage(LOG,
                        ZooTrace. EVENT_DELIVERY_TRACE_MASK,
                        “No watchers for ” + path);
            }
            return null;
        }
        for (Watcher w : watchers) {
            HashSet<String> paths = watch2Paths.get(w);
            if (paths != null) {
                paths.remove(path);
            }
        }
    }
    // 循环处理所有关于 path 的 Watcher, 这里 Watcher 对象实际上就是 ServerCnxn 类型对象
    for (Watcher w : watchers) {
        if (supress != null && supress.contains(w)) {
            continue;
        }
        w.process(e);
    }
    return watchers;
}

 

Code 7: NIOServerCnxn.process 方法,发送 notification 给 Client 端

synchronized public void process (WatchedEvent event) {
    ReplyHeader h = new ReplyHeader(-1, -1L, 0);
    if (LOG .isTraceEnabled()) {
        ZooTrace. logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK ,
                                “Deliver event ” + event + ” to 0x”
                                + Long. toHexString(this. sessionId)
                                + ” through ” + this );
    }

    // Convert WatchedEvent to a type that can be sent over the wire
    WatcherEvent e = event.getWrapper();
   
    // 发送 notification 给 Client 端
    sendResponse(h, e, “notification”);
}

3. 总结
Watcher 具有 one-time trigger 的特性,在代码中我们也可以看到一个 watcher 被处理后会立即从 watchTable 中删掉。

————————————– 分割线 ————————————–

Ubuntu 14.04 安装分布式存储 Sheepdog+ZooKeeper  http://www.linuxidc.com/Linux/2014-12/110352.htm

CentOS 6 安装 sheepdog 虚拟机分布式储存  http://www.linuxidc.com/Linux/2013-08/89109.htm

ZooKeeper 集群配置 http://www.linuxidc.com/Linux/2013-06/86348.htm

使用 ZooKeeper 实现分布式共享锁 http://www.linuxidc.com/Linux/2013-06/85550.htm

分布式服务框架 ZooKeeper — 管理分布式环境中的数据 http://www.linuxidc.com/Linux/2013-06/85549.htm

ZooKeeper 集群环境搭建实践 http://www.linuxidc.com/Linux/2013-04/83562.htm

ZooKeeper 服务器集群环境配置实测 http://www.linuxidc.com/Linux/2013-04/83559.htm

ZooKeeper 集群安装 http://www.linuxidc.com/Linux/2012-10/72906.htm

正文完
星哥说事-微信公众号
post-qrcode
 0
星锅
版权声明:本站原创文章,由 星锅 于2022-01-20发表,共计7437字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
【腾讯云】推广者专属福利,新客户无门槛领取总价值高达2860元代金券,每种代金券限量500张,先到先得。
阿里云-最新活动爆款每日限量供应
评论(没有评论)
验证码
【腾讯云】云服务器、云数据库、COS、CDN、短信等云产品特惠热卖中