zookeeperとは?
本家のガイダンスはこちら。
まともに組むと苦戦するバッチの分散処理などを容易に実装できるもの、と理解しています。
インストール
$ brew install zookeeper
確認
$ ls /usr/local/etc/zookeeper
defaults log4j.properties zoo.cfg zoo_sample.cfg
スタンドアローンモードで実行
デフォルトで配置してある設定ファイルをチュートリアルと見比べてもほぼ同じなので、このまま起動してみる。
$ cat /usr/local/etc/zookeeper/zoo.cfg | egrep -v "#"
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/usr/local/var/run/zookeeper/data
clientPort=2181
$ which zkServer
/usr/local/bin/zkServer
$ sudo zkServer start
ZooKeeper JMX enabled by default
Using config: /usr/local/etc/zookeeper/zoo.cfg
Starting zookeeper ... STARTED
ログは下記に出力される
$ tail -f /usr/local/var/log/zookeeper/zookeeper.log
2017-01-09 06:40:05 QuorumPeerMain [WARN] Either no config or no quorum defined in config, running in standalone mode
なんか出てる。
quorum(クォーラム)はレプリケーション先のサーバのことを指し、それが存在しない、という警告。
ここではお試しのスタンドアローンなので出ていても特に問題ないかと。
接続テスト
設定ファイルでclientPortに指定したポートに接続できる。
$ cat /usr/local/etc/zookeeper/zoo.cfg | grep clientPort
clientPort=2181
2181番ポートを指定しているので、そちらに接続する。
$ which zkCli
/usr/local/bin/zkCli
$ sudo zkCli -server 127.0.0.1:2181
Connecting to 127.0.0.1:2181
Welcome to ZooKeeper!
JLine support is enabled
WATCHER::
WatchedEvent state:SyncConnected type:None path:null
[zk: 127.0.0.1:2181(CONNECTED) 0]
利用可能なコマンドを確認
[zk: 127.0.0.1:2181(CONNECTED) 0] help
ZooKeeper -server host:port cmd args
stat path [watch]
set path data [version]
ls path [watch]
delquota [-n|-b] path
ls2 path [watch]
setAcl path acl
setquota -n|-b val path
history
redo cmdno
printwatches on|off
delete path [version]
sync path
listquota path
rmr path
get path [watch]
create [-s] [-e] path data acl
addauth scheme auth
quit
getAcl path
close
connect host:port
znodeを作成する
チュートリアルに従って。
znodeはZooKeeperツリー上の各ノードのことを指す。
# ディレクトリ確認
[zk: 127.0.0.1:2181(CONNECTED) 0] ls /
[zookeeper]
# ノード(znode)作成
[zk: 127.0.0.1:2181(CONNECTED) 1] create /test_node my_test_data
Created /test_node
# 作成確認
[zk: 127.0.0.1:2181(CONNECTED) 2] ls /
[test_node, zookeeper]
# 作成したデータがノードに関連付けされていることを確認
[zk: 127.0.0.1:2181(CONNECTED) 3] get /test_node
my_test_data
cZxid = 0x8
ctime = Mon Jan 09 07:00:17 JST 2017
mZxid = 0x8
mtime = Mon Jan 09 07:00:17 JST 2017
pZxid = 0x8
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 12
numChildren = 0
# 関連付けされたデータを変更してみる
[zk: 127.0.0.1:2181(CONNECTED) 4] set /test_node replaced_my_test_data
cZxid = 0x8
ctime = Mon Jan 09 07:00:17 JST 2017
mZxid = 0x9
mtime = Mon Jan 09 07:03:21 JST 2017
pZxid = 0x8
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 21
numChildren = 0
# 確認
[zk: 127.0.0.1:2181(CONNECTED) 5] get /test_node
replaced_my_test_data # <- 変更された
cZxid = 0x8
ctime = Mon Jan 09 07:00:17 JST 2017
mZxid = 0x9
mtime = Mon Jan 09 07:03:21 JST 2017
pZxid = 0x8
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 21
numChildren = 0
# 削除
[zk: 127.0.0.1:2181(CONNECTED) 6] delete /test_node
# 確認
[zk: 127.0.0.1:2181(CONNECTED) 7] ls /
[zookeeper]
pythonで使ってみる
python x zookeeperはkazooというライブラリを利用する。
下記のドキュメントに沿って色々書いてみる。
kazoo
kazooのインストール
$ pip install kazoo
ざっと試す
hello_kazoo.py
# coding:utf-8
from kazoo.client import KazooClient
from kazoo.client import KazooState
from kazoo.client import KeeperState
from kazoo.handlers.gevent import SequentialGeventHandler
import logging
logging.basicConfig()
# znodeのルート
root = '/root'
# zookeeperクライアント
zk = KazooClient(hosts='127.0.0.1:2181', read_only=True, handler=SequentialGeventHandler())
# 非同期モードで起動
event = zk.start_async()
event.wait(timeout=3)
# zookeeperサーバに接続できない場合、処理を中断
if not zk.connected:
zk.stop()
raise Exception("Unable to connect.")
def listener(state):
'''
State変更時のリスナー
'''
print('current state is ' + state)
zk.add_listener(listener)
@zk.add_listener
def watch_for_ro(state):
if state == KazooState.CONNECTED:
if zk.client_state == KeeperState.CONNECTED_RO:
print('state is read_only')
else:
print('state is writable')
def print_status(znode):
'''
ノードの状態を取得
'''
print('#####[DEBUG]#####')
# バージョンと登録データを確認
data, stat = zk.get(znode)
print('Version: %s, data: %s' % (stat.version, data.decode('utf-8')))
# rootの子ノード一覧を取得
children = zk.get_children(root)
print("There are %s children with names %s" % (len(children), children))
def doAsync(async_obj):
'''
非同期処理のコールバック関数(処理内容に特に意味は無い)
'''
znodes = async_obj.get()
try:
children = async_obj.get()
# すべての子ノードの名称を出力
print('#####[print child znodes]#####')
for child in children:
print(child)
except (ConnectionLossException, NoAuthException):
print("ERROR!!!")
sys.exit(1)
if __name__ == '__main__':
# トランザクションの開始
tx = zk.transaction()
## 基本的な使い方を確認
# パスの生成
zk.ensure_path(root)
# znodeが未作成であれば作成
znode = root + '/sample_znode'
if zk.exists(znode) is None:
zk.create(znode, b'sample_data')
print_status(znode)
# データの更新
zk.set(znode, b'updated_data')
print_status(znode)
# 子ノードの追加
znode2 = root + '/sample_znode2'
if zk.exists(znode2) is None:
zk.create(znode2, b'sample_data2')
print_status(znode2)
## 非同期処理はこうやって使う
async_obj = zk.get_children_async(root)
async_obj.rawlink(doAsync)
# ノードの削除
zk.delete(root, recursive=True)
# コミット
results = tx.commit()
print('#####[Result]#####')
print(results)
実行結果
$ python hello_kazoo.py
##### [DEBUG]#####
Version: 0, data: sample_data
There are 1 children with names ['sample_znode']
##### [DEBUG]#####
Version: 1, data: updated_data
There are 1 children with names ['sample_znode']
##### [DEBUG]#####
Version: 0, data: sample_data2
There are 2 children with names ['sample_znode', 'sample_znode2']
##### [print child znodes]#####
sample_znode
sample_znode2
##### [Result]#####
[]
$ sudo zkCli -server 127.0.0.1:2181
でZooKeeperサーバに接続し、確認しながらちょっとずつコードを書くと色々動きが確認できます。