0
0

More than 1 year has passed since last update.

【初心者】ApacheSpark基礎編-SparkSQL

Posted at

入門編はこちらです。
https://qiita.com/SHA_AKA/items/455a3116b8e95a680753
今回はSparkSQLによって、色々なデータフォーマットを読み込み・書き出すを実行しようと思います。
SparkSQLとはSparkのAPIの一つという雑な理解をしています。
前回と同じ基礎知識が全くない、RDDとDataFrameとDataSetの違いなどは説明できないのでご了承ください。
または、今回はバッチ処理限り、ストリーミングが応用編までに。

目次

  1. 前準備
  2. pom.xmlの更新
  3. 実行コード
  4. JDBCによりDBとの連携
  5. その他

前準備

実験データとしては、バンダイナムコさんの約一週間(7/19~7/22)の株価です。
フォーマットとしては、テキストファイル含め、json,csv,avro,parquet,hive-table
お先にcsvとjsonファイル両方用意しました。
CSVファイル

share_code datetime_JST open high low close volume
7832 2022-07-19 09:00:00 9960.0 10010.0 9950.0 9950.0 0.0
7832 2022-07-19 09:01:00 9879.0 9901.0 9876.0 9901.0 12200.0
7832 2022-07-19 09:02:00 9892.0 9898.0 9888.0 9888.0 900.0
7832 2022-07-19 09:03:00 9883.0 9900.0 9881.0 9900.0 2300.0
7832 2022-07-19 09:04:00 9894.0 9905.0 9889.0 9905.0 1600.0

Jsonファイル

[{"share_code":7832,"datetime_JST":"2022-07-19 09:00:00","open":9960.0,"high":10010.0,"low":9950.0,"close":9950.0,"volume":0.0},
{"share_code":7832,"datetime_JST":"2022-07-19 09:01:00","open":9879.0,"high":9901.0,"low":9876.0,"close":9901.0,"volume":12200.0},
{"share_code":7832,"datetime_JST":"2022-07-19 09:02:00","open":9892.0,"high":9898.0,"low":9888.0,"close":9888.0,"volume":900.0},
{"share_code":7832,"datetime_JST":"2022-07-19 09:03:00","open":9883.0,"high":9900.0,"low":9881.0,"close":9900.0,"volume":2300.0},
{"share_code":7832,"datetime_JST":"2022-07-19 09:04:00","open":9894.0,"high":9905.0,"low":9889.0,"close":9905.0,"volume":1600.0},
...]

テキストファイルは「星の王子さま」の第一章の一部です。

we are introduced to the narrator, a pilot, and his ideas about grown-ups
Once when I was six years old I saw a magnificent1 picture in a book, called True Stories from Nature, about the primeval forest. It was a picture of a boa constrictor in the act of swallowing an animal. Here is a copy of the drawing...

pom.xmlの更新

SparkSQLとSparkAvroの利用は、pom.xmlに下記の追加が必要です。

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.12</artifactId>
            <version>3.2.1</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-avro_2.12</artifactId>
            <version>3.2.1</version>
        </dependency>

実行コード

説明はコメントまで

package com;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

import java.io.File;


public class SparkFileRead {
    public static void main(String[] args) {
        SparkConf sparkConf = new SparkConf().setAppName("spark-ReadFile").setMaster("local");
        JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);
        //LogLevelを“Error”に設定
        sparkContext.setLogLevel("Error");

        /*テキストファイルの読み込み
        PATHがディレクトリであれば、その中の全てのファイル、
        “*.txt”などの利用も可能、
        sparkContext.wholeTextFiles()では、pairRDDが返され、Keyはファイル名、Valueはファイルの内容
         */
        JavaRDD<String> lines = sparkContext.textFile("/opt/data/text.txt");
        for (String line:lines.take(10)){
            System.out.println("* "+line);
        }

        /*Json、CSV、Parquet、Hiveテーブル、
        または他のDBからのファイルなどの読み込みはSparkSQLで楽にできます。
         */

        //Hiveテーブル作成ため、ディレクトリspark-warehouseを作成
        String warehouseLocation = new File("spark-warehouse").getAbsolutePath();

        //SparkSQLのセクションを設定
        SparkSession sparkSession = SparkSession.builder().master("local")
                .appName("Java Spark SQL")
                .config("spark.sql.warehouse.dir", warehouseLocation)
                .enableHiveSupport()
                .getOrCreate();

