3
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

pythonで新着メールを取得したかったと言うお話

3
Posted at

新着メールを取得したかった

このお話は、現在筆者が作成中の自社システムを作る過程で、
メールサーバより定期的に送られてくるメールを取得し、補完するため奮闘記だ...

ほぼコピペなので、個人的には大した技術は使っておらず、
本人の技量はクソレベルである、悪しからずw

では、本題に移ろう...

仕様まとめ

  1. まずIPカメラより定期的に送られてくるメールを受信するサーバへアクセスする
  2. 続いて指定したアドレスの新着メールのみデータを受信し、内容別にデータベースへ補完する
  3. ここで補完するのは、最新のデータのみとする
  4. 更に、タイトルや本題から各種データを精査し、適正箇所に最適化して置くコト

と言う感じだ...今回は、仕様1-3までを満たしたソースコードを置いておくことにする。
もし必要に応じて、githubなどのサービスへあげることも可能だ。
欲しい方は、ぜひTwitterをフォローして知らせてくれると喜んで、お譲りしますw

いかがコードだ!スプラトゥーーーーーン!!イカだけにw

mailer/__init__.py
# !/bin/python3.7
# -*- coding: utf-8 -*-

import os, sys, tempfile, base64, email, poplib, ssl, sqlite3, urllib, io
from email.header import decode_header, make_header
from email.utils import parsedate_to_datetime
from importlib import import_module
from logging import getLogger
import logging.config

sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')

class mail_server:
    def __init__(self,HOST="",NGCB=("ssl", 995)):
        self.host       = HOST
        self.nego_combo = NGCB

        if self.nego_combo[0] == "no-encrypt":
            self.popclient = poplib.POP3(self.host, self.nego_combo[1], timeout=10)
        elif self.nego_combo[0] == "starttls":
            self.context = ssl.create_default_context()
            self.popclient = poplib.POP3(self.host, self.nego_combo[1], timeout=10)
            self.popclient.stls(self.context)
        elif self.nego_combo[0] == "ssl":
            self.context = ssl.create_default_context()
            self.popclient = poplib.POP3_SSL(self.host, self.nego_combo[1], timeout=10, context=self.context)
        self.db = sqlite3.connect('mail.db')
        # self.popclient.set_debuglevel(2) # サーバとの問い合わせ内容確認用

    def run(self):
        content = self.popclient.retr(self.msg_num)[1]
        msg = email.message_from_bytes(b'\r\n'.join(content))
        from_ = self.get_header(msg, 'From')
        date_hdr = self.get_header(msg, 'Date')
        if date_hdr:
            date = parsedate_to_datetime(date_hdr)
        else:
            date = None
        subject = self.get_header(msg, 'Subject')
        content = self.get_content(msg)
        return (uidl, subject, content, from_, date)

    def get_header(self, msg, name):
        header = ''
        if msg[name]:
            for tup in decode_header(str(msg[name])):
                if type(tup[0]) is bytes:
                    charset = tup[1]
                    if charset:
                        header += tup[0].decode(tup[1])
                    else:
                        header += tup[0].decode()
                elif type(tup[0]) is str:
                    header += tup[0]
        return header

    def get_content(self, msg):
        charset = msg.get_content_charset()
        payload = msg.get_payload(decode=True)
        try:
            if payload:
                if charset:
                    return payload.decode(charset)
                else:
                    return payload.decode()
            else:
                return ""
        except:
            return payload

    def update(self):
        self.msg_num = self.popclient.stat()[0]
        return self.msg_num

    def fetchmail(self, msg_no):
        try:
            content  = self.popclient.retr(msg_no)[1]
            uidl     = self.popclient.uidl(msg_no).decode().split(' ')[-1]
            msg, cnt = "", ""
            for i in range(len(content)):
                cnt += b'\r\n'.join(content[i])
            msg  = email.message_from_bytes(cnt)
            from_    = self.get_header(msg, 'From')
            date_hdr = self.get_header(msg, 'Date')
            if date_hdr:
                date = parsedate_to_datetime(date_hdr)
            else:
                date = None
            subject = self.get_header(msg, 'Subject')
            content = self.get_content(msg)
            return (uidl, subject, content, from_, date)
        except:
            print(sys.exc_info()[0],sys.exc_info()[1])

    def receive_all(self):
        newmail = self.find_newmail()
        count = len(newmail)
        c = self.db.cursor()
        for mail in newmail:
            try:
                msg = self.fetchmail(mail[0])
                c.execute("""
                    INSERT INTO mail (uidl, subject, content, sender, sent_at)
                    VALUES (?, ?, ?, ?, ?)
                    """, msg)
                print('Date: %s, From: %s, Subject: %s' % (msg[4], msg[3], msg[1]))
            except:
                print(sys.exc_info()[0],sys.exc_info()[1])
                continue
            self.db.commit()
        c.close()

    def find_newmail(self):
        uidl = self.popclient.uidl()[1]
        remote_uidl = list(map(lambda elm: elm.decode().split(' '), self.popclient.uidl()[1]))
        c = self.db.cursor()
        res = c.execute('SELECT uidl FROM mail').fetchall()
        local_uidl = map(lambda tup: tup[0], res)
        new_uidl = set(map(lambda elm: elm[-1], remote_uidl)) - set(local_uidl)
        return list(filter(lambda elm: elm[1] in new_uidl, remote_uidl))

    def setup(self):
        c = self.db.cursor()
        c.execute('CREATE TABLE IF NOT EXISTS mail (uidl text, subject text, content text, sender text, sent_at timestamp, created_at default current_timestamp)')
        self.db.commit()
        c.close()

    def login(self, UNAME="",PASSWD="",AUTH_METHOD=""):
        self.username     = f"{UNAME}"
        self.password     = f"{PASSWD}"
        self.auth_method  = f"{AUTH_METHOD}"

        if self.auth_method == "user":
            self.popclient.user(self.username)
            self.popclient.pass_(self.password)
        elif self.auth_method == "apop":
            self.popclient.apop(self.username, self.password)
        elif self.auth_method == "rpop":
            self.popclient.rpop(self.username)
            self.popclient.pass_(self.password)
    def __enter__(self):
        self.setup()
        return self
    def __exit__(self, exception_type, exception_value, traceback):
        self.popclient.quit()
        return True

if __name__ == '__main__':

    # Test:
    host        = "メールサーバドメイン"
    nego_combo  = ("ssl", 995)       # ("通信方式", port番号)
    username    = "アカウント名"
    password    = "パスワード"
    auth_method = "user" # 認証方法
    with mail_server(host, nego_combo) as ms:
        try:
            ms.login(username,password,auth_method)
            print(ms.receive_all())
        except:
            pass

以上だ。
なお、withで呼び出し可能なクラスになっているから、普通に簡易なシステムへも簡単に組み込み可能だ!
ただ問題もある...画像や添付データには未対応なため、クソコードだと言うことだ!

参照先:Python の標準ライブラリで新着メールだけを受信してDBに保存する

3
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?