前回はこちらです。
https://qiita.com/SHA_AKA/items/f57cbe11b208282103e3
基礎編の最後は、SparkStreamingについて実装しようと思います。
環境は前回と同じです
目次
- pom.xmlの更新とncatのダウンロード
- stateless&stateful
- 最初のコード(stateless)
- window処理(stateful)
- MySQLへの書き込み
- その他
pom.xmlの更新とncatのダウンロード
pom.xmlに下記の依頼関係を追加
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.13</artifactId>
<version>3.2.1</version>
<scope>provided</scope>
</dependency>
ローカルportを監視して、データを取るため、ncatのダウンロードします。
sudo apt-get -y install ncat
インストール完了後、新たなターミナルを開けて、下記のコマンドを実行
nc -lk 8080
stateless&stateful
ストリーミング処理本質的には、バッチ処理です。処理の時間間隔がすごく短くだけなので、見た目は「連続」になります。
SparkStreamingでは、statelessとstateful二つの変換操作があります。
簡単に説明すると、
・stateless
今回取得のデータのみ、前後のデータとは関係ない操作
・stateful
今回のデータだけではなく、前後のデータまたはその計算結果にも関わる操作
最初のコードで先ずはstatelessの例を挙げます。
最初のコード(stateless)
実行例:errorを含むレコードを抽出
package com;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
public class SparkStreaming {
public static void main(String[] args) throws InterruptedException {
/*
conf設定、local[2]とは2 thread
JavaStreamingContext設定、Durations.seconds(5)とは処理間隔が5秒
*/
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("sparkStreamIng");
JavaStreamingContext javaStreamingContext = new JavaStreamingContext(conf, Durations.seconds(5));
/*
InputDStream:IP:localhost,PORT:8080
errorを含む行を抽出
*/
JavaReceiverInputDStream<String> inputDStream = javaStreamingContext.socketTextStream("xiexiaofeng-virtual-machine",8080);
JavaDStream<String> errorLine = inputDStream.filter(s -> s.contains("error"));
//errorLineをプリント
errorLine.print();
/*
監視開始
*/
javaStreamingContext.start();
javaStreamingContext.awaitTermination();
}
}
上記の例によって、port8080を監視して、5秒ごとにその内容を読み込んで、"error"という文字を含むレコードを抽出できます。
結果は下記のように
window処理(stateful)
statefulの操作では、window関数により時間間隔を指定することが必要です。
・window関数:一定時間間隔のデータを処理する
・window length:windowの長さ、つまり時間間隔
・sliding interval:処理の頻度
また、stateful操作ではチェックポイントが必要です。
コード例:30秒ごとにIPごとのアクセス回数カウントして、10秒一回実施する
package com;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;
public class SparkStreamingCount {
public static void main(String[] args) throws InterruptedException {
SparkConf sparkConf = new SparkConf().setAppName("SparkCount").setMaster("local[2]");
JavaStreamingContext javaStreamingContext = new JavaStreamingContext(sparkConf, Durations.seconds(10));
//check point
javaStreamingContext.checkpoint("/opt/data1");
JavaDStream<String> dStream = javaStreamingContext.socketTextStream("xiexiaofeng-virtual-machine",8080);
JavaPairDStream<String,Long> namedStream = dStream.mapToPair(s-> new Tuple2(s,1l));
/*
window関数:一定時間間隔のデータを処理する
window length:windowの長さ、つまり時間間隔、ここでは30秒
sliding interval:処理の頻度、ここでは10秒
*/
JavaPairDStream<String, Long> result = namedStream.reduceByKeyAndWindow(new Add(), new Minus(), Durations.seconds(30), Durations.seconds(10));
//print
result.print();
//start
javaStreamingContext.start();
javaStreamingContext.awaitTermination();
}
static class Add implements Function2<Long,Long,Long> {
@Override
public Long call(Long v1, Long v2) throws Exception{
return v1 + v2;
}
}
static class Minus implements Function2<Long,Long,Long> {
@Override
public Long call(Long v1, Long v2) throws Exception{
return v1 - v2;
}
}
}
MySQLへの書き込み
よくあるシナリオで、Streamingで取得したデータをDBへoutputです。
DBとの接続では、静的なプールを作成することがお勧めです。
(分散処理によっての接続爆増のコストを避けるため)
先ずはpoolのコード
package com;
import java.sql.Connection;
import java.sql.DriverManager;
import java.util.LinkedList;
public class ConnectionPool {
//リンクリストを作成
private static LinkedList<Connection> connectionQueue;
static {
try{
Class.forName("com.mysql.jdbc.Driver");
}catch (Exception e){
e.printStackTrace();
}
}
public synchronized static Connection getConnection(){
try {
if(connectionQueue == null){
connectionQueue = new LinkedList<>();
}
//connectionは五つまでQueueにpush
for (int i = 0; i < 5; i++){
Connection conn = DriverManager.getConnection("jdbc:mysql://127.0.0.1:8080",
"root","password");
connectionQueue.push(conn);
}
}catch (Exception e){
e.printStackTrace();
}
//Queueからconnectionを取得
return connectionQueue.poll();
}
public static void returnConnection(Connection conn){
connectionQueue.push(conn);
}
}
main処理:30秒ごとのIPアクセス回数をカウントしてMySQLに書き込み
package com;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;
import java.sql.Connection;
import java.sql.Statement;
public class SparkStreamingJDBC {
public static void main(String[] args) throws InterruptedException {
SparkConf sparkConf = new SparkConf().setAppName("SparkCount").setMaster("local[2]");
JavaStreamingContext javaStreamingContext = new JavaStreamingContext(sparkConf, Durations.seconds(30));
//check point
javaStreamingContext.checkpoint("/opt/data1");
JavaDStream<String> dStream = javaStreamingContext.socketTextStream("xiexiaofeng-virtual-machine",8080);
JavaPairDStream<String,Long> namedStream = dStream.mapToPair(s-> new Tuple2(s,1l));
//30秒内IPごとのアクセスをカウント
JavaPairDStream<String, Long> result = namedStream.reduceByKey((v1,v2)->v1+v2);
//結果print
result.print();
//MySQLに書き込み
result.foreachRDD(rdd -> {
rdd.foreachPartition(partitionOfRecords -> {
Connection connection = ConnectionPool.getConnection();
Tuple2<String, Long> ipcount;
while (partitionOfRecords.hasNext()) {
ipcount = partitionOfRecords.next();
String sql = "insert into dtable(ip,count)" + "values ('" + ipcount._1 + "'," + ipcount._2 + ")";
Statement statement = connection.createStatement();
statement.executeUpdate(sql);
}
});
});
try{
javaStreamingContext.start();
javaStreamingContext.awaitTermination();
}catch (Exception e){
e.printStackTrace();
}
}
}
(最近は暑くて怠い、MySQLのインストールがまだなので、結果はいまいち省略させていただきます。)
その他
kafkaとの連携もよくある例ですが、
現在クラウドサービスを利用することが多いではないが(GCPのpubsub、AWSのMSKなど)と考えながら、
dataproc実装の際にまた話をしよう。
基礎編はここまでで、次回は応用編Sparkのmachine learningでの実装です。