5
6

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Python 3 で日本語のアドレス(CC含む)、件名、添付ファイルでメール送信するサンプル

Posted at

Pythonで日本語のメールを送信するまとまったサンプルが見つからなかったので作成。

from logging import getLogger
import logging
import re, os

import smtplib
from email.mime.text import MIMEText
from email.mime.base import MIMEBase
from email.mime.multipart import MIMEMultipart
from email.utils import formatdate
from email import encoders
from email.header import Header
from mimetypes import guess_type

LOG_FORMAT = '%(asctime)s %(filename)-20s %(levelname)8s %(lineno)5s %(message)s'

logger = getLogger(__name__)

def decorate_function_simple(func):
    def new_function(*args, **kwargs):
        logger.debug('Running function [' + func.__name__ + ']')
        result = func(*args, **kwargs)
        logger.debug('Exit function [' + func.__name__ + ']')
        return result

    return new_function

@decorate_function_simple
def write_file(path_write_file, contents, mode='w'):

    logger.info('write file [' + path_write_file + ']')
    try:
        with open(path_write_file, mode) as fd:
            fd.write(contents)
    except Exception as e:
        logger.critical('Failed to write file')
        return False

    return True


@decorate_function_simple
def read_file(path_read_file, mode='r'):

    try:
        with open(path_read_file, mode) as stream:
            file_contents = stream.read()
    except Exception as e:
        logger.critical('Failed to read file')
        return False, ''

    return True, file_contents


def encode_mail_address(mail_addresses):

    for index, mail_address in enumerate(mail_addresses):
        match = re.search('([^<]+) <(.+)>', mail_address)
        if match is None:
            logger.critical('Cannot parse mail address {}'.format(mail_address))
            return False
        mail_addresses[index] = '{} <{}>'.format(Header(match.group(1), 'utf-8').encode(), match.group(2))

    return mail_addresses


