5
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

[AWS][Bedrock] 動画を文字起こしして要約するまで

5
Last updated at Posted at 2025-12-22

はじめに

この記事ではAWSサービスを使って、動画ファイル(mp4)から文字起こしと要約をする機能を作成します。

この記事の対象者

  • AWSでLambdaを使ったことがある人
  • Bedrockを使ってみたい人
  • Transcribeを使ってみたい人
  • Step FunctionsでLambdaの連携処理を組んでみたい人

この記事を書いた理由

お仕事で文字起こしと要約を組み合わせる処理をしたいと希望をいただき、そのための検証を行いました。BedrockでいろいろなAIモデルが使えるということはなんとなく知ってはいたのですが、APIの場合どうやって使うか、利用した場合ログがどうなるかなど不明点を潰すために検証をしています。

私はAWSコンソールでBedrockとTranscribeは試したことがあり、Step Functionsを触ったのは今回初めてです。

準備物

  • AWSアカウント
  • aws cli/sam cliの利用環境
  • 文字起こしする動画(mp4)

動画ファイルは実際に文字起こしと要約内容を精度を確認したかったため、会社のインターン生をインタビューした際に録画したデータ(Google Meetの録画)を利用しました。

AWS利用サービス

()は利用用途

  • Amazon Bedrock (要約)
  • Amazon Transcribe (文字起こし)
  • API Gateway (API)
  • Lambda (Bedrock, Transcribeの実行)
  • Step Functions (Lambda実行管理)
  • S3 (動画、文字起こし、要約ファイルの管理)

実行フロー

今回は文字起こしから要約を検証するため、下記の実行フローで作成を行いました

  1. S3に文字起こしする動画配置
  2. APIでS3のパスをリクエスト
  3. Lambdaで文字起こしファイルを作成しS3に保存
  4. Lambdaで文字起こししたファイルから要約ファイルを作成しS3に保存

APIは検証用のため今回は認証を設定しないで作成します

全体図

architecture_1.drawio.png

実践

最初にS3にバケットを作成しサンプル動画をセットアップします

例: s3://sample-bucket/items/sample.mp4

文字起こし

文字起こし用のLambdaとして test-transcribe-functionをPython 3.14環境で作成します。

動画時間によりますが時間が多少かかるためタイムアウトを15分で設定しています。

Transcribeによる文字起こし時に言語、話者数を指定可能なため日本語と最大5人話者としてプログラムを作成します。

import boto3
import json
import os
import time
import urllib.request
import urllib.parse
from datetime import datetime

# クライアント初期化
transcribe = boto3.client('transcribe')
s3 = boto3.client('s3')

# 環境変数(デフォルト値設定)
OUTPUT_BUCKET = os.environ.get('OUTPUT_BUCKET', 'sample-bucket')
OUTPUT_PREFIX = os.environ.get('OUTPUT_PREFIX', 'transcripts/')

def lambda_handler(event, context):
    """
    S3イベントトリガーまたは手動実行
    動画をTranscribeで文字起こし→結果をS3に保存
    """
    try:
        # イベントの種類判定
        if 'Records' in event:
            # S3イベントトリガー
            bucket = event['Records'][0]['s3']['bucket']['name']
            key = urllib.parse.unquote_plus(event['Records'][0]['s3']['object']['key'])
        elif 'bucket' in event and 'key' in event:
            # 手動実行(テスト用)
            bucket = event['bucket']
            key = event['key']
        
        print(f"処理開始: s3://{bucket}/{key}")
        
        # 1. Transcribeジョブ開始
        job_name = start_transcription_job(bucket, key)
        
        # 2. 完了待機
        transcript_uri = wait_for_transcription(job_name)
        
        # 3. 文字起こし結果取得
        transcript_json = get_transcript(transcript_uri)
        
        # 4. 話者情報付きに整形
        formatted_result = format_transcript_with_speakers(transcript_json)
        
        # 5. S3に保存
        output_key = f"{OUTPUT_PREFIX}{os.path.splitext(os.path.basename(key))[0]}_transcript.json"
        save_to_s3(formatted_result, OUTPUT_BUCKET, output_key)
        
        print(f"完了: s3://{OUTPUT_BUCKET}/{output_key}")
        
        return {
            'statusCode': 200,
            'body': json.dumps({
                'message': '文字起こし完了',
                'input': f"s3://{bucket}/{key}",
                'output': f"s3://{OUTPUT_BUCKET}/{output_key}",
                'transcriptKey': output_key,
                'bucket': OUTPUT_BUCKET,
                'job_name': job_name,
                'segments_count': len(formatted_result.get('segments', [])),
                'speakers': formatted_result.get('statistics', {}).get('speakers', [])
            }, ensure_ascii=False)
        }
        
    except Exception as e:
        print(f"エラー: {str(e)}")
        import traceback
        traceback.print_exc()
        
        return {
            'statusCode': 500,
            'body': json.dumps({
                'error': str(e),
                'error_type': type(e).__name__
            }, ensure_ascii=False)
        }

