LoginSignup
0
1

OCI Data Flow の SQL endpoints を使って Object Storage にあるデータに Query をかける

Last updated at Posted at 2024-02-20

はじめに

Data Flow の SQL endpoints を使用すると、SQLを利用してオブジェクト・ストレージ上のデータを分析することができます。
Data Flow なので Query エンジンは Apache Spark です。

SQL endpoint とは

SQL endpoint を使うと、データレイク (≒Object Storage) にあるデータを直接対話的に問い合せることができます。データの量と複雑さが増すにつれ、データレイク内のデータを変換または移動するコストが高くなるので、ネイティブ形式のまま調査および分析する手段に価値が出てきます。

以下が SQL endpoints の概要図です。

概要図

SQL endpoint は Data Catalog の Metastore に格納されている Managed Table 及び External Table のメタ情報を利用して動作します。
ということで、まず "Managed Table" と "External table" について解説しておきます。

  • Managed Table
    テーブル名だけ指定して DataFrame を永続化 (saveAsTable("<テーブル名>")) すると、Metastore が管理している永続化用バケットの中にデータを永続化します。このテーブルを DROP すると、永続化されたデータも削除されます。

  • External Table (あるいは Unmanaged Table)
    テーブル名に加えて保存先を指定して DataFrame を永続化すると、保存先のデータとテーブル名のリンクが Metastore で管理されます。このテーブルを DROP すると、リンクの情報は削除されますが、永続化されたデータそのものは削除されません。

    External Table を作成するもう一つの方法は、既存のファイルに対して "CREATE TABLE" SQL文を使う方法です。後程 JDBCドライバ経由でこの方法を試します。

    CREATE TABLE <テーブル名> USING <フォーマット> LOCATION '<oci://~>' [OPTIONS (...)]   
    

要するに Managed Table ではデータの実体が管理され、External Table ではデータへのリンクが管理されます。

Spark アプリケーションを Data Flow で作成・実行しなくても、SQL endpoint を使用すれば、JDBC/ODBC 経由で Object Storage 上の任意のオブジェクト(JSON、Parquet、CSV および Avro)に対して External Table を作成してデータ操作ができるというのがミソです。
もちろん Spark アプリケーションで作成・加工したデータを Managed/External Table に永続化して JDBC/ODBC 経由で外部のアプリケーションから利用することもできます。

尚、SQL endpoint がデータにアクセスできるようにするには、(1) SQL endpoint が、データの格納されている Object Storage に対するアクセス権 と、(2) データのメタデータが格納されている Metastore へのアクセス権 の両方を持っている必要があります。詳細はドキュメントを参照してください。

SQL endpoint を使ってみる (準備編)

準備 #1: Data Catalog の Metastore を作成する

最初に、Object Storage のバケット metastoremaanged_table external_table の2つのフォルダを作っておきます。

image.png

次に、コンソールから アナリティクスとAI > データ・カタログ > メタストア とメニューを進み「メタストアの作成」ボタンを押します。
「デフォルトの管理対照表の場所」「デフォルトの外部表の場所」に先ほど作成しバケットのフォルダを各々指定します。

image.png

ちなみに、Object Storage の Path は oci://<バケット名>@<ネームスペース>/[フォルダ] となります。

メタストアが作成されていることを確認して下さい。

image.png

準備 #2: SQL endpoint を作成する

コンソールから アナリティクスとAI > データ・フロー > SQL endpoints とメニューを進み「Create SQL endpoint」のボタンを押します。

とりあえずデフォルトのまま進んでいいですが、Metastore には先ほど作成した Metastore を指定します。
また、ネットワークについて、今回は SQL endpoint にインターネット経由でアクセスできる設定にします(接続時に IAM で認証されます)。

image.png

SQL endpoint が作成されていることを確認して下さい。

image.png

準備 #3: JDBC ドライバをダウンロードする

接続に必要な JDBC ドライバはコンソールからダウンロードします。
作成した SQL endpoint の詳細画面に移ると、下の方に「Drivers」のタブがあります。ここから JDBC ドライバ(SimbaSparkJDBC-xxxxxx.zip )をダウンロードして下さい。

image.png

さらに、「Show details」のリンクをクリックすると画面がポップアップします。

image.png

「JDBC URL」をコピーしてメモしておいてください。

準備 #4: JDBC Client (DBeaver アプリ) をセットアップする

SQL endpoint に接続するアプリケーションのセットアップをします。今回は、JDBC 接続でデータベース操作を行うツール DBeaver を使用します。ダウンロード・サイトからアプリケーションをダウンロードしてセットアップしてください。

次に、DBeaver を起動して、JDBCドライバの設定を行います。

image.png

「ライブラリ」のタブで、JDBCドライバの zip ファイルを展開したフォルダを指定します。

image.png

「設定」のタブでは、ドライバ名に "Data Flow"、クラス名に "com.simba.spark.jdbc.Driver" を設定します。

image.png

次に データベース > 新しい接続 メニューから SQL endpoint への接続設定を行います。
先ほど作成した "Data Flow" を指定し、JDBC URL にメモしておいたものを貼り付けます。

image.png

では、接続してみましょう。

