阿里云-云小站(无限量代金券发放中)
【腾讯云】云服务器、云数据库、COS、CDN、短信等热卖云产品特惠抢购

ZooKeeper–一个具有高可用性的高性能协调服务

153次阅读
没有评论

共计 11290 个字符,预计需要花费 29 分钟才能阅读完成。

ZooKeeper 是什么

ZooKeeper 是一个具有高可用性的高性能协调服务。

ZooKeeper 维护着一个树形层次结构,书中的节点被称为 znode。znode 可以用来存储数据,并且有一个与之相关联的 ACL(权限),znode 不能大于 1M。

ZooKeeper 的详细介绍:请点这里
ZooKeeper 的下载地址:请点这里

相关阅读

ZooKeeper 集群配置 http://www.linuxidc.com/Linux/2013-06/86348.htm

使用 ZooKeeper 实现分布式共享锁 http://www.linuxidc.com/Linux/2013-06/85550.htm

分布式服务框架 ZooKeeper — 管理分布式环境中的数据 http://www.linuxidc.com/Linux/2013-06/85549.htm

ZooKeeper 集群环境搭建实践 http://www.linuxidc.com/Linux/2013-04/83562.htm

ZooKeeper 服务器集群环境配置实测 http://www.linuxidc.com/Linux/2013-04/83559.htm

ZooKeeper 集群安装 http://www.linuxidc.com/Linux/2012-10/72906.htm

ZooKeeper 使用场景

ZooKeeper 主要用来解决分布式系统中的“部分失败”问题。部分失败是分布式系统的固有的特征,ZooKeeper 不能根除部分失败,也不会隐藏部分失败;但是可以提供一组工具,使你在构建分布式应用时对部分失败进行处理。这主要利用的是观察者模式。

Hadoop 内置了 ZooKeeper,阿里的开源框架 Dubbo、Otter 等也都是用 ZooKeeper 作为协调服务。

安装和运行 ZooKeeper

安装

wget http://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.3.6/zookeeper-3.3.6.tar.gz

tar zxvf zookeeper-3.3.6.tar.gz

cd zookeeper-3.3.6

cp conf/zoo_sample.cfg conf/zoo.cfg

运行

bin/zkServer.sh start

测试是否正确运行:echo ruok | nc localhost 2181,如果启动成功,则返回 imok

zkCli.sh 命令行工具

bin/zkCli.sh -server localhost,连接到 ZooKeeper,可以执行各种 znode 操作。

Java 开发的 znode 操作代码

import java.io.IOException;
import java.util.List;
import java.util.concurrent.CountDownLatch;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;

public class ZkTest implements Watcher
{
 private static final int SESSION_TIMEOUT = 5000;
 private ZooKeeper zk;
 private CountDownLatch connectedSignal = new CountDownLatch(1);

