設定したアドレスに対して、PCのスクリーンキャプチャーと接続されているUSB CAMERAの撮影画像を、設定した時間間隔毎に自動送信する
import tkinter as tk
from tkinter import messagebox, ttk, filedialog
import pyautogui
import cv2
import os
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.image import MIMEImage
from PIL import Image
import threading
import time
from datetime import datetime
import json
import shutil
# 設定ファイルのパス
CONFIG_FILE = "config.json"
# グローバル変数
timer_running = False
timer_thread = None
countdown_thread = None
def capture_screen(save_folder=None):
"""画面キャプチャを取得してファイルに保存"""
screenshot = pyautogui.screenshot()
filename = f"screenshot_{datetime.now().strftime('%Y%m%d_%H%M%S')}.png"
if save_folder and save_var.get():
save_path = os.path.join(save_folder, filename)
os.makedirs(save_folder, exist_ok=True)
screenshot.save(save_path)
temp_path = os.path.join(os.getcwd(), filename)
screenshot.save(temp_path)
return temp_path
def capture_camera(camera_id, save_folder=None):
"""指定されたカメラIDから画像を取得"""
cap = cv2.VideoCapture(camera_id)
if not cap.isOpened():
return None
ret, frame = cap.read()
if ret:
filename = f"camera_{camera_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.png"
if save_folder and save_var.get():
save_path = os.path.join(save_folder, filename)
os.makedirs(save_folder, exist_ok=True)
cv2.imwrite(save_path, frame)
temp_path = os.path.join(os.getcwd(), filename)
cv2.imwrite(temp_path, frame)
cap.release()
return temp_path
cap.release()
return None
def send_email(to_emails, bcc_emails, screen_file, cam_files):
"""メールを送信"""
msg = MIMEMultipart()
msg["From"] = smtp_user_var.get()
msg["To"] = ", ".join(to_emails) if to_emails else ""
msg["Bcc"] = ", ".join(bcc_emails) if bcc_emails else ""
msg["Subject"] = f"Capture Report {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
now = datetime.now()
body = f"以下は{now.month}月{now.day}日{now.hour}時{now.minute}分のキャプチャ結果です:\n\n"
if screen_file:
body += "画面キャプチャ: 取得済み\n"
else:
body += "画面キャプチャ: 取得失敗\n"
for i in range(3):
if cam_files[i]:
body += f"カメラ ID {i}: 取得済み\n"
else:
body += f"カメラ ID {i}: カメラ接続なし\n"
msg.attach(MIMEText(body, "plain"))
if screen_file:
with open(screen_file, "rb") as f:
img = MIMEImage(f.read())
img.add_header(
"Content-Disposition",
"attachment",
filename=os.path.basename(screen_file),
)
msg.attach(img)
for cam_file in cam_files:
if cam_file:
with open(cam_file, "rb") as f:
img = MIMEImage(f.read())
img.add_header(
"Content-Disposition",
"attachment",
filename=os.path.basename(cam_file),
)
msg.attach(img)
smtp_server = smtp_server_var.get()
smtp_port = int(smtp_port_var.get())
smtp_user = smtp_user_var.get()
smtp_pass = smtp_pass_var.get()
server = smtplib.SMTP(smtp_server, smtp_port)
server.starttls()
server.login(smtp_user, smtp_pass)
server.send_message(msg)
server.quit()
def update_countdown(start_time, interval_seconds):
"""カウントダウン表示を更新"""
global timer_running
while timer_running:
elapsed = time.time() - start_time
remaining = max(0, interval_seconds - elapsed)
minutes, seconds = divmod(int(remaining), 60)
root.after(
0,
countdown_label.config,
{"text": f"次送信まで: {minutes:02d}:{seconds:02d}"},
)
time.sleep(1)
if remaining <= 0:
start_time = time.time()
def process_capture_and_send(to_emails, bcc_emails, save_folder=None):
"""キャプチャと送信の処理を共通化"""
screen_file = capture_screen(save_folder)
cam_files = [capture_camera(i, save_folder) for i in range(3)]
try:
send_email(to_emails, bcc_emails, screen_file, cam_files)
root.after(
0,
status_label.config,
{"text": f"送信完了: {datetime.now().strftime('%H:%M:%S')}"},
)
except Exception as e:
root.after(0, status_label.config, {"text": f"送信エラー: {str(e)}"})
if screen_file and os.path.exists(screen_file):
os.remove(screen_file)
for cam_file in cam_files:
if cam_file and os.path.exists(cam_file):
os.remove(cam_file)
return time.time()
def timer_task():
"""タイマー処理"""
global timer_running
interval_minutes = interval_var.get()
try:
interval_seconds = int(interval_minutes) * 60
if interval_seconds <= 0:
raise ValueError
except ValueError:
messagebox.showerror("エラー", "タイマー時間は正の数値を入力してください")
timer_running = False
return
to_emails = []
bcc_emails = []
for i in range(10):
email = email_vars[i].get().strip()
if email and "@" in email:
if method_vars[i].get() == "TO":
to_emails.append(email)
else:
bcc_emails.append(email)
if not (to_emails or bcc_emails):
messagebox.showerror(
"エラー", "少なくとも1つの有効なメールアドレスを入力してください"
)
timer_running = False
return
save_folder = save_folder_var.get() if save_var.get() else None
start_time = process_capture_and_send(to_emails, bcc_emails, save_folder)
root.after(
0,
status_label.config,
{"text": f"初回送信完了: {datetime.now().strftime('%H:%M:%S')}"},
)
global countdown_thread
countdown_thread = threading.Thread(
target=update_countdown, args=(start_time, interval_seconds)
)
countdown_thread.daemon = True
countdown_thread.start()
while timer_running:
time.sleep(interval_seconds)
if not timer_running:
break
start_time = process_capture_and_send(to_emails, bcc_emails, save_folder)
def start_timer():
"""タイマー開始"""
global timer_running, timer_thread
if not timer_running:
if not smtp_user_var.get() or "@" not in smtp_user_var.get():
messagebox.showerror(
"エラー", "有効なSMTPユーザー(メールアドレス)を入力してください"
)
return
if not smtp_server_var.get() or not smtp_pass_var.get():
messagebox.showerror("エラー", "SMTPサーバーとパスワードを入力してください")
return
try:
int(smtp_port_var.get())
except ValueError:
messagebox.showerror("エラー", "SMTPポートは数値を入力してください")
return
if save_var.get() and not save_folder_var.get():
messagebox.showerror("エラー", "保存先フォルダーを指定してください")
return
timer_running = True
timer_thread = threading.Thread(target=timer_task)
timer_thread.daemon = True
timer_thread.start()
start_button.config(state="disabled")
stop_button.config(state="normal")
status_label.config(text="タイマー開始")
def save_config():
"""設定をJSONファイルに保存"""
config = {
"emails": [email_vars[i].get() for i in range(10)],
"methods": [method_vars[i].get() for i in range(10)],
"interval": interval_var.get(),
"smtp_server": smtp_server_var.get(),
"smtp_port": smtp_port_var.get(),
"smtp_user": smtp_user_var.get(),
"smtp_pass": smtp_pass_var.get(),
"save_images": save_var.get(),
"save_folder": save_folder_var.get(),
}
with open(CONFIG_FILE, "w", encoding="utf-8") as f:
json.dump(config, f, ensure_ascii=False, indent=4)
def load_config():
"""設定をJSONファイルから読み込み"""
if os.path.exists(CONFIG_FILE):
with open(CONFIG_FILE, "r", encoding="utf-8") as f:
config = json.load(f)
for i in range(10):
email_vars[i].set(config.get("emails", [""] * 10)[i])
method_vars[i].set(config.get("methods", ["TO"] * 10)[i])
interval_var.set(config.get("interval", "5"))
smtp_server_var.set(config.get("smtp_server", "smtp.gmail.com"))
smtp_port_var.set(config.get("smtp_port", "587"))
smtp_user_var.set(config.get("smtp_user", "your_email@example.com"))
smtp_pass_var.set(config.get("smtp_pass", "your_app_password"))
save_var.set(config.get("save_images", False))
save_folder_var.set(config.get("save_folder", ""))
def stop_timer():
"""タイマー停止"""
global timer_running, timer_thread, countdown_thread
timer_running = False
if timer_thread and timer_thread.is_alive():
timer_thread.join(timeout=2.0)
timer_thread = None
if countdown_thread and countdown_thread.is_alive():
countdown_thread.join(timeout=2.0)
countdown_thread = None
start_button.config(state="normal")
stop_button.config(state="disabled")
status_label.config(text="タイマー停止")
countdown_label.config(text="次送信まで: --:--")
save_config()
def select_save_folder():
"""保存先フォルダーを選択"""
folder = filedialog.askdirectory()
if folder:
save_folder_var.set(folder)
# GUIの作成
root = tk.Tk()
root.title("キャプチャ&メール送信ツール")
root.geometry("500x750") # 追加項目分サイズを調整
# 変数
email_vars = [tk.StringVar() for _ in range(10)]
method_vars = [tk.StringVar(value="TO") for _ in range(10)]
interval_var = tk.StringVar(value="5")
smtp_server_var = tk.StringVar(value="smtp.gmail.com")
smtp_port_var = tk.StringVar(value="587")
smtp_user_var = tk.StringVar(value="your_email@example.com")
smtp_pass_var = tk.StringVar(value="your_app_password")
save_var = tk.BooleanVar(value=False) # 画像保存のON/OFF
save_folder_var = tk.StringVar(value="")
# 設定を読み込み
load_config()
# 送信先メールアドレス入力(10個)
tk.Label(root, text="送信先メールアドレス (最大10個):").pack(pady=5)
for i in range(10):
frame = tk.Frame(root)
frame.pack(pady=2)
tk.Entry(frame, textvariable=email_vars[i], width=30).pack(side=tk.LEFT)
ttk.Combobox(
frame,
textvariable=method_vars[i],
values=["TO", "BCC"],
state="readonly",
width=5,
).pack(side=tk.LEFT, padx=5)
tk.Label(root, text="タイマー間隔(分):").pack(pady=5)
tk.Entry(root, textvariable=interval_var, width=10).pack()
tk.Label(root, text="SMTPサーバー:").pack(pady=5)
tk.Entry(root, textvariable=smtp_server_var, width=40).pack()
tk.Label(root, text="SMTPポート:").pack(pady=5)
tk.Entry(root, textvariable=smtp_port_var, width=10).pack()
tk.Label(root, text="SMTPユーザー(メールアドレス):").pack(pady=5)
tk.Entry(root, textvariable=smtp_user_var, width=40).pack()
tk.Label(root, text="SMTPパスワード:").pack(pady=5)
tk.Entry(root, textvariable=smtp_pass_var, width=40, show="*").pack()
# 画像保存設定
tk.Label(root, text="画像保存設定:").pack(pady=5)
tk.Checkbutton(root, text="画像を保存する", variable=save_var).pack()
frame_save = tk.Frame(root)
frame_save.pack(pady=2)
tk.Entry(frame_save, textvariable=save_folder_var, width=40).pack(side=tk.LEFT)
tk.Button(frame_save, text="選択", command=select_save_folder).pack(
side=tk.LEFT, padx=5
)
start_button = tk.Button(root, text="開始", command=start_timer)
start_button.pack(pady=10)
stop_button = tk.Button(root, text="停止", command=stop_timer, state="disabled")
stop_button.pack(pady=5)
status_label = tk.Label(root, text="待機中")
status_label.pack(pady=5)
countdown_label = tk.Label(root, text="次送信まで: --:--")
countdown_label.pack(pady=5)
root.protocol("WM_DELETE_WINDOW", stop_timer) # ウィンドウ閉じる時にタイマー停止
root.mainloop()