こちらで、Spark StreamingからRDBへのデータ書き出しの作業の流れ(Spark Streaming / JDBC連携)を確認した上で、Spark StreamingによるTwitter構文解析※データを、PostgreSQLに格納した手順をまとめます。
実行環境は次の通りです。
・CentOS 7.5
・PostgreSQL 9.2.23
・Apache Spark 2.3.1
・Scala 2.12.6
・kuromoji 0.7.7
・Spark Streaming Twitter 2.10 rev 1.1.0
・Twitter4J 3.0.3
#PostgreSQL : 格納用テーブルの作成
目的)60秒毎に「iPhone6」が含まれるTweet中に出現する単語とその頻度をカウントし、先頭10件の単語情報を格納対象とする
psql -d mydb -U postgres
mydb=#
create table Twitter4J (
tweet1 varchar,
tweet2 varchar,
tweet3 varchar,
tweet4 varchar,
tweet5 varchar,
tweet6 varchar,
tweet7 varchar,
tweet8 varchar,
tweet9 varchar,
tweet10 varchar
);
\dt;
\q
#Spark-shell : Spark Streaming及びPostgreSQL関連モジュールのロード
--driver-calss-path : driver (メインPGM)実行用Classpath
--jars : driver/executor (リモート)実行用Classpath
spark-shell --master local[2] --driver-class-path=/usr/share/java/postgresql-jdbc.jar --jars /usr/local/lib/spark/twitter4j/spark-streaming-twitter_2.10-1.1.0.jar,/usr/local/lib/spark/twitter4j/twitter4j-core-3.0.3.jar,/usr/local/lib/spark/twitter4j/twitter4j-media-support-3.0.3.jar,/usr/local/lib/spark/twitter4j/twitter4j-async-3.0.3.jar,/usr/local/lib/spark/twitter4j/twitter4j-examples-3.0.3.jar,/usr/local/lib/spark/twitter4j/twitter4j-stream-3.0.3.jar,/usr/local/lib/spark/twitter4j/spark-core_2.11-1.5.2.logging.jar,/usr/local/lib/spark/kuromoji-0.7.7/lib/kuromoji-0.7.7.jar
確認)PostgreSQL mydbのpublic.Twitter4J表のデータをロードする
val jdbcDF = spark.read.format("jdbc").option("url", "jdbc:postgresql://localhost:5432/mydb").option("dbtable", "public.Twitter4J").option("user", "postgres").load()
jdbcDF.show()
※ Bundle重複の警告はスキップ
※ Another instance of Derby may have already booted the database エラー発生時は、該当Spark-shellのプロセスをkill。spark-shellの停止はCtrl-D
#Spark-shell : Twitter4JによるTwitter集計の実行とPostgreSQLへの結果の格納
import java.sql.DriverManager
import org.apache.spark.streaming._
import org.apache.spark.streaming._
import org.apache.spark.streaming.twitter._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.SparkConf
import java.util.regex._
import org.atilika.kuromoji._
import org.atilika.kuromoji.Tokenizer._
System.setProperty("twitter4j.oauth.consumerKey","XXXXXXXXXXXXXXXX")
System.setProperty("twitter4j.oauth.consumerSecret","XXXXXXXXXXXXXXXX")
System.setProperty("twitter4j.oauth.accessToken","XXXXXXXXXXXXXXXX")
System.setProperty("twitter4j.oauth.accessTokenSecret","XXXXXXXXXXXXXXXX")
val ssc = new StreamingContext(sc, Seconds(60))
val stream = TwitterUtils.createStream(ssc, None, Array("iPhone6"))
val tweetStream = stream.flatMap(status => {
val tokenizer : Tokenizer = Tokenizer.builder().build()
val features : scala.collection.mutable.ArrayBuffer[String] = new collection.mutable.ArrayBuffer[String]()
var tweetText : String = status.getText()
val japanese_pattern : Pattern = Pattern.compile("[\\u3040-\\u309F]+")
if(japanese_pattern.matcher(tweetText).find()) {
tweetText = tweetText.replaceAll("http(s*)://(.*)/", "").replaceAll("\\uff57", "")
val tokens : java.util.List[Token] = tokenizer.tokenize(tweetText)
val pattern : Pattern = Pattern.compile("^[a-zA-Z]+$|^[0-9]+$")
for(index <- 0 to tokens.size()-1) {
val token = tokens.get(index)
val matcher : Matcher = pattern.matcher(token.getSurfaceForm())
if(token.getSurfaceForm().length() >= 3 && !matcher.find()) {
features += (token.getSurfaceForm() + "-" + token.getAllFeatures())
}
}
}
(features)
})
val topCounts60 = tweetStream.map((_, 1)
).reduceByKeyAndWindow(_+_, Seconds(60*60)
).map{case (topic, count) => (count, topic)
}.transform(_.sortByKey(false))
topCounts60.foreachRDD(rdd => {
val topList = rdd.take(20)
println("\n Popular topics in last 60*60 seconds (%s words):".format(rdd.count()))
topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))}
//mapの各要素を文字型に変換
val myVal1= topList(1).toString
val myVal2= topList(2).toString
val myVal3= topList(3).toString
val myVal4= topList(4).toString
val myVal5= topList(5).toString
val myVal6= topList(6).toString
val myVal7= topList(7).toString
val myVal8= topList(8).toString
val myVal9= topList(9).toString
val myVal10=topList(10).toString
val prop = new java.util.Properties
prop.setProperty("driver", "org.postgresql.Driver")
prop.setProperty("user", "postgres")
prop.setProperty("password", "postgres")
Class.forName("org.postgresql.Driver")
val conn= DriverManager.getConnection("jdbc:postgresql://localhost:5432/mydb", prop)
//mapの各要素をPostgreSQL Twitter4J表に挿入
val setSQL_conn = conn.prepareStatement ("INSERT INTO public.Twitter4J values (?,?,?,?,?,?,?,?,?,?)")
setSQL_conn.setString(1,myVal1)
setSQL_conn.setString(2,myVal2)
setSQL_conn.setString(3,myVal3)
setSQL_conn.setString(4,myVal4)
setSQL_conn.setString(5,myVal5)
setSQL_conn.setString(6,myVal6)
setSQL_conn.setString(7,myVal7)
setSQL_conn.setString(8,myVal8)
setSQL_conn.setString(9,myVal9)
setSQL_conn.setString(10,myVal10)
setSQL_conn.executeUpdate
conn.close()
})
ssc.start()
ssc.awaitTermination()
※参照情報
Scalaの構文一般
PrepareStatement構文