 public void connect(String hosts) throws IOException,InterruptedException
 {
  zk = new ZooKeeper(hosts,SESSION_TIMEOUT,this);
  connectedSignal.await();
 }
 @Override
 public void process(WatchedEvent event)
 {
  if(event.getState() == KeeperState.SyncConnected)
  {
   connectedSignal.countDown();
  }
 }
 // 新建表示组的 znode
 public void create(String groupName) throws KeeperException,InterruptedException
 {
  String path = “/” + groupName;
  String createdPath = zk.create(path,null/*data*/,Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
  System.out.println(“Created ” + createdPath);
 }
 // 用户将成员加入组中: 临时成员
 public void join(String groupName,String memberName) throws KeeperException,InterruptedException
 {
  String path = “/” + groupName + “/” + memberName;
  String createdPath = zk.create(path,null/*data*/,Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL);
  System.out.println(“Joined ” + createdPath);
 }
 // 列出组成员
 public void list(String groupName) throws KeeperException,InterruptedException
 {
  String path = “/” + groupName;
  try
  {
   List<String> children = zk.getChildren(path,false);
   if(children.isEmpty())
   {
    System.out.printf(“No members in group %s\n”,groupName);
    System.exit(1);
   }
   for(String child:children)
   {
    System.out.println(child);
   }
  }
  catch (KeeperException.NoNodeException e)
  {
   System.out.printf(“No members in group %s\n”,groupName);
   System.exit(1);
  }
 }
 // 删除一个组及其所有成员
 public void delete(String groupName) throws KeeperException,InterruptedException
 {
  String path = “/” + groupName;
  try
  {
   List<String> children = zk.getChildren(path,false);
   for(String child:children)
   {
    zk.delete(path + “/” + child,-1);
   }
   zk.delete(path,-1);
  }
  catch (KeeperException.NoNodeException e)
  {
   System.out.printf(“Group %s does not exist\n”,groupName);
   System.exit(1);
  }
 }
 public void close() throws InterruptedException
 {
  zk.close();
 }
 public static void main(String[] args) throws Exception
 {
  ZkTest zkTest = new ZkTest();
  zkTest.connect(“192.168.211.230”);
  //zkTest.create(“longlonggroup”);
  //zkTest.join(“longlonggroup”, “aaa”);
  //Thread.sleep(60000); // 这里睡 60 秒是为了让你通过 zkCli.sh 之类的工具看到 /creategroup 下的 aaa 这个临时节点
  //zkTest.list(“CMS”);
  //zkTest.delete(“longlonggroup”);
  zkTest.close();
 }
}

需要引用 jar 包,Maven 方式如下:

 <dependency>
  <groupId>org.apache.zookeeper</groupId>
  <artifactId>zookeeper</artifactId>
  <version>3.4.6</version>
 </dependency>

Java 开发的配置服务代码

import java.io.IOException;
import java.nio.charset.Charset;
import java.util.Random;
import java.util.concurrent.CountDownLatch;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

class ConnectionWatcher implements Watcher
{
 private static final int SESSION_TIMEOUT = 5000;
 protected ZooKeeper zk;
 private CountDownLatch connectedSignal = new CountDownLatch(1);

