2024/4/12に翔泳社よりApache Spark徹底入門を出版します!
書籍のサンプルノートブックをウォークスルーしていきます。Python/Chapter04/4-2 Spark Data Sources
となります。
翻訳ノートブックのリポジトリはこちら。
ノートブックはこちら
パスの定義
parquet_file = "/databricks-datasets/learning-spark-v2/flights/summary-data/parquet/2010-summary.parquet"
json_file = "/databricks-datasets/learning-spark-v2/flights/summary-data/json/*"
csv_file = "/databricks-datasets/learning-spark-v2/flights/summary-data/csv/*"
orc_file = "/databricks-datasets/learning-spark-v2/flights/summary-data/orc/*"
avro_file = "/databricks-datasets/learning-spark-v2/flights/summary-data/avro/*"
schema = "DEST_COUNTRY_NAME STRING, ORIGIN_COUNTRY_NAME STRING, count INT"
Parquetデータソース
df = (spark
.read
.format("parquet")
.option("path", parquet_file)
.load())
display(df)
このAPIのバリエーションを用いて同じデータを読み込む別の方法。
df2 = spark.read.parquet(parquet_file)
display(df2)
SQLの活用
これは アンマネージドの 一時ビューを作成します。
%sql
CREATE OR REPLACE TEMPORARY VIEW us_delay_flights_tbl
USING parquet
OPTIONS (
path "/databricks-datasets/definitive-guide/data/flight-data/parquet/2010-summary.parquet"
)
テーブルをクエリーするためにSQLを使います。
結果は上のデータフレームに読み込んだものと同じになります。
spark.sql("SELECT * FROM us_delay_flights_tbl").display()
JSONデータソース
df = spark.read.format("json").option("path", json_file).load()
display(df)
df2 = spark.read.json(json_file)
display(df2)
SQLの活用
%sql
CREATE OR REPLACE TEMPORARY VIEW us_delay_flights_tbl
USING json
OPTIONS (
path "/databricks-datasets/learning-spark-v2/flights/summary-data/json/*"
)
spark.sql("SELECT * FROM us_delay_flights_tbl").display()
CSVデータソース
df = (spark
.read
.format("csv")
.option("header", "true")
.schema(schema)
.option("mode", "FAILFAST") # エラーが起きたら終了
.option("nullValue", "") # nullデータを""で置換
.option("path", csv_file)
.load())
display(df)
SQLの活用
%sql
CREATE OR REPLACE TEMPORARY VIEW us_delay_flights_tbl
USING csv
OPTIONS (
path "/databricks-datasets/learning-spark-v2/flights/summary-data/csv/*",
header "true",
inferSchema "true",
mode "FAILFAST"
)
spark.sql("SELECT * FROM us_delay_flights_tbl").display()
ORCデータソース
df = (spark.read
.format("orc")
.option("path", orc_file)
.load())
display(df)
SQLの活用
%sql
CREATE OR REPLACE TEMPORARY VIEW us_delay_flights_tbl
USING orc
OPTIONS (
path "/databricks-datasets/learning-spark-v2/flights/summary-data/orc/*"
)
spark.sql("SELECT * FROM us_delay_flights_tbl").show(10, truncate=False)
+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|United States |Romania |1 |
|United States |Ireland |264 |
|United States |India |69 |
|Egypt |United States |24 |
|Equatorial Guinea|United States |1 |
|United States |Singapore |25 |
|United States |Grenada |54 |
|Costa Rica |United States |477 |
|Senegal |United States |29 |
|United States |Marshall Islands |44 |
+-----------------+-------------------+-----+
only showing top 10 rows
Avroデータソース
df = (spark.read
.format("avro")
.option("path", avro_file)
.load())
display(df)
SQLの活用
%sql
CREATE OR REPLACE TEMPORARY VIEW us_delay_flights_tbl
USING avro
OPTIONS (
path "/databricks-datasets/learning-spark-v2/flights/summary-data/avro/*"
)
spark.sql("SELECT * FROM us_delay_flights_tbl").show(10, truncate=False)
+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|United States |Romania |1 |
|United States |Ireland |264 |
|United States |India |69 |
|Egypt |United States |24 |
|Equatorial Guinea|United States |1 |
|United States |Singapore |25 |
|United States |Grenada |54 |
|Costa Rica |United States |477 |
|Senegal |United States |29 |
|United States |Marshall Islands |44 |
+-----------------+-------------------+-----+
only showing top 10 rows
画像
from pyspark.ml import image
image_dir = "/databricks-datasets/cctvVideos/train_images/"
images_df = spark.read.format("image").load(image_dir)
images_df.printSchema()
images_df.select("image.height", "image.width", "image.nChannels", "image.mode", "label").show(5, truncate=False)
root
|-- image: struct (nullable = true)
| |-- origin: string (nullable = true)
| |-- height: integer (nullable = true)
| |-- width: integer (nullable = true)
| |-- nChannels: integer (nullable = true)
| |-- mode: integer (nullable = true)
| |-- data: binary (nullable = true)
|-- label: integer (nullable = true)
+------+-----+---------+----+-----+
|height|width|nChannels|mode|label|
+------+-----+---------+----+-----+
|288 |384 |3 |16 |0 |
|288 |384 |3 |16 |1 |
|288 |384 |3 |16 |0 |
|288 |384 |3 |16 |0 |
|288 |384 |3 |16 |0 |
+------+-----+---------+----+-----+
only showing top 5 rows
display(images_df)
バイナリー
path = "/databricks-datasets/learning-spark-v2/cctvVideos/train_images/"
binary_files_df = (spark
.read
.format("binaryFile")
.option("pathGlobFilter", "*.jpg")
.load(path))
binary_files_df.show(5)
+--------------------+-------------------+------+--------------------+-----+
| path| modificationTime|length| content|label|
+--------------------+-------------------+------+--------------------+-----+
|dbfs:/databricks-...|2020-01-02 20:42:21| 55037|[FF D8 FF E0 00 1...| 0|
|dbfs:/databricks-...|2020-01-02 20:42:31| 54634|[FF D8 FF E0 00 1...| 1|
|dbfs:/databricks-...|2020-01-02 20:42:21| 54624|[FF D8 FF E0 00 1...| 0|
|dbfs:/databricks-...|2020-01-02 20:42:22| 54505|[FF D8 FF E0 00 1...| 0|
|dbfs:/databricks-...|2020-01-02 20:42:22| 54475|[FF D8 FF E0 00 1...| 0|
+--------------------+-------------------+------+--------------------+-----+
only showing top 5 rows
ディレクトリのパーティションデータディスカバリーを無視するには、recursiveFileLookup
をtrue
に設定します。
binary_files_df = (spark
.read
.format("binaryFile")
.option("pathGlobFilter", "*.jpg")
.option("recursiveFileLookup", "true")
.load(path))
binary_files_df.show(5)
+--------------------+-------------------+------+--------------------+
| path| modificationTime|length| content|
+--------------------+-------------------+------+--------------------+
|dbfs:/databricks-...|2020-01-02 20:42:21| 55037|[FF D8 FF E0 00 1...|
|dbfs:/databricks-...|2020-01-02 20:42:31| 54634|[FF D8 FF E0 00 1...|
|dbfs:/databricks-...|2020-01-02 20:42:21| 54624|[FF D8 FF E0 00 1...|
|dbfs:/databricks-...|2020-01-02 20:42:22| 54505|[FF D8 FF E0 00 1...|
|dbfs:/databricks-...|2020-01-02 20:42:22| 54475|[FF D8 FF E0 00 1...|
+--------------------+-------------------+------+--------------------+
only showing top 5 rows