image.png

default というデータベース(スキーマ)がありますが、Managed/External Table はこのデータベースの中に作成されます。

準備 #5: Object Storage にサンプルデータを格納する

今回お題として使用するデータは航空会社のIATAコードと会社名からなるシンプルなCSVファイルです。コンソールからアップロードしてもいいし、OCI CLIを使ってもいいですが、oci://dataflow@NAMESPACE/data/airlines.csv という場所に置きます。

# cat したかっただけで 別に hadoop 使わなくていいです...
$ hadoop fs -cat oci://dataflow@NAMESPACE/data/airlines.csv
IATA_CODE,AIRLINE
UA,United Air Lines Inc.
AA,American Airlines Inc.
US,US Airways Inc.
F9,Frontier Airlines Inc.
B6,JetBlue Airways
OO,Skywest Airlines Inc.
AS,Alaska Airlines Inc.
NK,Spirit Air Lines
WN,Southwest Airlines Co.
DL,Delta Air Lines Inc.
EV,Atlantic Southeast Airlines
HA,Hawaiian Airlines Inc.
MQ,American Eagle Airlines Inc.
VX,Virgin America

↑ [参考] Object Storage を Hadoop で操作する方法は こちら で解説しています。

SQL endpoint を使ってみる (実践編)

実践 #1: JDBC経由で接続し External Table を作成して Query をかける

DBeaver から 以下の SQL を実行し、External Table を作成します。

CREATE TABLE airlines
  USING CSV 
  LOCATION 'oci://dataflow@NAMESPACE/data/airlines.csv'
  OPTIONS ( 'header'='true' )

SQLエディタ を開いて SQK 文を実行します。

image.png

Data Flow を更新すると、airlines テーブルが作成されていることが確認できます。

image.png

このテーブルに Query をかけてみます。

SELECT * FROM default.airlines

image.png

実践 #2: Spark アプリケーションから Managed & External Table を作成して JDBC 経由で Query をかける

まず DataFrame を Table に保存する Spark アプリケーションを作成します。DataFrameWriter に対して saveAsTable("<テーブル名>") すると Managed Table に保存されますが、その前に option("path", "<保存先>") を指定すると External Table に保存されます。

"""
airlines.py
"""
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Airlines").getOrCreate()

# airlines.csv を読み込んで DataFrame にする
airlines = spark.read.format("csv").option("header", "true") \
    .option("inferSchema","true").load("oci://dataflow@NAMESPACE/data/airlines.csv")

# managed table に永続化する
airlines.write.format("parquet").mode('overwrite').saveAsTable("airlines_managed")

# データを1行追加した DataFrame (airlines_ex) を作成し、external table に永続化する
# -> option で 保存先の path を指定すれば external table となる
df = spark.createDataFrame([("JL", "Japan Airlines")])
airlines_ex = airlines.union(df)
airlines_ex.write.format("parquet").mode('overwrite') \
    .option('path', "oci://dataflow@NAMESPACE/data/airlines_external") \
    .saveAsTable("airlines_external")

spark.stop()

これを Data Flow のアプリケーションとして実行しますが、アプリケーションの作成時に MetaStore を指定することを忘れずに。

image.png

(Data Flow によるアプリケーションの作成と実行の手順を大幅に省略していますが...)実行が成功したら、DBeaver で確認します。

image.png

新たに二つのテーブル airlines_managed airlines_external が作成されています。Query も問題なく実行できました。

Object Storage も確認しておきましょう。Exteranl Table は指定した場所に永続化されている筈なので確認は省いて、Managed Table が所定の場所に永続化されているかを確認します。

image.png

metastore バケットに parquet フォーマットで保存されているのを確認できました。

SQL endpoint を削除する

SQL endpoint の詳細画面から「削除」できます。
そして「削除」はできますが「停止」ができません!
SQL endpoint を作成するときにドライバ&エグゼキュータのシェイプを指定しましたが、削除しない限りこのコンピュートは稼働し続け課金が発生しますので注意して下さい。
また、SQL endpoint を再度作成すると JDBC URL も更新されるので注意して下さい。
将来「停止」できるようになるようです。

おまけ: spark.catalog を使ったテーブル作成

とりあえずテーブルだけ作りたければ、以下の方法で可能です。

from pyspark.sql.types import StructType, StructField, StringType

struct = StructType([
        StructField("IATA_CODE", StringType(), True),
        StructField("AIRLINE", StringType(), True)
    ])

spark.catalog.createTable("managed_table", schema=struct)
spark.catalog.createTable("external_table", schema=struct, \
    path="oci://dataflow@orasejapan/data/airlines_external")

まとめ

今回は、検証目的で DBeaver を使用して SQL endpoint に接続して シンプルに Query をかけてみましたが、本来 SQL endpoint は、データ・ビジュアライゼーション/分析ツールと組み合わせたり、データ連携ソリューションで他のアプリケーションとの間でデータ連携を行ったりすることで、もっとバリューが発揮できるものです。直接 Object Storage に Query をかけるのもよし、Data Flow アプリケーションで前処理されたデータを分析に用いるのもよし、色々と応用が考えられますね。

参照

0
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
1