はじめに
Apache Cassandraについて
Apache Cassandraとは、一言でいうなら、オープンソースの分散データベース管理システムです。
他の分散データベース管理システム同様、複数の汎用サーバーを用いて、ひとつのデータベースを構築します(開発などの目的のため、一つのサーバーのみで構成することも可能です)。
ここでは、詳しい説明は割愛し、興味のある方へのご紹介の役割は、公式サイトやWikipediaに譲ります。
https://cassandra.apache.org/_/index.html
https://en.wikipedia.org/wiki/Apache_Cassandra
Cassandraを用いたデータ分析へのアプローチ〜GPUとRAPIDSの活用
本稿では、下記のブログの内容を、日本語の読者向けに噛み砕きながら、紹介します。
並列処理システム、そしてGPUの登場
企業は、大量のデータを Apache CassandraなどのNoSQLデータ ストアに保存するようになると同時に、そのデータから洞察を得る、大量データを用いたデータ分析、というニーズが現れました。
そして、この大量のデータに対する、迅速な処理といったニーズのために、Apache Spark などの並列処理システムが登場しました。
一方、今日の企業におけるデータ分析においては、AI および ML 技術が急速に取り入れられています。
そして、データ サイエンス、特に機械学習は多くの並列計算を使用するため、CPU よりも数桁高速に「マルチタスク」を実行できる GPU での実行に適しています。
この投稿では、RAPIDS エコシステムのツールを使用して Cassandra SSTable を GPU デバイス メモリに直接解析することで、Cassandraのデータを処理する最先端のアプローチを紹介します。
これにより、ユーザーは少ない初期設定でより迅速に分析情報に到達できるようになり、Python で記述された既存の分析コードの移行も簡単になります。
この投稿では、RAPIDS プロジェクトについて簡単に説明し、Cassandra のデータを RAPIDS で分析できるようにするための一連のオプションを紹介します。
最終的に、C++ で SSTable ファイルを解析し、GPU に適した形式に変換して、データを GPU デバイス メモリにロードしやすくする現在のアプローチについて説明します。
RAPIDSとは何か?
RAPIDS は、 GPU 上で分析とデータ サイエンスをエンドツーエンドで実行するためのオープン ソース ライブラリスイートです。
RAPIDSは、開発者が GPU を活用できるようにするために Nvidia によって開発された開発者ツールキットであるCUDAから生まれました。RAPIDS は、 pandasやscikit-learnなどの一般的な AI / ML API を、GPU アクセラレーションに使用できるようにします。
以下は、GPUの活用がデータ分析の高速化にもたらす恩恵を示す画像です(以下で掲載する画像は、rapids.ai から引用)。
GPU 上のデータを cuDF (基本的にpandas DataFrame に相当する RAPIDSの形式) で取得したら、 pandasやscikit-learnなどの使い慣れた Python ライブラリとほぼ同じ API を使用してデータを操作できます。
以下は、機械学習の各フェーズに用いられる、ライブラリの関係です。この図では、CPUを使った場合として図示されています。
CPUでなく、GPUを使った場合が、以下の図です。
Apache Arrowの利用
RAPIDSでは、基礎となるメモリ形式としてApache Arrowが使用されています。
Arrow は行ではなく列をベースに設計されているため、より高速な分析クエリが可能になります。
また、プロセス間で Arrow レコード バッチ (いわばテーブル形式データに対応するもの) を転送するために使用されるプロセス間通信 (IPC) メカニズムも付属しています。IPC 形式はメモリ内形式と同一であるため、余分なコピーやシリアル化解除のコストが排除され、非常に高速なデータ アクセスが可能になります。
GPU で分析を実行する利点は明らかです。必要なのは適切なハードウェアだけです。Python データ サイエンス ライブラリのAPIを同等の RAPIDS APIに置き換えるだけで、既存のデータ サイエンス コードを GPU で実行するために移行することができます。
Cassandra
では、Cassandra データを PARIDSで利用する(GPU上で効果的に活用する)にはどうすればよいでしょうか?
ここで、5 つのアプローチが考えられます。
-
Cassandra ドライバーを使用してデータをフェッチし、それを pandas DataFrame に変換してから、cuDF に変換します。
-
Cassandra ドライバーを使用してデータをフェッチしするまでは同じですが、pandasのステップをスキップし、ドライバーからのデータを直接 Arrow テーブルに変換します。
-
Cassandraのサーバー内部で実行されている処理(レベルのコード)を使用してディスクから SSTableを読み取り、Arrow IPC ストリーム形式を使用してシリアル化し、クライアントに送信します。
-
アプローチ 3 と同じですが、Cassandra コードを使用する代わりに、C++ で独自の解析実装を使用します。
-
アプローチ 4 と同じですが、SSTable の解析中にCUDAによる GPU ベクトル化を使用します。
まず、これらの各アプローチの概要を説明し、最後に比較して次のステップについて説明します。
1. Cassandra ドライバーを使用してデータを取得する
あまりハッキングをしなくても既存のライブラリを使用できるため、このアプローチは非常に簡単です。ドライバーからデータを取得し、session.row_factory
をpandas_factory
関数に設定して、受信データをpandas.DataFrame
に変換する方法をドライバーに指示します。
そこまで行えば、次に、 cudf.DataFrame.from_pandas
関数を呼び出してデータを GPU にロードするのは簡単です。その後、RAPIDS ライブラリを使用して GPU アクセラレーション分析を実行できます。
まず、必要な Python ライブラリをインストールすることも必要です。以下は、Conda環境を利用した例です。
conda install -c blazingsql -c rapidsai -c nvidia -c conda-forge -c defaults blazingsql cudf pyarrow pandas numpy cassandra-driver
次にコード例を示します。
詳細については、DataStax Python Driver のドキュメントを参照してください。
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
import pandas as pd
import pyarrow as pa
import cudf
from blazingsql import BlazingContext
import config
# connect to the Cassandra server in the cloud and configure the session settings
cloud_config= {
'secure_connect_bundle': '/path/to/secure/connect/bundle.zip'
}
auth_provider = PlainTextAuthProvider(user=’your_username_here’, password=’your_password_here’)
cluster = Cluster(cloud=cloud_config, auth_provider=auth_provider)
session = cluster.connect()
def pandas_factory(colnames, rows):
"""Read the data returned by the driver into a pandas DataFrame"""
return pd.DataFrame(rows, columns=colnames)
session.row_factory = pandas_factory
# run the CQL query and get the data
result_set = session.execute("select * from your_keyspace.your_table_name limit 100;")
df = result_set._current_rows # a pandas dataframe with the information
gpu_df = cudf.DataFrame.from_pandas(df) # transform it into memory on the GPU
# do GPU-accelerated operations, such as SQL queries with blazingsql
bc = BlazingContext()
bc.create_table("gpu_table", gpu_df)
bc.describe_table("gpu_table")
result = bc.sql("SELECT * FROM gpu_table")
print(result)
2. Cassandra ドライバーを使用してデータを Arrow に直接フェッチする
このアプローチの前半は、上のアプローチと同じですが、pandas_factory
をarrow_factory
に、切り替えます。
def get_col(col):
rtn = pa.array(col) # automatically detects the type of the array
# for a full implementation, we'd need to fully check which arrow types need
# to be manually casted for compatibility with cudf
if pa.types.is_decimal(rtn.type):
return rtn.cast('float32')
return rtn
def arrow_factory(colnames, rows):
# convert from the row format passed by
# CQL into the column format of arrow
cols = [get_col(col) for col in zip(*rows)]
table = pa.table({ colnames[i]: cols[i] for i in range(len(colnames)) })
return table
session.row_factory = arrow_factory
次に、同じ方法でデータをフェッチし、cuDF を作成します。
ただし、これら 2 つのアプローチには両方とも大きな欠点があります。
既存の Cassandra クラスターへのクエリに依存していますが、これは性能的に必ずしも望ましくありません。
読み取り負荷の高い分析ワークロードが、リアルタイム パフォーマンスが重要であるトランザクション運用ワークロードに影響を与える可能性があるためです。
リアルタイムデータの分析、という要件においては、事実上、データベースの分析ワークロードの影響だけを考えることはできません。
代わりに、データベースを経由せずにディスク上の SSTable ファイルから直接データを取得する方法を確認したいと思います。
3. Cassandra サーバー コードを使用してディスクから SSTable を読み取ります
おそらく、ディスク上の SSTable を読み取る最も簡単な方法は、既存の Cassandra サーバー テクノロジ、つまりSSTableLoaderを使用することです。SSTable からパーティションのリストを取得したら、Java オブジェクトのデータをテーブルの列に対応するArrowベクトルに手動で変換できます。次に、ベクトルのコレクションを Arrow IPC ストリーム形式にシリアル化し、この形式でソケット経由でストリーミングできます。
実は、このアプローチは、次に紹介するアプローチよりも開発が進んでいない、という状況があります。そこで、このアプローチについての詳細は割愛します。
またこのアプローチが避けられる要因として、このアプローチは Cassandra クラスターのプロセスとは別のプロセス(またはマシン)で実行することになりますが、SSTableLoaderを使用するには、まずクライアント プロセス内で、(組み込みレベルの)Cassandra を初期化する必要があり、コールド スタートでかなりの時間がかかること、があります。
4. カスタム SSTable パーサーを使用する
Cassandra の初期化を回避するために、バイナリ データ SSTable ファイルを解析するための独自のカスタム実装が C++ で開発されました。
このアプローチの詳細については、次のブログ投稿を参照してください。
このThe Last Pickle によるCassandra ストレージ エンジンのガイドは、データ形式を解読するときに非常に役立ちます。
最終的に CUDA を取り込むことを想定したパーサーの言語として、またバイナリ データを処理するための低レベルの制御にも C++ が使用されています。
5. CUDA を統合してテーブル読み取りを高速化する
カスタム解析の実装がより包括的になった後、このアプローチに取り組み始めることが予定されています。GPU ベクトル化を利用すると、読み取りと変換のプロセスが大幅に高速化されます。
性能比較
現段階では、主に SSTable ファイルの読み取りにかかる時間が焦点となっています。
アプローチ 1 と 2 については、1つ目のアプローチは追加のハードウェア (Cassandra クラスター) に依存し、2つ目は、Cassandra 自体内で複雑なキャッシュ効果が作用しているため、実際にはこの時間を公平に測定することはできません。ただし、アプローチ 3 と 4 では、簡単なイントロスペクションを実行して、プログラムが SSTable ファイルを最初から最後まで読み取るのにかかる時間を追跡できます。
NoSQLBenchによって生成された 1k、5k、10k、50k、100k、500k、および 1m 行のデータを含むデータセットに対する結果は次のとおりです。
グラフが示すように、マルチスレッドなどの追加の最適化を行わない場合でも、カスタム実装は既存の Cassandra 実装よりわずかに高速です。
結論
分析ユースケースのデータ アクセス パターンには通常、大規模なスキャンが含まれ、テーブル全体の読み取りが頻繁に行われることを考えると、このデータを取得する最も効率的な方法は、CQL を使用するのではなく、SSTable に直接取得することです。これを実行できる sstable パーサーを C++ で実装し、データを Apache Arrow に変換して、NVIDIA の GPU を利用した RAPIDS エコシステムを含む分析ライブラリで利用できるようにすることができました。作成されたオープン ソース (Apache 2 ライセンス) プロジェクトはsstable-to-arrowと呼ばれ、 GitHubで入手可能であり、アルファ リリースとしてDocker Hub経由でアクセスできます。
sstable-to-arrowを試すことに興味がある場合は、次のブログ投稿を参照してください。
最後に
この記事で紹介されている試みは、ともすれば、専門的すぎ、実際に活用することはハードルが高いと思われるかもしれません。
興味深いのは、ここで紹介されたような内部実装へのアプローチ、具体的にはSSTableというデータ保存形式の汎用化、という側面は、Cassandra Enhancement Requestという形で、Cassandra本体の実装への反映が検討されていることです。