def start_transcription_job(bucket_name, video_key):
    """Transcribeジョブを開始"""
    # ジョブ名生成(英数字とハイフンのみ)
    timestamp = str(int(time.time()))
    safe_key = video_key.replace('/', '-').replace('.', '-').replace('_', '-')
    job_name = f"transcribe-{safe_key}-{timestamp}"[:200]
    
    video_uri = f"s3://{bucket_name}/{video_key}"
    
    # ファイル拡張子から形式判定
    file_extension = video_key.split('.')[-1].lower()
    media_format_map = {
        'mp4': 'mp4',
    }
    media_format = media_format_map.get(file_extension, 'mp4')
    
    try:
        response = transcribe.start_transcription_job(
            TranscriptionJobName=job_name,
            Media={'MediaFileUri': video_uri},
            MediaFormat=media_format,
            LanguageCode='ja-JP',
            Settings={
                'ShowSpeakerLabels': True,
                'MaxSpeakerLabels': 5
            }
        )
        
        print(f"Transcribeジョブ開始: {job_name}")
        print(f"入力: {video_uri}")
        print(f"フォーマット: {media_format}")
        
        return job_name
        
    except Exception as e:
        print(f"Transcribeジョブ開始エラー: {str(e)}")
        raise

def wait_for_transcription(job_name, max_wait_time=840):
    """
    Transcribeジョブの完了を待機
    Lambda実行時間制限: 900秒(15分)を考慮して840秒でタイムアウト
    """
    print("文字起こし処理中...")
    start_time = time.time()
    check_interval = 10  # 10秒ごとにチェック
    
    while True:
        elapsed_time = time.time() - start_time
        
        # タイムアウトチェック
        if elapsed_time > max_wait_time:
            raise Exception(f"タイムアウト: {max_wait_time}秒以内に完了しませんでした。ジョブ名: {job_name}")
        
        try:
            response = transcribe.get_transcription_job(
                TranscriptionJobName=job_name
            )
            
            status = response['TranscriptionJob']['TranscriptionJobStatus']
            print(f"ステータス: {status} (経過時間: {int(elapsed_time)}秒)")
            
            if status == 'COMPLETED':
                print("文字起こし完了!")
                transcript_uri = response['TranscriptionJob']['Transcript']['TranscriptFileUri']
                return transcript_uri
                
            elif status == 'FAILED':
                failure_reason = response['TranscriptionJob'].get('FailureReason', '不明')
                raise Exception(f"Transcribeジョブ失敗: {failure_reason}")
            
        except transcribe.exceptions.BadRequestException as e:
            raise Exception(f"Transcribeジョブが見つかりません: {job_name}")
        
        time.sleep(check_interval)

def get_transcript(transcript_uri):
    """文字起こし結果をHTTPで取得"""
    print(f"文字起こし結果取得: {transcript_uri}")
    
    try:
        with urllib.request.urlopen(transcript_uri) as response:
            transcript_json = json.loads(response.read())
        return transcript_json
        
    except Exception as e:
        print(f"文字起こし結果取得エラー: {str(e)}")
        raise

