LoginSignup
0
0

More than 1 year has passed since last update.

【初心者】ApacheSpark基礎編-SparkStreaming

Posted at

前回はこちらです。
https://qiita.com/SHA_AKA/items/f57cbe11b208282103e3
基礎編の最後は、SparkStreamingについて実装しようと思います。
環境は前回と同じです

目次

  1. pom.xmlの更新とncatのダウンロード
  2. stateless&stateful
  3. 最初のコード(stateless)
  4. window処理(stateful)
  5. MySQLへの書き込み
  6. その他

pom.xmlの更新とncatのダウンロード

pom.xmlに下記の依頼関係を追加

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.13</artifactId>
    <version>3.2.1</version>
    <scope>provided</scope>
</dependency>

ローカルportを監視して、データを取るため、ncatのダウンロードします。

sudo apt-get -y install ncat

インストール完了後、新たなターミナルを開けて、下記のコマンドを実行

nc -lk 8080

stateless&stateful

ストリーミング処理本質的には、バッチ処理です。処理の時間間隔がすごく短くだけなので、見た目は「連続」になります。
SparkStreamingでは、statelessとstateful二つの変換操作があります。
簡単に説明すると、
・stateless
今回取得のデータのみ、前後のデータとは関係ない操作
・stateful
今回のデータだけではなく、前後のデータまたはその計算結果にも関わる操作
最初のコードで先ずはstatelessの例を挙げます。

最初のコード(stateless)

実行例:errorを含むレコードを抽出

package com;

import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

public class SparkStreaming {
    public static void main(String[] args) throws InterruptedException {
        /*
        conf設定、local[2]とは2 thread
        JavaStreamingContext設定、Durations.seconds(5)とは処理間隔が5秒
         */
        SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("sparkStreamIng");
        JavaStreamingContext javaStreamingContext = new JavaStreamingContext(conf, Durations.seconds(5));

        /*
        InputDStream:IP:localhost,PORT:8080
        errorを含む行を抽出
         */
        JavaReceiverInputDStream<String> inputDStream = javaStreamingContext.socketTextStream("xiexiaofeng-virtual-machine",8080);
        JavaDStream<String> errorLine = inputDStream.filter(s -> s.contains("error"));

        //errorLineをプリント
        errorLine.print();

        /*
        監視開始
         */
        javaStreamingContext.start();
        javaStreamingContext.awaitTermination();
    }
}

上記の例によって、port8080を監視して、5秒ごとにその内容を読み込んで、"error"という文字を含むレコードを抽出できます。
結果は下記のように
error.png

window処理(stateful)

statefulの操作では、window関数により時間間隔を指定することが必要です。
・window関数:一定時間間隔のデータを処理する
・window length:windowの長さ、つまり時間間隔
・sliding interval:処理の頻度
また、stateful操作ではチェックポイントが必要です。
コード例:30秒ごとにIPごとのアクセス回数カウントして、10秒一回実施する

package com;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

import scala.Tuple2;

public class SparkStreamingCount {
    public static void main(String[] args) throws InterruptedException {
        SparkConf sparkConf = new SparkConf().setAppName("SparkCount").setMaster("local[2]");
        JavaStreamingContext javaStreamingContext = new JavaStreamingContext(sparkConf, Durations.seconds(10));

        //check point
        javaStreamingContext.checkpoint("/opt/data1");

        JavaDStream<String> dStream = javaStreamingContext.socketTextStream("xiexiaofeng-virtual-machine",8080);
        JavaPairDStream<String,Long> namedStream = dStream.mapToPair(s-> new Tuple2(s,1l));
        /*
        window関数:一定時間間隔のデータを処理する
        window length:windowの長さ、つまり時間間隔、ここでは30秒
        sliding interval:処理の頻度、ここでは10秒
         */
        JavaPairDStream<String, Long> result = namedStream.reduceByKeyAndWindow(new Add(), new Minus(), Durations.seconds(30), Durations.seconds(10));
        //print
        result.print();
        //start
        javaStreamingContext.start();
        javaStreamingContext.awaitTermination();
    }
    static class Add implements Function2<Long,Long,Long> {
        @Override
        public Long call(Long v1, Long v2) throws Exception{
            return v1 + v2;
        }
    }

