備忘録✍️
redis clientを独立したファイルで定義して、どこでもimportで呼び出して使えるように。
redis_client.py
# redis_client.py
import os
from redis import Redis
from dotenv import load_dotenv
load_dotenv()
# redisの接続設定
REDIS_HOST = os.getenv("REDIS_HOST", "redis")
REDIS_PORT = int(os.getenv("REDIS_PORT", 6379))
redis_client = Redis(host=REDIS_HOST, port=REDIS_PORT)
環境変数
# redis
REDIS_HOST=redis
REDIS_PORT=6379
呼び出し先でやること(変数等は開発当時まま)
# redis clientのimport
from app.services.redis_client import redis_client
# ...他コード省略...
def get_daily_report(slack_user_id: str, start_date:date, end_date:date):
start_datetime = datetime.combine(start_date, datetime.min.time())
end_datetime = datetime.combine(end_date, datetime.max.time())
logger.debug(f"start_data: {start_date} => start: {start_datetime}")
logger.debug(f"end_date: {end_date} => end: {end_datetime}")
# キャッシュキーの生成
cache_key = f"daily_report:{slack_user_id}:{start_date}:{end_date}"
# キャッシュからデータを取得
cached_report = redis_client.get(cache_key)
if cached_report:
logger.debug("◆キャッシュから日報データを取得しました。")
return cached_report
# データベースから指定したユーザーの指定期間分の日報データを取得する
db = get_db()
try:
target_daily_report = db.query(DailyReport).filter(
and_(
DailyReport.slack_user_id == slack_user_id,
DailyReport.created_at >= start_datetime,
DailyReport.created_at <= end_datetime
)
).all()
logger.debug("◆DBから正常に日報データを取得できました。")
response = compile_daily_report_data(target_daily_report)
# キャッシュに保存(有効期限12時間)
redis_client.set(cache_key, response, ex=43200)
return response
except Exception as e:
logger.error(f"◆daily_reportの取得中にエラーが発生しました。: {e}")
return[]
finally:
db.close()