バックオフィスが生成AIでがんばるシリーズです。今回はプライベートで利用しているakippaの予約情報取得からGoogleカレンダー転記の自動化についてやっていきます。
1回目はこちら
Lambdaにレイヤーを準備する
今回GoogleカレンダーAPIを使用するのにあたり、Google系のライブラリをレイヤーで使用します。
ちなみに私LambdaでPythonを使用する際にライブラリはレイヤー機能で使用するというのを初めて知りました。
Windows11でPowershellを使用してレイヤーに使用するzipファイルを作成していきます。
- 作業用ディレクトリを作成します
mkdir lambda_project cd lambda_project
- 仮想環境を作成して有効化する
python -m venv venv venv\Scripts\activate
- 必要なライブラリをインストールする
pip install google-auth google-auth-oauthlib google-auth-httplib2 google-api-python-client -t .
- pythonフォルダをzipに圧縮します。任意のファイル名でOKです
- Lambdaのメニューからレイヤーを選択します
- レイヤーの作成から、レイヤー名称は任意、zipファイルは先ほど4.で作成したファイルをアップロード。ランタイムはPythonを選択して作成ボタンを押下します。
- 作成中の関数に戻り、コードタブの最下段「レイヤー」で「レイヤーの追加」から先ほど作成したレイヤーを選択します。
これで、関数からGoogleの認証やカレンダーAPIが呼び出せるようになります。
コードを記述する
こちらはいつものとおりChatGPT 4o で作成してもらいました。毎度のことですが、紆余曲折あってすんなりとは動きません。が、完成したのが以下のコードとなります。
import json
import os
import csv
import urllib.request
import urllib.parse
from datetime import datetime, timedelta
from http.cookiejar import CookieJar
from google.oauth2 import service_account
from googleapiclient.discovery import build
import re
import uuid
def lambda_handler(event, context):
try:
print("Starting the lambda function")
# 環境変数からユーザー名とパスワード、カレンダーIDを取得
username = os.environ['username']
password = os.environ['pass']
calendar_id = os.environ['calendar_id']
# 環境変数からJSONキーを取得
credentials_json = os.environ['GOOGLE_APPLICATION_CREDENTIALS_JSON']
print("Credentials JSON obtained")
# JSONキーのデコード
try:
credentials_dict = json.loads(credentials_json)
print("Credentials JSON decoded")
except Exception as decode_error:
return {
'statusCode': 500,
'body': f"JSONデコードエラー: {str(decode_error)}"
}
# 認証情報の作成
try:
credentials = service_account.Credentials.from_service_account_info(credentials_dict)
print("Credentials created")
except Exception as credentials_error:
return {
'statusCode': 500,
'body': f"認証情報作成エラー: {str(credentials_error)}"
}
# GoogleカレンダーAPIの呼び出し
try:
service = build('calendar', 'v3', credentials=credentials)
print("GoogleカレンダーAPI client created")
except Exception as service_error:
return {
'statusCode': 500,
'body': f"GoogleカレンダーAPI呼び出しエラー: {str(service_error)}"
}
# 日付の設定
from_date = datetime.now().strftime('%Y-%m-%d')
to_date = (datetime.now() + timedelta(days=30)).strftime('%Y-%m-%d')
# ログイン情報とURL
login_url = 'https://owner.akippa.com/auth/login'
download_url = 'https://owner.akippa.com/reservation/download'
# クッキーを保持するためのオブジェクトを作成
cj = CookieJar()
opener = urllib.request.build_opener(urllib.request.HTTPCookieProcessor(cj))
# ログインページにアクセスしてCSRFトークンを取得
login_page_response = opener.open(login_url)
login_page_content = login_page_response.read().decode('utf-8')
csrf_token_match = re.search(r'name="csrf-token" content="(.+?)"', login_page_content)
if not csrf_token_match:
return {
'statusCode': 500,
'body': 'CSRFトークンの取得に失敗しました'
}
csrf_token = csrf_token_match.group(1)
print(f"CSRF token obtained: {csrf_token}")
# ログインデータをエンコード
login_data = urllib.parse.urlencode({
'email': username,
'password': password,
'_token': csrf_token
}).encode('utf-8')
# ログインリクエストを作成
login_request = urllib.request.Request(login_url, data=login_data)
login_response = opener.open(login_request)
# ログインが成功したかチェック
if login_response.getcode() == 200:
print("Logged in successfully")
# CSVデータをダウンロード
query_params = urllib.parse.urlencode({
'keyword': '',
'organization_path': '/86457/',
'parking_id': '',
'from': from_date,
'to': to_date
})
csv_url = f"{download_url}?{query_params}"
csv_request = urllib.request.Request(csv_url, headers={'Referer': login_url})
csv_response = opener.open(csv_request)
if csv_response.getcode() == 200:
print("CSV data downloaded")
# CSVデータのパースとフィルタリング
csv_content = csv_response.read().decode('shift_jis', errors='ignore') # Shift-JISを使用し、エラーを無視
csv_reader = csv.reader(csv_content.splitlines(), delimiter=',')
next(csv_reader) # ヘッダーをスキップ
events = []
for i, row in enumerate(csv_reader):
print(f"Processing row {i}: {row}")
if len(row) < 16: # カラム数をチェック
print(f"Row {i} is invalid: {row}")
continue
# キャンセルされた予約の処理
if row[3] != '-':
title_prefix = 'キャンセル:'
else:
title_prefix = ''
# 利用日時のパース
use_dates = row[2].split('〜')
if len(use_dates) != 2:
print(f"Row {i} has invalid date format: {row[2]}")
continue
start_date = parse_date(use_dates[0])
end_date = parse_date(use_dates[1])
print(f"Start date parsed: {start_date}")
print(f"End date parsed: {end_date}")
# ユニークなIDを生成(base32hexエンコード使用)
unique_id = uuid.uuid4().hex
print(f"Generated unique ID: {unique_id}")
events.append({
'summary': f"{title_prefix}{row[10]}", # 車種情報
'description': f"予約ID: {row[8]}, 開始日時: {start_date}, 車種: {row[9]}, 車種情報: {row[10]}, ナンバー: {row[11]}",
'start': {'dateTime': start_date, 'timeZone': 'Asia/Tokyo'},
'end': {'dateTime': end_date, 'timeZone': 'Asia/Tokyo'},
'id': unique_id # ユニークなIDを使用
})
# Googleカレンダーにイベントを追加
add_events_to_calendar(events, calendar_id, credentials_dict)
return {
'statusCode': 200,
'body': 'CSVファイルを処理し、Googleカレンダーにイベントを追加しました'
}
else:
return {
'statusCode': 500,
'body': 'CSVデータの取得に失敗しました'
}
else:
return {
'statusCode': 500,
'body': 'ログインに失敗しました'
}
except Exception as e:
print(f"Unknown error: {str(e)}")
return {
'statusCode': 500,
'body': str(e)
}
def parse_date(date_str):
# 日付フォーマットのパース
date_str = re.sub(r'(.*?)', ' ', date_str) # 曜日の部分を半角スペースに置き換え
date_str = re.sub(r'(\d{4}/\d{1,2}/\d{1,2})(\d{1,2}:\d{2})', r'\1 \2', date_str) # 日付と時間の間にスペースを追加
return datetime.strptime(date_str, '%Y/%m/%d %H:%M').isoformat() + '+09:00'
def add_events_to_calendar(events, calendar_id, credentials_dict):
# GoogleカレンダーAPIの認証
credentials = service_account.Credentials.from_service_account_info(credentials_dict, scopes=['https://www.googleapis.com/auth/calendar'])
service = build('calendar', 'v3', credentials=credentials)
for event in events:
try:
print(f"Processing event ID: {event['id']}")
# 予約IDを含むイベントを検索
query = f"予約ID: {event['description'].split(',')[0].split(': ')[1]}"
existing_events = service.events().list(calendarId=calendar_id, q=query).execute().get('items', [])
target_event = None
for existing_event in existing_events:
event_start_datetime = existing_event['start']['dateTime']
if datetime.fromisoformat(event_start_datetime) == datetime.fromisoformat(event['start']['dateTime']):
target_event = existing_event
break
if target_event:
print(f"Found existing event: {target_event}")
if 'キャンセル' in event['summary']:
# キャンセルを反映
target_event['summary'] = event['summary']
service.events().update(calendarId=calendar_id, eventId=target_event['id'], body=target_event).execute()
print(f"Updated event {target_event['id']} to cancelled.")
else:
# 新しいイベントを作成
service.events().insert(calendarId=calendar_id, body=event).execute()
print(f"Created new event {event['id']}")
except Exception as e:
print(f"Failed to process event {event['id']}: {str(e)}")
コードをコピペし、テストを準備しましょう
実際に予約があってCSVがダウンロードできる状態でないとテストは難しいかと思います。
akippaのCSVのおかしな仕様
さて、このプロジェクトを進めていくなかで、akippaの予約一覧CSVには理解しがたい仕様があることに気づきました。
複数予約を一度に取得した際には、各レコードの利用日時がおかしくなる
akippaでは一度に複数日の予約が可能で、例えばa日とb日とc日を1回の手続きで予約できます。
この場合レコードはa日、b日、c日の3レコードできるのですが、それぞれの利用日時の開始日時はa日の開始日時で、終了時刻がc日の終了時刻となるんです。簡単に示すと以下のような形になります。
利用日,利用日時
a日, a日開始時刻~c日終了時刻
b日, a日開始時刻~c日終了時刻
c日, a日開始時刻~c日終了時刻
利用日時のカラム、なんなんだこの仕様は…。利用日はわかるものの、開始時刻と終了時刻は全然意味のない情報になってます。本来であれば以下のようになっているべきです。
利用日,利用日時
a日, a日開始時刻~a日終了時刻
b日, b日開始時刻~b日終了時刻
c日, c日開始時刻~c日終了時刻
サポートに確認しましたが、これは仕様のようです。
ですので、今回のコードではこの仕様はそのまま処理され、a日開始時刻からc日終了時刻までの長い予定が作成されてしまいます。この点についてはCSVファイル内の情報だけでは解決ができず、Webでスクレイピングしないかぎりは把握が難しいため、このままとしています。
akippa側のCSV仕様がきちんと修正されることを願います。
予約通知メールでLambda関数を起動させる
さてこのLambda関数、何をトリガーに実行させるかですが、最初は5分ごとなどで定期実行していました。Lambdaは100万回まで無料ですし、それでも構わないのですが、予約メールが来たときだけ実行することもできます。
そちらは拙記事の「GmailトリガーでAWSのLambdaを起動する」をご覧ください。
現在私のほうではGASが5分ごとに定期実行され、予約メールを発見した場合にLambdaが実行されてGoogleカレンダーに転記されるようになっています。