Help us understand the problem. What is going on with this article?

Scala SparkでTwitterのストリーミング処理テスト on Vagrant Ubuntu 14.04

More than 5 years have passed since last update.

やること

  1. Vagrant up
  2. Twitterアプリケーション作成準備
  3. Install Scala (and sbt)
  4. Sparkの外部ライブラリのダウンロードなど
  5. 実装 Apache Sparkで始めるお手軽リアルタイムウインドウ集計
  6. そのメモ

概要

基本的に(1)のサイトに沿って実装していきますが,ハマった部分の解説を足していこうと思います.
元々やりたかったことは

$ sbt package
$ spark-submit target/xxxxxx.jar

で実行したかったですが,とりあえず今回はインラインで実行しました.
Option(メモ)としてsbtのインストール,プロジェクトの作成方法を書いておきます.

Vagrant up

Vagrantfile
# -*- mode: ruby -*-
# vi: set ft=ruby :

# Vagrantfile API/syntax version. Don't touch unless you know what you're doing!
VAGRANTFILE_API_VERSION = "2"

Vagrant.configure(VAGRANTFILE_API_VERSION) do |config|
  config.vm.box = "opscode-ubuntu1410"
  config.vm.box_url = "http://opscode-vm-bento.s3.amazonaws.com/vagrant/virtualbox/opscode_ubuntu-14.10_chef-provisionerless.box"

  config.vm.provider :virtualbox do |vb|
    vb.name = "spark"
    vb.customize ["modifyvm", :id, "--memory", 1024]
  end

  #config.vm.network :private_network, ip: "192.168.33.31"
  config.vm.synced_folder "documents", "/share", \
        create: true, owner: 'vagrant', group: 'vagrant', \
        mount_options: ['dmode=777,fmode=666']

  config.vm.provision :shell, :inline => <<-EOT
  EOT
end

Twitterアプリケーション作成準備

  1. Consumer Key
  2. Consumer Secret
  3. Access Token
  4. Access Token Secret それぞれ取得します.

OSの時間設定

時間設定が狂っているとTwitterのAPIを使うときにエラーがでるそうです.UTCとかJSTは特にどっちでもエラーとは関係ないです.
NTP で時刻合わせ | 自宅サーバー Debian/Ubuntu にあるようにNTPで普通に時刻設定しても可能ですが, vagrantで時刻がおかしい場合の対処法 にあるようにVagrantではホストOSと時刻の同期ができます.

各種キーの取得

以下のサイトに行きます.
https://apps.twitter.com/
Create New Appをクリックします.

こちらのサイトに沿ってキーを取得します.
Twitterアプリケーションの作成 - 各種キーの取得

※Callback URLはオプションですが,入力しないといけないみたいです.以下ソース
Twitterアプリの新規作成時に必要なCallback URLについて分かった2つのこと

cURLタイムラインを取得してファイルに保存する

curlの -o オプションを使用するとファイルに保存できます.

こちらのサイトに沿ってタイムラインの取得をしてみる.
Twitter API を使う

サイト https://apps.twitter.com/ で先程自分で作成したものを選択してTest OAuthボタンを押します.

GETを選択
Request URIに
https://api.twitter.com/1.1/statuses/home_timeline.json
と入力
Get OAuth Signature ボタンを押す.

出てきたcURLコマンドをコピーする.
恐らく大量に出力がでるので -o オプションでファイルに保存.

example
curl -o timeline.json --get 'https://api.twitter.com/1.1/statuses/home_timeline.json' --header 'Authorization: OAuth oauth_consumer_key="xxxxxxxxxxxxxxxxx", oauth_nonce="xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx", oauth_signature="xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx", oauth_signature_method="HMAC-SHA1", oauth_timestamp="xxxxxxxxxx", oauth_token="xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx", oauth_version="1.0"' --verbose

取得したものをJSON Viewerで表示してみる

Online JSON Viewer
ここにtimeline.jsonの内容を貼り付けてFormatボタンを押したりするとキレイに見れます.

※最初に用意したVagrantfileではvagrantの/shareとホストOSのdocumentsというフォルダが同期されているので

$ cp timeline.json /share/

とかするとホストOSのディレクトリdocumentsに入ります.

Twitter API を使う のサイトの通りPOSTの手順を実行すると,testという文字列が投稿されます.

