配置服务 Configuration Service

一个基本的ZooKeeper实现的服务就是“配置服务”,集群中的服务器可以通过ZooKeeper共享一个通用的配置数据。从表面上,ZooKeeper可以理解为一个配置数据的高可用存储服务,为应用提供检索和更新配置数据服务。我们可以使用ZooKeeper的观察模式实现一个活动的配置服务,当配置数据发生变化时,可以通知与配置相关客户端。

接下来,我们来实现一个这样的活动配置服务。首先,我们设计用znode来存储key-value对,我们在znode中存储一个String类型的数据作为value,用znode的path来表示key。然后,我们实现一个client,这个client可以在任何时候对数据进行跟新操作。那么这个设计的ZooKeeper数据模型应该是:master来更新数据,其他的worker也随之将数据更新,就像HDFS的namenode那样。

我们在一个叫做ActiveKeyValueStore的类中编写代码如下:

public class ActiveKeyValueStore extends ConnectionWatcher {

  private static final Charset CHARSET = Charset.forName("UTF-8");

  public void write(String path, String value) throws InterruptedException,
      KeeperException {
    Stat stat = zk.exists(path, false);
    if (stat == null) {
      zk.create(path, value.getBytes(CHARSET), Ids.OPEN_ACL_UNSAFE,
          CreateMode.PERSISTENT);
    } else {
      zk.setData(path, value.getBytes(CHARSET), -1);
    }
  }
}

write()方法主要实现将给定的key-value对写入到ZooKeeper中。这其中隐含了创建一个新的znode和更新一个已存在的znode的实现方法的不同。那么操作之前,我们需要根据exists()来判断znode是否存在,然后再根据情况进行相关的操作。其他值得一提的就是String类型的数据在转换成byte[]时,使用的字符集是UTF-8。

我们为了说明ActiveKeyValueStore怎么使用,我们考虑实现一个ConfigUpdater类来实现更新配置。下面代码实现了一个在一些随机时刻更新配置数据的应用。

public class ConfigUpdater {

  public static final String PATH = "/config";

  private ActiveKeyValueStore store;
  private Random random = new Random();

  public ConfigUpdater(String hosts) throws IOException, InterruptedException {
    store = new ActiveKeyValueStore();
    store.connect(hosts);
  }

  public void run() throws InterruptedException, KeeperException {
    while (true) {
      String value = random.nextInt(100) + "";
      store.write(PATH, value);
      System.out.printf("Set %s to %s\n", PATH, value);
      TimeUnit.SECONDS.sleep(random.nextInt(10));
    }
  }

  public static void main(String[] args) throws Exception {
    ConfigUpdater configUpdater = new ConfigUpdater(args[0]);
    configUpdater.run();
  }
}

上面的代码很简单。在ConfigUpdater的构造函数中,ActiveKeyValueStore对象连接到ZooKeeper服务。然后run()不断的循环运行,使用一个随机数不断的随机更新/configznode上的值。

下面我们来看一下,如何读取/config上的值。首先,我们在ActiveKeyValueStore中实现一个读方法。

public String read(String path, Watcher watcher) throws InterruptedException,
      KeeperException {
    byte[] data = zk.getData(path, watcher, null/*stat*/);
    return new String(data, CHARSET);
  }

ZooKeeper的getData()方法的参数包含:path,一个Watcher对象和一个Stat对象。Stat对象中含有从getData()返回的值,并且负责接收回调信息。这种方式下,调用者不仅可以获得数据,还能够获得znode的metadata。

做为服务的consumer,ConfigWatcher以观察者身份,创建一个ActiveKeyValueStore对象,并且在启动以后调用read()函数(在dispalayConfig()函数中)获得相关数据。

下面的代码实现了一个以观察模式获得ZooKeeper中的数据更新的应用,并将值到后台中。

public class ConfigWatcher implements Watcher {

  private ActiveKeyValueStore store;

  public ConfigWatcher(String hosts) throws IOException, InterruptedException {
    store = new ActiveKeyValueStore();
    store.connect(hosts);
  }

  public void displayConfig() throws InterruptedException, KeeperException {
    String value = store.read(ConfigUpdater.PATH, this);
    System.out.printf("Read %s as %s\n", ConfigUpdater.PATH, value);
  }

  @Override
  public void process(WatchedEvent event) {
    if (event.getType() == EventType.NodeDataChanged) {
      try {
        displayConfig();
      } catch (InterruptedException e) {
        System.err.println("Interrupted. Exiting.");        
        Thread.currentThread().interrupt();
      } catch (KeeperException e) {
        System.err.printf("KeeperException: %s. Exiting.\n", e);        
      }
    }
  }

  public static void main(String[] args) throws Exception {
    ConfigWatcher configWatcher = new ConfigWatcher(args[0]);
    configWatcher.displayConfig();

    // stay alive until process is killed or thread is interrupted
    Thread.sleep(Long.MAX_VALUE);
  }
}

ConfigUpadater更新znode时,ZooKeeper将触发一个EventType.NodeDataChanged的事件给观察者。ConfigWatcher将在他的process()函数中获得这个时间,并将显示读取到的最新的版本的配置数据。

由于观察模式的触发是一次性的,所以每次都要调用ActiveKeyValueStoreread()方法,这样才能获得未来的更新数据。我们不能确保一定能够接受到更新通知事件,因为在接受观察事件和下一次读取之间的窗口期内,znode可能被改变了(有可能很多次),但是client可能没有注册观察模式,所以client不会接到znode改变的通知。在配置服务中这不是一个什么问题,因为client只关心配置数据的最新版本。然而,建议读者关注一下这个潜在的问题。

让我们来看一下控制台打印的ConfigUpdater运行结果:

% java ConfigUpdater localhost
Set /config to 79
Set /config to 14
Set /config to 78

然后立即在另外的控制台终端窗口中运行ConfigWatcher:

% java ConfigWatcher localhost
Read /config as 79
Read /config as 14
Read /config as 78

results matching ""

    No results matching ""