はじめに
先日、以下の記事を発表しました。
新しいテクノロジーに触れる時はいつでも楽しいものです。
中でも新しいプログラミング言語(のパラダイム)を学ぶことは、特別に楽しいことです。
ということで、まだまだ勉強中ですが、公式ドキュメントの学習過程の記録として、以下の記事をまとめてみました。
本稿情報のソースとして、下記ドキュメントを参照ください。
JupyterノートブックによるKaskadaの「Hello World」
セットアップ
前提条件
Jupyter ノートブック内で Kaskada を使用するには、次のソフトウェアをインストールする必要があります。
- Python (バージョン 3.8 以降)
- Jupyter
Pythonクライアントのインストール
ノートブックで Kaskada を使用する最初のステップは、Kaskada Python クライアント パッケージをインストールすることです。OS でターミナルを開き、Kaskada Python クライアントをインストールします。
pip install kaskada
kaskada を初めてインストールする場合は、外部依存関係が構築されるまでに 10 ~ 15 分かかることがあります。その後のインストールとアップグレードは通常より高速になります。
Jupyterノートブック上での初期インストール
すべてがインストールされたら、ノートブックを起動して、Kaskada の残りのコンポーネントをインストールしましょう。
ターミナルを使用して、次のコマンドを使用して新しい Jupyter ノートブックを開始します。
Jupyter ノートブックを起動します。
jupyter notebook
jupyter コマンドによりブラウザがアクティブになり、新しいノートブックを開くことができます。新しいノートブックに新しいコード セルを作成し、コード セルに次のコードを入力します。
from kaskada.api.session import LocalBuilder
session = LocalBuilder().build()
ノートブック内でこのセルを実行すると、次のような出力が表示されるはずです。
図 1. Jupyter ノートブック内での Kaskada のインストールのサンプル出力
ここでは、LocalBuilder
をインポートし、このビルダーを使用してセッションを作成しています。このマシンでビルダーを実行するのはこれが初めてです。ビルダーは、(必要に応じて) Kaskada コンポーネントの最新リリースを GitHub からダウンロードし、これらのコンポーネントをローカル マシンで実行します。
自動回復
ローカル セッションには、障害や利用不能が発生した場合にローカル サービスのステータスを監視するヘルス チェック ウォッチャーが組み込まれています。サービスに障害が発生するか、サービスが利用できなくなった場合、ローカル セッションは障害から自動的に回復しようとします。この機能を無効にするには、以下のように、keep_alive(False)
を使用します。
from kaskada.api.session import LocalBuilder
session = LocalBuilder().keep_alive(False).build()
Kaskada マジック コマンド有効化
Kaskada のクライアントには、Fenl 言語でクエリを作成できるようにするノートブックのカスタマイズが含まれていますが、クエリの結果をノートブックで受け取ってレンダリングすることもできます。これらのカスタマイズを使用するには、まずこれらのカスタマイズを有効にする必要があります。
新しいコードセルに次のコマンドを入力し、このセルを実行します。
このノートブックで fenlmagic を有効にする
%load_ext fenlmagic
これで、ノートブックでコード セルを開始できます。
テーブル操作
それでは、小さなテーブルを作成し、簡単なクエリを作成して、すべてが設定で正しく動作していることを確認しましょう。
Kaskada はデータをテーブルに保存します。テーブルは複数の行で構成され、各行は同じ型の値です。Kaskada にクエリを実行すると、テーブルの内容は個別のタイムラインとして解釈されます。各イベントに関連付けられた値は、タイムライン内の値に対応します。
テーブルの作成
すべてのテーブルは、テーブル内の各イベントの構造を定義するスキーマに関連付けられています。
スキーマはテーブルにロードしたデータから推論されますが、Kaskada のデータ モデルでは、いくつかの種類の必ず含まれていなければならない列があります。すべてのテーブルには、各行に関連付けられた時刻とエンティティを識別する列が含まれている必要があります。
テーブルを作成するときは、どの列に各行の時間とエンティティが含まれるかを Kaskada に指示する必要があります。
時刻列はtime_column_name
パラメータを使用して指定します。このパラメータは、時間値を含むテーブルのデータ内の列名を識別する必要があります。時刻はイベントが発生した時刻を指す必要があります。
エンティティキーはentity_key_column_name
パラメータを使用して指定します。このパラメータは、エンティティ キー値を含むテーブル データ内の列名を識別する必要があります。エンティティ キーは、各イベントが関連付けられている世界の物体を識別する必要があります。「正しい」値を選択することについてあまり心配する必要はありません。with_key()
関数を使用するとエンティティを簡単に変更できます。
from kaskada import table
from kaskada.api.session import LocalBuilder
session = LocalBuilder().build()
table.create_table(
# The table's name
table_name = "Purchase",
# The name of a column in your data that contains the time associated with each row
time_column_name = "purchase_time",
# The name of a column in your data that contains the entity key associated with each row
entity_key_column_name = "customer_id",
)
テーブル作成結果確認
create_table
からの応答は、次のような内容を持つtable
オブジェクトになります。
table {
table_id: "76b***2e5"
table_name: "Purchase"
time_column_name: "purchase_time"
entity_key_column_name: "customer_id"
subsort_column_name: "subsort_id"
create_time {
seconds: 1634250064
nanos: 422017488
}
update_time {
seconds: 1634250064
nanos: 422017488
}
}
request_details {
request_id: "fe6bed41fa29cea6ca85fe20bea6ef4a"
}
Purchase
という名前のテーブルが作成されました。
このテーブルにロードされるデータには、 purchase_time
という名前のタイムスタンプ フィールド、 customer_id
という名前のフィールド 、および subsort_id
という名前のフィールドが必要です。
Kaskadaにおける命名の慣用
テーブルの名前付けには キャメルケースを使用することが好まれます。その理由は、変換された値や関数名からデータ ソースを区別するのに役立つからです。
データのロード
テーブルを作成したので、そこにデータをロードする準備が整いました(データをロードする前に、テーブルを作成する必要があります)。
データロード実行
データは複数の方法でテーブルにロードできます。この例では、Parquet ファイルの内容をテーブルにロードします。
from kaskada import table
from kaskada.api.session import LocalBuilder
session = LocalBuilder().build()
# A sample Parquet file provided by Kaskada for testing
# Available at https://drive.google.com/uc?export=download&id=1SLdIw9uc0RGHY-eKzS30UBhN0NJtslkk
purchases_path = "/absolute/path/to/purchases.parquet"
# Upload the files's contents to the Purchase table (which was created in the previous step)
table.load(table_name = "Purchase", file = purchases_path)
ロード結果表示
実行結果はloadですdata_token_id
。データ トークン ID は、システムに現在保存されているデータへの一意の参照です。データ トークンにより反復可能なクエリが可能になります。同じデータ トークンに対して実行されるクエリは、常に同じ入力データに対して実行されます。
data_token_id: "aa2***a6b9"
request_details {
request_id: "fe6bed41fa29cea6ca85fe20bea6ef4b"
}
ファイルの内容がテーブルに追加されます。
データへのクエリ
Kaskada マジックコマンド
Kaskada のクライアントには、Fenl 言語でクエリを作成できるようにするノートブックのカスタマイズが含まれており、クエリの結果をノートブックで受け取ってレンダリングすることができます。
これらのカスタマイズを使用するには、まずこれらのカスタマイズを有効にする必要があります。
実行していない場合には、以下のコマンドを実行して、このノートブックで fenlmagic を有効にします。
%load_ext fenlmagic
クエリの作成
クエリ ブロックの先頭に%%fenl
を付けることで、Fenl クエリを作成できます。クエリ結果は計算され、Pandas データフレームとして返されます。クエリの内容は次の行から始まり、コード ブロックの残りの内容が含まれます。
まずはフィルターなしでPurchase
テーブルを見てみましょう。このクエリはテーブルに含まれるすべての列と行を返します。
%%fenl
Purchase
このクエリは、テーブルに含まれるすべての列と行を返します。結果を 1 つのエンティティに限定すると役立つ場合があります。これにより、単一のエンティティが時間の経過とともにどのように変化するかを確認しやすくなります。
%%fenl
Purchase | when(Purchase.customer_id == "patrick")
この例では、|
キャラクターを使用して関数のパイプラインを構築します。Purchase
テーブルによって生成されたタイムラインから始めて、when()
フィルターをかけ、購入者の顧客IDが"patrick"であるデータセットを抽出します。
Kaskada のクエリ言語は、時間について推論するための豊富な操作セットを提供します。以下は、Kaskada クエリの多くのユニークな機能に触れた、より洗練された例です。
%%fenl
# How many big purchases happen each hour and where?
let cadence = hourly()
# Anything can be named and re-used
let hourly_big_purchases = Purchase
| when(Purchase.amount > 10)
# Filter anywhere
| count(window=since(cadence))
# Aggregate anything
| when(cadence)
# Shift timelines relative to each other
let purchases_now = count(Purchase)
let purchases_yesterday =
purchases_now | shift_by(days(1))
# Records are just another type
in { hourly_big_purchases, purchases_in_last_day: purchases_now - purchases_yesterday }
| extend({
# …modify them sequentially
last_visit_region: last(Pageview.region)
})
クエリ実行の構成
特定のクエリはさまざまな方法で計算できます。%%fenl
ブロックにフラグを指定することで、クエリの実行方法を構成できます。
結果のタイムラインの出力方法を変更する
クエリを作成すると、結果として得られるタイムラインは、履歴またはスナップショットの 2 つの方法のいずれかで解釈されます。
- タイムライン履歴はタイムラインが変更されるたびに値を生成し、各行は異なるエンティティおよび時点に関連付けられます。
- タイムラインスナップショットは、同じ時点で各エンティティの値を生成します。各行は異なるエンティティに関連付けられますが、すべての行は同じ時間に関連付けられます。
デフォルトではタイムラインが履歴として出力されます。fenlmagic 引数--result-behavior
をfinal-results
に設定すると、タイムラインをスナップショットとして出力できます。
%%fenl --result-behavior final-results
Purchase | when(Purchase.customer_id == "patrick")
返される行数を制限する
クエリから返される行数を制限できます。
%%fenl --preview-rows 10
Purchase | when(Purchase.customer_id == "patrick")
結果を変数に代入する
クエリの結果を取得して変数に割り当てるには、次のようにします--var
引数に変数名(ここではquery_result
)を指定します。
%%fenl --var query_result
Purchase | when(Purchase.customer_id == "patrick")
これで、結果のデータフレームまたは元のクエリ文字列を検査できるようになります。
# The result dataframe
query_result.dataframe
# The original query expression
query_result.expression
クリーンアップ
一覧の作業が完了したら、リソースを解放するために作成したテーブルを削除できます。
これにより、テーブルにロードされたすべてのデータも削除されることに注意してください。
from kaskada import table
from kaskada.api.session import LocalBuilder
session = LocalBuilder().build()
table.delete_table(table_name = "Purchase")