 public void connect(String hosts) throws IOException,InterruptedException
 {
  zk = new ZooKeeper(hosts,SESSION_TIMEOUT,this);
  connectedSignal.await();
 }
 @Override
 public void process(WatchedEvent event)
 {
  if(event.getState() == KeeperState.SyncConnected)
  {
   connectedSignal.countDown();
  }
 }
 public void close() throws InterruptedException
 {
  zk.close();
 }
}
class ActiveKeyValueStore extends ConnectionWatcher
{
 private static final Charset CHARSET = Charset.forName(“UTF-8”);
 public void write(String path,String value) throws InterruptedException,KeeperException
 {
  Stat stat = zk.exists(path,false);
  if(stat == null)
  {
   zk.create(path,value.getBytes(CHARSET),Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
  } else {
   zk.setData(path,value.getBytes(CHARSET),-1);
  }
 }
 public String read(String path,Watcher watcher) throws InterruptedException,KeeperException
 {
  byte[] data = zk.getData(path,watcher,null/*stat*/);
  return new String(data,CHARSET);
 }
}
// 随机更新 ZooKeeper 中的属性
class ConfigUpdater
{
 public static final String PATH = “/configtestbypuma”;
 private ActiveKeyValueStore store;
 private Random random = new Random();
 public ConfigUpdater() throws IOException,InterruptedException
 {
  store = new ActiveKeyValueStore();
  store.connect(“localhost”);
 }
 public void run() throws InterruptedException,KeeperException
 {
  String value = random.nextInt(100) + “”;
  store.write(PATH, value);
  System.out.printf(“Set %s to %s\n”,PATH,value);
  Thread.sleep(random.nextInt(10)*1000);
 }
 public static void main(String[] args) throws Exception
 {
  ConfigUpdater configUpdater = new ConfigUpdater();
  configUpdater.run();
 }
}
// 观察 Zookeeper 中属性的更新情况,并打印到控制台(注意,本例只会观察最新的 znode 情况,对于二次观察之前的更新不关心)
class ConfigWatcher implements Watcher
{
 private ActiveKeyValueStore store;
 public ConfigWatcher() throws IOException,InterruptedException
 {
  store = new ActiveKeyValueStore();
  store.connect(“localhost”);
 }
 public void displayConfig() throws InterruptedException,KeeperException
 {
  String value = store.read(ConfigUpdater.PATH, this);
  System.out.printf(“Read %s as %s\n”,ConfigUpdater.PATH,value);
 }
 @Override
 public void process(WatchedEvent event)
 {
  if(event.getType() == EventType.NodeDataChanged)
  {
   try
   {
    displayConfig();
   } catch(InterruptedException e) {
    System.err.println(“Interrupted. Exiting.”);
    Thread.currentThread().interrupt();
   } catch (KeeperException e)
   {
    System.err.printf(“KeeperException: %s. Exiting. \n”, e);
   }
  }
 }
 public static void main(String[] args) throws Exception
 {
  ConfigWatcher configWatcher = new ConfigWatcher();
  configWatcher.displayConfig();
 
  Thread.sleep(Long.MAX_VALUE);
 }
}

ZooKeeper 配置文件
如下是一个包含有 3 台机器的复制模式下的配置例子:

tickTime=2000
dataDir=/diskl/zookeeper
dataLogDir=/disk2/zookeeper
clientPort=2181
initlimit=5
syncLimit=2
server.l=zookeeperl:2888:3888
server.2=zookeeper2:2888:3888
server.3=zookeeper3:2888:3888

tickTime:以毫秒为单位,用来控制心跳和超时,默认情况超时的时间为两倍的 tickTime
监听端口:2181 端口用于客户端连接;对于领导者来说,2888 端口用于跟随者连接;3888 端口用于领导者选举阶段的其他服务器连接。当一个 ZooKeeper 服务器启动时,它读取 myid 文件用于确定自己的服务器 ID,然后通过读取配置文件来确定应当在哪个端口进行监听,同时确定集合体中的其他服务器的网络地址。

在复制模式下,有两个额外的强制参数:initLimit 和 syncLimit,两者都是以滴答参数的倍数进行度量。
initlimit 参数:设定了允许所有跟随者与领导者进行连接井同步的时间。如果在设定的时间段内,半数以上的跟随者未能完成同步,领导者便会宣布放弃领导地位,然后进行另外一次领导者选举。如果这种情况经常发生 (可以通过日志中的记录发现这种情况),则表明设定的值太小。
syncLimit 参数:设定了允许一个跟随者与领导者进行同步的时间。如果在设定的时间段内,一个跟随者未能完成同步,它将会自己重启。所有关联到这个跟随者的客户端将连接到另一个跟随者。
以上是服务器集群所需的最小配置,可以通过“ZooKeeper 管理员指南”查看更多配置,进行性能调优。

ZooKeeper 是什么

ZooKeeper 是一个具有高可用性的高性能协调服务。

ZooKeeper 维护着一个树形层次结构,书中的节点被称为 znode。znode 可以用来存储数据,并且有一个与之相关联的 ACL(权限),znode 不能大于 1M。

ZooKeeper 的详细介绍:请点这里
ZooKeeper 的下载地址:请点这里

相关阅读

ZooKeeper 集群配置 http://www.linuxidc.com/Linux/2013-06/86348.htm

使用 ZooKeeper 实现分布式共享锁 http://www.linuxidc.com/Linux/2013-06/85550.htm

分布式服务框架 ZooKeeper — 管理分布式环境中的数据 http://www.linuxidc.com/Linux/2013-06/85549.htm

ZooKeeper 集群环境搭建实践 http://www.linuxidc.com/Linux/2013-04/83562.htm

ZooKeeper 服务器集群环境配置实测 http://www.linuxidc.com/Linux/2013-04/83559.htm

ZooKeeper 集群安装 http://www.linuxidc.com/Linux/2012-10/72906.htm

ZooKeeper 使用场景

ZooKeeper 主要用来解决分布式系统中的“部分失败”问题。部分失败是分布式系统的固有的特征,ZooKeeper 不能根除部分失败,也不会隐藏部分失败;但是可以提供一组工具,使你在构建分布式应用时对部分失败进行处理。这主要利用的是观察者模式。

Hadoop 内置了 ZooKeeper,阿里的开源框架 Dubbo、Otter 等也都是用 ZooKeeper 作为协调服务。

安装和运行 ZooKeeper

安装

wget http://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.3.6/zookeeper-3.3.6.tar.gz

tar zxvf zookeeper-3.3.6.tar.gz

cd zookeeper-3.3.6

cp conf/zoo_sample.cfg conf/zoo.cfg

运行

bin/zkServer.sh start

测试是否正确运行:echo ruok | nc localhost 2181,如果启动成功,则返回 imok

zkCli.sh 命令行工具

bin/zkCli.sh -server localhost,连接到 ZooKeeper,可以执行各种 znode 操作。

Java 开发的 znode 操作代码

import java.io.IOException;
import java.util.List;
import java.util.concurrent.CountDownLatch;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;

public class ZkTest implements Watcher
{
 private static final int SESSION_TIMEOUT = 5000;
 private ZooKeeper zk;
 private CountDownLatch connectedSignal = new CountDownLatch(1);