def format_transcript_with_speakers(transcript_json):
    """
    Transcribe結果を話者情報付きで整形
    """
    result = {
        'job_name': transcript_json.get('jobName'),
        'account_id': transcript_json.get('accountId'),
        'status': transcript_json.get('status'),
        'created_at': datetime.now().isoformat(),
        'full_transcript': '',
        'segments': []
    }
    
    # 全文テキスト
    transcripts = transcript_json.get('results', {}).get('transcripts', [])
    if transcripts:
        result['full_transcript'] = transcripts[0].get('transcript', '')
    
    # 話者情報付きセグメント
    segments = transcript_json['results'].get('speaker_labels', {}).get('segments', [])
    items = transcript_json['results'].get('items', [])
    
    for segment in segments:
        speaker = segment['speaker_label']
        start_time = float(segment['start_time'])
        end_time = float(segment['end_time'])
        
        # セグメント内のテキストを取得
        segment_items = []
        for item in items:
            if 'start_time' in item:
                item_start = float(item['start_time'])
                if start_time <= item_start <= end_time:
                    segment_items.append(item)
        
        # テキスト結合
        text = ' '.join([
            item['alternatives'][0]['content'] 
            for item in segment_items 
            if item.get('alternatives')
        ])
        
        result['segments'].append({
            'speaker': speaker,
            'start_time': start_time,
            'end_time': end_time,
            'start_time_formatted': format_time(start_time),
            'end_time_formatted': format_time(end_time),
            'text': text
        })
    
    # 統計情報
    result['statistics'] = {
        'total_segments': len(result['segments']),
        'total_duration': float(segments[-1]['end_time']) if segments else 0,
        'speakers': list(set([s['speaker'] for s in result['segments']]))
    }
    
    return result