    static class Minus implements Function2<Long,Long,Long> {
        @Override
        public Long call(Long v1, Long v2) throws Exception{
            return v1 - v2;
        }
    }

}

MySQLへの書き込み

よくあるシナリオで、Streamingで取得したデータをDBへoutputです。
DBとの接続では、静的なプールを作成することがお勧めです。
(分散処理によっての接続爆増のコストを避けるため)
先ずはpoolのコード

package com;

import java.sql.Connection;
import java.sql.DriverManager;
import java.util.LinkedList;

public class ConnectionPool {
    //リンクリストを作成
    private static LinkedList<Connection> connectionQueue;

    static {
        try{
            Class.forName("com.mysql.jdbc.Driver");
        }catch (Exception e){
            e.printStackTrace();
        }
    }

    public synchronized static Connection getConnection(){
        try {
            if(connectionQueue == null){
                connectionQueue = new LinkedList<>();
            }
            //connectionは五つまでQueueにpush
            for (int i = 0; i < 5; i++){
                Connection conn = DriverManager.getConnection("jdbc:mysql://127.0.0.1:8080",
                        "root","password");
                connectionQueue.push(conn);
            }
        }catch (Exception e){
            e.printStackTrace();
        }
        //Queueからconnectionを取得
        return connectionQueue.poll();
    }

    public static void returnConnection(Connection conn){
        connectionQueue.push(conn);
    }

}

main処理:30秒ごとのIPアクセス回数をカウントしてMySQLに書き込み

package com;

import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;

import java.sql.Connection;
import java.sql.Statement;

public class SparkStreamingJDBC {
    public static void main(String[] args) throws InterruptedException {
        SparkConf sparkConf = new SparkConf().setAppName("SparkCount").setMaster("local[2]");
        JavaStreamingContext javaStreamingContext = new JavaStreamingContext(sparkConf, Durations.seconds(30));

        //check point
        javaStreamingContext.checkpoint("/opt/data1");

        JavaDStream<String> dStream = javaStreamingContext.socketTextStream("xiexiaofeng-virtual-machine",8080);
        JavaPairDStream<String,Long> namedStream = dStream.mapToPair(s-> new Tuple2(s,1l));
        //30秒内IPごとのアクセスをカウント
        JavaPairDStream<String, Long> result = namedStream.reduceByKey((v1,v2)->v1+v2);
        //結果print
        result.print();
        //MySQLに書き込み
        result.foreachRDD(rdd -> {
            rdd.foreachPartition(partitionOfRecords -> {
                Connection connection = ConnectionPool.getConnection();
                Tuple2<String, Long> ipcount;

                while (partitionOfRecords.hasNext()) {
                    ipcount = partitionOfRecords.next();
                    String sql = "insert into dtable(ip,count)" + "values ('" + ipcount._1 + "'," + ipcount._2 + ")";
                    Statement statement = connection.createStatement();
                    statement.executeUpdate(sql);
                }
            });
        });

        try{
            javaStreamingContext.start();
            javaStreamingContext.awaitTermination();
        }catch (Exception e){
            e.printStackTrace();
        }

    }
}

(最近は暑くて怠い、MySQLのインストールがまだなので、結果はいまいち省略させていただきます。)

その他

kafkaとの連携もよくある例ですが、
現在クラウドサービスを利用することが多いではないが(GCPのpubsub、AWSのMSKなど)と考えながら、
dataproc実装の際にまた話をしよう。
基礎編はここまでで、次回は応用編Sparkのmachine learningでの実装です。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0