記事を書いた人
株式会社マーズフラッグの川嶋です。
今年もアドベントカレンダーにチャレンジ!
この記事について
PythonとCasbinを利用して、ポリシーデータをデータベースに永続化する方法を紹介します。CasbinのDB永続化には同期と非同期のライブラリがあり、それぞれの使い方を実践してみましょう。
象読者
- 認可ライブラリであるCasbinをつかってみたい人
環境
- Casbin
- Python
- Database (MySQL)
Casbinを使うメリット
- RBACやABACなど、さまざまな認可手法に対応
- 多数の言語に対応しており、認可ロジックを共通化しやすい
Python向けのCasbin DB永続化ライブラリ
- Casbinのポリシーデータを保存する機能部をAdapterで呼びます。
- 解発言語毎のCasbin Adapterの種類については、Casbin Adapters に掲載されています。
- PythonのORMとしてはsqlalchemyを利用されている方も多いと思いますので、今回はそれを利用します。
- 同期、非同期で接続URLが変わってきますので注意が必要です。
Casbin Adapter | MySQLドライバ | 接続URLの例 | |
---|---|---|---|
同期 | casbin-sqlalchemy-adapter | pymysql | mysql+pymysql://{db_user}:{db_password}@{db_host}:{db_port}/{db_name} |
非同期 | casbin-async-sqlalchemy-adapter | asyncmy | mysql+asyncmy://{db_user}:{db_password}@{db_host}:{db_port}/{db_name} |
サンプルコード
- 今回は、認可判定後にトランザクション処理を考慮して、処理終了後(エラーで終わる判定を含む)に自動でコミットまたは、ロールバックするようPythonのcontextmanagerの機能を利用して実装してみましょう!
ポリシーデータを格納するcasbin_ruleテーブルの作成とサンプルデータ登録
- テーブルの作成とデータ登録をするツールを以下に載せておきます。
- ポリシーデータはcasbin_ruleテーブルに格納されます。
- このテーブルのsqlalchemyモデルは同期、非同期データのそれぞれに既に格納されています。
- どちらもほぼ同じですが、もし独自の属性する場合は自分でモデルを作成して利用することも可能です。
同期のCasbinRuleモデル
非同期のCasbinRuleモデル
"""casbin_ruleテーブルの作成とサンプルデータ登録するツール"""
from __future__ import annotations
import csv
import os
from pathlib import Path
from casbin_sqlalchemy_adapter import CasbinRule
from dotenv import load_dotenv
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.schema import Table
load_dotenv()
def import_casbin_rule(lines: list[str]) -> None:
"""casbinのルールをインポートする"""
database_url: str = os.environ["SYNC_DATABASE_URL"]
engine = create_engine(database_url)
sm = sessionmaker(autocommit=False, autoflush=True, bind=engine)
with sm() as session:
# 毎回データを作り直し
Table.drop(CasbinRule.__table__, bind=engine, checkfirst=True)
Table.create(CasbinRule.__table__, bind=engine, checkfirst=True)
for cols in lines:
# 空白が入っている場合があるのでトリムしておく
trimmed_cols: list[str] = [c.strip() for c in cols]
if not trimmed_cols or trimmed_cols[0].startswith("#"):
# 空行やコメント行はスキップ
continue
if trimmed_cols[0] == "p":
# p, alice, data1, read
session.add(CasbinRule(ptype=trimmed_cols[0], v0=trimmed_cols[1], v1=trimmed_cols[2], v2=trimmed_cols[3]))
elif trimmed_cols[0] == "g":
# g, alice, admin
session.add(CasbinRule(ptype=trimmed_cols[0], v0=trimmed_cols[1], v1=trimmed_cols[2]))
else:
continue
session.commit()
def import_casbin_rule_from_file(path: Path) -> None:
"""ファイルからcasbinのルールをインポートする"""
with open(path) as f:
lines = csv.reader(f)
import_casbin_rule(lines=lines)
def import_casbin_rule_from_str(content: str) -> None:
"""文字列からcasbinのルールをインポートする"""
lines: list[str] = content.split("\n")
import_casbin_rule(lines=lines)
同期処理
"""sqlalchemyのトランザクション管理を行うユーティリティクラス"""
from collections.abc import Generator
from contextlib import contextmanager
from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session, sessionmaker
from sqlalchemy.orm.session import Session as BaseSession
engine = create_engine(
url="mysql+pymysql://sample_user:sample_sample_password@localhost:53306/sample_db",
echo=False,
)
Session = scoped_session(sessionmaker(autocommit=False, autoflush=True, bind=engine))
@contextmanager
def session_scope() -> Generator[BaseSession, None, None]:
session: BaseSession = Session()
# これがないとセッションが閉じた後にORMモデルが期限切で参照できない
session.expire_on_commit = False
try:
yield session
session.commit()
except Exception:
session.rollback()
raise
finally:
session.close()
"""同期処理にて認可判定する"""
from logging import getLogger
import casbin
from casbin_sqlalchemy_adapter import Adapter
from app import sync_session_utility
logger = getLogger(__name__)
# # Sampleデータのルールをクリアして登録
# 必要ならコメントアウトして実施してください。
# from pathlib import Path
# from app.utilitty import import_casbin_rule_from_file
# import_casbin_rule_from_file(
# path=Path("./data/rbac_policy.csv"),
# )
def sync_auth_check() -> bool:
"""同期処理で認可チェックを行う。"""
# トランザクション処理が不要ならこのWithは不要
with sync_session_utility.session_scope() as session:
engine = session.bind
adapter = Adapter(engine)
e = casbin.Enforcer(
model="./data/rbac_model.conf",
adapter=adapter,
enable_log=True,
)
# ポリシーのロード
e.load_policy()
# e.load_filtered_policy(filter={"v0": "bob"})
# 認可チェック
# e.enforceメソッドは同期、非同期ともに同期メソッドだが、get_named_implicit_permissions_for_userのようなRBACなどのそれぞれ違う。
has_authz: bool = e.enforce("alice", "data1", "read")
if has_authz is True:
logger.info("権限あるよー")
user_all_policies_including_role = e.get_named_implicit_permissions_for_user(ptype="p", user="alice")
logger.info(user_all_policies_including_role)
# ここにDBのトランザクション処理があれば記載する。
else:
logger.info("権限ないよー")
# ここにDBのトランザクション処理があれば記載する。
return has_authz
if __name__ == "__main__":
sync_auth_check()
非同期処理
"""sqlalchemyのトランザクション管理を行うユーティリティクラス"""
import asyncio
import os
from collections.abc import AsyncGenerator
from contextlib import asynccontextmanager
from dotenv import load_dotenv
from sqlalchemy.ext.asyncio import (
AsyncSession,
async_scoped_session,
async_sessionmaker,
create_async_engine,
)
load_dotenv()
engine = create_async_engine(
url=os.environ["ASYNC_DATABASE_URL"],
echo=False,
)
Session = async_scoped_session(
async_sessionmaker(
autocommit=False,
autoflush=True,
bind=engine,
# これがないとセッションが閉じた後にORMモデルが期限切で参照できない
expire_on_commit=False,
),
scopefunc=asyncio.current_task,
)
@asynccontextmanager
async def session_scope() -> AsyncGenerator[AsyncSession, None]:
session: AsyncSession = Session()
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
finally:
await session.close()
"""非同期処理にて認可判定する"""
import asyncio
from logging import getLogger
import casbin
from casbin_async_sqlalchemy_adapter import Adapter
from app import async_session_utility
logger = getLogger(__name__)
# # Sampleデータのルールをクリアして登録
# 必要ならコメントアウトして実施してください。
# from pathlib import Path
# from app.utilitty import import_casbin_rule_from_file
# import_casbin_rule_from_file(
# path=Path("./data/rbac_policy.csv"),
# )
async def async_auth_check() -> bool:
"""非同期処理で認可チェックを行う"""
# トランザクション処理が不要ならこのWithは不要
async with async_session_utility.session_scope() as session:
engine = session.bind
adapter = Adapter(engine)
e: casbin.AsyncEnforcer = casbin.AsyncEnforcer(
model="./data/rbac_model.conf",
adapter=adapter,
enable_log=True,
)
# ポリシーのロード
await e.load_policy()
# await e.load_filtered_policy(filter={"v0": "bob"}
# 認可チェック
# e.enforceメソッドは同期、非同期ともに同期メソッドだが、get_named_implicit_permissions_for_userのようなRBACなどのそれぞれ違う。
has_authz: bool = e.enforce("alice", "data1", "read")
if has_authz is True:
logger.info("権限あるよー")
user_all_policies_including_role = await e.get_named_implicit_permissions_for_user(ptype="p", user="alice")
logger.info(user_all_policies_including_role)
# ここにDBのトランザクション処理があれば記載する。
else:
logger.info("権限ないよー")
# ここにDBのトランザクション処理があれば記載する。
return has_authz
if __name__ == "__main__":
asyncio.run(async_auth_check())
まとめ
非同期処理にてcotextmanagerを使ったコーディングに少々ハマったので今回よい勉強になりました。ASGIが利用可能なアプリケーションサーバーでは、非同期にて実装することが多いと思います。次回ばcasbin_ruleテーブルを独自に拡張する方法をやってみたいと思います。