2
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

OCI の Autonomous Database (ADB) に搭載された Select AI を Python で REST API を公開

Last updated at Posted at 2025-03-18

はじめに

この記事では、Oracle Cloud Infrastructure (OCI) の Autonomous Database (ADB) に搭載された Select AI 機能を、Python で REST API を公開する方法について説明します。Flask を使用して REST API を作成し、Oracle Database に接続して Select AI クエリを実行します。

必要なパッケージのインストール

まず、必要な Python パッケージをインストールします。以下のコマンドを実行してください。

pip install Flask oracledb werkzeug python-dotenv

アプリケーションの作成

次に、app.py ファイルを作成します。このファイルは、Flask アプリケーションの設定やデータベース接続、エンドポイントの定義などを含みます。

ファイル構造

project/
├── app.py
├── .env
└── app.log

.env ファイルの作成

環境変数を管理するために .env ファイルを作成します。以下の内容を記述してください。

ORACLE_23AI_CONNECTION_STRING=your_oracle_user/your_oracle_password@your_adb_high
ORACLE_CLIENT_LIB_DIR=/u01/aipoc/instantclient_23_5
HOST=0.0.0.0
PORT=9999
DEBUG=False

app.py の内容

以下は、app.py の内容です。このファイルは、Flask アプリケーションの設定やデータベース接続、エンドポイントの定義などを含みます。

from flask import Flask, request, jsonify
import oracledb
import os
import platform
from dotenv import load_dotenv
from functools import wraps
import logging
import time
from werkzeug.middleware.proxy_fix import ProxyFix
from typing import List, Dict, Any, Callable, Optional, Union

# Initialize Flask application
app = Flask(__name__)
app.wsgi_app = ProxyFix(app.wsgi_app)

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[logging.StreamHandler(), logging.FileHandler('app.log')]
)
logger = logging.getLogger(__name__)

# Load environment variables
load_dotenv()

# Initialize Oracle client (required for Linux systems)
if platform.system() == 'Linux':
    try:
        oracledb.init_oracle_client(lib_dir=os.getenv("ORACLE_CLIENT_LIB_DIR"))
        logger.info("Oracle client initialized successfully")
    except Exception as e:
        logger.error(f"Failed to initialize Oracle client: {str(e)}")
        raise

# Create database connection pool
pool = None
try:
    pool = oracledb.create_pool(
        dsn=os.getenv("ORACLE_23AI_CONNECTION_STRING"),
        min=2,
        max=10,
        increment=1,
        timeout=30,
        getmode=oracledb.POOL_GETMODE_WAIT,  # Wait for available connection
        wait_timeout=10000,  # Wait timeout in milliseconds
        max_lifetime_session=3600,  # Max lifetime of a connection in seconds
    )
    logger.info("Successfully created Oracle connection pool")
except Exception as e:
    logger.error(f"Failed to create Oracle connection pool: {str(e)}")
    raise


def validate_request_data(required_fields: List[str]) -> Callable:
    """Decorator to validate request data"""

    def decorator(f):
        @wraps(f)
        def decorated_function(*args, **kwargs):
            try:
                data = request.get_json()
                if not data:
                    return jsonify(error="Missing request body"), 400

                missing_fields = [field for field in required_fields if field not in data or not data[field]]
                if missing_fields:
                    return jsonify(error=f"Missing required fields: {', '.join(missing_fields)}"), 400

                # Sanitize inputs
                for field in data:
                    if isinstance(data[field], str):
                        data[field] = data[field].strip()

                kwargs['data'] = data
                return f(*args, **kwargs)
            except Exception as e:
                logger.error(f"Error validating request data: {str(e)}")
                return jsonify(error="Invalid request format"), 400

        return decorated_function

    return decorator


def db_connection(f):
    """Decorator to handle database connection with retry logic"""

    @wraps(f)
    def wrapper(*args, **kwargs):
        conn = None
        retries = 3
        retry_delay = 0.5  # seconds

        for attempt in range(retries):
            try:
                conn = pool.acquire()
                kwargs['conn'] = conn
                return f(*args, **kwargs)
            except oracledb.DatabaseError as e:
                error_obj, = e.args
                # Only retry on connection errors, not on SQL errors
                if error_obj.code in (-3113, -3114, -1034, -1012):  # Connection-related error codes
                    logger.warning(f"Database connection error (attempt {attempt + 1}/{retries}): {str(e)}")
                    if attempt < retries - 1:
                        time.sleep(retry_delay)
                        retry_delay *= 2  # Exponential backoff
                        continue
                logger.error(f"Database error: {str(e)}")
                return jsonify(error="Database operation failed"), 500
            except Exception as e:
                logger.error(f"Unexpected error: {str(e)}")
                return jsonify(error="Internal server error"), 500
            finally:
                if conn:
                    try:
                        pool.release(conn)
                    except Exception as e:
                        logger.error(f"Error releasing connection: {str(e)}")

        return jsonify(error="Failed to connect to database after multiple attempts"), 503

    return wrapper


