#はじめに
PythonとGmail APIを使用して、受信トレイに保存されている対象のメールの件名と本文を取得して、ファイルに保存していきます。
#前提条件
・Google Cloud Platformプロジェクトを作成し、Gmail APIを有効にする。
・Gmail APIを利用するための認証情報の取得。
以下の記事が参考になります。
https://qiita.com/muuuuuwa/items/822c6cffedb9b3c27e21
#実行環境
Python 3.9
必要なライブラリ
・google-api-python-client
・google-auth-httplib2
・google-auth-oauthlib
#ソースコード
今回使用したコードはGitHubよりダウンロードできます。
https://github.com/kirinnsan/backup-gmail
Dockerfileも併せてアップロードしているので、Dockerが使える場合は、pip install でライブラリをインストールする必要はありません。
##認証
作成した認証情報はclient_id.jsonというファイル名で同じディレクトリに配置します。
認証フローは、InstalledAppFlowクラスで実装されており、ユーザが指示された認証URLを開き、認証コードを取得してコンソールに貼り付けるrun_consoleと、ウェブサーバを用いて認証を行うrun_local_serverの2種類のメソッドが用意されています。
今回は、run_consoleメソッドを用いて認証を行っています。
初回の認証に成功すると、アクセストークンと更新トークンが格納されたtoken.pickleがディレクトリ内に作成されます。以降はこちらを使用して認証が行われます。
import pickle
import os.path
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
from google.auth.exceptions import GoogleAuthError
def authenticate(scope):
creds = None
# The file token.pickle stores the user's access and refresh tokens, and is
# created automatically when the authorization flow completes for the first
# time.
if os.path.exists('token.pickle'):
with open('token.pickle', 'rb') as token:
creds = pickle.load(token)
# If there are no (valid) credentials available, let the user log in.
if not creds or not creds.valid:
try:
if creds and creds.expired and creds.refresh_token:
creds.refresh(Request())
else:
flow = InstalledAppFlow.from_client_secrets_file(
'client_id.json', scope)
creds = flow.run_console()
except GoogleAuthError as err:
print(f'action=authenticate error={err}')
raise
# Save the credentials for the next run
with open('token.pickle', 'wb') as token:
pickle.dump(creds, token)
return creds
##Google APIを使用したメールの取得
Gmail APIを使用して、受信トレイのメールのリストを取得するメソッドと対象メールの件名と本文を取得するメソッドを実装しています。
メールのリストには、最大取得件数や検索条件を指定することができます。
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
import util
class ApiClient(object):
def __init__(self, credential):
self.service = build('gmail', 'v1', credentials=credential)
def get_mail_list(self, limit, query):
# Call the Gmail API
try:
results = self.service.users().messages().list(
userId='me', maxResults=limit, q=query).execute()
except HttpError as err:
print(f'action=get_mail_list error={err}')
raise
messages = results.get('messages', [])
return messages
def get_subject_message(self, id):
# Call the Gmail API
try:
res = self.service.users().messages().get(userId='me', id=id).execute()
except HttpError as err:
print(f'action=get_message error={err}')
raise
result = {}
subject = [d.get('value') for d in res['payload']['headers'] if d.get('name') == 'Subject'][0]
result['subject'] = subject
# Such as text/plain
if 'data' in res['payload']['body']:
b64_message = res['payload']['body']['data']
# Such as text/html
elif res['payload']['parts'] is not None:
b64_message = res['payload']['parts'][0]['body']['data']
message = util.base64_decode(b64_message)
result['message'] = message
return result
以下は、base64 エンコーディングされた本文をデコードする処理と取得したメッセージを保存する処理になります。ファイルは、指定したディレクトリにメールの件名.txtの形で保存されます。
import base64
import os
def base64_decode(b64_message):
message = base64.urlsafe_b64decode(
b64_message + '=' * (-len(b64_message) % 4)).decode(encoding='utf-8')
return message
def save_file(base_dir, result):
os.makedirs(base_dir, exist_ok=True)
file_name = base_dir + '/' + result['subject'] + '.txt'
with open(file_name, mode='w') as f:
f.write(result['message'])
##メイン処理
以下が、実行部分のソースコードになります。処理の流れとしては、
- 認証
- 最大取得件数と検索条件をもとにメールのリストを取得
- 各メールのIDから対象メールの件名と本文を取得
-
件名.txtの形でファイルに保存
になります。
from __future__ import print_function
import auth
from client import ApiClient
import util
# If modifying these scopes, delete the file token.pickle.
SCOPES = ['https://www.googleapis.com/auth/gmail.readonly']
# Number of emails retrieved
MAIL_COUNTS = 5
# Search criteria
SEARCH_CRITERIA = {
'from': "test@gmail.com",
'to': "",
'subject': "メールの件名"
}
BASE_DIR = 'mail_box'
def build_search_criteria(query_dict):
query_string = ''
for key, value in query_dict.items():
if value:
query_string += key + ':' + value + ' '
return query_string
def main():
creds = auth.authenticate(SCOPES)
query = build_search_criteria(SEARCH_CRITERIA)
client = ApiClient(creds)
messages = client.get_mail_list(MAIL_COUNTS, query)
if not messages:
print('No message list.')
else:
for message in messages:
message_id = message['id']
# get subject and message
result = client.get_subject_message(message_id)
# save file
util.save_file(BASE_DIR, result)
if __name__ == '__main__':
main()
今回は、最大取得件数を5件、受信者がtest@gmail.com、件名がメールの件名を検索条件にしています。受信者を指定する場合は、from:test@gmail.comやfrom:花子の形で設定します。件名の場合はsubject:件名の形で設定します。
以下の公式ページにGmailで使用可能な条件や使い方が記載されています。
https://support.google.com/mail/answer/7190
取得したメールは、mail_boxディレクトリ内に保存されるようにしています。
アプリを実行します。
python3 main.py
実行すると以下のように、コンソールから認証URLを開くよう指示あるのでURLを開きます。
URLを開くと、以下の画面になるので、詳細→安全でないページに移動をクリックします。
コードが表示されるので、コピーして、コンソールのEnter the authorization codeの部分に貼り付けます。
認証が成功すれば、後続処理が実行され、メールが保存されます。
#最後に
公式リファレンスに、Pythonを使用した場合のアプリのサンプルが載っているので、こちらも参考にするとよいです。
https://developers.google.com/gmail/api/quickstart/python
検索条件を指定して対象のメールを保存することができました。今回は、受信したメールのタイトルと本文を保存するだけでしたが、Gmail APIには、他にも様々なAPIが用意されているので、方法次第で色々できそうです。