Python
ZooKeeper
Kazoo

Zookeeper x python(kazoo)をMac OS X上で動かす

More than 1 year has passed since last update.

zookeeperとは?

本家のガイダンスはこちら
まともに組むと苦戦するバッチの分散処理などを容易に実装できるもの、と理解しています。

インストール

$ brew install zookeeper

確認

$ ls /usr/local/etc/zookeeper
defaults         log4j.properties zoo.cfg          zoo_sample.cfg

スタンドアローンモードで実行

デフォルトで配置してある設定ファイルをチュートリアルと見比べてもほぼ同じなので、このまま起動してみる。

$ cat /usr/local/etc/zookeeper/zoo.cfg | egrep -v "#"
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/usr/local/var/run/zookeeper/data
clientPort=2181
$ which zkServer
/usr/local/bin/zkServer

$ sudo zkServer start
ZooKeeper JMX enabled by default
Using config: /usr/local/etc/zookeeper/zoo.cfg
Starting zookeeper ... STARTED

ログは下記に出力される

$ tail -f /usr/local/var/log/zookeeper/zookeeper.log
2017-01-09 06:40:05 QuorumPeerMain [WARN] Either no config or no quorum defined in config, running  in standalone mode

なんか出てる。
quorum(クォーラム)はレプリケーション先のサーバのことを指し、それが存在しない、という警告。
ここではお試しのスタンドアローンなので出ていても特に問題ないかと。

接続テスト

設定ファイルでclientPortに指定したポートに接続できる。

$ cat /usr/local/etc/zookeeper/zoo.cfg | grep clientPort
clientPort=2181

2181番ポートを指定しているので、そちらに接続する。

$ which zkCli
/usr/local/bin/zkCli

$ sudo zkCli -server 127.0.0.1:2181
Connecting to 127.0.0.1:2181
Welcome to ZooKeeper!
JLine support is enabled

WATCHER::

WatchedEvent state:SyncConnected type:None path:null
[zk: 127.0.0.1:2181(CONNECTED) 0]

利用可能なコマンドを確認

[zk: 127.0.0.1:2181(CONNECTED) 0] help
ZooKeeper -server host:port cmd args
    stat path [watch]
    set path data [version]
    ls path [watch]
    delquota [-n|-b] path
    ls2 path [watch]
    setAcl path acl
    setquota -n|-b val path
    history
    redo cmdno
    printwatches on|off
    delete path [version]
    sync path
    listquota path
    rmr path
    get path [watch]
    create [-s] [-e] path data acl
    addauth scheme auth
    quit
    getAcl path
    close
    connect host:port

znodeを作成する

チュートリアルに従って。
znodeはZooKeeperツリー上の各ノードのことを指す。

# ディレクトリ確認
[zk: 127.0.0.1:2181(CONNECTED) 0] ls /
[zookeeper]

# ノード(znode)作成
[zk: 127.0.0.1:2181(CONNECTED) 1] create /test_node my_test_data
Created /test_node

# 作成確認
[zk: 127.0.0.1:2181(CONNECTED) 2] ls /
[test_node, zookeeper]

# 作成したデータがノードに関連付けされていることを確認
[zk: 127.0.0.1:2181(CONNECTED) 3] get /test_node
my_test_data
cZxid = 0x8
ctime = Mon Jan 09 07:00:17 JST 2017
mZxid = 0x8
mtime = Mon Jan 09 07:00:17 JST 2017
pZxid = 0x8
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 12
numChildren = 0

# 関連付けされたデータを変更してみる
[zk: 127.0.0.1:2181(CONNECTED) 4] set /test_node replaced_my_test_data
cZxid = 0x8
ctime = Mon Jan 09 07:00:17 JST 2017
mZxid = 0x9
mtime = Mon Jan 09 07:03:21 JST 2017
pZxid = 0x8
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 21
numChildren = 0