def process_oracle_result(item: Any) -> Any:
    """Process Oracle result items to make them JSON serializable"""
    if hasattr(item, 'read'):  # Handle LOB objects
        return item.read()
    return item


def log_request_info(action: str, query: str) -> None:
    """Log request information with proper truncation"""
    max_length = 100
    truncated_query = query[:max_length] + '...' if len(query) > max_length else query
    logger.info(f"Processing {action} request: {truncated_query}")


@app.route('/selectai/<action>', methods=['POST'])
@validate_request_data(['query'])
@db_connection
def ai_query(action: str, conn: oracledb.Connection, data: Dict[str, Any]) -> tuple:
    """Handle AI query requests"""
    try:
        query = data['query']
        log_request_info(action, query)

        with conn.cursor() as cursor:
            if action == 'showsql':
                cursor.execute("""
                    SELECT DBMS_CLOUD_AI.GENERATE(
                        prompt => :1,
                        profile_name => 'OCI_GENAI',
                        action => 'showsql'
                    ) FROM dual
                """, [query])

                result = cursor.fetchone()
                if result and result[0]:
                    generated_sql = process_oracle_result(result[0])
                    return jsonify(sql=generated_sql)
                return jsonify(error="No SQL generated"), 404

            elif action == 'runsql':
                sql = query.replace('\\"', '"')

                # Execute the query with proper error handling
                try:
                    cursor.execute(sql)

                    # Process results
                    columns = [col[0] for col in cursor.description]
                    rows = [[process_oracle_result(item) for item in row] for row in cursor.fetchall()]

                    row_count = len(rows)
                    logger.info(f"Query returned {row_count} rows")

                    return jsonify(
                        columns=columns,
                        data=rows,
                        row_count=row_count
                    )
                except oracledb.DatabaseError as e:
                    error_obj, = e.args
                    logger.error(f"SQL execution error: {error_obj.message}")
                    return jsonify(error=f"SQL execution error: {error_obj.message}"), 400

            else:
                return jsonify(error=f"Invalid action: {action}"), 400

    except Exception as e:
        logger.error(f"Error processing request: {str(e)}")
        return jsonify(error=str(e)), 500


@app.route('/health', methods=['GET'])
def health_check() -> tuple:
    """Health check endpoint with detailed status information"""
    start_time = time.time()
    try:
        status = {
            'status': 'ok',
            'database': False,
            'pool_stats': {
                'busy': pool.busy if pool else 0,
                'opened': pool.opened if pool else 0,
                'max': pool.max if pool else 0
            },
            'timestamp': int(start_time)
        }

        if pool:
            try:
                # Test database connection
                conn = pool.acquire()
                with conn.cursor() as cursor:
                    cursor.execute("SELECT 1 FROM dual")
                    cursor.fetchone()
                pool.release(conn)
                status['database'] = True
                status['response_time_ms'] = int((time.time() - start_time) * 1000)
            except Exception as e:
                logger.warning(f"Database health check failed: {str(e)}")
                status['status'] = 'degraded'
                status['error'] = str(e)
        else:
            status['status'] = 'critical'

        return jsonify(status), 200 if status['status'] == 'ok' else 503

    except Exception as e:
        logger.error(f"Health check failed: {str(e)}")
        response_time = int((time.time() - start_time) * 1000)
        return jsonify(
            status='error',
            error=str(e),
            response_time_ms=response_time
        ), 500


if __name__ == '__main__':
    app.run(
        host=os.getenv('HOST', '0.0.0.0'),
        port=int(os.getenv('PORT', 9999)),
        debug=os.getenv('DEBUG', 'False').lower() == 'true'
    )

アプリケーションの実行

アプリケーションを実行するために、以下のコマンドを実行します。

python app.py

テスト

以下の curl コマンドを使用して、API の動作を確認します。

showsql エンドポイントのテスト

curl --location --request POST 'http://127.0.0.1:9999/selectai/showsql' \
--header 'Content-Type: application/json' \
--data-raw '{
    "query": "how many regions exist"
}'

showsql エンドポイントの結果

{
  "sql": "SELECT COUNT(\"r\".\"REGION_ID\") AS \"REGION_COUNT\" FROM \"ADMIN\".\"REGIONS\" \"r\""
}

runsql エンドポイントのテスト

curl --location --request POST 'http://127.0.0.1:9999/selectai/runsql' \
--header 'Content-Type: application/json' \
--data-raw '{
    "query": "SELECT COUNT(\"r\".\"REGION_ID\") AS \"REGION_COUNT\" FROM \"ADMIN\".\"REGIONS\" \"r\""
}'

runsql エンドポイントの結果

{
  "columns": [
    "REGION_COUNT"
  ],
  "data": [
    [
      5
    ]
  ],
  "row_count": 1
}

まとめ

この記事では、OCI ADB Select AI を Python で REST API に公開する方法について説明しました。Flask を使用して API を構築し、Oracle Database に接続して Select AI クエリを実行することで、Oracle Cloud Infrastructure の Select AI 機能を効率的に活用することができます。

2
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?