この記事について
この記事は先日投稿した記事の続きです。
1. お仕事の備忘録(pymongo) その1. 基本的な操作編
2. お仕事の備忘録(pymongo) その2. 割と便利な操作(bulk_write)編
今回は個人的にあるRest APIで収集していたBitCointの価格変動のデータが多すぎて重くなってきたのでダウンサンプリングしてドキュメント数を減らした際のことを書いていきます。
aggregateとは
集計処理です。pymongoではaggregate関数にpipeline(条件文)を渡すことで集計処理が実行されます。
pilelineはstageとoperatorで構成されそれぞれstageがSQL分の"select","group by","where"などに対応し、operatorが"sum","max","min"などに対応するイメージです。
参考:Aggregate Pipeline(stage)
参考:Aggregate Pileline(operator)
経緯
Rest APIの取得結果をコレクションにそのまま突っ込んできた結果、
下記の感じのデータが無駄にたくさん(51240件)入れてしまいました。
(10分間隔で数日放置してたらいつの間にか溜まっていた...)
鬱陶しいのでダウンサンプリングしてデータ数を減らしました。
client = MongoClient()
db = client["BitCoin"]["document"]
pprint(db.count()) # コレクションないのドキュメント数を取得する関数
pprint(db.find_one())
"""
#出力結果
51240
{'_id': ObjectId('5f328ad85ae5ac59aee515cb'),
'best_ask': 1245419.0,
'best_ask_size': 0.02,
'best_bid': 1244658.0,
'best_bid_size': 0.05,
'ltp': 1245615.0,
'product_code': 'BTC_JPY',
'tick_id': 10956004,
'timestamp': 1597115465.0,
'total_ask_depth': 1364.44898005,
'total_bid_depth': 1637.4300907,
'volume': 126756.67774321,
'volume_by_product': 6571.45287901
}
"""
aggregate用のpipelineの設定
とりあえず、10分間隔のデータを1日ごとのデータにグループ化してそれぞれの値を平均してダウンサンプリングしました。
下記がpymongoで使ったaggregateのパイプラインです。
coin = "BTC_JPY"
interval = 60*60*24 # 24hour
pipeline = [
# match stage
{"$match": {"product_code": coin} },
# group stage
{"$group": {
"_id":
{"timestamp":
{"$subtract": ["$timestamp", { "$mod": ["$timestamp", interval]}]
}
,
"product_code": "$product_code"
},
"timestamp":{"$avg": "$timestamp"},
"ltp": {"$avg": "$ltp"},
"best_ask": {"$avg": "$best_ask"},
"best_ask_size": {"$avg": "$best_ask_size"},
"best_bid_size": {"$avg": "$best_bid_size"},
"total_ask_depth": {"$avg": "$total_ask_depth"},
"total_bid_depth": {"$avg": "$total_bid_depth"},
"volume": {"$avg": "$volume"},
"volume_by_product": {"$avg": "$volume_by_product"},
}},
# presentation stage
{"$project": {
"product_code": "$_id.product_code",
"_id": 0, "timestamp": 1,"ltp": 1,
"best_ask": 1,"best_ask_size": 1,
"best_bid_size": 1,
"total_ask_depth": 1,
"total_bid_depth": 1,
"volume": 1, "volume_by_product": 1,
}
}
]
パイプラインの説明をしていきます。
-
集計を行う対象を取得(
$match
)
今回はproduct_code
が一致するものを対象として取得した。
(findと同じような方法で指定できます。)
{"$match": {"product_code": coin} },
```
-
グループ化(
$group
)
product_codeとtimestampがunixtimeで1日間隔で一致するようにグループ化して、他の値が平均値を算出した。
ポイントとしては下記の二つが挙げられる。-
_id
にグループ化する対象を設定する -
_id
以降に平均、最大値など取得したいkeyと計算方法(operator)を指定する。
{"$group": { "_id": #ここにグループ化する対象を設定する {"timestamp": {"$subtract": ["$timestamp", { "$mod": ["$timestamp", interval]}] } , "product_code": "$product_code" }, "timestamp":{"$avg": "$timestamp"}, "ltp": {"$avg": "$ltp"},
-
-
表示するデータを指定(
$project
)
(find
でのprojectと同じ操作でできる){"$project": { "product_code": "$_id.product_code", "_id": 0, "timestamp": 1,"ltp": 1, "best_ask": 1,"best_ask_size": 1, "best_bid_size": 1, "total_ask_depth": 1, "total_bid_depth": 1, "volume": 1, "volume_by_product": 1, } }
ダウンサンプリングした結果
先ほどのpipelineでダウンサンプリングしたデータと元のデータを比較しました。
赤い点がダウンサンプリング前、青がダウンサンプリング後です。
いい感じにデータを間引けてるのがわかると思います。
import matplotlib.pyplot as plt
plt.figure()
for i in db.find( filter= {"product_code": coin
} ):
plt.scatter(i["timestamp"], i["ltp"], marker=".", color="r")
for i in db.aggregate(pipeline=pipeline):
plt.scatter(i["timestamp"], i["ltp"], marker=".", color="b")
plt.grid()
plt.xlabel("Data[unixtime]")
plt.ylabel(coin)
plt.savefig("test2.jpg")
plt.show()
まぁ、aggregateについてはまだまだいろいろあるのですが多すぎるので今回はここまでにします。
修正や質問、書いて欲しいことなどあれば追加していきます。