配置服务 Configuration Service
一个基本的ZooKeeper实现的服务就是“配置服务”,集群中的服务器可以通过ZooKeeper共享一个通用的配置数据。从表面上,ZooKeeper可以理解为一个配置数据的高可用存储服务,为应用提供检索和更新配置数据服务。我们可以使用ZooKeeper的观察模式实现一个活动的配置服务,当配置数据发生变化时,可以通知与配置相关客户端。
接下来,我们来实现一个这样的活动配置服务。首先,我们设计用znode来存储key-value对,我们在znode中存储一个String类型的数据作为value,用znode的path来表示key。然后,我们实现一个client,这个client可以在任何时候对数据进行跟新操作。那么这个设计的ZooKeeper数据模型应该是:master来更新数据,其他的worker也随之将数据更新,就像HDFS的namenode那样。
我们在一个叫做ActiveKeyValueStore的类中编写代码如下:
public class ActiveKeyValueStore extends ConnectionWatcher {
private static final Charset CHARSET = Charset.forName("UTF-8");
public void write(String path, String value) throws InterruptedException,
KeeperException {
Stat stat = zk.exists(path, false);
if (stat == null) {
zk.create(path, value.getBytes(CHARSET), Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
} else {
zk.setData(path, value.getBytes(CHARSET), -1);
}
}
}
write()
方法主要实现将给定的key-value对写入到ZooKeeper中。这其中隐含了创建一个新的znode和更新一个已存在的znode的实现方法的不同。那么操作之前,我们需要根据exists()
来判断znode是否存在,然后再根据情况进行相关的操作。其他值得一提的就是String类型的数据在转换成byte[]
时,使用的字符集是UTF-8。
我们为了说明ActiveKeyValueStore
怎么使用,我们考虑实现一个ConfigUpdater
类来实现更新配置。下面代码实现了一个在一些随机时刻更新配置数据的应用。
public class ConfigUpdater {
public static final String PATH = "/config";
private ActiveKeyValueStore store;
private Random random = new Random();
public ConfigUpdater(String hosts) throws IOException, InterruptedException {
store = new ActiveKeyValueStore();
store.connect(hosts);
}
public void run() throws InterruptedException, KeeperException {
while (true) {
String value = random.nextInt(100) + "";
store.write(PATH, value);
System.out.printf("Set %s to %s\n", PATH, value);
TimeUnit.SECONDS.sleep(random.nextInt(10));
}
}
public static void main(String[] args) throws Exception {
ConfigUpdater configUpdater = new ConfigUpdater(args[0]);
configUpdater.run();
}
}
上面的代码很简单。在ConfigUpdater
的构造函数中,ActiveKeyValueStore
对象连接到ZooKeeper服务。然后run()
不断的循环运行,使用一个随机数不断的随机更新/config
znode上的值。
下面我们来看一下,如何读取/config
上的值。首先,我们在ActiveKeyValueStore
中实现一个读方法。
public String read(String path, Watcher watcher) throws InterruptedException,
KeeperException {
byte[] data = zk.getData(path, watcher, null/*stat*/);
return new String(data, CHARSET);
}
ZooKeeper的getData()
方法的参数包含:path,一个Watcher对象和一个Stat对象。Stat对象中含有从getData()
返回的值,并且负责接收回调信息。这种方式下,调用者不仅可以获得数据,还能够获得znode的metadata。
做为服务的consumer,ConfigWatcher
以观察者身份,创建一个ActiveKeyValueStore
对象,并且在启动以后调用read()
函数(在dispalayConfig()
函数中)获得相关数据。
下面的代码实现了一个以观察模式获得ZooKeeper中的数据更新的应用,并将值到后台中。
public class ConfigWatcher implements Watcher {
private ActiveKeyValueStore store;
public ConfigWatcher(String hosts) throws IOException, InterruptedException {
store = new ActiveKeyValueStore();
store.connect(hosts);
}
public void displayConfig() throws InterruptedException, KeeperException {
String value = store.read(ConfigUpdater.PATH, this);
System.out.printf("Read %s as %s\n", ConfigUpdater.PATH, value);
}
@Override
public void process(WatchedEvent event) {
if (event.getType() == EventType.NodeDataChanged) {
try {
displayConfig();
} catch (InterruptedException e) {
System.err.println("Interrupted. Exiting.");
Thread.currentThread().interrupt();
} catch (KeeperException e) {
System.err.printf("KeeperException: %s. Exiting.\n", e);
}
}
}
public static void main(String[] args) throws Exception {
ConfigWatcher configWatcher = new ConfigWatcher(args[0]);
configWatcher.displayConfig();
// stay alive until process is killed or thread is interrupted
Thread.sleep(Long.MAX_VALUE);
}
}
当ConfigUpadater
更新znode时,ZooKeeper将触发一个EventType.NodeDataChanged
的事件给观察者。ConfigWatcher
将在他的process()
函数中获得这个时间,并将显示读取到的最新的版本的配置数据。
由于观察模式的触发是一次性的,所以每次都要调用ActiveKeyValueStore
的read()
方法,这样才能获得未来的更新数据。我们不能确保一定能够接受到更新通知事件,因为在接受观察事件和下一次读取之间的窗口期内,znode可能被改变了(有可能很多次),但是client可能没有注册观察模式,所以client不会接到znode改变的通知。在配置服务中这不是一个什么问题,因为client只关心配置数据的最新版本。然而,建议读者关注一下这个潜在的问题。
让我们来看一下控制台打印的ConfigUpdater
运行结果:
% java ConfigUpdater localhost
Set /config to 79
Set /config to 14
Set /config to 78
然后立即在另外的控制台终端窗口中运行ConfigWatcher
:
% java ConfigWatcher localhost
Read /config as 79
Read /config as 14
Read /config as 78