def format_time(seconds):
    """秒をHH:MM:SS形式に変換"""
    hours = int(seconds // 3600)
    minutes = int((seconds % 3600) // 60)
    secs = int(seconds % 60)
    
    if hours > 0:
        return f"{hours:02d}:{minutes:02d}:{secs:02d}"
    else:
        return f"{minutes:02d}:{secs:02d}"

def save_to_s3(data, bucket, key):
    """結果をS3に保存"""
    try:
        s3.put_object(
            Bucket=bucket,
            Key=key,
            Body=json.dumps(data, ensure_ascii=False, indent=2),
            ContentType='application/json',
            ContentEncoding='utf-8'
        )
        print(f"S3保存完了: s3://{bucket}/{key}")
        
    except Exception as e:
        print(f"S3保存エラー: {str(e)}")
        raise

Lambdaの実行ロールにポリシーを設定します

{
  "Version": "2012-10-17",
  "Statement": [
      {
          "Effect": "Allow",
          "Action": [
              "s3:ListBucket"
          ],
          "Resource": "arn:aws:s3:::sample-bucket"
      },
      {
          "Effect": "Allow",
          "Action": [
              "s3:GetObject",
              "s3:PutObject"
          ],
          "Resource": "arn:aws:s3:::sample-bucket/*"
      },
      {
          "Effect": "Allow",
          "Action": [
              "transcribe:StartTranscriptionJob",
              "transcribe:GetTranscriptionJob"
          ],
          "Resource": "*"
      }
  ]
}

コマンドから実行テストします

実行のイベントファイルを作成

// s3-upload-event.json
{
  "bucket": "sample-bucket",
  "key": "items/sample.mp4"
}
# 実行
$ aws lambda invoke \
  --function-name test-transcribe-function \
  --payload file://s3-upload-event.json \
  --region ap-northeast-1 \
  response.json

s3://sample-bucket/transcripts フォルダに transcript.json が作成されていれば成功です。

中身の一部サンプルです

{
  "job_name": "transcribe-items-sample-mp4-1765190507",
  "account_id": "603514171564",
  "status": "COMPLETED",
  "created_at": "2025-12-08T10:43:38.967391",
  "full_transcript": "えっと、記事の方を見ていただいているということなので、全体の流れはだいたい、あの、把握されているかなと思います。はい。では最初に、えっと、略歴のところからお聞きしたいので、....",
  "segments": [
    {
      "speaker": "spk_0",
      "start_time": 1.009,
      "end_time": 12.489,
      "start_time_formatted": "00:01",
      "end_time_formatted": "00:12",
      "text": "えっ と、 記事 の 方 を 見 て いただい て いる と いう こと な の で、 全体 の 流れ は だいたい、 あの、 把握 さ れ て いる か な と 思い ます。 はい。 で は 最初 に、 えっ と、 略歴 の ところ から お 聞き し たい の で、 "
    },    {
      "speaker": "spk_1",
      "start_time": 20.879,
      "end_time": 23.0,
      "start_time_formatted": "00:20",
      "end_time_formatted": "00:23",
      "text": "はい。 と"
    },
    ...
  ],
  "statistics": {
    "total_segments": 195,
    "total_duration": 1444.9,
    "speakers": [
      "spk_0",
      "spk_1"
    ]
  }
}

要約

要約用のLambdaとして test-summarize-functionをPython 3.14環境で作成します。

動画時間によりますが時間が多少かかるためタイムアウトを15分で設定しています。

また今回は要約するAIモデルとして gpt-oss-120b を利用します。

import boto3
import json
import os
from datetime import datetime

# クライアント初期化
s3 = boto3.client('s3', region_name='ap-northeast-1')
bedrock = boto3.client('bedrock-runtime', region_name='ap-northeast-1')

# 環境変数
INPUT_BUCKET = os.environ.get('INPUT_BUCKET', 'sample-bucket')
OUTPUT_BUCKET = os.environ.get('OUTPUT_BUCKET', 'sample-bucket')
BEDROCK_MODEL_ID = os.environ.get('BEDROCK_MODEL_ID', 'openai.gpt-oss-120b-1:0')

def lambda_handler(event, context):
    """
    S3のTranscribe結果を読み込んでBedrockで要約
    """
    try:
        # イベントからファイル情報取得
        if 'Records' in event:
            bucket = event['Records'][0]['s3']['bucket']['name']
            key = event['Records'][0]['s3']['object']['key']
        elif 'bucket' in event and 'key' in event:
            bucket = event['bucket']
            key = event['key']
        
        print(f"処理開始: s3://{bucket}/{key}")
        
        # 1. Transcribe結果をS3から取得
        transcript_data = get_transcript_from_s3(bucket, key)
        
        # 2. Bedrockで要約
        summary = summarize_with_bedrock(transcript_data)
        
        # 3. 結果をS3に保存
        base_name = os.path.splitext(os.path.basename(key))[0]
        output_key = f"summaries/{base_name}_summary.json"
        
        result = {
            'source': {
                'bucket': bucket,
                'key': key
            },
            'processed_at': datetime.now().isoformat(),
            'model': BEDROCK_MODEL_ID,
            'summary': summary
        }
        
        save_to_s3(result, OUTPUT_BUCKET, output_key)
        
        print(f"完了: s3://{OUTPUT_BUCKET}/{output_key}")
        
        return {
            'statusCode': 200,
            'body': json.dumps({
                'message': '要約完了',
                'input': f"s3://{bucket}/{key}",
                'output': f"s3://{OUTPUT_BUCKET}/{output_key}",
                'model': BEDROCK_MODEL_ID
            }, ensure_ascii=False)
        }
        
    except Exception as e:
        print(f"エラー: {str(e)}")
        import traceback
        traceback.print_exc()
        
        return {
            'statusCode': 500,
            'body': json.dumps({
                'error': str(e),
                'error_type': type(e).__name__
            }, ensure_ascii=False)
        }

def get_transcript_from_s3(bucket, key):
    """S3からTranscribe結果を取得"""
    print(f"Transcribe結果取得: s3://{bucket}/{key}")
    
    try:
        response = s3.get_object(Bucket=bucket, Key=key)
        transcript_json = json.loads(response['Body'].read().decode('utf-8'))
        return transcript_json
        
    except Exception as e:
        print(f"S3読み込みエラー: {str(e)}")
        raise

def summarize_with_bedrock(transcript_data):
    """Bedrock (GPT-OSS-120B) で要約"""
    
    # Transcriptデータから要約用テキストを作成
    segments = transcript_data.get('segments', [])
    
    # セグメント情報を整形
    formatted_segments = []
    for segment in segments:
        formatted_segments.append(
            f"[{segment['start_time_formatted']}] "
            f"{segment['speaker']}: {segment['text']}"
        )
    
    segments_text = "\n".join(formatted_segments)
    
    # プロンプト作成(reasoning出力を抑制)
    prompt = f"""以下は動画の文字起こしテキストです。話者と時間情報が含まれています。
この内容を日本語で要約してください。

文字起こし:
{segments_text}

以下のJSON形式で出力してください。JSONのみを出力し、それ以外の説明や思考過程は出力しないでください:
{{
  "summary": "全体の要約(3-5文程度)",
  "key_points": [
    {{
      "timestamp": "MM:SS",
      "speaker": "話者",
      "point": "重要なポイント"
    }}
  ],
  "topics": ["トピック1", "トピック2", "..."],
  "action_items": ["アクションアイテム1", "アクションアイテム2", "..."]
}}"""
    
    print(f"Bedrock API呼び出し: {BEDROCK_MODEL_ID}")
    
    # OpenAI互換モデル用のリクエスト形式
    request_body = {
        "messages": [
            {
                "role": "system",
                "content": "あなたは会議や動画の文字起こしを要約する専門家です。JSON形式で正確に出力してください。思考過程や説明は不要で、JSONのみを出力してください。"
            },
            {
                "role": "user",
                "content": prompt
            }
        ],
        "max_tokens": 4000,
        "temperature": 0.3,
        "top_p": 0.9
    }
    
    try:
        import time
        import re
        start_time = time.time()
        
        response = bedrock.invoke_model(
            modelId=BEDROCK_MODEL_ID,
            body=json.dumps(request_body)
        )
        
        elapsed_time = time.time() - start_time
        response_body = json.loads(response['body'].read())
        
        print(f"Bedrock呼び出し完了: {elapsed_time:.2f}")
        print(f"レスポンス構造: {list(response_body.keys())}")
        
        summary_text = ""
        
        if 'choices' in response_body:
            # OpenAI形式
            summary_text = response_body['choices'][0]['message']['content']
        elif 'completion' in response_body:
            summary_text = response_body['completion']
        elif 'text' in response_body:
            summary_text = response_body['text']
        elif 'output' in response_body:
            summary_text = response_body['output']
        else:
            print(f"予期しないレスポンス形式: {json.dumps(response_body, ensure_ascii=False, indent=2)}")
            raise Exception(f"未知のレスポンス形式: {list(response_body.keys())}")
        
        print(f"レスポンステキスト長: {len(summary_text)} 文字")
        
        # レスポンスのクリーニング
        clean_text = summary_text.strip()
        
        # <reasoning>タグを削除
        if '<reasoning>' in clean_text:
            # <reasoning>...</reasoning>の部分を削除
            clean_text = re.sub(r'<reasoning>.*?</reasoning>', '', clean_text, flags=re.DOTALL)
            clean_text = clean_text.strip()
            print("reasoning部分を削除しました")
        
        # マークダウンコードブロックを削除
        if clean_text.startswith('```json'):
            clean_text = clean_text[7:]
        if clean_text.startswith('```'):
            clean_text = clean_text[3:]
        if clean_text.endswith('```'):
            clean_text = clean_text[:-3]
        clean_text = clean_text.strip()
        
        print(f"クリーニング後のテキスト長: {len(clean_text)} 文字")
        print(f"クリーニング後の最初の200文字: {clean_text[:200]}...")
        
        # JSONとしてパース
        try:
            summary_json = json.loads(clean_text)
            
            # 正しくパースできたか確認
            if isinstance(summary_json, dict) and 'summary' in summary_json:
                print("JSON解析成功")
                return summary_json
            else:
                print(f"JSONの構造が不正: {list(summary_json.keys())}")
                # フォールバック
                return {
                    'summary': str(summary_json),
                    'key_points': summary_json.get('key_points', []),
                    'topics': summary_json.get('topics', []),
                    'action_items': summary_json.get('action_items', [])
                }
                
        except json.JSONDecodeError as e:
            print(f"JSON解析失敗: {e}")
            print(f"解析対象テキスト: {clean_text[:500]}...")
            
            # 再度reasoningタグの確認と削除
            if '<reasoning>' in clean_text or '</reasoning>' in clean_text:
                clean_text = re.sub(r'</?reasoning>', '', clean_text)
                clean_text = clean_text.strip()
                
                # JSON部分のみを抽出({ で始まり } で終わる最初の完全なJSON)
                json_match = re.search(r'\{.*\}', clean_text, re.DOTALL)
                if json_match:
                    clean_text = json_match.group(0)
                    print("JSON部分を抽出しました")
                    
                    try:
                        summary_json = json.loads(clean_text)
                        print("再試行でJSON解析成功")
                        return summary_json
                    except:
                        pass
            
            # それでも失敗したらフォールバック
            return {
                'summary': clean_text[:500],  # 最初の500文字のみ
                'key_points': [],
                'topics': [],
                'action_items': [],
                'raw_response': clean_text,
                'parse_error': str(e)
            }
            
    except Exception as e:
        print(f"Bedrock API呼び出しエラー: {str(e)}")
        print(f"リクエストボディ: {json.dumps(request_body, ensure_ascii=False, indent=2)}")
        raise

def save_to_s3(data, bucket, key):
    """結果をS3に保存"""
    try:
        s3.put_object(
            Bucket=bucket,
            Key=key,
            Body=json.dumps(data, ensure_ascii=False, indent=2),
            ContentType='application/json',
            ContentEncoding='utf-8'
        )
        print(f"S3保存完了: s3://{bucket}/{key}")
        
    except Exception as e:
        print(f"S3保存エラー: {str(e)}")
        raise

Lambdaの実行ロールにポリシーを設定します

{
  "Version": "2012-10-17",
  "Statement": [
      {
          "Effect": "Allow",
          "Action": [
              "s3:ListBucket"
          ],
          "Resource": "arn:aws:s3:::sample-bucket"
      },
      {
          "Effect": "Allow",
          "Action": [
              "s3:GetObject",
              "s3:PutObject"
          ],
          "Resource": "arn:aws:s3:::sample-bucket/*"
      },
      {
          "Effect": "Allow",
          "Action": [
              "bedrock:InvokeModel"
          ],
          "Resource": [
              "arn:aws:bedrock:ap-northeast-1::foundation-model/openai.gpt-oss-120b-1:0"
          ]
      }
  ]
}

コマンドから実行テストします

実行のイベントファイルを作成

// transcribe-event.json
{
    "bucket": "sample-bucket",
    "key": "transcripts/sample_transcript.json"
}
# 実行
$ aws lambda invoke \
  --function-name test-summarize-function \
  --payload file://transcribe-event.json \
  --region ap-northeast-1 \
  response.json

s3://sample-bucket/summaries フォルダに summary.json が作成されていれば成功です。

要約サンプル

{
  "source": {
    "bucket": "sample-bucket",
    "key": "transcripts/sample_transcript.json"
  },
  "processed_at": "2025-12-08T10:43:52.469401",
  "model": "openai.gpt-oss-120b-1:0",
  "summary": {
    "summary": "インタビューでは、大学2年生の学生が高校卒業から大学入学までの経歴、IoT・A=....",
    "key_points": [
      {
        "timestamp": "00:24",
        "speaker": "spk_1",
        "point": "高校卒業後、大学に入学し現在2年生であることを報告。"
      },
      ...
=    ],
    "topics": [
      "学歴と現在の在籍",
      ...
    ],
    "action_items": [
      "IoTセンサー網の設置とデータ公開システムを完成させる。",
      "インターンでのコード修正・テスト業務を継続し、実装実績を増やす。",
      ...
    ]
  }
}

APIエンドポイント

API GatewayとStep Functionsを使って2つのLambdaを続けて実行できるようにします。

Step Functions

Step Functionsで実行フローの定義を行いましょう

Step Functions用のIAM Role StepFunctionsExecutionRole を作成して許可設定します

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "lambda:InvokeFunction"
      ],
      "Resource": [
        "arn:aws:lambda:ap-northeast-1:*:function:test-transcribe-function",
        "arn:aws:lambda:ap-northeast-1:*:function:test-summarize-function"
      ]
    },
    {
      "Effect": "Allow",
      "Action": [
        "logs:CreateLogGroup",
        "logs:CreateLogStream",
        "logs:PutLogEvents"
      ],
      "Resource": "arn:aws:logs:*:*:*"
    }
  ]
}

