5
4

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

PythonでのDB接続管理の良さそうな方法の紹介

Last updated at Posted at 2020-03-19

はじめに

とあるOSSのコードを眺めてて、良さそうなのがあったのでそれの紹介をば。

コード

とあるOSSというのは、Apache Airflow
この中で、airflow/utils/session.py がいい感じでした。

簡単な説明

まずは、 session.py から。

import contextlib
from functools import wraps

from airflow import settings

# contextlib.contextmanager を指定すると with を使って close を自動でやってくれる
@contextlib.contextmanager
def create_session():
    """
    Contextmanager that will create and teardown a session.
    """
    session = settings.Session()
    try:
        yield session
        session.commit()
    except Exception:
        session.rollback()
        raise
    finally:
        session.close()

# Sessionを使いたい function で使うといい感じに Session を補完してくれる
def provide_session(func):
    """
    Function decorator that provides a session if it isn't provided.
    If you want to reuse a session or run the function as part of a
    database transaction, you pass it to the function, if not this wrapper
    will create one and close it for you.
    """
    @wraps(func)
    def wrapper(*args, **kwargs):
        arg_session = 'session'

        func_params = func.__code__.co_varnames
        session_in_args = arg_session in func_params and \
            func_params.index(arg_session) < len(args)
        session_in_kwargs = arg_session in kwargs
        # function の引数で session があるならそれを使い、なかったら作るという処理
        if session_in_kwargs or session_in_args:
            return func(*args, **kwargs)
        else:
            with create_session() as session:
                kwargs[arg_session] = session
                return func(*args, **kwargs)

    return wrapper

で、次に使う側のコード。
airflow/models/baseoperator.py940行目にある

@provide_session
def get_task_instances(self, start_date: Optional[datetime] = None,
                        end_date: Optional[datetime] = None,
                        session: Session = None) -> List[TaskInstance]:
    """
    Get a set of task instance related to this task for a specific date
    range.
    """
    end_date = end_date or timezone.utcnow()
    return session.query(TaskInstance)\
        .filter(TaskInstance.dag_id == self.dag_id)\
        .filter(TaskInstance.task_id == self.task_id)\
        .filter(TaskInstance.execution_date >= start_date)\
        .filter(TaskInstance.execution_date <= end_date)\
        .order_by(TaskInstance.execution_date)\
        .all()

のように session を使う function でデコレータとして指定すると
呼び出し元で session を設定していない場合は新規に作成され(且つ終了時にcloseされる)
session を設定している場合はそれをそのまま使うといういい感じなことが実現できる。

終わりに

この方法はDB接続以外にも色々応用ができそうだなと思う。

参考

5
4
1

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
4

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?