ML基盤構築
Fastapiを使った機械学習システムを検討している。
前提としては、Pythonでバックエンドサーバーを構築する部分で、
基本はAPIを基本とした疎結合を考えている。
FastAPI関係
これまでの記事...
https://qiita.com/EasyCording/items/4256c33854ebfbbb85e8
https://qiita.com/EasyCording/items/3756358fafdfc184dbdb
今回の内容
ある程度、APIの使い方はマスター出来てきた。
但し、クエリパラメータやパスパラメータなど、これと言ったスタンダードが無い世界なので、なかなか苦労している。
少し自分の経験から、今後こうしたほうが良いなと言う気付きをメモする。
教訓1 フォルダで何とかする作戦はやめたほうが良い
-
Pytorchなどのリポジトリでは、フォルダにTRAIN,VAL,TESTに振り分けて、そこに入れればワンタッチなどと言うパターンが多い。
-
テキスト分類の類で言えば、livedoorコーパスのように、ラベル名ごとにテキストを分けて教師データにするケースも散見する。
-
もちろん使う側にとっても、プログラムを組む側にとっても、フォルダ構成を信じ切ってしまえば、それはかなり楽になるかもしれない。
→ しかしこれは、最終的なAIシステム構築には、支障になってしまうケースがある。 -
唯一の解決策は、やはりデータベースを使って、ファイルのURLパスは、どこかに置いておくべきだと感じる。またアクセストークンなどで、モデル固有のデータを管理しようとすれば、確実にデータベース上でURLと紐付けしなければならない。
-
やみ雲に長いパス名を入れるのは、避けたほうが良いと思うので、ユーザー、モデル、教師データについて、3階層ぐらいのカラムに分けて、それぞれ相対パスで保存したほうが方が良さそうである。
-
そもそもモデルと言う部分は、ユーザーとは切り離して考えた方が良さそうである。モデルに対してはTokenでユーザーにアクセス権を振るとかで良いだろう。教師データとモデルのファイル置き場所をどうするかが課題。
教訓2 APIのクエリパラメータをプルダウンに
- クエリのパラメータは、API設計者の親切心から、プルダウンで明示したほうが良い。
- パスパラメータ、クエリパラメータの事例を以下に示す。
パスパラメータの場合 → URLの部分 /{選択モデル}
クエリパラメータの場合 → URLの部分 /}
from enum import Enum
from fastapi import FastAPI,Query
import uvicorn
class Selector(str, Enum):
value1 = "選択1"
value2 = "選択2"
app = FastAPI()
@app.get("/v1/{selector}")
def get_url(selector: Selector):
if selector == Selector.value1:
return {"message" : Selector.value1}
if selector == Selector.value2:
return {"message" : Selector.value2}
return {"messege": "その他"}
@app.get("/v1/")
def get_query(selector: Selector = Selector.value1):
return {"message": selector}
if __name__ == "__main__":
uvicorn.run("test:app", host='0.0.0.0', port=8000, reload=True)
ここで、わざわざClassを定義しているが、命名規則をちゃんとしないと頭がこんがらがってくる。
つまり、それがクラス名なのか、パス(クエリ)パラメータなのか?
同様に、クラスのメンバ名と、そのメンバに定義している値についての命名規則も注意が必要である。
なぜ故に、こんなややこしい設定にするのか・・・ 単にユーザーへOPTION選択肢を明示するだけなのに
推奨
上記のような混乱を避けるため、調べ抜いて以下の結論になった。
@app.get("/v2/")
def get_query2(selector: str = Query("その他", enum=["選択1","選択2"])):
return {"message": selector}
これはクエリパラメータのみ有効で、パスパラメータには使えなかった。
→ つまりプルダウンしたい場合は、クエリパラメータの方が簡潔に書ける
Reference
https://fastapi.tiangolo.com/tutorial/path-params/#predefined-values
https://github.com/tiangolo/fastapi/issues/246
教訓3 Sqlalchemyとサヨナラする
Postgresでデータを触ろうとしていて、Update or INSERTの処理で悩みまくった。
- Sqlalchemyの場合 → めちゃめちゃ、ややこしい!
解決策
- sqlmodelでやっているソースを発見
→ 基本的には、条件分岐しているだけ。これならSqlalchemyでもできそう。
とりあえず、sqlmodelのソースをいただきます。
from sqlmodel import SQLModel
class Device(SQLModel, table=True): # type: ignore
"""The Device model.
It is used to model a device.
"""
@api.post("/devices/{device_uuid}", response_model=Device)
def upsert_device(device_uuid: str, device: Device) -> Device:
with Session(engine) as session:
# check if the device exists
statement = select(Device).where(Device.uuid == device_uuid)
result = session.exec(statement).first()
# if not, create it
if result is None:
result = device
# sync the data
for key, value in device.dict(exclude_unset=True).items():
setattr(result, key, value)
# persist the data to the database
session.add(result)
session.commit()
session.refresh(result)
return result
ごちそうさまでした。
reference
こんなのも参考にしました。
追記
最終的に、sqlalchemyでupsertを書くことができました。(SQLMODELは結局使わなかった。。。)
呼び出し側の処理
1)まずschemaをインスタンス化する → PYDANTICが型チェックする
2) add_forex()でデータベースへupsert → 個別にインスタンス化するのか、よくわからなかったので、モデルオブジェクトを引数で渡した。
def func()
obj =schema.Forex(
id = dt.strptime(time, "%Y.%m.%d %H:%M"),
open = open,
high= high,
low= low,
close= close,
volume= volume
)
if peristr == "forex_f1":
repo = Forex2_m5
elif peristr == "forex_f2":
repo = Forex2_m30
elif peristr == "forex_f3":
repo = Forex2_m240
else:
return {"error": "invalid peristr"}
try:
r = add_forex(
db=db,
schema = obj,
model = repo,
commit=True,
)
CRUD側の処理
1)受け取ったキーでモデルオブジェクトを取得する。
2)あれば重複なので、モデルを書き換えて、commitする
3)なければ、モデルをインスタンス化する。この時にスケマの要素を、一つづつ転記(邪魔くさっ・・・)
from models import forex_model as model_
from schemas import forex_schema as schema_
def select_forex_by_id(
db: Session,
id: datetime,
model:model_.Forex
) -> model_.Forex:
return db.query(model).filter(model.id == id).first()
def add_forex(
db: Session,
commit: bool = True,
schema:schema_.Forex = None,
model:model_.Forex = None
) -> model_.Forex:
exists = select_forex_by_id(
db=db,
id=schema.id,
model=model
)
#if exists is not None, data will be updated.
if exists:
exists.id = schema.id
exists.close = schema.close
exists.high = schema.high
exists.low = schema.low
exists.open = schema.open
exists.volume = schema.volume
db.commit()
return exists
#if exists is None, data will be added.
else:
data=model(
id=schema.id,
close= schema.close,
high= schema.high,
low= schema.low,
open= schema.open,
volume= schema.volume,
)
db.add(data)
if commit:
db.commit()
db.refresh(data)
return data