LoginSignup
2
0

More than 5 years have passed since last update.

Amazon Kinesis Client for PHP で Kinesis Streams を PHP で処理する

Posted at

Kinesis Streamsを使ったアプリケーションは、SDKのKinesis Streams APIもしくは、Kinesis Client Library(KCL)を使って作ることができますが、KCLを使った方がよりアプリケーションの実装に集中できると思います。

KCLのAWSオフィシャルの実装はJavaのKCLのほかに、Java KCLに含まれるMultiLangDaemonを利用した、Pythonの実装や、Rubyの実装 があり、既存のアプリケーションからKinesisStreamsを使った実装への移行を容易にしています。

今回、PHPのアプリケーションでKinesisStreamsに移行したかったのですが、PHPのKCL実装はパッと見つからなかったので作成しました。github のページはこちらです。

amazon-kinesis-client-php

KCLを理解するための習作も兼ねており、ナイーブにKCL for Pythonをポーティングせずに少し構成は変えています。

サンプル起動方法

ストリームレコードのデータを単純にログに吐くだけのサンプルです。
Kinesis Streams 自体の使い方は端折りますが、使い方は簡単です。

  1. git clone する
  2. mld.propertiesstreamName , regionName を使用するストリームに合わせて変更する
  3. docker-compose up する。 docker が無い場合は、composer update && php printCommand.php | /bin/bash でも良い 1

以上でサンプルが起動するので、ストリームにデータをプッシュすると、即時ログにデータが表示されると思います。

使い方

amazon-kinesis-client-php を使ったアプリケーションを作るのも簡単です。

ストリームのレコードは、MultiLangDaemonからのProcessRecordsメッセージとして受信します。このメッセージを処理するクラスを作成し、ProcessRecordsメッセージのアクションとして登録します。
そして、メッセージを処理するワーカーを起動します。最低限必要な実装はこれだけです。

use Sabme\Kcl\Action\ProcessRecords;
use Sabme\Kcl\Message\ProcessRecords as ProcessRecordsMessage;

MyProcessRecordsAction extends ProcessRecords { // レコードを処理するクラス

    public function perform() {
        // perform() がワーカーが呼び出すメッセージハンドラ
        // ここでレコードの処理をする
    }

}

// Dispatcher に ProcessRecordsMessage のアクションを MyProcessRecords に更新する
Dispatcher::setRule(
    ProcessRecordsMessage::class, MyProcessRecordsAction::class);

$worker = new DefaultWorker();  // ワーカーを作成して起動する
$worker->run();

docker イメージにしてしまえば、 ECS で起動できるのでらくちんです。

では、実装にあたって知っておいた方が良い、MultiLangDaemonとの通信と、チェックポイント処理について説明します。

MultiLangDaemon との通信

amazon-kinesis-client-phpのワーカーは、MultiLangDaemonのサブプロセスとして起動されます。MultiLangDaemonとワーカーサブプロセスは、STDIN/STDOUT上でJSONメッセージの交換で通信します。メッセージはシンプルで、例えば

メッセージのフォーマット
{
  "action": "initialize",
  "shardId" : "shard-00000001"
}

といった形式で、この action により、アプリケーション側の振舞いと応答メッセージが決定します。サブプロセスは、ストリームのシャードと1対1になるようにMultiLangDaemonから起動されるため、シャード毎の状態を考慮する必要は無いので、ワーカーは単純なループで、メッセージはシリアルに処理します。メッセージは initializeprocessRecordscheckpointshutdown くらいしかありません。

initialize

ワーカーサブプロセスが起動された後、最初に担当するシャードのIDを通知するメッセージです。

processRecords

initializeで通知されたシャードのレコードのリストが渡されるメッセージです。レコード毎にパーティションキーと、データ、タイムスタンプが含まれています。initialize を受理した後、shutdown までは、このメッセージを繰り返し受信、処理することになります。

shutdown

MultiLangDaemonがワーカーへ終了を通知するメッセージです。ワーカーが何らかのエラーで応答できなかった場合や、リシャーディングによるシャードの統廃合の際に、終了の理由と一緒に通知されます。

チェックポイント処理

チェックポイントは、シャードのレコードストリームのどこまでをワーカーが処理したかを管理する仕組みです。

MultiLangDaemonは、ワーカーとシャードの対応付けと、シャードのチェックポイントの管理をDynamoDBで行います。
mld.properties ファイルの applicationName に書かれているワーカーが最初に起動した際に、MultiLangDaemonはそれと同じ名前のテーブルをDynamoDBに作成し、シャードIDをパーティションキーとするレコード上に、シャードを担当するワーカーの識別子と、現在のチェックポイント情報を記録します。

このDynamoDB上の管理はMultiLangDaemonがやってくれますが、チェックポイント処理はワーカー側で行う必要があります。
チェックポイントを処理するタイミングは、processRecordsメッセージによりレコードを処理したときと、shutdownメッセージで正常終了(TERMINATE)を受信したときです。チェックポイント処理は、MultiLangDaemonにcheckpointメッセージを通知することで行います。

各レコードにはシーケンス番号がついているので、一定の間隔で処理済みのレコードのシーケンス番号でチェックポイントを更新します。
レコード毎に更新すると頻度が高すぎるので、 Action\ProcessRecords では更新間隔を60秒にしています。あまり間隔が空くとワーカーが入れ替わったときに二重処理される量が増えたり、間隔を短くするとDynamoDBのキャパシティを高くする必要があるので、これは適宜調整する必要があります。

それでは、ぜひ使っていただいてフィードバックをいただけたらと思います。


  1. 後者は PHP7 などのインストールが必要 

2
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
0