ソース
- fukuiretu/amazon-kinesis-client-library-php · GitHub
- rf/amazon-kinesis-client-library-php - Packagist
インストール
制限が特になければComposer経由が良いと思います。
詳細はGithubのREDEMEを見てください。
できること
- DataRecordのpush
- Shardの取得
- DataRecordの取得
- ShardIdとSequenceNumberの状態を管理 => 前回の処理以降のDataRecordを扱うことができる!!
状態を管理するデータストア先の選択
- File
- Memcached
DataRecordの取得(データストア先をFileとした場合)
<?php
require 'vendor/autoload.php';
use Aws\Kinesis\KinesisClient;
use Aws\Common\Enum\Region;
use Rf\Aws\AutoLoader;
use Rf\Aws\Kinesis\ClientLibrary\KinesisProxy;
use Rf\Aws\Kinesis\ClientLibrary\KinesisShardFileDataStore;
use Rf\Aws\Kinesis\ClientLibrary\KinesisShardMemcacheDataStore;
use Rf\Aws\Kinesis\ClientLibrary\KinesisStorageManager;
define('STREAM_NAME', 'kinesis-trial');
$kinesis = KinesisClient::factory(array(
'key' => 'XXXXX',
'secret' => 'XXXXX',
'region' => Region::VIRGINIA
));
$kinesis_proxy = KinesisProxy::factory($kinesis, STREAM_NAME);
$kinesis_storage_manager = new KinesisStorageManager($kinesis_proxy, new KinesisShardFileDataStore('/tmp/amazon-kinesis'));
// 前回保存したSequenceNumber以降のDataRecordを取得
// 初回は先頭から取得
$data_records = $kinesis_storage_manager->findWithMergeStoreDataRecords(null, 10, 5);
foreach ($data_records as $data_record) {
// 取得したDataRecordに対しゴニョゴニョする
echo $data_record->getData(), PHP_EOL;
}
// 今回分のSequenceNumberを保存
$kinesis_storage_manager->saveAll();
TODO
- ストレージの選択肢としてDynamoとかRedisとかRDSとかは今後追加するかもしれない
- 並列(pthreads)で動くようになるかもしれない