タイムライン取得でエラーが出る場合の原因一覧

gem 'omniauth-twitter'で認証しようとしてOAuth::Unauthorized 401 Unauthorizedなんてエラーがでたら

Install Scala and sbt

Install Scala and sbt on Ubuntu 14.04
※sbtは今回使用しません.

Install Spark

こちら参照
Install Spark on Ubuntu 14.04

インラインで実装

参考サイト: Apache Sparkで始めるお手軽リアルタイムウインドウ集計
Sparkのインストールまでは行ったので,参考サイトの4, 5番のライブラリの準備からしていきます.

kuromojiや外部ライブラリのダウンロード

Twitter関係の外部ライブラリダウンロード

Using twitterUtils in Spark shell

ちなみに,先程ダウンロードしたSparkの中
~/spark-1.1.0/lib_managed/jars
の中に
lib_managed/jars/twitter4j-stream-3.0.3.jar
lib_managed/jars/twitter4j-core-3.0.3.jar
が元々ありますが,とりあえず今回はTwitter関連まとめてダウンロードしてきます.

ref: https://repo1.maven.org/maven2/org/apache/spark/spark-streaming-twitter_2.10/1.1.0/

$ cd ~/spark-1.1.0/lib_managed
$ mkdir twiter4j
$ cd twitter4j
$ wget https://repo1.maven.org/maven2/org/apache/spark/spark-streaming-twitter_2.10/1.1.0/spark-streaming-twitter_2.10-1.1.0.jar
$ TWITTER4J_SOURCE=twitter4j-3.0.3.zip
$ curl -O "http://twitter4j.org/archive/$TWITTER4J_SOURCE"
$ unzip -j ./$TWITTER4J_SOURCE "lib/*.jar" -d twitter4j/
$ rm twitter4j-3.0.3.zip
$ mv twitter4j/* .
$ rm -r twitter4j

Download kuromoji

$ cd ~/spark-1.1.0/lib_managed
$ wget https://github.com/downloads/atilika/kuromoji/kuromoji-0.7.7.zip
$ unzip -j kuromoji-0.7.7.zip
$ rm いらないファイル

実行

spark-shellコマンドでsparkのshellが起動しますが,その際 --jars オプションで外部ライブラリを読み込むようにします.恐らくもっとスマートな指定方法があると思いますが,とりあえずこれで動かしてみます.

起動
$ cd ~
$ ./spark-1.1.0/bin/spark-shell --master local[2] --jars /home/vagrant/spark-1.1.0/lib_managed/twitter4j/spark-streaming-twitter_2.10-1.1.0.jar,/home/vagrant/spark-1.1.0/lib_managed/twitter4j/twitter4j-core-3.0.3.jar,/home/vagrant/spark-1.1.0/lib_managed/twitter4j/twitter4j-media-support-3.0.3.jar,/home/vagrant/spark-1.1.0/lib_managed/twitter4j/twitter4j-async-3.0.3.jar,/home/vagrant/spark-1.1.0/lib_managed/twitter4j/twitter4j-examples-3.0.3.jar,/home/vagrant/spark-1.1.0/lib_managed/twitter4j/twitter4j-stream-3.0.3.jar,/home/vagrant/spark-1.1.0/lib_managed/kuromoji-0.7.7/kuromoji-0.7.7.jar

local[2]
というのはCPUの数で,Sparkで分散処理をするような場合だとlocalやlocal[1]ではエラーとなるそうです.

参考サイトの方でソースに若干スペルミスがあったので,以下修正.
ソースコードを全部コピーして,
scala >
となっている部分にペーストします.
consumerKeyなどは適宜自分のものを使用してください.

timeline.scala
import org.apache.spark.streaming._
import org.apache.spark.streaming.twitter._
import org.apache.spark.streaming.StreamingContext._
import org.atilika.kuromoji._
import org.atilika.kuromoji.Tokenizer._
import java.util.regex._

System.setProperty("twitter4j.oauth.consumerKey", "xxxxxxxxxxxxxxxxxxx")
System.setProperty("twitter4j.oauth.consumerSecret", "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx")
System.setProperty("twitter4j.oauth.accessToken", "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx")
System.setProperty("twitter4j.oauth.accessTokenSecret", "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx")


val ssc = new StreamingContext(sc, Seconds(60))

val stream = TwitterUtils.createStream(ssc, None, Array("iPhone6"))

val tweetStream = stream.flatMap(status => {
   val tokenizer : Tokenizer = Tokenizer.builder().build()
   val features : scala.collection.mutable.ArrayBuffer[String] = new collection.mutable.ArrayBuffer[String]()
   var tweetText : String = status.getText()

   val japanese_pattern : Pattern = Pattern.compile("[¥¥u3040-¥¥u309F]+")
   if(japanese_pattern.matcher(tweetText).find()) {
     tweetText = tweetText.replaceAll("http(s*)://(.*)/", "").replaceAll("¥¥uff57", "")

     val tokens : java.util.List[Token] = tokenizer.tokenize(tweetText)
     val pattern : Pattern = Pattern.compile("^[a-zA-Z]+$|^[0-9]+$")
     for(index <- 0 to tokens.size()-1) {
       val token = tokens.get(index)
       val matcher : Matcher = pattern.matcher(token.getSurfaceForm())
       if(token.getSurfaceForm().length() >= 3 && !matcher.find()) {
         features += (token.getSurfaceForm() + "-" + token.getAllFeatures())
       }
     }
   }
   (features)
})

val topCounts60 = tweetStream.map((_, 1)
                  ).reduceByKeyAndWindow(_+_, Seconds(60*60)
                  ).map{case (topic, count) => (count, topic)
                  }.transform(_.sortByKey(false))

topCounts60.foreachRDD(rdd => {
   val topList = rdd.take(20)
   println("¥ nPopular topics in last 60*60 seconds (%s words):".format(rdd.count()))
   topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))}
})

ssc.start()
ssc.awaitTermination()

これで動きます.
日本語文字化けします.

今後やること

scala >
となっている場所にソース入力するのではなく,sbt packageとしてspark-submitで実行したい.

[Apache Spark]ストリーミング処理で直近の人気ハッシュタグを取得する

以下メモ

config.propertiesを用意

util.properties

config.properties
twitter_consumerKey=xxxxxxxxxx
twitter_consumerSecret=xxxxxxxxxx
twitter_accessToken=xxxxxxxxxx
twitter_accessTokenSecret=xxxxxxxxxx

libraryDependencies

vagrant@vagrant:~$ ./spark-1.1.0/bin/spark-submit \

--class "SimpleApp" \
--master local[4] \
./ScalaProjects/spark/target/scala-2.10/spark-project_2.10-1.0.jar

spark-1.1.0/bin/spark-submit --class "Startup" --master local[2] --jars /home/vagrant/spark-1.1.0/lib_managed/twitter4j/spark-streaming-twitter_2.10-1.1.0.jar,/home/vagrant/spark-1.1.0/lib_managed/twitter4j/twitter4j-core-3.0.3.jar,/home/vagrant/spark-1.1.0/lib_managed/twitter4j/twitter4j-media-support-3.0.3.jar,/home/vagrant/spark-1.1.0/lib_managed/twitter4j/twitter4j-async-3.0.3.jar,/home/vagrant/spark-1.1.0/lib_managed/twitter4j/twitter4j-examples-3.0.3.jar,/home/vagrant/spark-1.1.0/lib_managed/twitter4j/twitter4j-stream-3.0.3.jar,/home/vagrant/spark-1.1.0/lib_managed/kuromoji-0.7.7/kuromoji-0.7.7.jar ScalaProjects/hello/target/scala-2.10/hello_2.10-1.0.jar

メモ

http://xerial.org/scala-cookbook/recipes/2012/06/28/create-a-scala-project/
http://www.scala-sbt.org/0.13/tutorial/ja/Library-Dependencies.html
http://www.scala-sbt.org/0.13/tutorial/ja/Library-Dependencies.html
http://search.maven.org/#search%7Cga%7C1%7Ctwitter
http://www.scala-sbt.org/0.13/tutorial/ja/Library-Dependencies.html
http://qiita.com/kanuma1984/items/45412a82536d94e18631
https://spark.apache.org/docs/1.1.0/submitting-applications.html
http://stackoverflow.com/questions/28165032/java-lang-noclassdeffounderror-org-apache-spark-streaming-twitter-twitterutils
http://qiita.com/abey1192/items/7f0bc006cbe56abbebb9

tshimba
このサイトの掲載内容は私自身の見解であり、所属する組織とは関係ありません
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away