Scala
Ubuntu
Twitter
vagrant
Spark

Scala SparkでTwitterのストリーミング処理テスト on Vagrant Ubuntu 14.04

More than 3 years have passed since last update.


やること


  1. Vagrant up

  2. Twitterアプリケーション作成準備

  3. Install Scala (and sbt)

  4. Sparkの外部ライブラリのダウンロードなど

  5. 実装 Apache Sparkで始めるお手軽リアルタイムウインドウ集計

  6. そのメモ


概要

基本的に(1)のサイトに沿って実装していきますが,ハマった部分の解説を足していこうと思います.

元々やりたかったことは

$ sbt package

$ spark-submit target/xxxxxx.jar

で実行したかったですが,とりあえず今回はインラインで実行しました.

Option(メモ)としてsbtのインストール,プロジェクトの作成方法を書いておきます.


Vagrant up


Vagrantfile

# -*- mode: ruby -*-

# vi: set ft=ruby :

# Vagrantfile API/syntax version. Don't touch unless you know what you're doing!
VAGRANTFILE_API_VERSION = "2"

Vagrant.configure(VAGRANTFILE_API_VERSION) do |config|
config.vm.box = "opscode-ubuntu1410"
config.vm.box_url = "http://opscode-vm-bento.s3.amazonaws.com/vagrant/virtualbox/opscode_ubuntu-14.10_chef-provisionerless.box"

config.vm.provider :virtualbox do |vb|
vb.name = "spark"
vb.customize ["modifyvm", :id, "--memory", 1024]
end

#config.vm.network :private_network, ip: "192.168.33.31"
config.vm.synced_folder "documents", "/share", \
create: true, owner: 'vagrant', group: 'vagrant', \
mount_options: ['dmode=777,fmode=666']

config.vm.provision :shell, :inline => <<-EOT
EOT
end



Twitterアプリケーション作成準備


  1. Consumer Key

  2. Consumer Secret

  3. Access Token

  4. Access Token Secret
    それぞれ取得します.


OSの時間設定

時間設定が狂っているとTwitterのAPIを使うときにエラーがでるそうです.UTCとかJSTは特にどっちでもエラーとは関係ないです.

NTP で時刻合わせ | 自宅サーバー Debian/Ubuntu にあるようにNTPで普通に時刻設定しても可能ですが, vagrantで時刻がおかしい場合の対処法 にあるようにVagrantではホストOSと時刻の同期ができます.


各種キーの取得

以下のサイトに行きます.

https://apps.twitter.com/

Create New Appをクリックします.

こちらのサイトに沿ってキーを取得します.

Twitterアプリケーションの作成 - 各種キーの取得

※Callback URLはオプションですが,入力しないといけないみたいです.以下ソース

Twitterアプリの新規作成時に必要なCallback URLについて分かった2つのこと


cURLタイムラインを取得してファイルに保存する

curlの -o オプションを使用するとファイルに保存できます.

こちらのサイトに沿ってタイムラインの取得をしてみる.

Twitter API を使う

サイト https://apps.twitter.com/ で先程自分で作成したものを選択してTest OAuthボタンを押します.

GETを選択

Request URIに

https://api.twitter.com/1.1/statuses/home_timeline.json

と入力

Get OAuth Signature ボタンを押す.

出てきたcURLコマンドをコピーする.

恐らく大量に出力がでるので -o オプションでファイルに保存.


example

curl -o timeline.json --get 'https://api.twitter.com/1.1/statuses/home_timeline.json' --header 'Authorization: OAuth oauth_consumer_key="xxxxxxxxxxxxxxxxx", oauth_nonce="xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx", oauth_signature="xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx", oauth_signature_method="HMAC-SHA1", oauth_timestamp="xxxxxxxxxx", oauth_token="xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx", oauth_version="1.0"' --verbose



取得したものをJSON Viewerで表示してみる

Online JSON Viewer

ここにtimeline.jsonの内容を貼り付けてFormatボタンを押したりするとキレイに見れます.

※最初に用意したVagrantfileではvagrantの/shareとホストOSのdocumentsというフォルダが同期されているので



$ cp timeline.json /share/



とかするとホストOSのディレクトリdocumentsに入ります.

Twitter API を使う のサイトの通りPOSTの手順を実行すると,testという文字列が投稿されます.