また、信頼関係にAssumeRoleを追加しておきましょう

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "Service": "states.amazonaws.com"
      },
      "Action": "sts:AssumeRole"
    }
  ]
}

ステートマシーンを定義します。パラメータとしてbucketとkeyを受け取りLambdaを連続して実行するように定義しています。

// state-machine.json
{
    "Comment": "動画文字起こし→要約パイプライン",
    "StartAt": "ParseInput",
    "States": {
      "ParseInput": {
        "Type": "Pass",
        "Parameters": {
          "bucket.$": "$.bucket",
          "key.$": "$.key"
        },
        "Next": "Transcribe"
      },
      "Transcribe": {
        "Type": "Task",
        "Resource": "arn:aws:states:::lambda:invoke",
        "Parameters": {
          "FunctionName": "test-transcribe-function",
          "Payload": {
            "bucket.$": "$.bucket",
            "key.$": "$.key"
          }
        },
        "ResultSelector": {
          "statusCode.$": "$.Payload.statusCode",
          "body.$": "$.Payload.body"
        },
        "ResultPath": "$.transcribeResult",
        "Next": "CheckTranscribeSuccess",
        "TimeoutSeconds": 900
      },
      "CheckTranscribeSuccess": {
        "Type": "Choice",
        "Choices": [
          {
            "Variable": "$.transcribeResult.statusCode",
            "NumericEquals": 200,
            "Next": "ParseTranscribeResult"
          }
        ],
        "Default": "TranscribeFailed"
      },
      "ParseTranscribeResult": {
        "Type": "Pass",
        "Parameters": {
          "transcribeBody.$": "States.StringToJson($.transcribeResult.body)"
        },
        "Next": "ExtractTranscriptInfo"
      },
      "ExtractTranscriptInfo": {
        "Type": "Pass",
        "Parameters": {
          "bucket.$": "$.transcribeBody.bucket",
          "key.$": "$.transcribeBody.transcriptKey"
        },
        "Next": "Summarize"
      },
      "Summarize": {
        "Type": "Task",
        "Resource": "arn:aws:states:::lambda:invoke",
        "Parameters": {
          "FunctionName": "test-summarize-function",
          "Payload": {
            "bucket.$": "$.bucket",
            "key.$": "$.key"
          }
        },
        "ResultSelector": {
          "statusCode.$": "$.Payload.statusCode",
          "body.$": "$.Payload.body"
        },
        "ResultPath": "$.summarizeResult",
        "Next": "CheckSummarizeSuccess",
        "TimeoutSeconds": 300
      },
      "CheckSummarizeSuccess": {
        "Type": "Choice",
        "Choices": [
          {
            "Variable": "$.summarizeResult.statusCode",
            "NumericEquals": 200,
            "Next": "Success"
          }
        ],
        "Default": "SummarizeFailed"
      },
      "Success": {
        "Type": "Succeed",
        "OutputPath": "$.summarizeResult"
      },
      "TranscribeFailed": {
        "Type": "Fail",
        "Error": "TranscribeError",
        "Cause": "文字起こし処理に失敗しました"
      },
      "SummarizeFailed": {
        "Type": "Fail",
        "Error": "SummarizeError",
        "Cause": "要約処理に失敗しました"
      }
    }
  }

