3
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

μmongo (umongo) を使ってmongodb documentデータをcsv へコンバートしてみる

Last updated at Posted at 2019-05-10

概要・目的

最近、会社でBI系の業務が増え、redash系のツールを使うことが増えてきた。ただ、スマートフォンApp用のAPIのバックエンドDBサーバーにmongodbを使っているのと、プロトタイピングレベルでサービスをローンチした時期に生成されたデータの構造にばらつきがあったりするので、その生データの取り扱う時に、ODM(object document mapper)を利用することにした。基本、Python使っているので、Pythonで開発することにした。

μmongoの採用と理由

今回は、マイクロモンゴ(μmong)を採用することにした。

  • レガシーシステムがpymongoを使っていて、そのサポートもしている
  • Asynchronous対応のmotorを現在よく使っているので、Asynchronous対応がいい
  • motorengineも検討したが、まだベータで、asyncio.get_event_loopサポートが厳しい感じだった
  • Python2系対応していないので、Python3ベースで開発されている
  • ソースはざっとしか見ていないが、悪くはない感じだった
  • 他、選択肢もあまりないのと名前もいい感じ(結構重要)
  • Doumentモデルをクラス単位で定義と継承でき、基本的な機能が備わっている
  • marshmallowベースのデータモデルの管理なので、問題はなさげ
  • instance.register が使いやすい

感想

使ってみた感じ、使い方はそんなに難しくなさそう。また、依存しているパッケージも少ない(tornadoとか入って来ない)ので、管理も結構しやすい。Python3から始める場合は、採用はありかと思う。パフォーマスンスの検証はできていないので、どなたか情報待ってます。(自分でやるしかないかなぁ)

ちなみに、最近流行りのモジュールは、 aiohttp + motor/μmongo + mongodb になりつつある。 以下、サンプルコード載せました。

サンプルコード

motorを使ったDBコネクト

engine.py
import asyncio

import motor.motor_asyncio
from umongo import Instance

#  asyncioのイベントループにわたすcoroutine関数
@asyncio.coroutine
def setup_db(host, port, db_name):
    #  motorドライバーベースのクライアントを生成
    client = motor.motor_asyncio.AsyncIOMotorClient(host, port)

    #  DBにアクセス
    db = client[db_name]
    return db

# DB接続パラメータの設定
host = 'localhost'
port = 27017
dbname = 'testdb'

#  asyncio non-blocking thread イベントループの取得。
loop = asyncio.get_event_loop()  

#  setup_db(asyncio.coroutine関数)を渡し、その処理を実行
#  NOTE: https://umongo.readthedocs.io/en/latest/apireference.html#instance
db = loop.run_until_complete(setup_db(host, port, dbname))

# DBのインスタンスを生成
instance = Instance(db)

Documentクラス継承

activity.py
from umongo import Document, fields

from engine import db, instance

#  importしたinstanceを使って、クラスをレジストする
@instance.register
class Activity(Document):
    class Meta:
        #  このクラスに対応するコレクションを設定
        collection_name = "activity"
        

    #  コレクションのドキュメントのスキーマ定義
    id = fields.ObjectIdField(required=True, attribute='_id')
    user_id = fields.ObjectIdField(required=True, attribute='u')
    target = fields.ObjectIdField(default=None, attribute='t')
    action = fields.StrField(required=True, attribute='a')
    at = fields.DateTimeField(required=True)

csv bufferを生成

to_csv.py
from io import StringIO
import pandas as pd


# データセットをcsvに変換して、bufferに乗せる
def buffer(dataset, sep=",", index=False):
    #  Pandasのdfを生成
    df = pd.DataFrame(dataset)

    #  出力用のデータを格納するbufferを準備
    csv_buffer = StringIO()

    #  dfを使ってセットされたデータをbufferに書き込む
    df.to_csv(csv_buffer, sep=sep, index=index)

    #  bufferデータを戻す
    return csv_buffer

データセットを生成

to_dataset.py
import asyncio

#  motorの結果のdb cursorからdatasetを生成する
@asyncio.coroutine
async def from_cursor(cur, projection):
    #  データセットのデータ項目を準備
    dataset = {}
    for x in projection:
        dataset.update({x: []})

    #  datasetを生成するループ
    for x in await cur.to_list():
        for k in dataset:
            dataset[k].append(x[k])

    return dataset

メインの処理

main.py
import asyncio
from datetime import (datetime,
                      timedelta)

from bson.objectid import ObjectId

from engine import db
import to_dataset
import to_csv
import activity.Activity as activity


#  メインの処理
#  @classmethodと@asyncio.coroutineのデコレータの記述順の備忘録として、クラスを使う
class ActivityHandler:
    @classmethod
    @asyncio.coroutine
    async def like(self, start, end):
        #  query条件
        cond = {
            'a': 'like',
            'at': {
                '$gt': start,
                '$lte':  end
            }
        }

        #  結果を取得
        cursor = activity.find(cond, limit=10, skip=0)

        #  出力項目を定義
        projection = ['action', 'user_id', 'target', 'at']

        #  結果を持っているカーソルからdatasetを生成する
        dataset = await to_dataset.from_cursor(cursor, projection)

        #  csvを生成する
        buf = to_csv.buffer(dataset=dataset)

        #  csvを出力する
        print(buf.getvalue())


if __name__ == '__main__':
    end = datetime.utcnow()
    start = end - timedelta(hours=1)
    loop = asyncio.get_event_loop()
    loop.run_until_complete(ActivityHandler.like(start, end))

実行結果サンプル

> python main.py
action,user_id,target,at
like,5622105e0acd0b58a218ab0d,5cd499612c2c0b06c2ce6156,2019-05-10 01:42:07.700
like,554472764b1a45200ef30279,5cd16ecf12949c06b89a3cb1,2019-05-10 01:42:07.285
like,5668488181cfcf0644993aa1,5cd4a9740b760c06c8cbe9d1,2019-05-10 01:42:07.034
like,5622105e0acd0b58a218ab0d,5cd499570fa05006be5f54b7,2019-05-10 01:42:06.812
like,554472764b1a45200ef30279,5cd202184b738506b0fde6e0,2019-05-10 01:42:06.695
like,5622105e0acd0b58a218ab0d,5cd4994e0b760c06c70f8797,2019-05-10 01:42:05.704
like,5622105e0acd0b58a218ab0d,5cd499692c2c0b06c2ce6159,2019-05-10 01:42:04.175
like,554472764b1a45200ef30279,5cd2d476e4266d06d8db148a,2019-05-10 01:42:03.961
like,554472764b1a45200ef30279,5cd3540e0f38c30674c309d6,2019-05-10 01:42:03.348
like,5622105e0acd0b58a218ab0d,5cd499be0fa05006bd48be35,2019-05-10 01:42:03.244
3
3
1

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?