しれっとAmazon Kinesis Client Library for Ruby(以下、kclrb)がリリースされていたのだが、ちょうど仕事でKinesisを使う機会があったので使ってみた。
Sampleが優秀。Python版より使い始めが簡単!
githubのプロジェクトに同梱されているsamplesが結構優秀だった。とりあえずrake run
すれば動く。で、そのままサンプルアプリを改造していく感じで自前のアプリが書き始められる。このあたりはPython版よりもだいぶ使い始めが簡単。
サンプルのファイル構成は以下のとおり。
Rakefile
rake run
でsample_kcl.rbを起動してくれる
samples.properties
rake_runで利用される設定ファイル。ストリームのリージョンや名前、KCLアプリケーション名、並列度などを設定する。
sample_kcl.rb
実際のKCLアプリケーション。中身的にはただMultiLangDaemon(KCLアプリケーションのスーパーバイバー的なプロセス。これはJavaで実装されていて、Kinesisからデータを取り出してRubyで実装されたアプリに値を渡すという仕事をしてくれる。)の標準出力にレコードを出力するだけ。
sample_kcl_producer.rb
サンプルのレコードをKinesisに投入してくれる。rake run_producer
で起動する。
動かしてみる。
-
rake run_producer
でProducerを起動 -
rake run
でKCLを起動
以上。
少しサンプルコードや設定ファイルを覗いてみる。
sample.propertites
起動されるKCLアプリケーションのファイル名やアプリ名、ストリーム名、リージョンなどを設定している。他にも多数の設定項目があるが、ファイルを読めばだいたい内容の想像はつくものばかり。
executableName = sample_kcl.rb
streamName = kclrbsample
applicationName = RubyKCLSample
AWSCredentialsProvider = DefaultAWSCredentialsProviderChain
processingLanguage = ruby
initialPositionInStream = TRIM_HORIZON
regionName = ap-northeast-1
sample_kcr.rb
if __FILE__ == $0
# Start the main processing loop
record_processor = SampleRecordProcessor.new(ARGV[1] || File.join(Dir.tmpdir, 'kclrbsample'))
driver = Aws::KCLrb::KCLProcess.new(record_processor)
driver.run
end
KCLを実装したクラスを引数に Aws::KCLrb::KCLProcessというドライバクラスを初期化してやってrunすればOK。
require 'aws/kclrb'
class SampleRecordProcessor < Aws::KCLrb::RecordProcessorBase
ビジネスロジックを実装するためのKCLクラスは上記のようにAws::KCLrb::RecordProcessorBaseを継承して以下の3つのインターフェイスを実装してやることになる。
def initialize(output=$stderr)
@close = false
if output.is_a?(String)
@output_directory = output
# Make sure the directory exists and that we can
# write to it. If not, this will fail and processing
# can't start.
FileUtils.mkdir_p @output_directory
probe_file = File.join(@output_directory, '.kclrb_probe')
FileUtils.touch(probe_file)
FileUtils.rm(probe_file)
elsif output
# assume it's an IO
@output = output
else
fail "Output destination cannot be nil"
end
end
クラスの初期化。まあお約束。
# (see Aws::KCLrb::RecordProcessorBase#init_processor)
def init_processor(shard_id)
unless @output
@filename = File.join(@output_directory, "#{shard_id}-#{Time.now.to_i}.log")
@output = open(@filename, 'w')
@close = true
end
end
shard_idが渡ってくる。これが実際のProcessorプロセスのクラス初期化・・・になるんだと思う。
# (see Aws::KCLrb::RecordProcessorBase#process_records)
def process_records(records, checkpointer)
last_seq = nil
records.each do |record|
begin
@output.puts Base64.decode64(record['data'])
@output.flush
last_seq = record['sequenceNumber']
rescue => e
# Make sure to handle all exceptions.
# Anything you write to STDERR will simply be echoed by parent process
STDERR.puts "#{e}: Failed to process record '#{record}'"
end
end
checkpoint_helper(checkpointer, last_seq) if last_seq
end
実際にはここが処理の本体。KCLのドライバがKinesisがレコードを定期的に取ってきて、recordsという引数を付けてこのメソッドを呼んでくれる。サンプルコードにあるとおり、データはBase64 Encodedな状態で飛んでくる。もうひとつのcheckpointerという引数は checkpointer.checkpoint(seq_number)
という感じに呼び出してやるとDynamoDBにチェックポイントを作ってくれる。
# (see Aws::KCLrb::RecordProcessorBase#shutdown)
def shutdown(checkpointer, reason)
checkpoint_helper(checkpointer) if 'TERMINATE' == reason
ensure
# Make sure to cleanup state
@output.close if @close
end
お片づけ用のインターフェイス。シャットダウンするまで呼び出されないので、実際には実装しなくても動く。けどまあここでチェックポイントを作っておくというのがただしい運用方法だと思う。
まとめ
kclrbを使ってアプリケーションを実装するなら
gem install aws-kclrb
- aws/kclrbをrequireしつつ、Aws::KCLrb::RecordProcessorBaseを継承したクラスをつくる
- 以下の4つのインターフェイスを実装
- initialize
- init_processor
- process_records
- shutdown
- propertiesファイルをつくって、stream名やregionなどを設定。
- (これは必要に応じてでOK) Rakeファイルに起動コマンドを書いてあとは起動する!
- 実際にはもろもろの長いコマンドになるのでRakeファイルに書いたり、シェルスクリプトにしとくのが楽だと思う。
免責
これは個人のメモ/意見であり、私が所属する組織を代表するものではありません。