デプロイします

$ aws stepfunctions create-state-machine
    --name "video-transcribe-summarize" \
    --definition file://state-machine.json \
    --role-arn arn:aws:iam::{{アカウントID}}:role/StepFunctionsExecutionRole

API Gateway

API Gatewayをyamlで定義し、Step Functionsを起動するようにします。

パスは POST /prod/transcribe の形式で作成します

# api-gateway.yaml
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31

Parameters:
  StateMachineArn:
    Type: String
    Description: Step Functions State Machine ARN

Resources:
  TranscribeApi:
    Type: AWS::Serverless::Api
    Properties:
      StageName: prod
      Cors:
        AllowMethods: "'POST,OPTIONS'"
        AllowHeaders: "'Content-Type,X-Amz-Date,Authorization,X-Api-Key'"
        AllowOrigin: "'*'"
      DefinitionBody:
        openapi: 3.0.0
        info:
          title: Video Transcribe API
          version: 1.0.0
        paths:
          /transcribe:
            post:
              summary: 動画の文字起こしと要約を実行
              requestBody:
                required: true
                content:
                  application/json:
                    schema:
                      type: object
                      required:
                        - bucket
                        - key
                      properties:
                        bucket:
                          type: string
                          example: sample-bucket
                        key:
                          type: string
                          example: items/video.mp4
              responses:
                '200':
                  description: 処理開始
              x-amazon-apigateway-integration:
                type: aws
                uri: !Sub 'arn:aws:apigateway:${AWS::Region}:states:action/StartExecution'
                credentials: !GetAtt ApiGatewayStepFunctionsRole.Arn
                httpMethod: POST
                requestTemplates:
                  application/json: !Sub |
                    {
                      "input": "$util.escapeJavaScript($input.json('$'))",
                      "stateMachineArn": "${StateMachineArn}"
                    }
                responses:
                  default:
                    statusCode: 200
                    responseTemplates:
                      application/json: |
                        {
                          "executionArn": "$input.path('$.executionArn')",
                          "startDate": "$input.path('$.startDate')"
                        }

  ApiGatewayStepFunctionsRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
          - Effect: Allow
            Principal:
              Service: apigateway.amazonaws.com
            Action: sts:AssumeRole
      Policies:
        - PolicyName: InvokeStepFunctions
          PolicyDocument:
            Version: '2012-10-17'
            Statement:
              - Effect: Allow
                Action:
                  - states:StartExecution
                Resource: !Ref StateMachineArn

