LoginSignup
25
26

More than 5 years have passed since last update.

Twitter -> (Kinesis | Kafka) -> Storm -> Redis

Posted at

AWS Kinesis、Apache Kafka、Apache Stormを連携させるサンプルプログラム

以下のようなサンプルプログラムを作りました。

Twitter Streaming API
 -> Producer -> (AWS Kinesis | Apache Kafka) -> Apache Storm[WordCounter]
  -> Redis

TweetStreamWordCounter.png

Vagrant

環境構築用のVagrantfile、Berksfile、各種Cookbook

検証した環境

  • HostOS
    • MacOS X(10.7)
    • Oracle VM VirtualBox(4.3.14)
    • Berkshelf(3.1.3)
    • Vagrant(1.6.3)
    • vagrant-omnibus(1.4.1)
    • vagrant-vbguest(0.10.0)

インストール

  1. VirtualBox、Vagrant、Berkshelfをインストール
  2. git clone https://github.com/nmatsui/twitter-kinesis-kafka-storm_vagrant.git
  3. berks vendor cookbooks
  4. vagrant up --provision

これにより、下記2つのVMが立ち上がり、/etc/hostsが設定されます。

  • VM 1: Apache Zookeeper & Apache Kafka用

    • Ubuntu Server(14.04.1 LTS)
    • Oracle JDK(1.8.0_20)
    • Apache Zookeeper(3.3.3) [Apache Kafka install packageに内包されたバージョン]
    • Apache Kafka(0.8.1.1)
  • VM 2: Apache Storm & Redis用

    • Ubuntu Server(14.04.1 LTS)
    • Oracle JDK(1.8.0_20)
    • SBT(0.13.5)
    • Apache Storm(0.9.2-incubating)

WordCounter

Apache Storm用の単語出現頻度をカウントするプログラム

  1. AWS Kinesis用のSpout or Apache Kafka用のSpoutを用いてTweetを取得
  2. 下記3つのBoltを用いて単語の出現頻度をカウント
    1. KinesisとKafkaのデータ構造差異を吸収
    2. kuromojiを用いて形態素解析
    3. 単語数のカウント
  3. 出現数をスコアとして単語をRedisのソート済みセット型に登録

インストール

  1. VM 2上でgit clone https://github.com/nmatsui/twitter-kinesis-kafka-storm_wordcounter.git
  2. src/main/resources/*.properties.templatesrc/main/resources/*.propertiesにリネーム
  3. AwsCredentials.propertiesにAWSのAPI情報をセット
    • もしプロキシ配下で動作させる場合、kinesis.propertiesにプロキシ情報をセット
  4. sbt compile
    • Scala本体や各種依存ライブラリを取得するため、初回はかなり時間がかかる
  5. sbt assembly

依存ライブラリ

  • Java
    • Oracle JDK(1.8.0_20)
    • Scala(2.10.4)
    • SBT(0.13.5)
  • Library
    • Apache Storm(0.9.2-incubating)
    • KafkaSpout(0.9.2-incubating)
    • KinesisSpout
    • Kuromoji(0.7.7)
    • Redis Client(2.12)

KinesisSpoutは、awslabsのkinesis-storm-spoutにbuild.sbtを追加し、バグフィックスしたもの( https://github.com/nmatsui/kinesis-storm-spout )を利用

Producer

Twitter Streaming APIから取得したTweetをAWS KinesisもしくはApache Kafkaへ連続的に投入するプログラム

  1. Twitter Streaming APIから指定したFilterQueryを用いてTweetを連続的に取得
  2. AWS KinesisもしくはApache Kafkaへ投入

インストール

  1. VM 2上でgit clone https://github.com/nmatsui/twitter-kinesis-kafka-storm_producer.git
  2. src/main/resources/*.properties.templatesrc/main/resources/*.propertiesにリネーム
  3. AwsCredentials.propertiesにAWSのAPI情報をセット
    • もしプロキシ配下で動作させる場合、kinesis.propertiestwitter.propertiesにプロキシ情報をセット
  4. sbt compile
    • Scala本体や各種依存ライブラリを取得するため、初回はかなり時間がかかる
  5. sbt assembly

依存ライブラリ

  • Java
    • Oracle JDK(1.8.0_20)
    • Scala(2.10.4)
    • SBT(0.13.5)
  • Library
    • Twitter4J(4.0.2)
    • Apache Kafka Library(0.8.1.1)
    • Amazon Kinesis Client Library(1.1.0)

利用方法

AWS Kinesis準備

  1. AWS Kinesisに"tweets"という名前のStreamを作成
    • 現在のバージョンではUS EAST(N.Virginia)のKinesisにのみ対応

Apache Kafka準備

  1. VM 1にログイン
  2. Apache Zookeeper起動
  3. Apache Kafka起動
  4. Apache Kafkaへ"tweets"という名前のtopicを作成

ZookeeperやKafkaの起動手順等は、公式サイトのQuickStartを参照

Redis起動確認

  1. redis-cli -h redis --rawでRedisに接続できることを確認

Apache StromのTopology起動

  1. java -jar target/scala-2.10/wordcounter.jar (TEST|KAFKA|KINESIS)
    • TEST: AWS KinesisやApache Kafkaに接続せず、ランダムな文章をBoltに流すSpoutに接続
    • KAFKA: KafkaSpoutに接続
    • KINESIS: KinesisSpoutに接続

Producer起動

  1. java -jar target/scala-2.10/tweetstreamer.jar (TEST|KAFKA|KINESIS) query_words...
    • TEST: AWS KinesisやApache Kafkaに接続せずTwitter Streaming APIから取得したTweetをログ出力するのみ
    • KAFKA: Apache KafkaにTweetをputする
    • KINESIS: AWS KinesisにTweetをputする
  2. query_wordsは、Twitter Streaming APIの"statuses/filter"エンドポイントに渡されるフィルタワード
    • 空白区切りで複数の単語を指定可能
    • 指定した単語のいづれかが含まれるTweetが抽出される

正しく環境構築できていれば、取得したTweetがログ出力され、Redisに登録される

Redis確認

  1. redis-cli -h redis --rawでRedisに接続
  2. Redis-CLI上でzrange words 0 -1 withscoresを実行すると、出現頻度の昇順で単語が表示される

Redisの結果をクリアする場合、Redis-CLI上でdel wordsを実行

25
26
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
25
26