0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Sparkのコンポーネント

Posted at

概要

Sparkには様々なコンポーネントが用意されています。

  • Spark SQL
  • Spark MLlib
  • Spark Structured Streaming
  • GrephX

これらは全てSparkのライブラリですが、
耐障害性の機能は分かれています。

これらは、scala, SQL, Python, Java, Rで実装することが出来ます。

Spark SQL

Spark SQLで使えるデータは以下です。

  • RDBMS
  • CSV
  • text
  • JSON
  • Avro
  • ORC
  • Parquet
  • AWS S3

読み込まれたデータは、Spark DataFrameとして扱われます。
データの操作は、SQLと同じように操作をすることが出来ます。

//データを読み込みDataframeとして扱う
spark.read.json("ファイルパス")

//SQLを使いデータ操作
spark.sql("""SELECT id, name FROM customer WHERE city = 'tokyo' ORDER BY id""")

Saprk MLlib

MLlibは、一般的な機械学習モデルを、APIで構築することが出来ます。
APIは、引用・変換した特徴量、学習評価のためのパイプライン、学習済みモデルのデプロイが許可されています。
また、線形代数や統計学を使うこともできます。

//ライブラリの読み込み
from pyspark.ml.classification import LogisticRegression

//CSV読み込み
training = spark.read.csv("ファイルパス")
test = spark.read.csv("ファイルパス")

//モデル定義
lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)

//学習
lrModel = lr.fit(training)

//予測
lrModel.transform(test)

Spark Structured Streaming

Apache Spark2.0から、以下が提供されました。

  • experimental Continuous Streaming model
  • Structured Streaming APIs
  • Spark SQLエンジン、DataFrame-based APIs

Spark2.2から、エンジニアや開発者の環境下で使うことが、広い範囲で使えるようになりました。

また、Structures Streaming modelの下の、Spark SQLのエンジンでは、
straming Applicationなどを、耐障害性のある処理で扱うことが出来ます。

そして、Spark3.0ではストリーミングデータのリソースに以下が追加されました。

  • Apache Kafka
  • Kinesis
  • HDFS-based
  • クラウドストレージ
//ライブラリの読み込み
from pyspark.sql.functions import explode, split

//データ読み込み
lines = (spark
    .readStream
    .format("socket")
    .option("host", "localhost")
    .option("port", 9999)
    .load())

//データ分割
words = lines.select(explode(split(lines.value, " ")).alias("word"))

//wordのカウント
word_counts = words.groupBy("word").count()

//Kafkaへ書き込み
query = (word_counts
    .writeStream
    .format("kafka")
    .option("topic", "output"))

GraphX

GraphXは、分析、コネクション、探索など。様々なグラフの処理に使われます。

val graph = Graph(vertices, edges)
messages = spark.textFile("ファイルパス")
val graph2 = graph.joinVertices(message){
    (id, vertex, msg) => ...
}
0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?