1
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

LINE BOTで受け取った画像をNASに保存する実装方法

Last updated at Posted at 2024-12-28

はじめに

LINE BOTで画像を受信した際、その画像をNAS上に保存する実装方法を解説します。GAS(Google Apps Script)でLINEからのWebhookを受け取り、NAS上で動作するFlaskサーバーで画像を保存する構成です。

システム構成

  1. LINE Bot: ユーザーからの画像受信
  2. GAS: Webhookの受信と転送
  3. NAS(Flask): 画像の保存処理

実装のポイント

  • GAS側は最小限の情報(画像ID等)のみを送信
  • 画像の取得・保存はNAS側で実行
  • マルチパートフォームデータとJSONの両方に対応
  • 重複リクエストの防止機能を実装

直面した課題

  1. HTTPSの認証エラー

    • LINE Messaging APIは、WebhookエンドポイントにHTTPSを要求
    • NASで自己証明書を使用したHTTPSを設定しても、LINE側で認証エラー
    • Let's Encryptなどの正規の証明書取得も検討したが、NASの環境制約により断念
  2. 解決策の検討

    • 当初は「LINE → NAS直接通信」を目指した
    • 認証の問題により断念
    • GASを中継点として使用する案を採用

GASを中継点として採用した理由

  1. メリット

    • HTTPSの認証問題を回避できる
    • GASはLINE Messaging APIと相性が良い
    • NASへの通信は必要最小限のデータのみ
    • システムの柔軟性が向上
  2. デメリット

    • 構成が若干複雑になる
    • 処理に若干の遅延が発生

しかし、これらのデメリットは許容範囲内であり、認証問題を解決できるメリットの方が大きいと判断しました。

採用した構成の利点

  1. GASがWebhookを受け取り、必要な情報のみをNASに転送
  2. NASは画像IDを使って必要なタイミングで画像を取得
  3. 処理の分散により、システムの安定性が向上

この構成により、当初の目的である「LINE BOTで受け取った画像をNASに保存する」という要件を、実用的な形で実現することができました。

システム構成図

コード実装

1. GAS側の実装

function doPost(e) {
  try {
    var json = JSON.parse(e.postData.contents);
    var event = json.events[0];
    var userId = event.source.userId;

    // データを準備
    var webhookData = {
      'user': userId,
      'timestamp': new Date().toISOString(),
      'type': event.message.type,
      'message': event.message.type === 'text' ? event.message.text : event.message.id
    };

    // NASにPOSTリクエスト送信
    var options = {
      'method': 'post',
      'contentType': 'application/json',
      'payload': JSON.stringify(webhookData)
    };

    var response = UrlFetchApp.fetch("http://your-nas-domain:8083/webhook", options);
    return ContentService.createTextOutput(JSON.stringify({status: 'ok'}))
      .setMimeType(ContentService.MimeType.JSON);

  } catch(error) {
    console.error("Error:", error);
    return ContentService.createTextOutput(JSON.stringify({
      status: 'error',
      message: error.toString()
    })).setMimeType(ContentService.MimeType.JSON);
  }
}

2. NAS側の実装(Flask)

from flask import Flask, request, jsonify
import requests
from datetime import datetime
import os
import logging
from dotenv import load_dotenv

# 環境変数のロード
load_dotenv()
LINE_TOKEN = os.getenv("LINE_CHANNEL_ACCESS_TOKEN")
SAVE_DIR = os.getenv("SAVE_DIR", './line_uploads')

# ログ設定
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)

# グローバル変数として過去のリクエストを保持
processed_requests = {}

app = Flask(__name__)

def is_duplicate_request(request_id, timeout_minutes=5):
    """重複リクエストをチェック"""
    current_time = datetime.now()
    
    # 古いリクエストを削除
    for rid, timestamp in list(processed_requests.items()):
        if (current_time - timestamp).total_seconds() > timeout_minutes * 60:
            del processed_requests[rid]
    
    # 重複チェック
    if request_id in processed_requests:
        return True
    
    # 新しいリクエストを記録
    processed_requests[request_id] = current_time
    return False

