Kinesis Streamsを使ったアプリケーションは、SDKのKinesis Streams APIもしくは、Kinesis Client Library(KCL)を使って作ることができますが、KCLを使った方がよりアプリケーションの実装に集中できると思います。
KCLのAWSオフィシャルの実装はJavaのKCLのほかに、Java KCLに含まれるMultiLangDaemonを利用した、Pythonの実装や、Rubyの実装 があり、既存のアプリケーションからKinesisStreamsを使った実装への移行を容易にしています。
今回、PHPのアプリケーションでKinesisStreamsに移行したかったのですが、PHPのKCL実装はパッと見つからなかったので作成しました。github のページはこちらです。
KCLを理解するための習作も兼ねており、ナイーブにKCL for Pythonをポーティングせずに少し構成は変えています。
サンプル起動方法
ストリームレコードのデータを単純にログに吐くだけのサンプルです。
Kinesis Streams 自体の使い方は端折りますが、使い方は簡単です。
- git clone する
-
mld.properties
のstreamName
,regionName
を使用するストリームに合わせて変更する -
docker-compose up
する。 docker が無い場合は、composer update && php printCommand.php | /bin/bash
でも良い 1
以上でサンプルが起動するので、ストリームにデータをプッシュすると、即時ログにデータが表示されると思います。
使い方
amazon-kinesis-client-php を使ったアプリケーションを作るのも簡単です。
ストリームのレコードは、MultiLangDaemonからのProcessRecordsメッセージとして受信します。このメッセージを処理するクラスを作成し、ProcessRecordsメッセージのアクションとして登録します。
そして、メッセージを処理するワーカーを起動します。最低限必要な実装はこれだけです。
use Sabme\Kcl\Action\ProcessRecords;
use Sabme\Kcl\Message\ProcessRecords as ProcessRecordsMessage;
MyProcessRecordsAction extends ProcessRecords { // レコードを処理するクラス
public function perform() {
// perform() がワーカーが呼び出すメッセージハンドラ
// ここでレコードの処理をする
}
}
// Dispatcher に ProcessRecordsMessage のアクションを MyProcessRecords に更新する
Dispatcher::setRule(
ProcessRecordsMessage::class, MyProcessRecordsAction::class);
$worker = new DefaultWorker(); // ワーカーを作成して起動する
$worker->run();
docker イメージにしてしまえば、 ECS で起動できるのでらくちんです。
では、実装にあたって知っておいた方が良い、MultiLangDaemonとの通信と、チェックポイント処理について説明します。
MultiLangDaemon との通信
amazon-kinesis-client-phpのワーカーは、MultiLangDaemonのサブプロセスとして起動されます。MultiLangDaemonとワーカーサブプロセスは、STDIN/STDOUT上でJSONメッセージの交換で通信します。メッセージはシンプルで、例えば
{
"action": "initialize",
"shardId" : "shard-00000001"
}
といった形式で、この action
により、アプリケーション側の振舞いと応答メッセージが決定します。サブプロセスは、ストリームのシャードと1対1になるようにMultiLangDaemonから起動されるため、シャード毎の状態を考慮する必要は無いので、ワーカーは単純なループで、メッセージはシリアルに処理します。メッセージは initialize
、 processRecords
、 checkpoint
、 shutdown
くらいしかありません。
initialize
ワーカーサブプロセスが起動された後、最初に担当するシャードのIDを通知するメッセージです。
processRecords
initializeで通知されたシャードのレコードのリストが渡されるメッセージです。レコード毎にパーティションキーと、データ、タイムスタンプが含まれています。initialize を受理した後、shutdown までは、このメッセージを繰り返し受信、処理することになります。
shutdown
MultiLangDaemonがワーカーへ終了を通知するメッセージです。ワーカーが何らかのエラーで応答できなかった場合や、リシャーディングによるシャードの統廃合の際に、終了の理由と一緒に通知されます。
チェックポイント処理
チェックポイントは、シャードのレコードストリームのどこまでをワーカーが処理したかを管理する仕組みです。
MultiLangDaemonは、ワーカーとシャードの対応付けと、シャードのチェックポイントの管理をDynamoDBで行います。
mld.properties
ファイルの applicationName
に書かれているワーカーが最初に起動した際に、MultiLangDaemonはそれと同じ名前のテーブルをDynamoDBに作成し、シャードIDをパーティションキーとするレコード上に、シャードを担当するワーカーの識別子と、現在のチェックポイント情報を記録します。
このDynamoDB上の管理はMultiLangDaemonがやってくれますが、チェックポイント処理はワーカー側で行う必要があります。
チェックポイントを処理するタイミングは、processRecordsメッセージによりレコードを処理したときと、shutdownメッセージで正常終了(TERMINATE)を受信したときです。チェックポイント処理は、MultiLangDaemonにcheckpointメッセージを通知することで行います。
各レコードにはシーケンス番号がついているので、一定の間隔で処理済みのレコードのシーケンス番号でチェックポイントを更新します。
レコード毎に更新すると頻度が高すぎるので、 Action\ProcessRecords
では更新間隔を60秒にしています。あまり間隔が空くとワーカーが入れ替わったときに二重処理される量が増えたり、間隔を短くするとDynamoDBのキャパシティを高くする必要があるので、これは適宜調整する必要があります。
それでは、ぜひ使っていただいてフィードバックをいただけたらと思います。
-
後者は PHP7 などのインストールが必要 ↩