2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Casbinの同期・非同期アダプターを使ったDB永続化の実践

Last updated at Posted at 2024-12-24

記事を書いた人

株式会社マーズフラッグの川嶋です。
今年もアドベントカレンダーにチャレンジ!

この記事について

PythonとCasbinを利用して、ポリシーデータをデータベースに永続化する方法を紹介します。CasbinのDB永続化には同期と非同期のライブラリがあり、それぞれの使い方を実践してみましょう。

象読者

  • 認可ライブラリであるCasbinをつかってみたい人

環境

  • Casbin
  • Python
  • Database (MySQL)

Casbinを使うメリット

  • RBACやABACなど、さまざまな認可手法に対応
  • 多数の言語に対応しており、認可ロジックを共通化しやすい

Python向けのCasbin DB永続化ライブラリ

  • Casbinのポリシーデータを保存する機能部をAdapterで呼びます。
  • 解発言語毎のCasbin Adapterの種類については、Casbin Adapters に掲載されています。
  • PythonのORMとしてはsqlalchemyを利用されている方も多いと思いますので、今回はそれを利用します。
  • 同期、非同期で接続URLが変わってきますので注意が必要です。
Casbin Adapter MySQLドライバ 接続URLの例
同期 casbin-sqlalchemy-adapter pymysql mysql+pymysql://{db_user}:{db_password}@{db_host}:{db_port}/{db_name}
非同期 casbin-async-sqlalchemy-adapter asyncmy mysql+asyncmy://{db_user}:{db_password}@{db_host}:{db_port}/{db_name}

サンプルコード

  • 今回は、認可判定後にトランザクション処理を考慮して、処理終了後(エラーで終わる判定を含む)に自動でコミットまたは、ロールバックするようPythonのcontextmanagerの機能を利用して実装してみましょう!

ポリシーデータを格納するcasbin_ruleテーブルの作成とサンプルデータ登録

  • テーブルの作成とデータ登録をするツールを以下に載せておきます。
  • ポリシーデータはcasbin_ruleテーブルに格納されます。
  • このテーブルのsqlalchemyモデルは同期、非同期データのそれぞれに既に格納されています。
  • どちらもほぼ同じですが、もし独自の属性する場合は自分でモデルを作成して利用することも可能です。
    同期のCasbinRuleモデル
    非同期のCasbinRuleモデル
"""casbin_ruleテーブルの作成とサンプルデータ登録するツール"""
from __future__ import annotations

import csv
import os
from pathlib import Path

from casbin_sqlalchemy_adapter import CasbinRule
from dotenv import load_dotenv
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.schema import Table

load_dotenv()

def import_casbin_rule(lines: list[str]) -> None:
        """casbinのルールをインポートする"""
        database_url: str = os.environ["SYNC_DATABASE_URL"]
        engine = create_engine(database_url)
        sm = sessionmaker(autocommit=False, autoflush=True, bind=engine)
        with sm() as session:

            # 毎回データを作り直し
            Table.drop(CasbinRule.__table__, bind=engine, checkfirst=True)
            Table.create(CasbinRule.__table__, bind=engine, checkfirst=True)

            for cols in lines:
                # 空白が入っている場合があるのでトリムしておく
                trimmed_cols: list[str] = [c.strip() for c in cols]
                if not trimmed_cols or trimmed_cols[0].startswith("#"):
                    # 空行やコメント行はスキップ
                    continue
                if trimmed_cols[0] == "p":
                    # p, alice, data1, read
                    session.add(CasbinRule(ptype=trimmed_cols[0], v0=trimmed_cols[1], v1=trimmed_cols[2], v2=trimmed_cols[3]))
                elif trimmed_cols[0] == "g":
                    # g, alice, admin
                    session.add(CasbinRule(ptype=trimmed_cols[0], v0=trimmed_cols[1], v1=trimmed_cols[2]))
                else:
                    continue
            session.commit()

def import_casbin_rule_from_file(path: Path) -> None:
    """ファイルからcasbinのルールをインポートする"""
    with open(path) as f:
        lines = csv.reader(f)
        import_casbin_rule(lines=lines)

def import_casbin_rule_from_str(content: str) -> None:
    """文字列からcasbinのルールをインポートする"""
    lines: list[str] = content.split("\n")
    import_casbin_rule(lines=lines)

同期処理

"""sqlalchemyのトランザクション管理を行うユーティリティクラス"""

from collections.abc import Generator
from contextlib import contextmanager

from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session, sessionmaker
from sqlalchemy.orm.session import Session as BaseSession

engine = create_engine(
    url="mysql+pymysql://sample_user:sample_sample_password@localhost:53306/sample_db",
    echo=False,
)

Session = scoped_session(sessionmaker(autocommit=False, autoflush=True, bind=engine))


@contextmanager
def session_scope() -> Generator[BaseSession, None, None]:
    session: BaseSession = Session()
    # これがないとセッションが閉じた後にORMモデルが期限切で参照できない
    session.expire_on_commit = False
    try:
        yield session
        session.commit()
    except Exception:
        session.rollback()
        raise
    finally:
        session.close()

