0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

bedrock API 2

Last updated at Posted at 2025-01-27
import boto3
import json
import base64
import pandas as pd
import numpy as np
from pathlib import Path
import os
import tempfile
import logging
from concurrent.futures import ThreadPoolExecutor
from openpyxl import load_workbook
from openpyxl.drawing.image import Image as XLImage
from PIL import Image
import io
from pdf2image import convert_from_bytes
import matplotlib.pyplot as plt

# ログの設定
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

class ExcelImageProcessor:
    def __init__(self, s3_bucket, input_folder, output_folder, region_name='us-east-1'):
        """
        SageMaker用の初期化
        """
        self.s3_bucket = s3_bucket
        self.input_folder = input_folder
        self.output_folder = output_folder
        self.s3_client = boto3.client('s3', region_name=region_name)
        self.bedrock = boto3.client('bedrock-runtime', region_name=region_name)
        self.temp_dir = '/tmp'  # SageMakerの一時ディレクトリ

    def list_excel_files(self):
        """
        S3から対象のExcelファイルを検索
        """
        try:
            paginator = self.s3_client.get_paginator('list_objects_v2')
            files = []
            
            for page in paginator.paginate(Bucket=self.s3_bucket, Prefix=self.input_folder):
                if 'Contents' in page:
                    for obj in page['Contents']:
                        if obj['Key'].endswith(('.xlsx', '.xls')) and '業務上災害報告書 兼 災害分析表' in obj['Key']:
                            files.append(obj['Key'])
            
            return files
        except Exception as e:
            logger.error(f"Excelファイルの検索中にエラーが発生しました: {str(e)}")
            raise

    def download_excel(self, s3_key):
        """
        S3からExcelファイルをダウンロード
        """
        try:
            response = self.s3_client.get_object(Bucket=self.s3_bucket, Key=s3_key)
            return io.BytesIO(response['Body'].read())
        except Exception as e:
            logger.error(f"ファイルのダウンロード中にエラーが発生しました - {s3_key}: {str(e)}")
            raise

    def convert_excel_to_image(self, excel_content):
        """
        Excelファイルを画像に変換(SageMaker環境用)
        """
        try:
            # ExcelをPDFとして保存
            wb = load_workbook(excel_content)
            ws = wb.active
            
            # ワークシートの範囲を取得
            max_row = ws.max_row
            max_col = ws.max_column
            
            # Pillowを使用して画像を作成
            dpi = 300
            width_inches = 11.7  # A4サイズ
            height_inches = 8.3
            
            # Matplotlibを使用してワークシートを画像として描画
            fig, ax = plt.subplots(figsize=(width_inches, height_inches))
            ax.axis('off')
            
            # セルの内容を描画
            for row in range(1, max_row + 1):
                for col in range(1, max_col + 1):
                    cell = ws.cell(row=row, column=col)
                    if cell.value:
                        x = (col - 1) / max_col
                        y = 1 - (row / max_row)
                        ax.text(x, y, str(cell.value), transform=ax.transAxes)
            
            # 画像として保存
            img_buffer = io.BytesIO()
            plt.savefig(img_buffer, format='png', dpi=dpi, bbox_inches='tight')
            plt.close()
            
            return img_buffer.getvalue()
            
        except Exception as e:
            logger.error(f"Excel画像変換中にエラーが発生しました: {str(e)}")
            raise

    def process_with_bedrock(self, image_data):
        """
        Bedrock Claude-3で画像を処理
        """
        try:
            base64_image = base64.b64encode(image_data).decode('utf-8')

            prompt = """
            この画像は災害報告書のエクセルファイルです。
            画像内で丸が付けられている全ての選択項目を抽出し、
            カテゴリーと選択項目のペアとしてCSVフォーマットで出力してください。
            出力形式は以下の通りです:
            カテゴリー,選択項目

            回答は必ずCSVフォーマットのみを出力し、余分な説明は含めないでください。
            """

            body = json.dumps({
                "anthropic_version": "bedrock-2023-05-31",
                "max_tokens": 1000,
                "messages": [
                    {
                        "role": "user",
                        "content": [
                            {
                                "type": "image",
                                "source": {
                                    "type": "base64",
                                    "media_type": "image/png",
                                    "data": base64_image
                                }
                            },
                            {
                                "type": "text",
                                "text": prompt
                            }
                        ]
                    }
                ]
            })

            response = self.bedrock.invoke_model(
                modelId='anthropic.claude-3-5-sonnet-20240620-v1:0',
                body=body
            )
            
            response_body = json.loads(response['body'].read())
            return response_body['messages'][0]['content'][0]['text']
            
        except Exception as e:
            logger.error(f"Bedrock処理中にエラーが発生しました: {str(e)}")
            raise

    def save_to_s3(self, csv_content, original_file_name):
        """
        CSVをS3に保存
        """
        try:
            # CSVデータの作成
            df = pd.DataFrame([line.split(',') for line in csv_content.strip().split('\n')[1:]], 
                            columns=csv_content.strip().split('\n')[0].split(','))
            
            # CSV形式に変換
            csv_buffer = df.to_csv(index=False, encoding='utf-8-sig').encode()
            
            # S3のパスを生成
            s3_path = f"{self.output_folder}/{os.path.splitext(os.path.basename(original_file_name))[0]}.csv"
            
            # S3にアップロード
            self.s3_client.put_object(
                Bucket=self.s3_bucket,
                Key=s3_path,
                Body=csv_buffer
            )
            
            logger.info(f"CSVファイルを保存しました: {s3_path}")
            
        except Exception as e:
            logger.error(f"CSV保存中にエラーが発生しました - {original_file_name}: {str(e)}")
            raise

    def process_single_file(self, excel_file):
        """
        1つのファイルを処理
        """
        try:
            # Excelファイルのダウンロード
            excel_content = self.download_excel(excel_file)
            
            # 画像に変換
            image_data = self.convert_excel_to_image(excel_content)
            
            # Bedrockで処理
            csv_content = self.process_with_bedrock(image_data)
            
            # 結果を保存
            self.save_to_s3(csv_content, excel_file)
            
        except Exception as e:
            logger.error(f"ファイル処理中にエラーが発生しました - {excel_file}: {str(e)}")

    def process_all_files(self):
        """
        全ファイルの処理を実行
        """
        try:
            # 対象ファイルの取得
            excel_files = self.list_excel_files()
            logger.info(f"{len(excel_files)}件のファイルを処理します")
            
            # ThreadPoolExecutorを使用して並列処理
            with ThreadPoolExecutor(max_workers=5) as executor:
                executor.map(self.process_single_file, excel_files)
            
            logger.info("全ての処理が完了しました")
            
        except Exception as e:
            logger.error(f"処理中にエラーが発生しました: {str(e)}")

def extract_items_enclosed_from_s3(s3_bucket, input_folder, output_folder, region_name='us-east-1'):
    """
    SageMaker用のメインエントリーポイント
    """
    try:
        processor = ExcelImageProcessor(s3_bucket, input_folder, output_folder, region_name)
        processor.process_all_files()
        return {
            'status': 'success',
            'message': '処理が完了しました'
        }
    except Exception as e:
        logger.error(f"処理でエラーが発生しました: {str(e)}")
        return {
            'status': 'error',
            'message': f'エラーが発生しました: {str(e)}'
        }

if __name__ == "__main__":
    # SageMakerでのテスト用
    config = {
        's3_bucket': 'your-bucket-name',
        'input_folder': '災害報告書',
        'output_folder': '災害報告書/processed_csv',
        'region_name': 'ap-northeast-1'
    }
    extract_items_enclosed_from_s3(**config)
0
0
2

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?