LoginSignup
0
0

2024/4/12に翔泳社よりApache Spark徹底入門を出版します!

書籍のサンプルノートブックをウォークスルーしていきます。Python/Chapter04/4-2 Spark Data Sourcesとなります。

翻訳ノートブックのリポジトリはこちら。

ノートブックはこちら

パスの定義

parquet_file = "/databricks-datasets/learning-spark-v2/flights/summary-data/parquet/2010-summary.parquet"
json_file = "/databricks-datasets/learning-spark-v2/flights/summary-data/json/*"
csv_file = "/databricks-datasets/learning-spark-v2/flights/summary-data/csv/*"
orc_file = "/databricks-datasets/learning-spark-v2/flights/summary-data/orc/*"
avro_file = "/databricks-datasets/learning-spark-v2/flights/summary-data/avro/*"
schema = "DEST_COUNTRY_NAME STRING, ORIGIN_COUNTRY_NAME STRING, count INT"

Parquetデータソース

df = (spark
      .read
      .format("parquet")
      .option("path", parquet_file)
      .load())
display(df)

Screenshot 2024-03-27 at 13.43.57.png

このAPIのバリエーションを用いて同じデータを読み込む別の方法。

df2 = spark.read.parquet(parquet_file)
display(df2)

Screenshot 2024-03-27 at 13.43.57.png

SQLの活用

これは アンマネージドの 一時ビューを作成します。

%sql
CREATE OR REPLACE TEMPORARY VIEW us_delay_flights_tbl
    USING parquet
    OPTIONS (
      path "/databricks-datasets/definitive-guide/data/flight-data/parquet/2010-summary.parquet"
    )

テーブルをクエリーするためにSQLを使います。

結果は上のデータフレームに読み込んだものと同じになります。

spark.sql("SELECT * FROM us_delay_flights_tbl").display()

Screenshot 2024-03-27 at 13.43.57.png

JSONデータソース

df = spark.read.format("json").option("path", json_file).load()
display(df)

Screenshot 2024-03-27 at 13.46.03.png

df2 = spark.read.json(json_file)
display(df2)

Screenshot 2024-03-27 at 13.46.03.png

SQLの活用

%sql
CREATE OR REPLACE TEMPORARY VIEW us_delay_flights_tbl
    USING json
    OPTIONS (
      path "/databricks-datasets/learning-spark-v2/flights/summary-data/json/*"
    )
spark.sql("SELECT * FROM us_delay_flights_tbl").display()

Screenshot 2024-03-27 at 13.46.03.png

CSVデータソース

df = (spark
      .read
	 .format("csv")
	 .option("header", "true")
	 .schema(schema)
	 .option("mode", "FAILFAST")  # エラーが起きたら終了
	 .option("nullValue", "")	  # nullデータを""で置換
	 .option("path", csv_file)
	 .load())
display(df)

Screenshot 2024-03-27 at 13.50.38.png

SQLの活用

%sql
CREATE OR REPLACE TEMPORARY VIEW us_delay_flights_tbl
    USING csv
    OPTIONS (
      path "/databricks-datasets/learning-spark-v2/flights/summary-data/csv/*",
      header "true",
      inferSchema "true",
      mode "FAILFAST"
    )
spark.sql("SELECT * FROM us_delay_flights_tbl").display()

Screenshot 2024-03-27 at 13.50.38.png

ORCデータソース

df = (spark.read
      .format("orc")
      .option("path", orc_file)
      .load())
display(df)

Screenshot 2024-03-27 at 13.52.40.png

SQLの活用

%sql
CREATE OR REPLACE TEMPORARY VIEW us_delay_flights_tbl
    USING orc
    OPTIONS (
      path "/databricks-datasets/learning-spark-v2/flights/summary-data/orc/*"
    )
spark.sql("SELECT * FROM us_delay_flights_tbl").show(10, truncate=False)
+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|United States    |Romania            |1    |
|United States    |Ireland            |264  |
|United States    |India              |69   |
|Egypt            |United States      |24   |
|Equatorial Guinea|United States      |1    |
|United States    |Singapore          |25   |
|United States    |Grenada            |54   |
|Costa Rica       |United States      |477  |
|Senegal          |United States      |29   |
|United States    |Marshall Islands   |44   |
+-----------------+-------------------+-----+
only showing top 10 rows

Avroデータソース

df = (spark.read
      .format("avro")
      .option("path", avro_file)
      .load())