        //jsonファイルの読み込み
        Dataset<Row> dataset_js = sparkSession.read().json("/opt/data/code7832.json");
        /*how to save.json
        dataset_js.write().format("json").save("code7832.json");
         */
        System.out.println("Json: ");
        dataset_js.show(10);

        //CSVファイルの読み込み
        Dataset<Row> dataset_csv = sparkSession.read().option("delimiter", ",").option("header", "true").csv("/opt/data/code7832.csv");
        /* how to save.csv
        dataset_csv.write.option("header",true).csv("code7832.csv")
         */
        System.out.println("csv: ");
        dataset_csv.show(10);

        /*Parquetファイルの読み込み
         手元にParquetのファイルがないので、Jsonファイルから変更してから利用します。
         */
        //how to save.parquet
        dataset_js.write().parquet("/opt/data/code7832.parquet");
        Dataset<Row> dataset_parquet = sparkSession.read().parquet("/opt/data/code7832.parquet");
        System.out.println("parquet: ");
        dataset_parquet.show(10);

        /*Avroファイル
          jsonファイルにより作成、その後読み込み
         */
        dataset_csv.select("*").write().format("avro").save("/opt/data/code7832.avro");
        //how to save.avro
        Dataset<Row> dataset_avro = sparkSession.read().format("avro").load("/opt/data/code7832.avro");
        System.out.println("avro: ");
        dataset_avro.show(10);

        /*Hiveテーブル
        こちらも先CSVファイルにより作成します。
        実際連携の場合がもっと複雑になるかもしれませんが、
        ここはSQL文だけ
         */
        sparkSession.sql("CREATE TABLE bandainamco (share_code INT,datetime_JST timestamp,open FLOAT,high FLOAT,low FLOAT,close FLOAT,volume FLOAT) USING CSV\n" +
                "OPTIONS (path \"/opt/data/code7832.csv\",\n" +
                "        delimiter \",\",\n" +
                "        header \"true\")\n" +
                "        ;");

        System.out.println("hive: ");
        sparkSession.sql("SELECT * FROM bandainamco limit 10").show();
    }

}

実行コマンド

/opt/spark-3.2.1-bin-hadoop3.2/bin/spark-submit --master local[3] --executor-memory 1024m --packages org.apache.spark:spark-avro_2.12:3.2.1 --class com.SparkFileRead  SparkTest-1.0-SNAPSHOT.jar

Avroのパッケージを利用するため、optionとして--org.apache.spark:spark-avro_2.12:3.2.1が必要です。
各フォーマットの結果は下記のようなDataFrameになります。
dataframe.png
DataFrameになったら、色々なメソッドがあるのでデータの処理が楽になると思います。

JDBCによりDBとの連携

以下のデータベース用の組み込み接続プロバイダがあります:
DB2
MariaDB
Microsoft SQL Server
Oracle
PostgreSQL
MySQL
下記のコードは実際動くことがなく、DBをインストールするのはややめんどくさいな…

package com;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

import java.util.Properties;

public class jdbcconnect {

    public static void main(String[] args) {
        //セクションの設定
        SparkSession sparkSession = SparkSession.builder().master("local")
                .appName("Java Spark SQL")
                .enableHiveSupport()
                .getOrCreate();
        //jdbcによりDBからデータを読み込み・その1
        Dataset<Row> jdbcDF = sparkSession.read()
                .format("jdbc")
                .option("url", "jdbc:postgresql:dbserver")
                .option("dbtable", "schema.tablename")
                .option("user", "username")
                .option("password", "password")
                .load();
        //jdbcによりDBからデータを読み込み・その2
        Properties connectionProperties = new Properties();
        connectionProperties.put("user", "username");
        connectionProperties.put("password", "password");
        Dataset<Row> jdbcDF2 = sparkSession.read()
                .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties);

        // DBへの書き出し
        jdbcDF.write()
                .format("jdbc")
                .option("url", "jdbc:postgresql:dbserver")
                .option("dbtable", "schema.tablename")
                .option("user", "username")
                .option("password", "password")
                .save();

        jdbcDF2.write()
                .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties);

        // テーブルを作成してから書き出し
        jdbcDF.write()
                .option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)")
                .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties);
    }
}

その他

主に下記のサイドを参考
http://mogile.web.fc2.com/spark/sql-programming-guide.html
パフォーマンスのチューニングが全く考えていなく、最低限実行できるものをだけ出すことです。
次回は共有変数について実装してみようと思います。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0