新着メールを取得したかった
このお話は、現在筆者が作成中の自社システムを作る過程で、
メールサーバより定期的に送られてくるメールを取得し、補完するため奮闘記だ...
ほぼコピペなので、個人的には大した技術は使っておらず、
本人の技量はクソレベルである、悪しからずw
では、本題に移ろう...
仕様まとめ
- まずIPカメラより定期的に送られてくるメールを受信するサーバへアクセスする
- 続いて指定したアドレスの新着メールのみデータを受信し、内容別にデータベースへ補完する
- ここで補完するのは、最新のデータのみとする
- 更に、タイトルや本題から各種データを精査し、適正箇所に最適化して置くコト
と言う感じだ...今回は、仕様1-3までを満たしたソースコードを置いておくことにする。
もし必要に応じて、githubなどのサービスへあげることも可能だ。
欲しい方は、ぜひTwitterをフォローして知らせてくれると喜んで、お譲りしますw
いかがコードだ!スプラトゥーーーーーン!!イカだけにw
mailer/__init__.py
# !/bin/python3.7
# -*- coding: utf-8 -*-
import os, sys, tempfile, base64, email, poplib, ssl, sqlite3, urllib, io
from email.header import decode_header, make_header
from email.utils import parsedate_to_datetime
from importlib import import_module
from logging import getLogger
import logging.config
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
class mail_server:
def __init__(self,HOST="",NGCB=("ssl", 995)):
self.host = HOST
self.nego_combo = NGCB
if self.nego_combo[0] == "no-encrypt":
self.popclient = poplib.POP3(self.host, self.nego_combo[1], timeout=10)
elif self.nego_combo[0] == "starttls":
self.context = ssl.create_default_context()
self.popclient = poplib.POP3(self.host, self.nego_combo[1], timeout=10)
self.popclient.stls(self.context)
elif self.nego_combo[0] == "ssl":
self.context = ssl.create_default_context()
self.popclient = poplib.POP3_SSL(self.host, self.nego_combo[1], timeout=10, context=self.context)
self.db = sqlite3.connect('mail.db')
# self.popclient.set_debuglevel(2) # サーバとの問い合わせ内容確認用
def run(self):
content = self.popclient.retr(self.msg_num)[1]
msg = email.message_from_bytes(b'\r\n'.join(content))
from_ = self.get_header(msg, 'From')
date_hdr = self.get_header(msg, 'Date')
if date_hdr:
date = parsedate_to_datetime(date_hdr)
else:
date = None
subject = self.get_header(msg, 'Subject')
content = self.get_content(msg)
return (uidl, subject, content, from_, date)
def get_header(self, msg, name):
header = ''
if msg[name]:
for tup in decode_header(str(msg[name])):
if type(tup[0]) is bytes:
charset = tup[1]
if charset:
header += tup[0].decode(tup[1])
else:
header += tup[0].decode()
elif type(tup[0]) is str:
header += tup[0]
return header
def get_content(self, msg):
charset = msg.get_content_charset()
payload = msg.get_payload(decode=True)
try:
if payload:
if charset:
return payload.decode(charset)
else:
return payload.decode()
else:
return ""
except:
return payload
def update(self):
self.msg_num = self.popclient.stat()[0]
return self.msg_num
def fetchmail(self, msg_no):
try:
content = self.popclient.retr(msg_no)[1]
uidl = self.popclient.uidl(msg_no).decode().split(' ')[-1]
msg, cnt = "", ""
for i in range(len(content)):
cnt += b'\r\n'.join(content[i])
msg = email.message_from_bytes(cnt)
from_ = self.get_header(msg, 'From')
date_hdr = self.get_header(msg, 'Date')
if date_hdr:
date = parsedate_to_datetime(date_hdr)
else:
date = None
subject = self.get_header(msg, 'Subject')
content = self.get_content(msg)
return (uidl, subject, content, from_, date)
except:
print(sys.exc_info()[0],sys.exc_info()[1])
def receive_all(self):
newmail = self.find_newmail()
count = len(newmail)
c = self.db.cursor()
for mail in newmail:
try:
msg = self.fetchmail(mail[0])
c.execute("""
INSERT INTO mail (uidl, subject, content, sender, sent_at)
VALUES (?, ?, ?, ?, ?)
""", msg)
print('Date: %s, From: %s, Subject: %s' % (msg[4], msg[3], msg[1]))
except:
print(sys.exc_info()[0],sys.exc_info()[1])
continue
self.db.commit()
c.close()
def find_newmail(self):
uidl = self.popclient.uidl()[1]
remote_uidl = list(map(lambda elm: elm.decode().split(' '), self.popclient.uidl()[1]))
c = self.db.cursor()
res = c.execute('SELECT uidl FROM mail').fetchall()
local_uidl = map(lambda tup: tup[0], res)
new_uidl = set(map(lambda elm: elm[-1], remote_uidl)) - set(local_uidl)
return list(filter(lambda elm: elm[1] in new_uidl, remote_uidl))
def setup(self):
c = self.db.cursor()
c.execute('CREATE TABLE IF NOT EXISTS mail (uidl text, subject text, content text, sender text, sent_at timestamp, created_at default current_timestamp)')
self.db.commit()
c.close()
def login(self, UNAME="",PASSWD="",AUTH_METHOD=""):
self.username = f"{UNAME}"
self.password = f"{PASSWD}"
self.auth_method = f"{AUTH_METHOD}"
if self.auth_method == "user":
self.popclient.user(self.username)
self.popclient.pass_(self.password)
elif self.auth_method == "apop":
self.popclient.apop(self.username, self.password)
elif self.auth_method == "rpop":
self.popclient.rpop(self.username)
self.popclient.pass_(self.password)
def __enter__(self):
self.setup()
return self
def __exit__(self, exception_type, exception_value, traceback):
self.popclient.quit()
return True
if __name__ == '__main__':
# Test:
host = "メールサーバドメイン"
nego_combo = ("ssl", 995) # ("通信方式", port番号)
username = "アカウント名"
password = "パスワード"
auth_method = "user" # 認証方法
with mail_server(host, nego_combo) as ms:
try:
ms.login(username,password,auth_method)
print(ms.receive_all())
except:
pass
以上だ。
なお、withで呼び出し可能なクラスになっているから、普通に簡易なシステムへも簡単に組み込み可能だ!
ただ問題もある...画像や添付データには未対応なため、クソコードだと言うことだ!