2
0

メール送信プログラム

Last updated at Posted at 2024-03-23

変更履歴

  • Subjectをutf-8でエンコード
  • 添付ファイル名をiso-2022-jpでエンコード
  • Content-Disposition, Content-Typeヘッダ両方に添付ファイル情報を付与
  • 添付ファイル名をファイル名だけに抜き出し
  • Content-Typeをmimetypeによって、動的に値が変わるように修正
send_mail.py
#!/bin/python3
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.base import MIMEBase
from email import encoders
import argparse
import threading
from queue import Queue
from email.utils import formatdate
from email.header import Header
import os
import mimetypes

# コマンドライン引数の処理
parser = argparse.ArgumentParser(description='Send emails via SMTP on port 25 with optional TLS.')
parser.add_argument('smtp_server', nargs='?', default='localhost:25', help='SMTP server and port in the format server:port. Default is localhost:25.')
parser.add_argument('--no-tls', action='store_false', dest='tls', default=True, help='Disable TLS. Default is enabled.')
parser.add_argument('-n', '--num-emails', type=int, default=1, help='Total number of emails to send. Default is 1.')
parser.add_argument('-c', '--concurrency', type=int, help='Number of concurrent connections.')
parser.add_argument('--from', dest='from_addr', default='from@hoge.co.jp', help='Sender email address. Default is from@hoge.co.jp.')
parser.add_argument('--to', dest='to_addr', default='ec2-user@localhost', help='Receiver email address. Default is ec2-user@localhost.')
parser.add_argument('--subject', default='Test Email', help='Email subject. Default is "Test Email".')
parser.add_argument('--body', default='This is a test email.', help='Email body. Default is "This is a test email.".')
parser.add_argument('--attachment', help='Path to the attachment file.')
args = parser.parse_args()

# SMTPサーバーの設定を引数から取得
smtp_server, port = args.smtp_server.split(':')
port = int(port)  # ポート番号を整数に変換

def send_email(queue):
    while not queue.empty():
        index = queue.get()
        from_addr = args.from_addr
        to_addr = args.to_addr
        subject = args.subject
        body = args.body

        message = MIMEMultipart()
        message['From'] = from_addr
        message['To'] = to_addr
        message['Subject'] = Header(subject, 'utf-8').encode()
        message['Date'] = formatdate(localtime=True)  # Dateヘッダーを追加

        # 本文をUTF-8でエンコード
        text = MIMEText(body, 'plain', 'utf-8')
        message.attach(text)

        if args.attachment:
            filename = os.path.basename(args.attachment)
            mime_type, _ = mimetypes.guess_type(args.attachment)
            if mime_type is None:
                mime_type = 'application/octet-stream'  # デフォルトのMIMEタイプ

            maintype, subtype = mime_type.split('/', 1)

            try:
                with open(args.attachment, 'rb') as attachment:
                    part = MIMEBase(maintype, subtype)
                    part.set_payload(attachment.read())
                    encoders.encode_base64(part)

                    # ファイル名をISO-2022-JPでエンコードしてヘッダーに設定
                    filename_encoded = Header(filename, 'iso-2022-jp').encode()
                    part.add_header('Content-Disposition', f'attachment; filename="{filename_encoded}"')
                    part.replace_header('Content-Type', f'{mime_type}; name="{filename_encoded}"')

                    message.attach(part)
            except Exception as e:
                print(f"Error attaching file {filename}: {e}")
                queue.task_done()
                continue

        try:
            with smtplib.SMTP(smtp_server, port) as server:
                server.ehlo()  # サーバーに対してEHLOコマンドを送信
                if args.tls:
                    server.starttls()  # STARTTLSを使用してTLS接続を開始
                    server.ehlo()  # TLS接続後に再度EHLOコマンドを送信
                server.sendmail(from_addr, to_addr, message.as_string())
                print(f"Email {index} sent successfully")
        except Exception as e:
            print(f"Error sending email {index}: {e}")
        finally:
            queue.task_done()

def main():
    email_queue = Queue()
    for i in range(args.num_emails):
        email_queue.put(i+1)

    concurrency = args.concurrency if args.concurrency else args.num_emails
    threads = [threading.Thread(target=send_email, args=(email_queue,)) for _ in range(concurrency)]

    for thread in threads:
        thread.start()
    for thread in threads:
        thread.join()

if __name__ == "__main__":
    main()
2
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
0