はじめに
とあるOSSのコードを眺めてて、良さそうなのがあったのでそれの紹介をば。
コード
とあるOSSというのは、Apache Airflow。
この中で、airflow/utils/session.py がいい感じでした。
簡単な説明
まずは、 session.py
から。
import contextlib
from functools import wraps
from airflow import settings
# contextlib.contextmanager を指定すると with を使って close を自動でやってくれる
@contextlib.contextmanager
def create_session():
"""
Contextmanager that will create and teardown a session.
"""
session = settings.Session()
try:
yield session
session.commit()
except Exception:
session.rollback()
raise
finally:
session.close()
# Sessionを使いたい function で使うといい感じに Session を補完してくれる
def provide_session(func):
"""
Function decorator that provides a session if it isn't provided.
If you want to reuse a session or run the function as part of a
database transaction, you pass it to the function, if not this wrapper
will create one and close it for you.
"""
@wraps(func)
def wrapper(*args, **kwargs):
arg_session = 'session'
func_params = func.__code__.co_varnames
session_in_args = arg_session in func_params and \
func_params.index(arg_session) < len(args)
session_in_kwargs = arg_session in kwargs
# function の引数で session があるならそれを使い、なかったら作るという処理
if session_in_kwargs or session_in_args:
return func(*args, **kwargs)
else:
with create_session() as session:
kwargs[arg_session] = session
return func(*args, **kwargs)
return wrapper
で、次に使う側のコード。
airflow/models/baseoperator.py の 940行目にある
@provide_session
def get_task_instances(self, start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
session: Session = None) -> List[TaskInstance]:
"""
Get a set of task instance related to this task for a specific date
range.
"""
end_date = end_date or timezone.utcnow()
return session.query(TaskInstance)\
.filter(TaskInstance.dag_id == self.dag_id)\
.filter(TaskInstance.task_id == self.task_id)\
.filter(TaskInstance.execution_date >= start_date)\
.filter(TaskInstance.execution_date <= end_date)\
.order_by(TaskInstance.execution_date)\
.all()
のように session
を使う function でデコレータとして指定すると
呼び出し元で session
を設定していない場合は新規に作成され(且つ終了時にcloseされる)
session
を設定している場合はそれをそのまま使うといういい感じなことが実現できる。
終わりに
この方法はDB接続以外にも色々応用ができそうだなと思う。