storm
Kinesis
Kafka

ストリーム系のリアルタイム処理系の調査

More than 1 year has passed since last update.

Amazon Kinesis

公式サイト

概要

  • Amazon Kinesis は、大規模なストリーミングデータをリアルタイムで処理する完全マネージド型サービスです

利用可能なリージョンと利用料金(2014/06/08 現在)

  • リージョン ** 米国東部(バージニア北部)のみ
  • 利用料金 ** 時間単位のシャード速度(取得速度は 1 MB/秒、送信速度は 2 MB/秒): 0.015 USD ** PUT 取引 1,000,000 個あたり: 0.028 USD

簡単な使い方

  1. レコードを登録する (PutRecord http://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html)
  2. レコードを取得する (GetRecord http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html)

コードサンプル (Ruby)

  • レコードを登録する
put.rb
require 'aws-sdk'

print "Enter target stream name: "
stream_name = gets.chomp

begin
  client = AWS::Kinesis.new(
    access_key_id: ENV['AWS_ACCESS_KEY_ID'],
    secret_access_key: ENV['AWS_SECRET_ACCESS_KEY']).client

  categories = %w{Foods Books Toys Electronics Sports Clothing Shoes Music Movies Games}
  loop do
    now = Time.now.to_s
    partition_key = categories.sample
    response = client.put_record(
      stream_name: stream_name,
      data: now,
      partition_key: partition_key)
    puts "Data : #{now}, Partition Key : #{partition_key}, Shard Id : #{response.shard_id}, Sequence Number : #{response.sequence_number}"
    sleep(1)
  end
rescue => e
  puts "Error: #{e.message}"
  abort
end
  • レコードを取得する
get.rb
require 'aws-sdk'
require 'parallel'

print "Enter target stream name: "
stream_name = gets.chomp

if stream_name.empty?
  puts "target stream name is required"
  abort
end

begin
  client = AWS::Kinesis.new(
    access_key_id: ENV['AWS_ACCESS_KEY_ID'],
    secret_access_key: ENV['AWS_SECRET_ACCESS_KEY']).client

  shards = client.describe_stream(stream_name: stream_name).stream_description.shards
  shard_ids = shards.map(&:shard_id)

  Parallel.each(shard_ids, in_threads: shard_ids.count) do |shard_id|
    shard_iterator_info = client.get_shard_iterator(
      stream_name: stream_name,
      shard_id: shard_id,
      shard_iterator_type: 'TRIM_HORIZON')

    shard_iterator = shard_iterator_info.shard_iterator
    loop do
      records_info = client.get_records(
        shard_iterator: shard_iterator,
        limit: 100)

      records_info.records.each do |record|
        puts "Data : #{record.data}, Partition Key : #{record.partition_key}, Shard Id : #{shard_id}, Sequence Number : #{record.sequence_number}"
      end

      shard_iterator = records_info.next_shard_iterator
      sleep(1)
    end
  end
rescue => e
  puts "Error: #{e.message}"
  abort
end

その他

  • fluentd と用途が近いけれど、Amazon Kinesis では ThroughPut を向上させたい場合は、Shared の数を増加させるだけでよい
  • MultiStep Processing も可能
  • Shared 上限デフォルト数が 10

参考資料

Apache Kafka

概要

  • publish-subscribe 型のメッセージングシステム
  • 用途としては、オフライン・オンライン両方のメッセージ取得に適している
  • 当初は LinkedIn で開発された メッセージングシステムでだったが、Apache プロジェクトのトップレベルプロジェクトになっている
  • 最近は Tumblr、DataSift といった企業でも使用されているとのこと

システム概念イメージ

20140223221420.png
http://fuji-151a.hatenablog.com/entry/2014/02/23/231639 より

クライアント

インストール方法

コンポーネント

  • message 流すデータ
  • topic messageのカテゴリのこと、topicは自分で名前を決めれる
  • broker Kafkaにおいてmessageを貯めるところ
  • producer brokerにmessageを送信するcomponent
  • consumer brokerからmessageを読むcomponent
  • consumer group 言葉通り,consumerをグルーピングしたもの.
  • offset consumerと密接に関わってくる単語 その情報を管理しているのはZookeeperというまた別のOSSである.

参考資料

Storm

概要

  • Apache Storm is a free and open source distributed realtime computation system. topology.png
  • リアルタイムでの分散処理の実行のみに特化
  • 基本的なユースケースとしては、ストリーム処理、継続的な計算、分散 RPC

コンポーネント

  • Tuple Storm で処理されるメッセージを保持するデータ
  • Stream 途切れずに連続する Tuple
  • Spout Storm のストリーム処理の起点となる
  • Bolt Stream の変換処理を行う
  • Topology Spult, Bolt からなるネットワーク構造のこと

参考資料

その他

Apache Samza

  • Apache Samza is a distributed stream processing framework. It uses Apache Kafka for messaging, and Apache Hadoop YARN to provide fault tolerance, processor isolation, security, and resource management.
  • http://samza.incubator.apache.org/

その他のメモ