PHP
Kafka

Kafka x phpをMac OS X上で動かす

More than 1 year has passed since last update.

Kafkaとは?

Kafka
大量のメッセージ処理に利用するメッセージングシステム。
分散処理はZooKeeperが担う。

インストール

zookeeperが必要。
詳しくはこちらの記事を参照。

$ brew install zookeeper

kafkaのインストール

$ brew install kafka

zookeeperを起動

$ sudo zkServer start
ZooKeeper JMX enabled by default
Using config: /usr/local/etc/zookeeper/zoo.cfg
Starting zookeeper ... STARTED

kafkaを起動

# コマンドの確認
$ which kafka-server-start
/usr/local/bin/kafka-server-start

# 起動
$ sudo kafka-server-start /usr/local/etc/kafka/server.properties

メッセージングを試してみる

kafkaでは「topic」という単位でメッセージをやり取りする模様。
ここでは「hello_kafka」というtopicを送信(produce) -> 受信(consume)する。

トピックの生成

# コマンドの確認
$ which kafka-topics
/usr/local/bin/kafka-topics

# トピックの作成
$ kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic hello_kafka
WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.
Created topic "hello_kafka".

# トピックの生成確認
$ kafka-topics --list --zookeeper localhost:2181
hello_kafka

受信側(consumer)

# コマンドの確認
$ which kafka-console-consumer
/usr/local/bin/kafka-console-consumer

# 受信待機
$ kafka-console-consumer --zookeeper localhost:2181 --topic hello_kafka --from-beginning

送信側(producer)

# コマンドの確認
$ which kafka-console-producer
/usr/local/bin/kafka-console-producer

# メッセージ送信(対話型)
$ kafka-console-producer --broker-list localhost:9092 --topic hello_kafka
my first kafka message!

consumer側で下記のようにメッセージが受信できればOK

my first kafka message!

phpで使ってみる

公式に幾つか推奨のライブラリが載っている。
ここではphpkafkaを利用する。

インストール

$ git clone https://github.com/edenhill/librdkafka/
$ cd librdkafka
$ ./configure
$ make
$ sudo make install
$ git clone git@github.com:EVODelavega/phpkafka.git
$ cd phpkafka/
$ phpize
$ ./configure --enable-kafka
$ make
$ make test
Do you want to send this report now? [Yns]: n
$ sudo make install
Installing shared extensions:     /usr/local/Cellar/php56/5.6.29_5/lib/php/extensions/no-debug-non-zts-20131226/

Mac上でパスを通す。

# php.iniの場所を確認
$ php -i | grep php.ini
Configuration File (php.ini) Path => /usr/local/etc/php/5.6
Loaded Configuration File => /usr/local/etc/php/5.6/php.ini

$ sudo sh -c 'echo "extension=kafka.so" >> /usr/local/etc/php/5.6/conf.d/kafka.ini'

# cli
$ mkdir -p /usr/local/etc/php/5.6/cli/conf.d
$ sudo sh -c 'echo "extension=kafka.so" >> /usr/local/etc/php/5.6/cli/conf.d/20-kafka.ini'

# 確認
$ php -m | grep kafka
kafka

producer / consumerを作成

ドキュメントはこちら
こちらの記事のほうがわかりやすいので写経させていただきましたmm

producer(送信側)

producer.php
<?php

if ($argc < 2) {
    exit('送信メッセージを指定してください'.PHP_EOL);
}
$message = $argv[1];

$kafka = new Kafka('localhost:9092');
try {
    // メッセージを送信
    $kafka->produce('hello_topic', $message);
} catch (Exception $e) {
    exit($e->getMessage().PHP_EOL);
}

$kafka->disconnect(Kafka::MODE_PRODUCER);
exit('done send message. message='.$message.PHP_EOL);

consumer(受信側)

consumer.php
<?php

$kafka = new Kafka('localhost:9092');
$partitions = $kafka->getPartitionsForTopic('hello_topic');
$kafka->setPartition($partitions[0]);
$offset = 1;
$size = 1;

while (1) {
    try {
        // メッセージを受信
        $messages = $kafka->consume('hello_topic', $offset, $size);
        if (count($messages) > 0) {
            foreach ($messages as $message) {
                echo $message.PHP_EOL;
                $offset += 1;
            }
        }
    } catch (Exception $e) {
        echo $e->getMessage().PHP_EOL;
        break;
    }
}

$kafka->disconnect();

実行

# 送信
$ php producer.php "this is test message"
done send message. message=this is test message

# 受信
$ php consumer.php
this is test message