9
6

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Amazon Kinesis Clinet Library for Ruby

Last updated at Posted at 2015-01-22

しれっとAmazon Kinesis Client Library for Ruby(以下、kclrb)がリリースされていたのだが、ちょうど仕事でKinesisを使う機会があったので使ってみた。

Sampleが優秀。Python版より使い始めが簡単!

githubのプロジェクトに同梱されているsamplesが結構優秀だった。とりあえずrake run すれば動く。で、そのままサンプルアプリを改造していく感じで自前のアプリが書き始められる。このあたりはPython版よりもだいぶ使い始めが簡単。

サンプルのファイル構成は以下のとおり。

Rakefile
rake runでsample_kcl.rbを起動してくれる

samples.properties
rake_runで利用される設定ファイル。ストリームのリージョンや名前、KCLアプリケーション名、並列度などを設定する。

sample_kcl.rb
実際のKCLアプリケーション。中身的にはただMultiLangDaemon(KCLアプリケーションのスーパーバイバー的なプロセス。これはJavaで実装されていて、Kinesisからデータを取り出してRubyで実装されたアプリに値を渡すという仕事をしてくれる。)の標準出力にレコードを出力するだけ。

sample_kcl_producer.rb
サンプルのレコードをKinesisに投入してくれる。rake run_producerで起動する。

動かしてみる。

  1. rake run_producerでProducerを起動
  2. rake runでKCLを起動

以上。

少しサンプルコードや設定ファイルを覗いてみる。

sample.propertites

起動されるKCLアプリケーションのファイル名やアプリ名、ストリーム名、リージョンなどを設定している。他にも多数の設定項目があるが、ファイルを読めばだいたい内容の想像はつくものばかり。

executableName = sample_kcl.rb
streamName = kclrbsample
applicationName = RubyKCLSample
AWSCredentialsProvider = DefaultAWSCredentialsProviderChain
processingLanguage = ruby
initialPositionInStream = TRIM_HORIZON
regionName = ap-northeast-1

sample_kcr.rb

起動部分
if __FILE__ == $0
  # Start the main processing loop
  record_processor = SampleRecordProcessor.new(ARGV[1] || File.join(Dir.tmpdir, 'kclrbsample'))
  driver = Aws::KCLrb::KCLProcess.new(record_processor)
  driver.run
end

KCLを実装したクラスを引数に Aws::KCLrb::KCLProcessというドライバクラスを初期化してやってrunすればOK。

KCLクラス
require 'aws/kclrb'

class SampleRecordProcessor < Aws::KCLrb::RecordProcessorBase

ビジネスロジックを実装するためのKCLクラスは上記のようにAws::KCLrb::RecordProcessorBaseを継承して以下の3つのインターフェイスを実装してやることになる。

実装必須インターフェイスその1--initialize
  def initialize(output=$stderr)
    @close = false
    if output.is_a?(String)
      @output_directory = output
      # Make sure the directory exists and that we can
      # write to it. If not, this will fail and processing
      # can't start.
      FileUtils.mkdir_p @output_directory
      probe_file = File.join(@output_directory, '.kclrb_probe')
      FileUtils.touch(probe_file)
      FileUtils.rm(probe_file)
    elsif output
      # assume it's an IO
      @output = output
    else
      fail "Output destination cannot be nil"
    end
  end

クラスの初期化。まあお約束。

  # (see Aws::KCLrb::RecordProcessorBase#init_processor)
  def init_processor(shard_id)
    unless @output
      @filename = File.join(@output_directory, "#{shard_id}-#{Time.now.to_i}.log")
      @output = open(@filename, 'w')
      @close = true
    end
  end

shard_idが渡ってくる。これが実際のProcessorプロセスのクラス初期化・・・になるんだと思う。

  # (see Aws::KCLrb::RecordProcessorBase#process_records)
  def process_records(records, checkpointer)
    last_seq = nil
    records.each do |record|
      begin
        @output.puts Base64.decode64(record['data'])
        @output.flush
        last_seq = record['sequenceNumber']
      rescue => e
        # Make sure to handle all exceptions.
        # Anything you write to STDERR will simply be echoed by parent process
        STDERR.puts "#{e}: Failed to process record '#{record}'"
      end
    end
    checkpoint_helper(checkpointer, last_seq)  if last_seq
  end

実際にはここが処理の本体。KCLのドライバがKinesisがレコードを定期的に取ってきて、recordsという引数を付けてこのメソッドを呼んでくれる。サンプルコードにあるとおり、データはBase64 Encodedな状態で飛んでくる。もうひとつのcheckpointerという引数は checkpointer.checkpoint(seq_number)という感じに呼び出してやるとDynamoDBにチェックポイントを作ってくれる。

実装必須インターフェイスその4--shutdown
  # (see Aws::KCLrb::RecordProcessorBase#shutdown)
  def shutdown(checkpointer, reason)
    checkpoint_helper(checkpointer)  if 'TERMINATE' == reason
  ensure
    # Make sure to cleanup state
    @output.close  if @close
  end

お片づけ用のインターフェイス。シャットダウンするまで呼び出されないので、実際には実装しなくても動く。けどまあここでチェックポイントを作っておくというのがただしい運用方法だと思う。

まとめ

kclrbを使ってアプリケーションを実装するなら

  1. gem install aws-kclrb
  2. aws/kclrbをrequireしつつ、Aws::KCLrb::RecordProcessorBaseを継承したクラスをつくる
  3. 以下の4つのインターフェイスを実装
  • initialize
  • init_processor
  • process_records
  • shutdown
  1. propertiesファイルをつくって、stream名やregionなどを設定。
  2. (これは必要に応じてでOK) Rakeファイルに起動コマンドを書いてあとは起動する!
  • 実際にはもろもろの長いコマンドになるのでRakeファイルに書いたり、シェルスクリプトにしとくのが楽だと思う。

免責

これは個人のメモ/意見であり、私が所属する組織を代表するものではありません。

9
6
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
9
6

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?