# 確認
[zk: 127.0.0.1:2181(CONNECTED) 5] get /test_node
replaced_my_test_data # <- 変更された
cZxid = 0x8
ctime = Mon Jan 09 07:00:17 JST 2017
mZxid = 0x9
mtime = Mon Jan 09 07:03:21 JST 2017
pZxid = 0x8
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 21
numChildren = 0

# 削除
[zk: 127.0.0.1:2181(CONNECTED) 6] delete /test_node

# 確認
[zk: 127.0.0.1:2181(CONNECTED) 7] ls /
[zookeeper]

pythonで使ってみる

python x zookeeperはkazooというライブラリを利用する。
下記のドキュメントに沿って色々書いてみる。
kazoo

kazooのインストール

$ pip install kazoo

ざっと試す

hello_kazoo.py
# coding:utf-8
from kazoo.client import KazooClient
from kazoo.client import KazooState
from kazoo.client import KeeperState
from kazoo.handlers.gevent import SequentialGeventHandler
import logging
logging.basicConfig()

# znodeのルート
root = '/root'

# zookeeperクライアント
zk = KazooClient(hosts='127.0.0.1:2181', read_only=True, handler=SequentialGeventHandler())

# 非同期モードで起動
event = zk.start_async()
event.wait(timeout=3)

# zookeeperサーバに接続できない場合、処理を中断
if not zk.connected:
    zk.stop()
    raise Exception("Unable to connect.")

def listener(state):
    '''
    State変更時のリスナー
    '''
    print('current state is ' + state)

zk.add_listener(listener)

@zk.add_listener
def watch_for_ro(state):
    if state == KazooState.CONNECTED:
        if zk.client_state == KeeperState.CONNECTED_RO:
            print('state is read_only')
        else:
            print('state is writable')

def print_status(znode):
    '''
    ノードの状態を取得
    '''
    print('#####[DEBUG]#####')
    # バージョンと登録データを確認
    data, stat = zk.get(znode)
    print('Version: %s, data: %s' % (stat.version, data.decode('utf-8')))
    # rootの子ノード一覧を取得
    children = zk.get_children(root)
    print("There are %s children with names %s" % (len(children), children))

def doAsync(async_obj):
    '''
    非同期処理のコールバック関数(処理内容に特に意味は無い)
    '''
    znodes = async_obj.get()
    try:
        children = async_obj.get()
        # すべての子ノードの名称を出力
        print('#####[print child znodes]#####')
        for child in children:
            print(child)
    except (ConnectionLossException, NoAuthException):
        print("ERROR!!!")
        sys.exit(1)

if __name__ == '__main__':
    # トランザクションの開始
    tx = zk.transaction()
    ## 基本的な使い方を確認
    # パスの生成
    zk.ensure_path(root)
    # znodeが未作成であれば作成
    znode = root + '/sample_znode'
    if zk.exists(znode) is None:
        zk.create(znode, b'sample_data')
    print_status(znode)
    # データの更新
    zk.set(znode, b'updated_data')
    print_status(znode)
    # 子ノードの追加
    znode2 = root + '/sample_znode2'
    if zk.exists(znode2) is None:
        zk.create(znode2, b'sample_data2')
    print_status(znode2)
    ## 非同期処理はこうやって使う
    async_obj = zk.get_children_async(root)
    async_obj.rawlink(doAsync)
    # ノードの削除
    zk.delete(root, recursive=True)
    # コミット
    results = tx.commit()
    print('#####[Result]#####')
    print(results)

実行結果

$ python hello_kazoo.py
#####[DEBUG]#####
Version: 0, data: sample_data
There are 1 children with names ['sample_znode']
#####[DEBUG]#####
Version: 1, data: updated_data
There are 1 children with names ['sample_znode']
#####[DEBUG]#####
Version: 0, data: sample_data2
There are 2 children with names ['sample_znode', 'sample_znode2']
#####[print child znodes]#####
sample_znode
sample_znode2
#####[Result]#####
[]
$ sudo zkCli -server 127.0.0.1:2181

でZooKeeperサーバに接続し、確認しながらちょっとずつコードを書くと色々動きが確認できます。