def send_mail(host_mail_server,
              password,
              from_address,
              to_address,
              cc_address,
              body,
              subject,
              attaches,
              directory_attaches,
              server_port=465,
              dump_mail_source=False,
              file_path_emls='',
              ):

    # get user id from mail address
    user_id = re.search('([^< ]+)@', from_address).group(1)

    if attaches:
        logger.debug('making multi part mail')
        mime = MIMEMultipart()
        msg = MIMEText(_text=body, _subtype='plain', _charset='utf-8')
        mime.attach(msg)
    else:
        logger.debug('making single part mail')
        mime = MIMEText(_text=body, _subtype='plain', _charset='utf-8')

    # Subject
    mime['Subject'] = Header(subject, 'utf-8')

    # make from address
    from_address_encoded = encode_mail_address([from_address])[0]
    if not from_address_encoded:
        return False
    mime['From'] = from_address_encoded

    # make to addresses
    to_addresses = to_address.split(',') # split always return list
    to_addresses_encoded = encode_mail_address(to_addresses)
    if not to_addresses_encoded:
        return False
    mime['To'] = ', '.join(to_addresses_encoded)

    # make cc addresses
    cc_addresses = []
    if cc_address:
        cc_addresses = cc_address.split(',')
        logger.info('this mail has cc addresses {}'.format(cc_address))
        cc_addresses_encoded = encode_mail_address(cc_addresses)
        if not cc_addresses_encoded:
            return False
        mime['Cc'] = ', '.join(cc_addresses_encoded)
        # add cc addresses to to addresses
        to_addresses_encoded = to_addresses_encoded + cc_addresses_encoded

    logger.info('from address {}'.format(from_address_encoded))
    logger.info('to   address {}'.format(to_addresses_encoded))
    mime['Date'] = formatdate(localtime=True)

    if attaches:
        logger.info('this mail has attachments')
        for attach in attaches:
            logger.info('attach file name {}'.format(attach))

            # get mime type from attach file name
            mime_type, encoding = guess_type(attach)
            if mime_type is None:
                logger.info('mime type guess failed, use application/octet-stream')
                part = MIMEBase('application', "octet-stream")
            else:
                logger.info('mime type {}'.format(mime_type))
                mime_types = mime_type.split('/', 1)
                part = MIMEBase(mime_types[0], mime_types[1])

            # get attach file (binary read)
            file_path_attach = directory_attaches + attach
            result, file_contents = read_file(file_path_attach, 'rb')
            if not result:
                logger.critical('Failed to read attach file')
                return False

            part.set_payload(file_contents)
            encoders.encode_base64(part)

            # make encoded file name
            attach_filename = Header(attach, 'utf-8').encode()
            part.set_param('name', attach_filename)
            logger.debug('attach file name {}'.format(attach_filename))
            part.add_header("Content-Disposition", "attachment", filename=attach_filename)

            mime.attach(part)
    else:
        logger.info('this mail has no attachments')

    # dump mail source
    if dump_mail_source:
        # make output directory
        logger.info('dumping mail source')
        logger.info('mkdir {}'.format(file_path_emls))
        try:
            os.makedirs(file_path_emls, exist_ok=True)
        except Exception as e:
            logger.critical('Failed to mkdir output directory')
            return False

        # replace spaces : to _ for file name
        title = subject
        title = title.replace(' ', '_')
        title = title.replace(':', '_')

        # write eml file
        file_path_eml = '{}cyder_{}_{}.eml'.format(file_path_emls, to_address, title)
        result = write_file(file_path_eml, mime.as_string())
        if not result:
            logger.critical('Failed to dump mail source.')
            return False

    # log into mail server and send mail
    mail_server = None
    try:
        logger.debug('now trying to log in to the mail server {}:{} {}'.format(host_mail_server, server_port, user_id))
        # connect to the mail server
        mail_server = smtplib.SMTP_SSL(host_mail_server, server_port)
        # login to the mail server
        mail_server.login(user_id, password)
        logger.debug('OK, logged in, trying to send mail')
        # send the message via postfix
        mail_server.sendmail(from_address_encoded, to_addresses_encoded, mime.as_string())
        logger.debug('OK, sent')

    except smtplib.SMTPHeloError as e:
        logger.critical('Failed to send mail [{msg}]'.format(msg=str(e)))
        return False
    except smtplib.SMTPAuthenticationError as e:
        logger.critical('Failed to send mail [{msg}]'.format(msg=str(e)))
        return False
    except smtplib.SMTPNotSupportedError as e:
        logger.critical('Failed to send mail [{msg}]'.format(msg=str(e)))
        return False
    except smtplib.SMTPRecipientsRefused as e:
        logger.critical('Failed to send mail [{msg}]'.format(msg=str(e)))
        return False
    except smtplib.SMTPHeloError as e:
        logger.critical('Failed to send mail [{msg}]'.format(msg=str(e)))
        return False
    except smtplib.SMTPSenderRefused as e:
        logger.critical('Failed to send mail [{msg}]'.format(msg=str(e)))
        return False
    except smtplib.SMTPDataError as e:
        logger.critical('Failed to send mail [{msg}]'.format(msg=str(e)))
        return False
    except smtplib.SMTPNotSupportedError as e:
        logger.critical('Failed to send mail [{msg}]'.format(msg=str(e)))
        return False
    except smtplib.SMTPNotSupportedError as e:
        logger.critical('Failed to send mail [{msg}]'.format(msg=str(e)))
        return False
    except smtplib.SMTPException as e:
        logger.critical('Failed to send mail [{msg}]'.format(msg=str(e)))
        return False
    except Exception as e:
        logger.critical('Failed to send mail [{msg}]'.format(msg=str(e)))
        return False
    finally:
        if not (mail_server is None):
            mail_server.close()

    return True

def main():

    logging.basicConfig(level=logging.DEBUG, format=LOG_FORMAT)
    send_mail(
        'mailserver.yourdomain.jp', 
        'password', 
        'ふろむ <user@.yourdomain.jp>',
        'ふう <foo@gmail.com>, ばあ <bar@yahoo.co.jp>',
        'ありす <alice@gmail.com>, ぼぶ <bob@yahoo.co.jp>',
        'テストメール 本文',
        'テストメール 件名',
        ['添付ファイル1.xlsx', '添付ファイル2.pdf'],
        '/Users/youracccount/mailtest/'
    )


if __name__ == '__main__':
    main()
5
6
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
6

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?