"""同期処理にて認可判定する"""
from logging import getLogger

import casbin
from casbin_sqlalchemy_adapter import Adapter

from app import sync_session_utility

logger = getLogger(__name__)


# # Sampleデータのルールをクリアして登録
# 必要ならコメントアウトして実施してください。
# from pathlib import Path
# from app.utilitty import import_casbin_rule_from_file
# import_casbin_rule_from_file(
#     path=Path("./data/rbac_policy.csv"),
# )

def sync_auth_check() -> bool:
    """同期処理で認可チェックを行う。"""

    # トランザクション処理が不要ならこのWithは不要    
    with sync_session_utility.session_scope() as session:
        engine = session.bind
        adapter = Adapter(engine)
        e = casbin.Enforcer(
            model="./data/rbac_model.conf",
            adapter=adapter,
            enable_log=True,
        )

        # ポリシーのロード
        e.load_policy()
        # e.load_filtered_policy(filter={"v0": "bob"})

        # 認可チェック
        # e.enforceメソッドは同期、非同期ともに同期メソッドだが、get_named_implicit_permissions_for_userのようなRBACなどのそれぞれ違う。
        has_authz: bool = e.enforce("alice", "data1", "read")
        if has_authz is True:
            logger.info("権限あるよー")

            user_all_policies_including_role = e.get_named_implicit_permissions_for_user(ptype="p", user="alice")
            logger.info(user_all_policies_including_role)

            # ここにDBのトランザクション処理があれば記載する。

        else:
            logger.info("権限ないよー")

            # ここにDBのトランザクション処理があれば記載する。

        return has_authz


if __name__ == "__main__":
    sync_auth_check()
    

非同期処理

"""sqlalchemyのトランザクション管理を行うユーティリティクラス"""

import asyncio
import os
from collections.abc import AsyncGenerator
from contextlib import asynccontextmanager

from dotenv import load_dotenv
from sqlalchemy.ext.asyncio import (
    AsyncSession,
    async_scoped_session,
    async_sessionmaker,
    create_async_engine,
)

load_dotenv()

engine = create_async_engine(
    url=os.environ["ASYNC_DATABASE_URL"],
    echo=False,
)

Session = async_scoped_session(
    async_sessionmaker(
        autocommit=False,
        autoflush=True,
        bind=engine,
        # これがないとセッションが閉じた後にORMモデルが期限切で参照できない
        expire_on_commit=False,
    ),
    scopefunc=asyncio.current_task,
)


@asynccontextmanager
async def session_scope() -> AsyncGenerator[AsyncSession, None]:
    session: AsyncSession = Session()
    try:
        yield session
        await session.commit()
    except Exception:
        await session.rollback()
        raise
    finally:
        await session.close()

"""非同期処理にて認可判定する"""

import asyncio
from logging import getLogger

import casbin
from casbin_async_sqlalchemy_adapter import Adapter

from app import async_session_utility

logger = getLogger(__name__)


# # Sampleデータのルールをクリアして登録
# 必要ならコメントアウトして実施してください。
# from pathlib import Path
# from app.utilitty import import_casbin_rule_from_file
# import_casbin_rule_from_file(
#     path=Path("./data/rbac_policy.csv"),
# )

async def async_auth_check() -> bool:
    """非同期処理で認可チェックを行う"""

    # トランザクション処理が不要ならこのWithは不要
    async with async_session_utility.session_scope() as session:
        engine = session.bind
        adapter = Adapter(engine)
        e: casbin.AsyncEnforcer = casbin.AsyncEnforcer(
            model="./data/rbac_model.conf",
            adapter=adapter,
            enable_log=True,
        )

        # ポリシーのロード
        await e.load_policy()
        # await e.load_filtered_policy(filter={"v0": "bob"}

        # 認可チェック
        # e.enforceメソッドは同期、非同期ともに同期メソッドだが、get_named_implicit_permissions_for_userのようなRBACなどのそれぞれ違う。
        has_authz: bool = e.enforce("alice", "data1", "read")
        if has_authz is True:
            logger.info("権限あるよー")

            user_all_policies_including_role = await e.get_named_implicit_permissions_for_user(ptype="p", user="alice")
            logger.info(user_all_policies_including_role)

            # ここにDBのトランザクション処理があれば記載する。

        else:
            logger.info("権限ないよー")

            # ここにDBのトランザクション処理があれば記載する。

        return has_authz

if __name__ == "__main__":
    asyncio.run(async_auth_check())

まとめ

非同期処理にてcotextmanagerを使ったコーディングに少々ハマったので今回よい勉強になりました。ASGIが利用可能なアプリケーションサーバーでは、非同期にて実装することが多いと思います。次回ばcasbin_ruleテーブルを独自に拡張する方法をやってみたいと思います。

2
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?