やること
- Vagrant up
- Twitterアプリケーション作成準備
- Install Scala (and sbt)
- Sparkの外部ライブラリのダウンロードなど
- 実装 [Apache Sparkで始めるお手軽リアルタイムウインドウ集計]
(http://www.intellilink.co.jp/article/column/bigdata-kk01.html) - そのメモ
概要
基本的に(1)のサイトに沿って実装していきますが,ハマった部分の解説を足していこうと思います.
元々やりたかったことは
$ sbt package
$ spark-submit target/xxxxxx.jar
で実行したかったですが,とりあえず今回はインラインで実行しました.
Option(メモ)としてsbtのインストール,プロジェクトの作成方法を書いておきます.
Vagrant up
# -*- mode: ruby -*-
# vi: set ft=ruby :
# Vagrantfile API/syntax version. Don't touch unless you know what you're doing!
VAGRANTFILE_API_VERSION = "2"
Vagrant.configure(VAGRANTFILE_API_VERSION) do |config|
config.vm.box = "opscode-ubuntu1410"
config.vm.box_url = "http://opscode-vm-bento.s3.amazonaws.com/vagrant/virtualbox/opscode_ubuntu-14.10_chef-provisionerless.box"
config.vm.provider :virtualbox do |vb|
vb.name = "spark"
vb.customize ["modifyvm", :id, "--memory", 1024]
end
#config.vm.network :private_network, ip: "192.168.33.31"
config.vm.synced_folder "documents", "/share", \
create: true, owner: 'vagrant', group: 'vagrant', \
mount_options: ['dmode=777,fmode=666']
config.vm.provision :shell, :inline => <<-EOT
EOT
end
Twitterアプリケーション作成準備
- Consumer Key
- Consumer Secret
- Access Token
- Access Token Secret
それぞれ取得します.
OSの時間設定
時間設定が狂っているとTwitterのAPIを使うときにエラーがでるそうです.UTCとかJSTは特にどっちでもエラーとは関係ないです.
[NTP で時刻合わせ | 自宅サーバー Debian/Ubuntu]
(http://debianj.com/ubuntu/install/ntp) にあるようにNTPで普通に時刻設定しても可能ですが, [vagrantで時刻がおかしい場合の対処法]
(http://polidog.jp/2014/01/08/vagrant/) にあるようにVagrantではホストOSと時刻の同期ができます.
各種キーの取得
以下のサイトに行きます.
https://apps.twitter.com/
Create New Appをクリックします.
こちらのサイトに沿ってキーを取得します.
[Twitterアプリケーションの作成 - 各種キーの取得]
(http://website-planner.com/twitter%E3%82%A2%E3%83%97%E3%83%AA%E3%82%B1%E3%83%BC%E3%82%B7%E3%83%A7%E3%83%B3%E3%81%AE%E4%BD%9C%E6%88%90%EF%BC%88consumer-key%E3%80%81consumer-secret%E3%80%81access-token%E3%80%81access-token-secret/)
※Callback URLはオプションですが,入力しないといけないみたいです.以下ソース
[Twitterアプリの新規作成時に必要なCallback URLについて分かった2つのこと]
(http://blog.ecoteki.com/webservice/post-1406/)
cURLタイムラインを取得してファイルに保存する
curlの -o オプションを使用するとファイルに保存できます.
こちらのサイトに沿ってタイムラインの取得をしてみる.
Twitter API を使う
サイト https://apps.twitter.com/ で先程自分で作成したものを選択してTest OAuthボタンを押します.
GETを選択
Request URIに
https://api.twitter.com/1.1/statuses/home_timeline.json
と入力
Get OAuth Signature ボタンを押す.
出てきたcURLコマンドをコピーする.
恐らく大量に出力がでるので -o オプションでファイルに保存.
curl -o timeline.json --get 'https://api.twitter.com/1.1/statuses/home_timeline.json' --header 'Authorization: OAuth oauth_consumer_key="xxxxxxxxxxxxxxxxx", oauth_nonce="xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx", oauth_signature="xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx", oauth_signature_method="HMAC-SHA1", oauth_timestamp="xxxxxxxxxx", oauth_token="xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx", oauth_version="1.0"' --verbose
取得したものをJSON Viewerで表示してみる
[Online JSON Viewer]
(http://jsonviewer.stack.hu/)
ここにtimeline.jsonの内容を貼り付けてFormatボタンを押したりするとキレイに見れます.
※最初に用意したVagrantfileではvagrantの/shareとホストOSのdocumentsというフォルダが同期されているので
$ cp timeline.json /share/
とかするとホストOSのディレクトリdocumentsに入ります.
Twitter API を使う のサイトの通りPOSTの手順を実行すると,testという文字列が投稿されます.
タイムライン取得でエラーが出る場合の原因一覧
[gem 'omniauth-twitter'で認証しようとしてOAuth::Unauthorized 401 Unauthorizedなんてエラーがでたら]
(http://qiita.com/hirokishirai/items/5a43977a38ecd922bfb9)
Install Scala and sbt
[Install Scala and sbt on Ubuntu 14.04]
(http://qiita.com/f2um2326/items/99b073d110989f36e6c3)
※sbtは今回使用しません.
Install Spark
こちら参照
[Install Spark on Ubuntu 14.04]
(http://qiita.com/f2um2326/items/b88fd743713910148ba0)
インラインで実装
参考サイト: Apache Sparkで始めるお手軽リアルタイムウインドウ集計
Sparkのインストールまでは行ったので,参考サイトの4, 5番のライブラリの準備からしていきます.
kuromojiや外部ライブラリのダウンロード
Twitter関係の外部ライブラリダウンロード
[Using twitterUtils in Spark shell]
(http://stackoverflow.com/questions/25085128/using-twitterutils-in-spark-shell)
ちなみに,先程ダウンロードしたSparkの中
~/spark-1.1.0/lib_managed/jars
の中に
lib_managed/jars/twitter4j-stream-3.0.3.jar
lib_managed/jars/twitter4j-core-3.0.3.jar
が元々ありますが,とりあえず今回はTwitter関連まとめてダウンロードしてきます.
ref: https://repo1.maven.org/maven2/org/apache/spark/spark-streaming-twitter_2.10/1.1.0/
$ cd ~/spark-1.1.0/lib_managed
$ mkdir twiter4j
$ cd twitter4j
$ wget https://repo1.maven.org/maven2/org/apache/spark/spark-streaming-twitter_2.10/1.1.0/spark-streaming-twitter_2.10-1.1.0.jar
$ TWITTER4J_SOURCE=twitter4j-3.0.3.zip
$ curl -O "http://twitter4j.org/archive/$TWITTER4J_SOURCE"
$ unzip -j ./$TWITTER4J_SOURCE "lib/*.jar" -d twitter4j/
$ rm twitter4j-3.0.3.zip
$ mv twitter4j/* .
$ rm -r twitter4j
Download kuromoji
$ cd ~/spark-1.1.0/lib_managed
$ wget https://github.com/downloads/atilika/kuromoji/kuromoji-0.7.7.zip
$ unzip -j kuromoji-0.7.7.zip
$ rm いらないファイル
実行
spark-shellコマンドでsparkのshellが起動しますが,その際 --jars オプションで外部ライブラリを読み込むようにします.恐らくもっとスマートな指定方法があると思いますが,とりあえずこれで動かしてみます.
$ cd ~
$ ./spark-1.1.0/bin/spark-shell --master local[2] --jars /home/vagrant/spark-1.1.0/lib_managed/twitter4j/spark-streaming-twitter_2.10-1.1.0.jar,/home/vagrant/spark-1.1.0/lib_managed/twitter4j/twitter4j-core-3.0.3.jar,/home/vagrant/spark-1.1.0/lib_managed/twitter4j/twitter4j-media-support-3.0.3.jar,/home/vagrant/spark-1.1.0/lib_managed/twitter4j/twitter4j-async-3.0.3.jar,/home/vagrant/spark-1.1.0/lib_managed/twitter4j/twitter4j-examples-3.0.3.jar,/home/vagrant/spark-1.1.0/lib_managed/twitter4j/twitter4j-stream-3.0.3.jar,/home/vagrant/spark-1.1.0/lib_managed/kuromoji-0.7.7/kuromoji-0.7.7.jar
local[2]
というのはCPUの数で,Sparkで分散処理をするような場合だとlocalやlocal[1]ではエラーとなるそうです.
参考サイトの方でソースに若干スペルミスがあったので,以下修正.
ソースコードを全部コピーして,
scala >
となっている部分にペーストします.
consumerKeyなどは適宜自分のものを使用してください.
import org.apache.spark.streaming._
import org.apache.spark.streaming.twitter._
import org.apache.spark.streaming.StreamingContext._
import org.atilika.kuromoji._
import org.atilika.kuromoji.Tokenizer._
import java.util.regex._
System.setProperty("twitter4j.oauth.consumerKey", "xxxxxxxxxxxxxxxxxxx")
System.setProperty("twitter4j.oauth.consumerSecret", "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx")
System.setProperty("twitter4j.oauth.accessToken", "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx")
System.setProperty("twitter4j.oauth.accessTokenSecret", "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx")
val ssc = new StreamingContext(sc, Seconds(60))
val stream = TwitterUtils.createStream(ssc, None, Array("iPhone6"))
val tweetStream = stream.flatMap(status => {
val tokenizer : Tokenizer = Tokenizer.builder().build()
val features : scala.collection.mutable.ArrayBuffer[String] = new collection.mutable.ArrayBuffer[String]()
var tweetText : String = status.getText()
val japanese_pattern : Pattern = Pattern.compile("[¥¥u3040-¥¥u309F]+")
if(japanese_pattern.matcher(tweetText).find()) {
tweetText = tweetText.replaceAll("http(s*)://(.*)/", "").replaceAll("¥¥uff57", "")
val tokens : java.util.List[Token] = tokenizer.tokenize(tweetText)
val pattern : Pattern = Pattern.compile("^[a-zA-Z]+$|^[0-9]+$")
for(index <- 0 to tokens.size()-1) {
val token = tokens.get(index)
val matcher : Matcher = pattern.matcher(token.getSurfaceForm())
if(token.getSurfaceForm().length() >= 3 && !matcher.find()) {
features += (token.getSurfaceForm() + "-" + token.getAllFeatures())
}
}
}
(features)
})
val topCounts60 = tweetStream.map((_, 1)
).reduceByKeyAndWindow(_+_, Seconds(60*60)
).map{case (topic, count) => (count, topic)
}.transform(_.sortByKey(false))
topCounts60.foreachRDD(rdd => {
val topList = rdd.take(20)
println("¥ nPopular topics in last 60*60 seconds (%s words):".format(rdd.count()))
topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))}
})
ssc.start()
ssc.awaitTermination()
これで動きます.
日本語文字化けします.
今後やること
scala >
となっている場所にソース入力するのではなく,sbt packageとしてspark-submitで実行したい.
[[Apache Spark]ストリーミング処理で直近の人気ハッシュタグを取得する]
(http://dev.classmethod.jp/etc/apache-spark_recent_hashtag_for_streaming/)
以下メモ
config.propertiesを用意
[util.properties]
(http://fits.hatenablog.com/entry/20110205/1296888257)
twitter_consumerKey=xxxxxxxxxx
twitter_consumerSecret=xxxxxxxxxx
twitter_accessToken=xxxxxxxxxx
twitter_accessTokenSecret=xxxxxxxxxx
libraryDependencies
vagrant@vagrant:~$ ./spark-1.1.0/bin/spark-submit \
--class "SimpleApp"
--master local[4]
./ScalaProjects/spark/target/scala-2.10/spark-project_2.10-1.0.jar
spark-1.1.0/bin/spark-submit --class "Startup" --master local[2] --jars /home/vagrant/spark-1.1.0/lib_managed/twitter4j/spark-streaming-twitter_2.10-1.1.0.jar,/home/vagrant/spark-1.1.0/lib_managed/twitter4j/twitter4j-core-3.0.3.jar,/home/vagrant/spark-1.1.0/lib_managed/twitter4j/twitter4j-media-support-3.0.3.jar,/home/vagrant/spark-1.1.0/lib_managed/twitter4j/twitter4j-async-3.0.3.jar,/home/vagrant/spark-1.1.0/lib_managed/twitter4j/twitter4j-examples-3.0.3.jar,/home/vagrant/spark-1.1.0/lib_managed/twitter4j/twitter4j-stream-3.0.3.jar,/home/vagrant/spark-1.1.0/lib_managed/kuromoji-0.7.7/kuromoji-0.7.7.jar ScalaProjects/hello/target/scala-2.10/hello_2.10-1.0.jar
メモ
http://xerial.org/scala-cookbook/recipes/2012/06/28/create-a-scala-project/
http://www.scala-sbt.org/0.13/tutorial/ja/Library-Dependencies.html
http://www.scala-sbt.org/0.13/tutorial/ja/Library-Dependencies.html
http://search.maven.org/#search%7Cga%7C1%7Ctwitter
http://www.scala-sbt.org/0.13/tutorial/ja/Library-Dependencies.html
http://qiita.com/kanuma1984/items/45412a82536d94e18631
https://spark.apache.org/docs/1.1.0/submitting-applications.html
http://stackoverflow.com/questions/28165032/java-lang-noclassdeffounderror-org-apache-spark-streaming-twitter-twitterutils
http://qiita.com/abey1192/items/7f0bc006cbe56abbebb9