Outputs:
  ApiUrl:
    Description: API Gateway URL
    Value: !Sub 'https://${TranscribeApi}.execute-api.${AWS::Region}.amazonaws.com/prod/transcribe'

デプロイをします

$ sam deploy \
  --template-file "api-gateway.yaml" \
  --stack-name video-transcribe-api \
  --parameter-overrides "StateMachineArn=arn:aws:states:ap-northeast-1:{{アカウントID}}:stateMachine:video-transcribe-summarize" \
  --capabilities CAPABILITY_IAM \
  --region ap-northeast-1 \
  --no-confirm-changeset \
  --no-fail-on-empty-changeset

これでAPI Gatewayを定義したので実際にリクエストをしましょう。

API Endpointを取得します

API_URL=$(aws cloudformation describe-stacks \
  --stack-name video-transcribe-api \
  --query 'Stacks[0].Outputs[?OutputKey==`ApiUrl`].OutputValue' \
  --output text \
  --region ap-northeast-1)

リクエストします

$ curl -X POST $API_URL \
  -H 'Content-Type: application/json' \
  -d '{"bucket":"sample-bucket","key":"items/sample.mp4"}'

StepFunctionですべて実行できれば完了です

CleanShot 2025-12-12 at 08.38.25@2x.png

まとめ

初めてStep Functionsを利用した処理を構築したため、連携エラー時のデバッグに苦労しました。Step Functionsで値受け渡しのステップが作成できますが、あまり難しいことはせずにLambda側に処理を寄せた方が良さそうです。

基本的な実行フローは作成できたのでそもそもの要約や認証などを今後は詰めていきたいです。

お知らせ

技術ブログを週1〜2本更新中、ソーイをフォローして最新記事をチェック!
https://qiita.com/organizations/sewii

5
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?