2
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Spark Streaming : Twitter4J集計データのPostgreSQLへの格納

Last updated at Posted at 2018-08-15

こちらで、Spark StreamingからRDBへのデータ書き出しの作業の流れ(Spark Streaming / JDBC連携)を確認した上で、Spark StreamingによるTwitter構文解析データを、PostgreSQLに格納した手順をまとめます。
実行環境は次の通りです。

・CentOS 7.5
・PostgreSQL 9.2.23
・Apache Spark 2.3.1
・Scala 2.12.6
・kuromoji 0.7.7
・Spark Streaming Twitter 2.10 rev 1.1.0
・Twitter4J 3.0.3

#PostgreSQL : 格納用テーブルの作成
目的)60秒毎に「iPhone6」が含まれるTweet中に出現する単語とその頻度をカウントし、先頭10件の単語情報を格納対象とする

psql -d mydb -U postgres 

mydb=#  
create table Twitter4J (
tweet1 varchar,
tweet2 varchar,
tweet3 varchar,
tweet4 varchar,
tweet5 varchar,
tweet6 varchar,
tweet7 varchar,
tweet8 varchar,
tweet9 varchar,
tweet10 varchar
);

\dt;
\q

#Spark-shell : Spark Streaming及びPostgreSQL関連モジュールのロード
--driver-calss-path : driver (メインPGM)実行用Classpath
--jars : driver/executor (リモート)実行用Classpath

spark-shell --master local[2] --driver-class-path=/usr/share/java/postgresql-jdbc.jar --jars /usr/local/lib/spark/twitter4j/spark-streaming-twitter_2.10-1.1.0.jar,/usr/local/lib/spark/twitter4j/twitter4j-core-3.0.3.jar,/usr/local/lib/spark/twitter4j/twitter4j-media-support-3.0.3.jar,/usr/local/lib/spark/twitter4j/twitter4j-async-3.0.3.jar,/usr/local/lib/spark/twitter4j/twitter4j-examples-3.0.3.jar,/usr/local/lib/spark/twitter4j/twitter4j-stream-3.0.3.jar,/usr/local/lib/spark/twitter4j/spark-core_2.11-1.5.2.logging.jar,/usr/local/lib/spark/kuromoji-0.7.7/lib/kuromoji-0.7.7.jar

確認)PostgreSQL mydbのpublic.Twitter4J表のデータをロードする

val jdbcDF = spark.read.format("jdbc").option("url", "jdbc:postgresql://localhost:5432/mydb").option("dbtable", "public.Twitter4J").option("user", "postgres").load()

jdbcDF.show()

※ Bundle重複の警告はスキップ

Another instance of Derby may have already booted the database エラー発生時は、該当Spark-shellのプロセスをkill。spark-shellの停止はCtrl-D

#Spark-shell : Twitter4JによるTwitter集計の実行とPostgreSQLへの結果の格納

import java.sql.DriverManager
import org.apache.spark.streaming._
import org.apache.spark.streaming._
import org.apache.spark.streaming.twitter._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.SparkConf
import java.util.regex._
import org.atilika.kuromoji._
import org.atilika.kuromoji.Tokenizer._

System.setProperty("twitter4j.oauth.consumerKey","XXXXXXXXXXXXXXXX")
System.setProperty("twitter4j.oauth.consumerSecret","XXXXXXXXXXXXXXXX")
System.setProperty("twitter4j.oauth.accessToken","XXXXXXXXXXXXXXXX")
System.setProperty("twitter4j.oauth.accessTokenSecret","XXXXXXXXXXXXXXXX")

val ssc = new StreamingContext(sc, Seconds(60))
val stream = TwitterUtils.createStream(ssc, None, Array("iPhone6"))

val tweetStream = stream.flatMap(status => {
   val tokenizer : Tokenizer = Tokenizer.builder().build()
   val features : scala.collection.mutable.ArrayBuffer[String] = new collection.mutable.ArrayBuffer[String]()
   var tweetText : String = status.getText()

   val japanese_pattern : Pattern = Pattern.compile("[\\u3040-\\u309F]+")
   if(japanese_pattern.matcher(tweetText).find()) {
     tweetText = tweetText.replaceAll("http(s*)://(.*)/", "").replaceAll("\\uff57", "")
     val tokens : java.util.List[Token] = tokenizer.tokenize(tweetText) 
     val pattern : Pattern = Pattern.compile("^[a-zA-Z]+$|^[0-9]+$")
     for(index <- 0 to tokens.size()-1) {
       val token = tokens.get(index)
       val matcher : Matcher = pattern.matcher(token.getSurfaceForm())
       if(token.getSurfaceForm().length() >= 3 && !matcher.find()) {
         features += (token.getSurfaceForm() + "-" + token.getAllFeatures())
       }
     }
   }
   (features)
})
val topCounts60 = tweetStream.map((_, 1)
                  ).reduceByKeyAndWindow(_+_, Seconds(60*60)
                  ).map{case (topic, count) => (count, topic)
                  }.transform(_.sortByKey(false))
topCounts60.foreachRDD(rdd => {
   val topList = rdd.take(20)
   println("\n Popular topics in last 60*60 seconds (%s words):".format(rdd.count()))
   topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))}
//mapの各要素を文字型に変換
val myVal1= topList(1).toString 
val myVal2= topList(2).toString 
val myVal3= topList(3).toString 
val myVal4= topList(4).toString 
val myVal5= topList(5).toString 
val myVal6= topList(6).toString 
val myVal7= topList(7).toString 
val myVal8= topList(8).toString 
val myVal9= topList(9).toString 
val myVal10=topList(10).toString

val prop = new java.util.Properties
   prop.setProperty("driver", "org.postgresql.Driver")
   prop.setProperty("user", "postgres")
   prop.setProperty("password", "postgres") 

Class.forName("org.postgresql.Driver")
val conn= DriverManager.getConnection("jdbc:postgresql://localhost:5432/mydb", prop)
//mapの各要素をPostgreSQL Twitter4J表に挿入
val setSQL_conn = conn.prepareStatement ("INSERT INTO public.Twitter4J values (?,?,?,?,?,?,?,?,?,?)")
setSQL_conn.setString(1,myVal1)
setSQL_conn.setString(2,myVal2)
setSQL_conn.setString(3,myVal3)
setSQL_conn.setString(4,myVal4)
setSQL_conn.setString(5,myVal5)
setSQL_conn.setString(6,myVal6)
setSQL_conn.setString(7,myVal7)
setSQL_conn.setString(8,myVal8)
setSQL_conn.setString(9,myVal9)
setSQL_conn.setString(10,myVal10)

setSQL_conn.executeUpdate  
conn.close()
})

ssc.start()
ssc.awaitTermination()

#PostgreSQL:格納結果の確認
tweet.JPG

※参照情報
Scalaの構文一般
PrepareStatement構文

2
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?