タイムライン取得でエラーが出る場合の原因一覧

gem 'omniauth-twitter'で認証しようとしてOAuth::Unauthorized 401 Unauthorizedなんてエラーがでたら


Install Scala and sbt

Install Scala and sbt on Ubuntu 14.04

※sbtは今回使用しません.


Install Spark

こちら参照

Install Spark on Ubuntu 14.04


インラインで実装

参考サイト: Apache Sparkで始めるお手軽リアルタイムウインドウ集計

Sparkのインストールまでは行ったので,参考サイトの4, 5番のライブラリの準備からしていきます.


kuromojiや外部ライブラリのダウンロード


Twitter関係の外部ライブラリダウンロード

Using twitterUtils in Spark shell

ちなみに,先程ダウンロードしたSparkの中

~/spark-1.1.0/lib_managed/jars

の中に

lib_managed/jars/twitter4j-stream-3.0.3.jar

lib_managed/jars/twitter4j-core-3.0.3.jar

が元々ありますが,とりあえず今回はTwitter関連まとめてダウンロードしてきます.

ref: https://repo1.maven.org/maven2/org/apache/spark/spark-streaming-twitter_2.10/1.1.0/

$ cd ~/spark-1.1.0/lib_managed

$ mkdir twiter4j
$ cd twitter4j
$ wget https://repo1.maven.org/maven2/org/apache/spark/spark-streaming-twitter_2.10/1.1.0/spark-streaming-twitter_2.10-1.1.0.jar
$ TWITTER4J_SOURCE=twitter4j-3.0.3.zip
$ curl -O "http://twitter4j.org/archive/$TWITTER4J_SOURCE"
$ unzip -j ./$TWITTER4J_SOURCE "lib/*.jar" -d twitter4j/
$ rm twitter4j-3.0.3.zip
$ mv twitter4j/* .
$ rm -r twitter4j


Download kuromoji

$ cd ~/spark-1.1.0/lib_managed

$ wget https://github.com/downloads/atilika/kuromoji/kuromoji-0.7.7.zip
$ unzip -j kuromoji-0.7.7.zip
$ rm いらないファイル


実行

spark-shellコマンドでsparkのshellが起動しますが,その際 --jars オプションで外部ライブラリを読み込むようにします.恐らくもっとスマートな指定方法があると思いますが,とりあえずこれで動かしてみます.


起動

$ cd ~

$ ./spark-1.1.0/bin/spark-shell --master local[2] --jars /home/vagrant/spark-1.1.0/lib_managed/twitter4j/spark-streaming-twitter_2.10-1.1.0.jar,/home/vagrant/spark-1.1.0/lib_managed/twitter4j/twitter4j-core-3.0.3.jar,/home/vagrant/spark-1.1.0/lib_managed/twitter4j/twitter4j-media-support-3.0.3.jar,/home/vagrant/spark-1.1.0/lib_managed/twitter4j/twitter4j-async-3.0.3.jar,/home/vagrant/spark-1.1.0/lib_managed/twitter4j/twitter4j-examples-3.0.3.jar,/home/vagrant/spark-1.1.0/lib_managed/twitter4j/twitter4j-stream-3.0.3.jar,/home/vagrant/spark-1.1.0/lib_managed/kuromoji-0.7.7/kuromoji-0.7.7.jar

local[2]

というのはCPUの数で,Sparkで分散処理をするような場合だとlocalやlocal[1]ではエラーとなるそうです.

参考サイトの方でソースに若干スペルミスがあったので,以下修正.

ソースコードを全部コピーして,

scala >

となっている部分にペーストします.

consumerKeyなどは適宜自分のものを使用してください.


timeline.scala

import org.apache.spark.streaming._

import org.apache.spark.streaming.twitter._
import org.apache.spark.streaming.StreamingContext._
import org.atilika.kuromoji._
import org.atilika.kuromoji.Tokenizer._
import java.util.regex._

System.setProperty("twitter4j.oauth.consumerKey", "xxxxxxxxxxxxxxxxxxx")
System.setProperty("twitter4j.oauth.consumerSecret", "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx")
System.setProperty("twitter4j.oauth.accessToken", "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx")
System.setProperty("twitter4j.oauth.accessTokenSecret", "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx")

val ssc = new StreamingContext(sc, Seconds(60))

val stream = TwitterUtils.createStream(ssc, None, Array("iPhone6"))

val tweetStream = stream.flatMap(status => {
val tokenizer : Tokenizer = Tokenizer.builder().build()
val features : scala.collection.mutable.ArrayBuffer[String] = new collection.mutable.ArrayBuffer[String]()
var tweetText : String = status.getText()

val japanese_pattern : Pattern = Pattern.compile("[¥¥u3040-¥¥u309F]+")
if(japanese_pattern.matcher(tweetText).find()) {
tweetText = tweetText.replaceAll("http(s*)://(.*)/", "").replaceAll("¥¥uff57", "")

val tokens : java.util.List[Token] = tokenizer.tokenize(tweetText)
val pattern : Pattern = Pattern.compile("^[a-zA-Z]+$|^[0-9]+$")
for(index <- 0 to tokens.size()-1) {
val token = tokens.get(index)
val matcher : Matcher = pattern.matcher(token.getSurfaceForm())
if(token.getSurfaceForm().length() >= 3 && !matcher.find()) {
features += (token.getSurfaceForm() + "-" + token.getAllFeatures())
}
}
}
(features)
})

val topCounts60 = tweetStream.map((_, 1)
).reduceByKeyAndWindow(_+_, Seconds(60*60)
).map{case (topic, count) => (count, topic)
}.transform(_.sortByKey(false))

topCounts60.foreachRDD(rdd => {
val topList = rdd.take(20)
println("¥ nPopular topics in last 60*60 seconds (%s words):".format(rdd.count()))
topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))}
})

ssc.start()
ssc.awaitTermination()


これで動きます.

日本語文字化けします.


今後やること

scala >

となっている場所にソース入力するのではなく,sbt packageとしてspark-submitで実行したい.

[Apache Spark]ストリーミング処理で直近の人気ハッシュタグを取得する


以下メモ


config.propertiesを用意

util.properties


config.properties

twitter_consumerKey=xxxxxxxxxx

twitter_consumerSecret=xxxxxxxxxx
twitter_accessToken=xxxxxxxxxx
twitter_accessTokenSecret=xxxxxxxxxx

libraryDependencies

vagrant@vagrant:~$ ./spark-1.1.0/bin/spark-submit \


--class "SimpleApp" \

--master local[4] \

./ScalaProjects/spark/target/scala-2.10/spark-project_2.10-1.0.jar


spark-1.1.0/bin/spark-submit --class "Startup" --master local[2] --jars /home/vagrant/spark-1.1.0/lib_managed/twitter4j/spark-streaming-twitter_2.10-1.1.0.jar,/home/vagrant/spark-1.1.0/lib_managed/twitter4j/twitter4j-core-3.0.3.jar,/home/vagrant/spark-1.1.0/lib_managed/twitter4j/twitter4j-media-support-3.0.3.jar,/home/vagrant/spark-1.1.0/lib_managed/twitter4j/twitter4j-async-3.0.3.jar,/home/vagrant/spark-1.1.0/lib_managed/twitter4j/twitter4j-examples-3.0.3.jar,/home/vagrant/spark-1.1.0/lib_managed/twitter4j/twitter4j-stream-3.0.3.jar,/home/vagrant/spark-1.1.0/lib_managed/kuromoji-0.7.7/kuromoji-0.7.7.jar ScalaProjects/hello/target/scala-2.10/hello_2.10-1.0.jar


メモ

http://xerial.org/scala-cookbook/recipes/2012/06/28/create-a-scala-project/

http://www.scala-sbt.org/0.13/tutorial/ja/Library-Dependencies.html

http://www.scala-sbt.org/0.13/tutorial/ja/Library-Dependencies.html

http://search.maven.org/#search%7Cga%7C1%7Ctwitter

http://www.scala-sbt.org/0.13/tutorial/ja/Library-Dependencies.html

http://qiita.com/kanuma1984/items/45412a82536d94e18631

https://spark.apache.org/docs/1.1.0/submitting-applications.html

http://stackoverflow.com/questions/28165032/java-lang-noclassdeffounderror-org-apache-spark-streaming-twitter-twitterutils

http://qiita.com/abey1192/items/7f0bc006cbe56abbebb9