@app.route('/webhook', methods=['POST'])
def webhook():
    try:
        logging.info(f"Headers: {dict(request.headers)}")
        content_type = request.headers.get('Content-Type', '')

        # マルチパートフォームデータの場合の処理
        if content_type.startswith('multipart/form-data'):
            logging.info("Reading raw data from request")
            raw_data = request.get_data()
            raw_str = raw_data.decode('utf-8', errors='ignore')
            logging.info(f"Raw data size: {len(raw_data)} bytes")

            # マルチパートデータから情報を抽出
            try:
                # データの各部分を分割
                parts = raw_str.split('\r\n')
                user_id = None
                timestamp = None
                image_data = None
                
                # 各部分を解析
                for i, part in enumerate(parts):
                    if 'name="user"' in part:
                        user_id = parts[i + 2]
                    elif 'name="timestamp"' in part:
                        timestamp = parts[i + 2]
                    elif 'name="image"' in part and 'filename="content.jpg"' in part:
                        # 画像データの開始位置を特定
                        image_start = raw_str.find('\r\n\r\n', raw_str.find('Content-type: image/jpeg')) + 4
                        image_data = raw_data[image_start:]

                logging.info(f"Extracted data - User: {user_id}, Timestamp: {timestamp}")

                if user_id and timestamp:
                    # 重複チェック
                    request_id = f"{timestamp}_{user_id}"
                    if is_duplicate_request(request_id):
                        logging.info(f"Duplicate request detected: {request_id}")
                        return jsonify({'status': 'ok', 'message': 'Duplicate request ignored'})

                if user_id and timestamp and image_data:
                    date_str = datetime.fromisoformat(timestamp.replace('Z', '+00:00')).strftime('%Y%m%d_%H%M%S')
                    save_image_data(user_id, date_str, image_data)
                    return jsonify({'status': 'ok', 'message': 'Image saved'})
                
            except Exception as e:
                logging.error(f"Error processing multipart data: {e}")
                return jsonify({'status': 'error', 'message': str(e)}), 400

            # 処理できなかった場合
            return jsonify({
                'status': 'error',
                'message': 'Could not process multipart data'
            }), 400

        # JSON形式の場合の処理
        elif request.is_json:
            data = request.json
            logging.info(f"Received JSON data: {data}")

            if not all(k in data for k in ['user', 'timestamp', 'type', 'message']):
                raise ValueError("Missing required fields in JSON data")

            # 重複チェック
            request_id = f"{data['timestamp']}_{data['user']}"
            if is_duplicate_request(request_id):
                logging.info(f"Duplicate request detected: {request_id}")
                return jsonify({'status': 'ok', 'message': 'Duplicate request ignored'})

            timestamp = datetime.fromisoformat(data['timestamp'].replace('Z', '+00:00'))
            date_str = timestamp.strftime('%Y%m%d_%H%M%S')

            if data['type'] == 'text':
                save_text(data['user'], date_str, data['message'])
            elif data['type'] == 'image':
                image_data = get_line_image(data['message'])
                save_image_data(data['user'], date_str, image_data)

            return jsonify({'status': 'ok'})

        else:
            logging.warning(f"Unsupported Content-Type: {content_type}")
            return jsonify({'status': 'error', 'message': 'Unsupported Content-Type'}), 415

    except Exception as e:
        logging.error(f"Error in webhook: {str(e)}", exc_info=True)
        return jsonify({'status': 'error', 'message': str(e)}), 500

# 残りの関数は変更なし

def get_line_image(message_id):
    """LINEから画像を取得"""
    url = f'https://api-data.line.me/v2/bot/message/{message_id}/content'
    headers = {'Authorization': f'Bearer {LINE_TOKEN}'}
    
    response = requests.get(url, headers=headers)
    if response.status_code != 200:
        raise Exception(f"Failed to get image from LINE: {response.status_code}")
    
    return response.content

def save_text(user_id, timestamp, text):
    """テキストメッセージを保存"""
    user_dir = os.path.join(SAVE_DIR, user_id)
    os.makedirs(user_dir, exist_ok=True)
    
    filename = f"{timestamp}.txt"
    filepath = os.path.join(user_dir, filename)
    
    with open(filepath, 'w', encoding='utf-8') as f:
        f.write(text)
    logging.info(f"Text saved: {filepath}")

def save_image_data(user_id, timestamp, image_data):
    """画像データを保存"""
    try:
        user_dir = os.path.join(SAVE_DIR, user_id)
        os.makedirs(user_dir, exist_ok=True)
        
        filename = f"{timestamp}.jpg"
        filepath = os.path.join(user_dir, filename)
        
        # 画像データのバリデーション
        if not image_data.startswith(b'\xff\xd8'):  # JPEGシグネチャチェック
            raise ValueError("Invalid JPEG data")
            
        with open(filepath, 'wb') as f:
            f.write(image_data)
        logging.info(f"Image saved: {filepath}")
        
        # ファイルサイズの確認
        file_size = os.path.getsize(filepath)
        logging.info(f"Saved file size: {file_size} bytes")
        
    except Exception as e:
        logging.error(f"Error saving image: {e}")
        raise

if __name__ == "__main__":
    if not LINE_TOKEN:
        raise ValueError("LINE_CHANNEL_ACCESS_TOKEN is not set in environment variables")
    
    os.makedirs(SAVE_DIR, exist_ok=True)
    logging.info(f"Save directory: {SAVE_DIR}")
    
    app.run(host='0.0.0.0', port=8083)

主要な機能

1. 重複リクエスト検出

  • タイムスタンプとユーザーIDの組み合わせでリクエストを識別
  • 5分以内の重複リクエストを検出して処理をスキップ

2. 画像データの検証

  • JPEGシグネチャのチェック
  • ファイルサイズの確認
  • 保存結果の検証

3. ディレクトリ構造

line_uploads/
  └── [USER_ID]/
      ├── YYYYMMDD_HHMMSS.jpg  # 画像ファイル
      └── YYYYMMDD_HHMMSS.txt  # テキストファイル

セットアップ方法

  1. 必要な環境変数の設定
LINE_CHANNEL_ACCESS_TOKEN="your_line_channel_token"
SAVE_DIR="./line_uploads"  # 保存先ディレクトリ
  1. 必要なパッケージのインストール
pip install flask requests python-dotenv
  1. Flaskサーバーの起動
python app.py

セキュリティ考慮事項

  • LINE Channel Access Tokenの適切な管理
  • 保存ディレクトリのアクセス権限設定
  • リクエストの検証
  • エラーハンドリングの実装

運用上の注意点

  • 開発サーバーではなく、本番環境ではgunicornなどのWSGIサーバーを使用
  • ログローテーションの設定
  • ディスク使用量の監視
  • 定期的なメンテナンス(古いファイルの削除など)

まとめ

この実装により、LINE BOTで受信した画像を安全かつ効率的にNASに保存することができます。GAS側での処理を最小限に抑え、主要な処理をNAS側で行うことで、システムの安定性と拡張性を確保しています。

参考文献

次のステップ

とりあえず、つぶやき入力部分はLINE-BOTで完成した。
何とか正月休み明けまでに、残り部分を実装して、完全アフェリエイト自動化を完成させたいと思う。

1
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?