 public void connect(String hosts) throws IOException,InterruptedException
 {
  zk = new ZooKeeper(hosts,SESSION_TIMEOUT,this);
  connectedSignal.await();
 }
 @Override
 public void process(WatchedEvent event)
 {
  if(event.getState() == KeeperState.SyncConnected)
  {
   connectedSignal.countDown();
  }
 }
 // 新建表示组的 znode
 public void create(String groupName) throws KeeperException,InterruptedException
 {
  String path = “/” + groupName;
  String createdPath = zk.create(path,null/*data*/,Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
  System.out.println(“Created ” + createdPath);
 }
 // 用户将成员加入组中: 临时成员
 public void join(String groupName,String memberName) throws KeeperException,InterruptedException
 {
  String path = “/” + groupName + “/” + memberName;
  String createdPath = zk.create(path,null/*data*/,Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL);
  System.out.println(“Joined ” + createdPath);
 }
 // 列出组成员
 public void list(String groupName) throws KeeperException,InterruptedException
 {
  String path = “/” + groupName;
  try
  {
   List<String> children = zk.getChildren(path,false);
   if(children.isEmpty())
   {
    System.out.printf(“No members in group %s\n”,groupName);
    System.exit(1);
   }
   for(String child:children)
   {
    System.out.println(child);
   }
  }
  catch (KeeperException.NoNodeException e)
  {
   System.out.printf(“No members in group %s\n”,groupName);
   System.exit(1);
  }
 }
 // 删除一个组及其所有成员
 public void delete(String groupName) throws KeeperException,InterruptedException
 {
  String path = “/” + groupName;
  try
  {
   List<String> children = zk.getChildren(path,false);
   for(String child:children)
   {
    zk.delete(path + “/” + child,-1);
   }
   zk.delete(path,-1);
  }
  catch (KeeperException.NoNodeException e)
  {
   System.out.printf(“Group %s does not exist\n”,groupName);
   System.exit(1);
  }
 }
 public void close() throws InterruptedException
 {
  zk.close();
 }
 public static void main(String[] args) throws Exception
 {
  ZkTest zkTest = new ZkTest();
  zkTest.connect(“192.168.211.230”);
  //zkTest.create(“longlonggroup”);
  //zkTest.join(“longlonggroup”, “aaa”);
  //Thread.sleep(60000); // 这里睡 60 秒是为了让你通过 zkCli.sh 之类的工具看到 /creategroup 下的 aaa 这个临时节点
  //zkTest.list(“CMS”);
  //zkTest.delete(“longlonggroup”);
  zkTest.close();
 }
}

正文完
星哥说事-微信公众号
post-qrcode
 0
星锅
版权声明:本站原创文章,由 星锅 于2022-01-20发表,共计11290字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
【腾讯云】推广者专属福利,新客户无门槛领取总价值高达2860元代金券,每种代金券限量500张,先到先得。
阿里云-最新活动爆款每日限量供应
评论(没有评论)
验证码
【腾讯云】云服务器、云数据库、COS、CDN、短信等云产品特惠热卖中