Kafkaとは?
Kafka
大量のメッセージ処理に利用するメッセージングシステム。
分散処理はZooKeeperが担う。
インストール
zookeeperが必要。
詳しくはこちらの記事を参照。
$ brew install zookeeper
kafkaのインストール
$ brew install kafka
zookeeperを起動
$ sudo zkServer start
ZooKeeper JMX enabled by default
Using config: /usr/local/etc/zookeeper/zoo.cfg
Starting zookeeper ... STARTED
kafkaを起動
# コマンドの確認
$ which kafka-server-start
/usr/local/bin/kafka-server-start
# 起動
$ sudo kafka-server-start /usr/local/etc/kafka/server.properties
メッセージングを試してみる
kafkaでは「topic」という単位でメッセージをやり取りする模様。
ここでは「hello_kafka」というtopicを送信(produce) -> 受信(consume)する。
トピックの生成
# コマンドの確認
$ which kafka-topics
/usr/local/bin/kafka-topics
# トピックの作成
$ kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic hello_kafka
WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.
Created topic "hello_kafka".
# トピックの生成確認
$ kafka-topics --list --zookeeper localhost:2181
hello_kafka
受信側(consumer)
# コマンドの確認
$ which kafka-console-consumer
/usr/local/bin/kafka-console-consumer
# 受信待機
$ kafka-console-consumer --zookeeper localhost:2181 --topic hello_kafka --from-beginning
送信側(producer)
# コマンドの確認
$ which kafka-console-producer
/usr/local/bin/kafka-console-producer
# メッセージ送信(対話型)
$ kafka-console-producer --broker-list localhost:9092 --topic hello_kafka
my first kafka message!
consumer側で下記のようにメッセージが受信できればOK
my first kafka message!
phpで使ってみる
公式に幾つか推奨のライブラリが載っている。
ここではphpkafkaを利用する。
インストール
$ git clone https://github.com/edenhill/librdkafka/
$ cd librdkafka
$ ./configure
$ make
$ sudo make install
$ git clone git@github.com:EVODelavega/phpkafka.git
$ cd phpkafka/
$ phpize
$ ./configure --enable-kafka
$ make
$ make test
Do you want to send this report now? [Yns]: n
$ sudo make install
Installing shared extensions: /usr/local/Cellar/php56/5.6.29_5/lib/php/extensions/no-debug-non-zts-20131226/
Mac上でパスを通す。
# php.iniの場所を確認
$ php -i | grep php.ini
Configuration File (php.ini) Path => /usr/local/etc/php/5.6
Loaded Configuration File => /usr/local/etc/php/5.6/php.ini
$ sudo sh -c 'echo "extension=kafka.so" >> /usr/local/etc/php/5.6/conf.d/kafka.ini'
# cli
$ mkdir -p /usr/local/etc/php/5.6/cli/conf.d
$ sudo sh -c 'echo "extension=kafka.so" >> /usr/local/etc/php/5.6/cli/conf.d/20-kafka.ini'
# 確認
$ php -m | grep kafka
kafka
producer / consumerを作成
ドキュメントはこちら。
こちらの記事のほうがわかりやすいので写経させていただきましたmm
producer(送信側)
producer.php
<?php
if ($argc < 2) {
exit('送信メッセージを指定してください'.PHP_EOL);
}
$message = $argv[1];
$kafka = new Kafka('localhost:9092');
try {
// メッセージを送信
$kafka->produce('hello_topic', $message);
} catch (Exception $e) {
exit($e->getMessage().PHP_EOL);
}
$kafka->disconnect(Kafka::MODE_PRODUCER);
exit('done send message. message='.$message.PHP_EOL);
consumer(受信側)
consumer.php
<?php
$kafka = new Kafka('localhost:9092');
$partitions = $kafka->getPartitionsForTopic('hello_topic');
$kafka->setPartition($partitions[0]);
$offset = 1;
$size = 1;
while (1) {
try {
// メッセージを受信
$messages = $kafka->consume('hello_topic', $offset, $size);
if (count($messages) > 0) {
foreach ($messages as $message) {
echo $message.PHP_EOL;
$offset += 1;
}
}
} catch (Exception $e) {
echo $e->getMessage().PHP_EOL;
break;
}
}
$kafka->disconnect();
実行
# 送信
$ php producer.php "this is test message"
done send message. message=this is test message
# 受信
$ php consumer.php
this is test message