#概要
imap4libを用いてGmailからpush通知でメッセージを引っ張ってくるものを作りました。
#環境
- python3.5.2
- Cent OS 7
#まずはGmailに接続してみる
最初にGmail側でIMAP接続を有効にしなくてはなりません。
Gmailのヘルプにドキュメントがありますので、こちらを参照してください。
https://support.google.com/mail/answer/7126229?hl=ja
準備が終わったら接続してみましょう
import imaplib
def connect_with_server():
imap_server = 'imap.gmail.com'
imap_port = '993'
imap_user = 'Username' #gmailのメールアドレス(e.g. なんとか@gmail.com)
imap_pass = 'Password' #gmailのログインパスワード
try:
imap = imaplib.IMAP4_SSL(imap_server,imap_port)
imap.login(imap_user,imap_pass)
except imaplib.IMAP4_SSL.error as e:
print(e)
return False
else:
return imap
これでconnect_with_server()
を呼び出すことで接続できるようになります。
補足
b'[ALERT] Please log in via your web browser: https://support.google.com/mail/accounts/answer/78754 (Failure)'
というエラーを吐く場合は以下を参照してみてください。
http://opendata.jp.net/?p=8498
#プッシュを受信する
プッシュは実際どうなっているのか
以下に簡単に流れを説明します。
1.プッシュはIDLE
コマンドをサーバーに送信するとスタートします。
2.IDLE
を受け取ったサーバーはクライアントに向けてメッセージを送ります。
3.Listenしているクライアントがメッセージに合わせて処理を行います。
4.DONE
を送信するとサーバーはIDLE
を終了します。
IDLE
後にサーバーがクライアントにメッセージを送信するタイミングは、何らかのアップデートがメールボックスに発生したときです。
つまり、新着メール到着時や他のクライアントによるEXPUNGE
コマンド時...etcとなります。
また、DONE
が送られるまでサーバーはクライアントからの要求に応答しません。
また、Listen中にタイムアウトになることがあるので、29分ごとに再度接続しなおすことが必要です。
プッシュ通信の例
以下は1番目の参考URLからの引用です。
Example: (前略)
C: A002 IDLE
S: + idling
...time passes; new mail arrives...
S: * 4 EXISTS
C: DONE
S: A002 OK IDLE terminated
...another client expunges message 2 now...
C: A003 FETCH 4 ALL
S: * 4 FETCH (...)
S: A003 OK FETCH completed
C: A004 IDLE
S: * 2 EXPUNGE
S: * 3 EXISTS
S: + idling
(以下略)
とりあえず
-
IDLE
を送信する - メッセージを待って、メッセージで条件分岐
- 有効なメッセージを受信したら
DONE
を送信してIDLE
を閉じる - その後にゴニョゴニョする
でよいのではないでしょうか
実装してみる
実際にプッシュを受信します。
imap = connect_with_server()
imap.select('inbox')
imap_idletag = imap._new_tag()
imap.send(b'%s IDLE\r\n'%(imap_idletag))
print('Waiting for a message...')
while True:
imap_line = imap.readline().strip().decode('utf-8');
if imap_line.startswith('* BYE ') or (len(imap_line) == 0):
print('Jumping out of a loop.')
flag = False
break
if imap_line.endswith('EXISTS'):
print('You got a message.')
imap.send(b'DONE\r\n')
imap_line = imap.readline().strip().decode('utf-8');
if imap_line.startswith('{} OK'.format(imap_idletag.decode('utf-8'))):
print('Terminating IDLE mode')
flag = True
else :
print('Failed to terminate')
flag = False
break
if flag == True:
#some codes...
else:
#some codes...
受信してみる
##厄介なエンコード関係
MIMEヘッダーから本文をデコードしてくれる関数を用意してみました。
Content-Type:text/plain
のみデコードし、後は無視します。
import email
import base64
import quopri
def get_info_from_header(headers):
mail_transfer_encoding = ''
mail_encoding = ''
for headinfo in headers:
if headinfo[0] == 'Content-Type':
charset = headinfo[1].split(';')[1]
if 'charset="' in charset:
mail_encoding = charset.split('charset="')[1]
mail_encoding = mail_encoding[:-1]
elif 'charset=' in charset:
mail_encoding = charset.split('charset=')[1]
else:
continue
elif headinfo[0] == 'Content-Transfer-Encoding':
mail_transfer_encoding = headinfo[1]
else:
continue
return mail_transfer_encoding , mail_encoding
def mail_to_txt(mail):
mail_transfer_encoding = ''
mail_encoding = 'ISO-2022-JP'
mail_body = ''
mail_data = email.message_from_string(mail)
if mail_data.is_multipart():
for payload in mail_data.get_payload():
if payload.get_content_type() == 'text/plain':
headers = payload._headers
mail_transfer_encoding,mail_encoding = get_info_from_header(headers)
mail_body = payload.get_payload()
else:
if mail_data.get_content_type() == 'text/plain':
mail_body = mail_data.get_payload()
headers = mail_data._headers
mail_transfer_encoding,mail_encoding = get_info_from_header(headers)
if mail_transfer_encoding == 'base64':
mail_body = base64.urlsafe_b64decode(mail_body.encode('ASCII'))
elif mail_transfer_encoding == 'quoted-printable':
mail_body = quopri.decodestring(mail_body.encode('ASCII'))
else:
mail_body = mail_body.encode('utf-8')
mail_body = mail_body.decode(mail_encoding)
return mail_body
あとはmail_to_txt()
にFetchで取得した後utf-8で変換されたデータを投げれば、おそらく読めるボディーが吐き出されるはずです。
#全体例
def main_routine():
text_list=[]
imap = connect_with_server()
imap.select('inbox')
imap_idletag = imap._new_tag()
imap.send(b'%s IDLE\r\n'%(imap_idletag))
print('Waiting for a message...')
while True:
imap_line = imap.readline().strip().decode('utf-8');
print(imap_line)
if imap_line.startswith('* BYE ') or (len(imap_line) == 0):
print('Jumping out of a loop.')
flag = False
break
if imap_line.endswith('EXISTS'):
print('You got a message.')
imap.send(b'DONE\r\n')
imap_line = imap.readline().strip().decode('utf-8');
if imap_line.startswith('{} OK'.format(imap_idletag.decode('utf-8'))):
print('Terminating IDLE mode')
flag = True
else :
print('Failed to terminate')
flag = False
break
if flag == True:
typ,data = imap.search(None,'UNSEEN')
if data[0] != '':
mail_ids = data[0].split()
for mail_id in mail_ids:
typ,data = imap.fetch(mail_id,'(RFC822)')
mail = data[0][1]
text_list.append(mail_to_txt(mail.decode('UTF-8')))
imap.store(mail_id,'+FLAGS','\\Seen')
else:
print('No unread e-mails')
print('Terminating FETCH')
else:
print('Jumped out')
imap.close()
imap.logout()
print('Logged out')
return text_list
if __name__ == '__main__':
while True:
mail_list = mail_routine()
for mail in mail_list:
print(mail)
実行結果は以下のようになります。
Waiting for a message...
+ idling
* 12 EXISTS
You got a message.
Terminating IDLE mode
Terminating FETCH
Logged out
テストテスト
Hello world!!
てすとてすと
#参考URL