概要
Sparkには様々なコンポーネントが用意されています。
- Spark SQL
- Spark MLlib
- Spark Structured Streaming
- GrephX
- 他
これらは全てSparkのライブラリですが、
耐障害性の機能は分かれています。
これらは、scala, SQL, Python, Java, Rで実装することが出来ます。
Spark SQL
Spark SQLで使えるデータは以下です。
- RDBMS
- CSV
- text
- JSON
- Avro
- ORC
- Parquet
- AWS S3
- 他
読み込まれたデータは、Spark DataFrameとして扱われます。
データの操作は、SQLと同じように操作をすることが出来ます。
//データを読み込みDataframeとして扱う
spark.read.json("ファイルパス")
//SQLを使いデータ操作
spark.sql("""SELECT id, name FROM customer WHERE city = 'tokyo' ORDER BY id""")
Saprk MLlib
MLlibは、一般的な機械学習モデルを、APIで構築することが出来ます。
APIは、引用・変換した特徴量、学習評価のためのパイプライン、学習済みモデルのデプロイが許可されています。
また、線形代数や統計学を使うこともできます。
//ライブラリの読み込み
from pyspark.ml.classification import LogisticRegression
//CSV読み込み
training = spark.read.csv("ファイルパス")
test = spark.read.csv("ファイルパス")
//モデル定義
lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)
//学習
lrModel = lr.fit(training)
//予測
lrModel.transform(test)
Spark Structured Streaming
Apache Spark2.0から、以下が提供されました。
- experimental Continuous Streaming model
- Structured Streaming APIs
- Spark SQLエンジン、DataFrame-based APIs
Spark2.2から、エンジニアや開発者の環境下で使うことが、広い範囲で使えるようになりました。
また、Structures Streaming modelの下の、Spark SQLのエンジンでは、
straming Applicationなどを、耐障害性のある処理で扱うことが出来ます。
そして、Spark3.0ではストリーミングデータのリソースに以下が追加されました。
- Apache Kafka
- Kinesis
- HDFS-based
- クラウドストレージ
//ライブラリの読み込み
from pyspark.sql.functions import explode, split
//データ読み込み
lines = (spark
.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load())
//データ分割
words = lines.select(explode(split(lines.value, " ")).alias("word"))
//wordのカウント
word_counts = words.groupBy("word").count()
//Kafkaへ書き込み
query = (word_counts
.writeStream
.format("kafka")
.option("topic", "output"))
GraphX
GraphXは、分析、コネクション、探索など。様々なグラフの処理に使われます。
val graph = Graph(vertices, edges)
messages = spark.textFile("ファイルパス")
val graph2 = graph.joinVertices(message){
(id, vertex, msg) => ...
}