display(df)

Screenshot 2024-03-27 at 13.53.55.png

SQLの活用

%sql
CREATE OR REPLACE TEMPORARY VIEW us_delay_flights_tbl
    USING avro
    OPTIONS (
      path "/databricks-datasets/learning-spark-v2/flights/summary-data/avro/*"
    )
spark.sql("SELECT * FROM us_delay_flights_tbl").show(10, truncate=False)
+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|United States    |Romania            |1    |
|United States    |Ireland            |264  |
|United States    |India              |69   |
|Egypt            |United States      |24   |
|Equatorial Guinea|United States      |1    |
|United States    |Singapore          |25   |
|United States    |Grenada            |54   |
|Costa Rica       |United States      |477  |
|Senegal          |United States      |29   |
|United States    |Marshall Islands   |44   |
+-----------------+-------------------+-----+
only showing top 10 rows

画像

from pyspark.ml import image

image_dir = "/databricks-datasets/cctvVideos/train_images/"
images_df = spark.read.format("image").load(image_dir)
images_df.printSchema()

images_df.select("image.height", "image.width", "image.nChannels", "image.mode", "label").show(5, truncate=False)
root
 |-- image: struct (nullable = true)
 |    |-- origin: string (nullable = true)
 |    |-- height: integer (nullable = true)
 |    |-- width: integer (nullable = true)
 |    |-- nChannels: integer (nullable = true)
 |    |-- mode: integer (nullable = true)
 |    |-- data: binary (nullable = true)
 |-- label: integer (nullable = true)

+------+-----+---------+----+-----+
|height|width|nChannels|mode|label|
+------+-----+---------+----+-----+
|288   |384  |3        |16  |0    |
|288   |384  |3        |16  |1    |
|288   |384  |3        |16  |0    |
|288   |384  |3        |16  |0    |
|288   |384  |3        |16  |0    |
+------+-----+---------+----+-----+
only showing top 5 rows
display(images_df)

Screenshot 2024-03-27 at 13.55.27.png

バイナリー

path = "/databricks-datasets/learning-spark-v2/cctvVideos/train_images/"
binary_files_df = (spark
                   .read
                   .format("binaryFile")
                   .option("pathGlobFilter", "*.jpg")
                   .load(path))
binary_files_df.show(5)
+--------------------+-------------------+------+--------------------+-----+
|                path|   modificationTime|length|             content|label|
+--------------------+-------------------+------+--------------------+-----+
|dbfs:/databricks-...|2020-01-02 20:42:21| 55037|[FF D8 FF E0 00 1...|    0|
|dbfs:/databricks-...|2020-01-02 20:42:31| 54634|[FF D8 FF E0 00 1...|    1|
|dbfs:/databricks-...|2020-01-02 20:42:21| 54624|[FF D8 FF E0 00 1...|    0|
|dbfs:/databricks-...|2020-01-02 20:42:22| 54505|[FF D8 FF E0 00 1...|    0|
|dbfs:/databricks-...|2020-01-02 20:42:22| 54475|[FF D8 FF E0 00 1...|    0|
+--------------------+-------------------+------+--------------------+-----+
only showing top 5 rows

ディレクトリのパーティションデータディスカバリーを無視するには、recursiveFileLookuptrueに設定します。

binary_files_df = (spark
                   .read
                   .format("binaryFile")
                   .option("pathGlobFilter", "*.jpg")
                   .option("recursiveFileLookup", "true")
                   .load(path))
binary_files_df.show(5)
+--------------------+-------------------+------+--------------------+
|                path|   modificationTime|length|             content|
+--------------------+-------------------+------+--------------------+
|dbfs:/databricks-...|2020-01-02 20:42:21| 55037|[FF D8 FF E0 00 1...|
|dbfs:/databricks-...|2020-01-02 20:42:31| 54634|[FF D8 FF E0 00 1...|
|dbfs:/databricks-...|2020-01-02 20:42:21| 54624|[FF D8 FF E0 00 1...|
|dbfs:/databricks-...|2020-01-02 20:42:22| 54505|[FF D8 FF E0 00 1...|
|dbfs:/databricks-...|2020-01-02 20:42:22| 54475|[FF D8 FF E0 00 1...|
+--------------------+-------------------+------+--------------------+
only showing top 5 rows

はじめてのDatabricks

はじめてのDatabricks

Databricks無料トライアル

Databricks無料トライアル

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0