今年一年は、大きなネガティブサプライズがたくさん発生して、どんどん仮想通貨市場が冷えていった一年でしたね。こんな厳冬のさなかですが、皆さんは次のバブルに向けて研究開発は順調に進んでいますでしょうか?僕はスプラ3がとても捗っています。
やる気の出ない値動きに流動性の薄さに嫌気がさしている方も多いと思いますが、そんな冬だからこそ値動きに邪魔されずにBUIDLする時間がたくさんあります。今回の記事では、研究開発を始めるにあたっての基礎の基礎、データ収集周りのトピックについてです。データ収集、管理、活用の仕組みってつくるのが非常に面倒ですよね?これらの問題をどのようにして解決してデータレイクを実装しているかについて書いていこうと思います。
この記事を読めば、ほぼS3の保存コストだけという格安で運用できるデータレイクを、誰でも簡単に構築できるようになるはずです。
問題提起
データレイクを作るにあったって大きな壁が主に3点あります。
- 冗長性/可用性を担保して低コストで保存するのはどうすべき?
- 継続的にデータを更新したいけど、仕組みづくりが面倒
- データを活用するにあたって、低コストでクエリを打ちたい。できれば共通化したインターフェイスで。
これを実現するのはなかなかに面倒です。RDSを使うと金額/運用面のコストが重いですし、いちいちテーブル追加してDBスキーマ追加するのも大変です。フラットファイルでローカル管理にしてもディスク死んだらどうする?そもそもフラットファイルは更新処理の実装が面倒だし...そもそもデータの規模が大きすぎて全件手元のマシンで処理するなんて無理だし別途処理レイヤ実装するのか...といろいろと考えないといけないです。
解決策
このペインを解決するために、ライブラリを作りました。awswranglerを経由してAWS S3 + AWS Glue + AWS Athenaを操作して継続的なデータ更新、クエリの発行を簡単、低コストで実現します。非稼働時はS3のストレージコストのみ、クエリ発行時のみ追加コストを支払えばいい構成となり、とても経済的です。私の環境ではAthenaはハードにクエリ発行しても月20ドル程度、平常時のクエリのは2ドル程度で、それなりなDBを常時動かし続けるよりも非常にお安く運用できてます。
僕はこの方法で50+のデータソースから毎日データを同期して、研究開発のためのデータ基盤を整えています。
上記に上げたペインに関してですが、
- AWS S3という恐らく最も堅牢なデータバックエンド。そしてストレージコストも最安。
- テーブルスキーマはデータフレームから推論でglueのカタログを自動管理。データを追加するたびにスキーマ管理する地獄から解放されました。
- AWS Athenaを使えばS3においてあるファイルから自由に欲しいデータだけクエリしてこれます。データのリサンプリングもクラウドの計算リソース活用でお手の物。
どうでしょうか?なんだかいい感じに思えてきませんか?
利用例 - アップロード
ライブラリathena-timeseriesを使った実例を紹介します。
基本的には、データフレームを用意してuploadを呼ぶだけで、OK。これだけのコードで一発でデータの取り込みが完了します。
import athena_timeseries
import pandas as pd
import numpy
import boto3
boto3_session = boto3.Session(region_name="ap-northeast-1")
tsdb = athena_timeseries.AthenaTimeSeries(
boto3_session=boto3_session,
glue_db_name='example_db',
s3_path='s3://my_bucket/example_db_directory',
)
# Prepare example data, your data need to have 3 columns named symbol, dt, partition_dt
df = pd.DataFrame(np.random.randn(5000, 4))
df.columns = ['open', 'high', 'low', 'close']
# symbol represent a group of data for given data columns
df['symbol'] = 'BTCUSDT'
# timestamp should be UTC timezone but without tz info
df['dt'] = pd.date_range('2022-01-01', '2022-05-01', freq='15Min')[:5000]
# partition_dt must be date, data will be updated partition by partition with use of this column.
# Every time, you have to upload all the data for a given partition_dt, otherwise older will be gone.
df['partition_dt'] = df['dt'].dt.date.map(lambda x: x.replace(day=1))
tsdb.upload(tabel_name='example_table', df=df)
上記例では、example_dbというテーブルにOHLCのデータをアップロードしています。簡単ですね。
S3を使って、継続的な更新をする目的のため、ある程度の量データごとにファイルを分けて保存する必要があります。このファイルの分割をpartition_dtのカラムを使って実現しており、(symbol, partition_dt)のペア事にS3にファイルが生成、保存されます。基本的にこのファイルは新規データが入ってきた場合上書き保存されるので、データの更新の際にはパーティション内の全データをアップロードしないといけないことに注意が必要です。僕は上記のように日次のパーティションキーを用いて、毎日直近2,3日分のデータをアップロードするタスクを走らせてます。
利用例 - クエリ
データのクエリ、特にリサンプルの処理をSQLで実装するのは結構面倒ですが、このライブラリでは生データクエリとリサンプリングクエリの2つを実装して用意しておきましたので、一般的なユースケースでは心配は必要ありません。
# Query for raw data.
raw_close = tsdb.query(
table_name='example_table',
field='close',
start_dt='2022-02-01 00:00:00', # yyyy-mm-dd HH:MM:SS, inclusive
end_dt='2022-02-05 23:59:59', # yyyy-mm-dd HH:MM:SS, inclusive
symbols=['BTCUSDT'],
)
# Query for raw data with resampling
resampeld_daily_close = tsdb.resample_query(
table_name='example_table',
field='close',
start_dt='2022-01-01 00:00:00', # yyyy-mm-dd HH:MM:SS, inclusive
end_dt='2022-01-31 23:59:59', # yyyy-mm-dd HH:MM:SS, inclusive
symbols=['BTCUSDT'],
interval='day', # month | week | day | hour | {1,2,3,4,6,8,12}hour | minute | {5,15,30}minute
op='last', # last | first | min | max | sum
)
どうでしょうか、簡単に見えてきましたか?
最後に
この記事を読んだあなたは、意外とコストを掛けずに簡単に実装することができるんだなーと体感できたかとおもいます。無数のソースからデータをかき集めて自分だけのアルファを見つける旅に出る準備は整いましたね。過去データの手に入らないデータが有ったりしますし、つぶさにデータを集めている人は、そうじゃない人よりも断然優位です。データは力、力はパワーです。
こんな冬の時代だからこそ、基盤を強化して自身の研究開発環境を改善するのはいかがでしょうか?
参考になった方はチャンネル登録と高評価、twitterのリプやDMでのエッジや使えるデータソースの暴露をよろしくお願いいたします。
免責
怠惰なので、SQLインジェクション回避してないです。信頼しないリクエストを受けると、意図しない結果をもたらします。自分自身で管理運用する目的でのみ使ってくださいね。