入門編はこちらです。
https://qiita.com/SHA_AKA/items/455a3116b8e95a680753
今回はSparkSQLによって、色々なデータフォーマットを読み込み・書き出すを実行しようと思います。
SparkSQLとはSparkのAPIの一つという雑な理解をしています。
前回と同じ基礎知識が全くない、RDDとDataFrameとDataSetの違いなどは説明できないのでご了承ください。
または、今回はバッチ処理限り、ストリーミングが応用編までに。
目次
- 前準備
- pom.xmlの更新
- 実行コード
- JDBCによりDBとの連携
- その他
前準備
実験データとしては、バンダイナムコさんの約一週間(7/19~7/22)の株価です。
フォーマットとしては、テキストファイル含め、json,csv,avro,parquet,hive-table
お先にcsvとjsonファイル両方用意しました。
CSVファイル
share_code | datetime_JST | open | high | low | close | volume |
---|---|---|---|---|---|---|
7832 | 2022-07-19 09:00:00 | 9960.0 | 10010.0 | 9950.0 | 9950.0 | 0.0 |
7832 | 2022-07-19 09:01:00 | 9879.0 | 9901.0 | 9876.0 | 9901.0 | 12200.0 |
7832 | 2022-07-19 09:02:00 | 9892.0 | 9898.0 | 9888.0 | 9888.0 | 900.0 |
7832 | 2022-07-19 09:03:00 | 9883.0 | 9900.0 | 9881.0 | 9900.0 | 2300.0 |
7832 | 2022-07-19 09:04:00 | 9894.0 | 9905.0 | 9889.0 | 9905.0 | 1600.0 |
Jsonファイル
[{"share_code":7832,"datetime_JST":"2022-07-19 09:00:00","open":9960.0,"high":10010.0,"low":9950.0,"close":9950.0,"volume":0.0},
{"share_code":7832,"datetime_JST":"2022-07-19 09:01:00","open":9879.0,"high":9901.0,"low":9876.0,"close":9901.0,"volume":12200.0},
{"share_code":7832,"datetime_JST":"2022-07-19 09:02:00","open":9892.0,"high":9898.0,"low":9888.0,"close":9888.0,"volume":900.0},
{"share_code":7832,"datetime_JST":"2022-07-19 09:03:00","open":9883.0,"high":9900.0,"low":9881.0,"close":9900.0,"volume":2300.0},
{"share_code":7832,"datetime_JST":"2022-07-19 09:04:00","open":9894.0,"high":9905.0,"low":9889.0,"close":9905.0,"volume":1600.0},
...]
テキストファイルは「星の王子さま」の第一章の一部です。
we are introduced to the narrator, a pilot, and his ideas about grown-ups
Once when I was six years old I saw a magnificent1 picture in a book, called True Stories from Nature, about the primeval forest. It was a picture of a boa constrictor in the act of swallowing an animal. Here is a copy of the drawing...
pom.xmlの更新
SparkSQLとSparkAvroの利用は、pom.xmlに下記の追加が必要です。
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.2.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-avro_2.12</artifactId>
<version>3.2.1</version>
</dependency>
実行コード
説明はコメントまで
package com;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import java.io.File;
public class SparkFileRead {
public static void main(String[] args) {
SparkConf sparkConf = new SparkConf().setAppName("spark-ReadFile").setMaster("local");
JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);
//LogLevelを“Error”に設定
sparkContext.setLogLevel("Error");
/*テキストファイルの読み込み
PATHがディレクトリであれば、その中の全てのファイル、
“*.txt”などの利用も可能、
sparkContext.wholeTextFiles()では、pairRDDが返され、Keyはファイル名、Valueはファイルの内容
*/
JavaRDD<String> lines = sparkContext.textFile("/opt/data/text.txt");
for (String line:lines.take(10)){
System.out.println("* "+line);
}
/*Json、CSV、Parquet、Hiveテーブル、
または他のDBからのファイルなどの読み込みはSparkSQLで楽にできます。
*/
//Hiveテーブル作成ため、ディレクトリspark-warehouseを作成
String warehouseLocation = new File("spark-warehouse").getAbsolutePath();
//SparkSQLのセクションを設定
SparkSession sparkSession = SparkSession.builder().master("local")
.appName("Java Spark SQL")
.config("spark.sql.warehouse.dir", warehouseLocation)
.enableHiveSupport()
.getOrCreate();
//jsonファイルの読み込み
Dataset<Row> dataset_js = sparkSession.read().json("/opt/data/code7832.json");
/*how to save.json
dataset_js.write().format("json").save("code7832.json");
*/
System.out.println("Json: ");
dataset_js.show(10);
//CSVファイルの読み込み
Dataset<Row> dataset_csv = sparkSession.read().option("delimiter", ",").option("header", "true").csv("/opt/data/code7832.csv");
/* how to save.csv
dataset_csv.write.option("header",true).csv("code7832.csv")
*/
System.out.println("csv: ");
dataset_csv.show(10);
/*Parquetファイルの読み込み
手元にParquetのファイルがないので、Jsonファイルから変更してから利用します。
*/
//how to save.parquet
dataset_js.write().parquet("/opt/data/code7832.parquet");
Dataset<Row> dataset_parquet = sparkSession.read().parquet("/opt/data/code7832.parquet");
System.out.println("parquet: ");
dataset_parquet.show(10);
/*Avroファイル
jsonファイルにより作成、その後読み込み
*/
dataset_csv.select("*").write().format("avro").save("/opt/data/code7832.avro");
//how to save.avro
Dataset<Row> dataset_avro = sparkSession.read().format("avro").load("/opt/data/code7832.avro");
System.out.println("avro: ");
dataset_avro.show(10);
/*Hiveテーブル
こちらも先CSVファイルにより作成します。
実際連携の場合がもっと複雑になるかもしれませんが、
ここはSQL文だけ
*/
sparkSession.sql("CREATE TABLE bandainamco (share_code INT,datetime_JST timestamp,open FLOAT,high FLOAT,low FLOAT,close FLOAT,volume FLOAT) USING CSV\n" +
"OPTIONS (path \"/opt/data/code7832.csv\",\n" +
" delimiter \",\",\n" +
" header \"true\")\n" +
" ;");
System.out.println("hive: ");
sparkSession.sql("SELECT * FROM bandainamco limit 10").show();
}
}
実行コマンド
/opt/spark-3.2.1-bin-hadoop3.2/bin/spark-submit --master local[3] --executor-memory 1024m --packages org.apache.spark:spark-avro_2.12:3.2.1 --class com.SparkFileRead SparkTest-1.0-SNAPSHOT.jar
Avroのパッケージを利用するため、optionとして--org.apache.spark:spark-avro_2.12:3.2.1
が必要です。
各フォーマットの結果は下記のようなDataFrameになります。
DataFrameになったら、色々なメソッドがあるのでデータの処理が楽になると思います。
JDBCによりDBとの連携
以下のデータベース用の組み込み接続プロバイダがあります:
DB2
MariaDB
Microsoft SQL Server
Oracle
PostgreSQL
MySQL
下記のコードは実際動くことがなく、DBをインストールするのはややめんどくさいな…
package com;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import java.util.Properties;
public class jdbcconnect {
public static void main(String[] args) {
//セクションの設定
SparkSession sparkSession = SparkSession.builder().master("local")
.appName("Java Spark SQL")
.enableHiveSupport()
.getOrCreate();
//jdbcによりDBからデータを読み込み・その1
Dataset<Row> jdbcDF = sparkSession.read()
.format("jdbc")
.option("url", "jdbc:postgresql:dbserver")
.option("dbtable", "schema.tablename")
.option("user", "username")
.option("password", "password")
.load();
//jdbcによりDBからデータを読み込み・その2
Properties connectionProperties = new Properties();
connectionProperties.put("user", "username");
connectionProperties.put("password", "password");
Dataset<Row> jdbcDF2 = sparkSession.read()
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties);
// DBへの書き出し
jdbcDF.write()
.format("jdbc")
.option("url", "jdbc:postgresql:dbserver")
.option("dbtable", "schema.tablename")
.option("user", "username")
.option("password", "password")
.save();
jdbcDF2.write()
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties);
// テーブルを作成してから書き出し
jdbcDF.write()
.option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)")
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties);
}
}
その他
主に下記のサイドを参考
http://mogile.web.fc2.com/spark/sql-programming-guide.html
パフォーマンスのチューニングが全く考えていなく、最低限実行できるものをだけ出すことです。
次回は共有変数について実装してみようと思います。