個人的な備忘録メモです。
プロジェクト内で、DBで取得したデータってメモリないにキャッシュしておきたいなと思うことありませんか?
でも、DBの記録されているデータって常時変わる可能性があるし、いちいちサービスを止めれない。
じゃ、redisか、いやlru_cacheをうまく活用できないかと上司との擦った揉んだ(私は聞いていた)の上、
以下のページでどうやらできるらしい。
一番参考になったのは、ほぼろさんの記事ですね。ほぼろさんの記事ではstackoverflowの内容を保管してくれていてすごく勉強になりました。
それらを踏まえて完成したのは以下のコード
import functools
from datetime import datetime, timedelta
from threading import Lock
def timed_lru_cache(seconds=300, maxsize=128, typed=False):
"""
時間制約付きlru_cache
デフォルト300秒後、再キャッシュする。
"""
def wrapper_cache(func):
func = functools.lru_cache(maxsize=maxsize, typed=typed)(func)
func.ttl = timedelta(seconds=seconds)
func.expiration = datetime.utcnow() + func.ttl
@functools.wraps(func)
def wrapped_func(*args, **kwargs):
with Lock():
if datetime.utcnow() >= func.expiration:
func.cache_clear()
func.expiration = datetime.utcnow() + func.ttl
return func(*args, **kwargs)
wrapped_func.clear_cache = func.cache_clear
wrapped_func.cache_info = func.cache_info
return wrapped_func
return wrapper_cache
#以下使用例
@timed_lru_cache(secconds=10)
def hoge():
print('hoge')
return True
>>>_hoge = hoge()
>>>hoge
>>>_hoge = hoge()
>>>_hoge = hoge()
>>>_hoge = hoge()
>>>_hoge = hoge()
>>>hoge
こんな感じで使えるようです。
2年経ってるのにまだまだ未熟者だなと痛感しました。