1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

お仕事の備忘録(pymongo) その3. 調べ直したくないからメモしとくやつ(aggregate)編

Last updated at Posted at 2020-10-24

この記事について

この記事は先日投稿した記事の続きです。
1. お仕事の備忘録(pymongo) その1. 基本的な操作編
2. お仕事の備忘録(pymongo) その2. 割と便利な操作(bulk_write)編

今回は個人的にあるRest APIで収集していたBitCointの価格変動のデータが多すぎて重くなってきたのでダウンサンプリングしてドキュメント数を減らした際のことを書いていきます。

aggregateとは

集計処理です。pymongoではaggregate関数にpipeline(条件文)を渡すことで集計処理が実行されます。
pilelineはstageとoperatorで構成されそれぞれstageがSQL分の"select","group by","where"などに対応し、operatorが"sum","max","min"などに対応するイメージです。

参考:Aggregate Pipeline(stage)
参考:Aggregate Pileline(operator)


経緯

Rest APIの取得結果をコレクションにそのまま突っ込んできた結果、
下記の感じのデータが無駄にたくさん(51240件)入れてしまいました。
(10分間隔で数日放置してたらいつの間にか溜まっていた...)
鬱陶しいのでダウンサンプリングしてデータ数を減らしました。

コレクションに格納されたドキュメント
client = MongoClient()
db = client["BitCoin"]["document"]
pprint(db.count()) # コレクションないのドキュメント数を取得する関数
pprint(db.find_one())

"""
#出力結果
51240
{'_id': ObjectId('5f328ad85ae5ac59aee515cb'),
 'best_ask': 1245419.0,
 'best_ask_size': 0.02,
 'best_bid': 1244658.0,
 'best_bid_size': 0.05,
 'ltp': 1245615.0,
 'product_code': 'BTC_JPY',
 'tick_id': 10956004,
 'timestamp': 1597115465.0,
 'total_ask_depth': 1364.44898005,
 'total_bid_depth': 1637.4300907,
 'volume': 126756.67774321,
 'volume_by_product': 6571.45287901
}
"""

グラフにするとこんな感じ...
点が多すぎて本当に鬱陶しい
test.jpg


aggregate用のpipelineの設定

とりあえず、10分間隔のデータを1日ごとのデータにグループ化してそれぞれの値を平均してダウンサンプリングしました。

下記がpymongoで使ったaggregateのパイプラインです。

pipeline
coin = "BTC_JPY"
interval = 60*60*24 # 24hour 
pipeline = [
    # match stage 
    {"$match": {"product_code": coin} },
    # group stage
    {"$group": {
        "_id":
        {"timestamp":
            {"$subtract":  ["$timestamp", { "$mod": ["$timestamp", interval]}]
             }
         ,
        "product_code": "$product_code"
        },
        "timestamp":{"$avg": "$timestamp"},
        "ltp": {"$avg": "$ltp"},
        "best_ask": {"$avg": "$best_ask"},
        "best_ask_size": {"$avg": "$best_ask_size"},
        "best_bid_size": {"$avg": "$best_bid_size"},
        "total_ask_depth": {"$avg": "$total_ask_depth"},
        "total_bid_depth": {"$avg": "$total_bid_depth"},
        "volume": {"$avg": "$volume"},
        "volume_by_product": {"$avg": "$volume_by_product"},
    }},
    # presentation stage
    {"$project": {
        "product_code": "$_id.product_code",
        "_id": 0, "timestamp": 1,"ltp": 1,
        "best_ask": 1,"best_ask_size":   1,
        "best_bid_size": 1,
        "total_ask_depth": 1,
        "total_bid_depth": 1,
        "volume": 1, "volume_by_product": 1,
        }
    }
]

パイプラインの説明をしていきます。

  1. 集計を行う対象を取得($match)
    今回はproduct_codeが一致するものを対象として取得した。
    (findと同じような方法で指定できます。)

{"$match": {"product_code": coin} },
```

  1. グループ化($group)
    product_codeとtimestampがunixtimeで1日間隔で一致するようにグループ化して、他の値が平均値を算出した。
    ポイントとしては下記の二つが挙げられる。

    1. _idにグループ化する対象を設定する
    2. _id以降に平均、最大値など取得したいkeyと計算方法(operator)を指定する。
    {"$group": {
        "_id": #ここにグループ化する対象を設定する
        {"timestamp":
            {"$subtract":  
                ["$timestamp", 
                    { "$mod": ["$timestamp", interval]}]
             }
         ,
        "product_code": "$product_code"
        },
        "timestamp":{"$avg": "$timestamp"},
        "ltp": {"$avg": "$ltp"},
    
    
  2. 表示するデータを指定($project)
    (findでのprojectと同じ操作でできる)

     {"$project": {
         "product_code": "$_id.product_code",
         "_id": 0, "timestamp": 1,"ltp": 1,
         "best_ask": 1,"best_ask_size":   1,
         "best_bid_size": 1,
         "total_ask_depth": 1,
         "total_bid_depth": 1,
         "volume": 1, "volume_by_product": 1,
         }
     }
    

ダウンサンプリングした結果

先ほどのpipelineでダウンサンプリングしたデータと元のデータを比較しました。
赤い点がダウンサンプリング前、青がダウンサンプリング後です。
いい感じにデータを間引けてるのがわかると思います。

import matplotlib.pyplot as plt

plt.figure()
for i in db.find( filter= {"product_code": coin
                          } ):
    plt.scatter(i["timestamp"], i["ltp"], marker=".", color="r")
for i in  db.aggregate(pipeline=pipeline):
    plt.scatter(i["timestamp"], i["ltp"], marker=".", color="b")
plt.grid()
plt.xlabel("Data[unixtime]")
plt.ylabel(coin)
plt.savefig("test2.jpg")
plt.show()

test2.jpg


まぁ、aggregateについてはまだまだいろいろあるのですが多すぎるので今回はここまでにします。
修正や質問、書いて欲しいことなどあれば追加していきます。

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?