0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

【Python入門】オンプレミスMySQLからAWS S3へデータをアップロードしてみた(.env + boto3)

Last updated at Posted at 2025-10-17

はじめに

オンプレミスのMySQLからAWS S3へのデータアップロードの実装を行ってみました。

.envファイルから環境変数を読み込む方法で以下のフローに沿って実装を行っています。

オンプレミスMySQL
   ↓ pymysql.connect() + pd.read_sql()
pandas DataFrame
   ↓ df.to_parquet(buffer, engine='pyarrow')
BytesIO(メモリ上のParquetデータ)
   ↓ s3_client.put_object(Body=buffer.getvalue())
AWS S3

備忘も兼ねて、各ライブラリにおける使用メソッドの役割や、
使い方についても説明を入れているので、初学者にもわかりやすいように作成をしています。


1. AWS接続(.env)

.envファイルから環境変数を読み込んでAWS認証を行う方法でAWS S3との接続を行います。他の接続方法については今回紹介しません。

1.1 環境準備

必要なライブラリのインストール

pip install python-dotenv boto3

インストール確認

pip list

以下のライブラリが表示されればOK

  • python-dotenv
  • boto3
  • botocore(boto3と一緒にインストールされる)

利用する標準ライブラリ

  • os

1.2 各ライブラリの役割

ライブラリ 役割 主な機能
dotenv 環境変数の読み込み .envファイルからの読み取り
os 環境変数の取得 オペレーティングシステムとのやり取り
boto3 AWS SDK AWSサービスとの接続・操作

1.2.1 dotenv の主な機能

ルートディレクトリの.envファイルの読み込み

from dotenv import load_dotenv
load_dotenv()

Pathを指定した.envファイルの読み込み

load_dotenv(dotenv_path='/path/to/.env')

1.2.2 os の主な機能

環境変数の取得

import os
database_url = os.getenv('DATABASE_URL')

本稿未使用機能

# カレントディレクトリの取得
current_dir = os.getcwd()
print(f"現在のディレクトリ: {current_dir}")

# ファイルパスの操作
file_path = os.path.join('data', 'users', 'profile.csv')
print(file_path)  # data/users/profile.csv(OSに応じて適切な区切り文字)

# ファイル・ディレクトリの存在確認
if os.path.exists('data'):
    print("dataディレクトリが存在します")

# ディレクトリの作成
os.makedirs('logs/2025/01', exist_ok=True)

# ファイル一覧の取得
files = os.listdir('.')
print(f"ファイル数: {len(files)}")

1.2.3 boto3 の主な機能

boto3とは

  • PythonからAWSサービスを操作するための公式SDK
  • S3、EC2、RDS、Lambdaなど、ほぼすべてのAWSサービスに対応
  • Amazon Web Services(AWS)によって開発・メンテナンスされている

boto3の2つのインターフェース

インターフェース 特徴 使用例
Client(低レベルAPI) AWSのAPIを直接呼び出す 細かい制御が必要な場合
Resource(高レベルAPI) オブジェクト指向的な操作 シンプルな操作の場合

Clientの作成

import boto3

# S3クライアントの作成
s3_client = boto3.client(
    's3',
    aws_access_key_id='YOUR_ACCESS_KEY',
    aws_secret_access_key='YOUR_SECRET_KEY',
    region_name='ap-northeast-1'
)

boto3.client() の主要パラメータ

パラメータ 説明
service_name AWSサービス名(必須) 's3', 'ec2', 'rds'
region_name AWSリージョン 'ap-northeast-1'(東京)
aws_access_key_id アクセスキーID 環境変数から取得
aws_secret_access_key シークレットアクセスキー 環境変数から取得
endpoint_url カスタムエンドポイント LocalStack使用時など
config 詳細設定(リトライ、タイムアウトなど) Config()オブジェクト

1.2.4 botocore.exceptions の主な機能

botocore.exceptionsとは

  • boto3の内部ライブラリ(boto3と一緒にインストールされる)
  • AWS操作時に発生する例外を定義
  • エラーハンドリングに使用

主要な例外クラス

例外クラス 発生タイミング 原因
NoCredentialsError 認証情報が見つからない .envファイルの設定漏れ
ClientError AWS APIの呼び出しエラー バケット不存在、権限不足など

NoCredentialsError

from botocore.exceptions import NoCredentialsError

try:
    # S3操作
    response = s3_client.list_buckets()
except NoCredentialsError:
    print("✗ AWS認証情報が見つかりません")

発生する主な原因:

  • .envファイルが存在しない
  • AWS_ACCESS_KEY_IDまたはAWS_SECRET_ACCESS_KEYが未設定
  • load_dotenv()を呼び出していない

ClientError

from botocore.exceptions import ClientError

try:
    # S3操作
    response = s3_client.put_object(...)
except ClientError as e:
    error_code = e.response['Error']['Code']
    if error_code == 'NoSuchBucket':
        print("✗ バケットが存在しません")
    elif error_code == 'AccessDenied':
        print("✗ アクセス権限がありません")

主要なエラーコード

エラーコード 意味 対処法
NoSuchBucket バケットが存在しない バケット名を確認
AccessDenied アクセス権限がない IAMポリシーを確認
InvalidAccessKeyId アクセスキーIDが無効 アクセスキーを確認
SignatureDoesNotMatch シークレットキーが無効 シークレットキーを確認

1.3 boto3の認証情報取得の優先順位

Boto3は以下の順序で認証情報を探します:

1. コード内で明示的に指定 ← 本稿で使用

s3 = boto3.client(
    's3',
    aws_access_key_id='YOUR_KEY',
    aws_secret_access_key='YOUR_SECRET'
)

2. 環境変数

export AWS_ACCESS_KEY_ID="YOUR_KEY"
export AWS_SECRET_ACCESS_KEY="YOUR_SECRET"

3. AWS認証情報ファイル (~/.aws/credentials)

[default]
aws_access_key_id = YOUR_KEY
aws_secret_access_key = YOUR_SECRET

4. IAMロール(EC2、Lambda、ECSで実行時)

本稿の採用方式: .envファイルから環境変数を読み込み、コード内で明示的に指定する方法(優先順位1)を採用しています。


1.4 AWS認証情報の取得方法

1. IAMユーザーの作成

  1. AWSマネジメントコンソールにログイン (https://aws.amazon.com/console/)
  2. 「IAM」サービスを開く
  3. 「ユーザー」→「ユーザーを作成」をクリック
  4. ユーザー名を入力(例:python-s3-user
  5. 「次へ」をクリック

2. 権限の設定

以下のいずれかのポリシーをアタッチ:

  • 学習用: AmazonS3FullAccess(S3の全操作が可能)
  • 読み取り専用: AmazonS3ReadOnlyAccess
  • 本番環境: 最小権限のカスタムポリシー

3. アクセスキーの作成

  1. 作成したユーザーをクリック
  2. 「セキュリティ認証情報」タブを選択
  3. 「アクセスキーを作成」をクリック
  4. 「ローカルコード」を選択
  5. アクセスキーIDシークレットアクセスキーをコピー
    • ⚠️ 重要: シークレットキーはこの画面でしか表示されません

1.5 .envファイルの作成

プロジェクトのルートディレクトリに .env ファイルを作成します。

# MySQL接続情報
MYSQL_HOST=your-mysql-host
MYSQL_PORT=3306
MYSQL_USER=your-username
MYSQL_PASSWORD=your-password
MYSQL_DATABASE=your-database

# AWS接続情報
AWS_REGION=ap-northeast-1
S3_BUCKET_NAME=your-bucket-name

# AWS認証情報
AWS_ACCESS_KEY_ID=your-access-key-id
AWS_SECRET_ACCESS_KEY=your-secret-access-key

1.6 AWS S3接続の実装

基本的な接続コード

import boto3
import os
from dotenv import load_dotenv
from botocore.exceptions import ClientError, NoCredentialsError

# .envファイルを読み込む
load_dotenv()

# S3クライアントを作成
s3_client = boto3.client(
    's3',
    aws_access_key_id=os.getenv('AWS_ACCESS_KEY_ID'),
    aws_secret_access_key=os.getenv('AWS_SECRET_ACCESS_KEY'),
    region_name=os.getenv('AWS_REGION', 'ap-northeast-1')
)

# 接続テスト
try:
    response = s3_client.list_buckets()
    print("✓ AWS S3接続成功")
    print(f"バケット数: {len(response['Buckets'])}")
except NoCredentialsError:
    print("✗ 認証情報が見つかりません")
except ClientError as e:
    print(f"✗ 接続エラー: {e}")

2. DBからのデータ取得・Parquet変換

2.1 使用ライブラリの役割

ライブラリ 役割 主な機能
pandas データ操作 MySQLデータの取得、DataFrameの操作、Parquet変換
pymysql MySQL接続 MySQLデータベースへの接続・クエリ実行
pyarrow Parquet処理 Parquet形式への変換・圧縮
io データストリーム管理 メモリ上でのファイル操作

2.1.1 pandas の主な機能

MySQLからデータの取得

# MySQLからDataFrame取得
query = f"SELECT * FROM {table_name}"
df = pd.read_sql(query, connection)

データフレームのParquet変換

df.to_parquet(buffer, engine='pyarrow', compression='snappy')

2.1.2 pymysql の主な機能

MySQL接続の作成

connection = pymysql.connect(
    host='localhost',
    user='user',
    password='password',
    database='database'
)

MySQL接続のクローズ

connection.close()

2.1.3 pyarrow の主な機能

pyarrow とは

  • pandasでParquet形式を扱うために必要なParquetエンジン
  • snappy, gzip, brotliなど複数の圧縮方式をサポート

データフレームのParquet変換(再掲)

# pyarrowでParquet変換、圧縮方式snappyで圧縮
df.to_parquet(buffer, engine='pyarrow', compression='snappy')

2.1.4 io の主な機能

io とは

  • Pythonの標準ライブラリ
  • メモリ上でファイルのような操作を行うための機能を提供(ファイルを実際に作成せずにメモリ上で処理)

⇒ ioでバッファを作成することで、ディスクにファイルを作る手間を省いて、メモリ上で直接データを変換して扱うことができる

io の2つの主要クラス

クラス データ型 用途
StringIO 文字列(str) CSV、JSON、テキストデータ
BytesIO バイト列(bytes) Parquet、画像、バイナリデータ

BytesIOの使用例

from io import BytesIO

# BytesIOオブジェクトを作成
buffer = BytesIO()

# バイトデータを書き込む
buffer.write(b"Hello, World!")
buffer.write(b"\n")
buffer.write(b"Binary data")

# 内容を取得
content = buffer.getvalue()
print(content)
# 出力: b'Hello, World!\nBinary data'

# クローズ
buffer.close()

2.2 DBからデータ取得・変換の実装

基本的な取得・変換コード

import pandas as pd
import pymysql
import os
from dotenv import load_dotenv
from io import BytesIO  

# .envファイルを読み込む
load_dotenv()

# MySQL接続を作成
connection = pymysql.connect(
    host=os.getenv('MYSQL_HOST'),
    user=os.getenv('MYSQL_USER'),
    password=os.getenv('MYSQL_PASSWORD'),
    database=os.getenv('MYSQL_DATABASE')
)

# データの取得
table_name = 'users'  
query = f"SELECT * FROM {table_name}"
df = pd.read_sql(query, connection)

# データの変換
# BytesIOオブジェクトを作成
buffer = BytesIO()
df.to_parquet(buffer, engine='pyarrow', compression='snappy')

3. S3へのデータアップロード

3.1 使用ライブラリの役割

ライブラリ 役割 主な機能
boto3 AWS SDK S3へのデータアップロード
datetime 日時処理 タイムスタンプ生成、ファイル名への日付追加

3.1.1 boto3 の S3アップロード機能

boto3とは(再掲)

  • 1章で接続に使用したライブラリ
  • 接続だけでなく、S3へのデータアップロード機能も提供

S3アップロードの主要メソッド

メソッド 用途 データ形式
put_object() オブジェクトをアップロード メモリ上のデータ(推奨)
upload_file() ローカルファイルをアップロード ディスク上のファイル
upload_fileobj() ファイルライクオブジェクトをアップロード ストリーム

本稿での使用メソッド: put_object() (メモリ上のParquetデータを直接アップロード)

put_object() の主要パラメータ

s3_client.put_object(
    Bucket='バケット名',           # 必須:アップロード先のS3バケット
    Key='S3のパス/ファイル名',      # 必須:S3上のオブジェクトキー(パス)
    Body='アップロードするデータ',   # 必須:アップロードするデータ本体
    ContentType='MIMEタイプ'       # 任意:ファイルの種類を指定
)

各パラメータの説明

パラメータ 説明
Bucket アップロード先のS3バケット名 'my-data-bucket'
Key S3上のオブジェクトキー(ファイルパス) 'data/users/20250115.parquet'
Body アップロードするデータ(str or bytes) buffer.getvalue()
ContentType ファイルのMIMEタイプ 'application/x-parquet'

S3キーとは

S3キー = S3バケット内でファイルを識別するための名前(パス)

パソコンのフォルダ構造と同じように考えると理解しやすいです。

S3の構造

s3://バケット名/S3キー
要素 説明
バケット名(Bucket) データを保存する最上位のコンテナ my-data-bucket
S3キー(Key) バケット内のファイルのパス data/users/20250115.parquet

視覚的なイメージ

S3バケット: my-data-bucket
│
├── data/
│   ├── users/
│   │   ├── 20250115.parquet  ← S3キー: data/users/20250115.parquet
│   │   └── 20250116.parquet  ← S3キー: data/users/20250116.parquet
│   │
│   └── orders/
│       └── 20250115.parquet  ← S3キー: data/orders/20250115.parquet
│
└── logs/
    └── app.log               ← S3キー: logs/app.log

S3キーの重要な特徴

  1. 実際のフォルダは存在しない

    • S3は「フラットな構造」で、すべてのファイルは同じ階層に保存
    • /(スラッシュ)を含むキー名で「フォルダっぽく」見せているだけ
  2. キーは文字列

    • S3キーは単なる文字列
    • data/users/file.parquetdata-users-file.parquetも同じ「文字列」
  3. 大文字小文字を区別する

    • Data/users/file.parquetdata/users/file.parquetは別のファイル

主要なContentType一覧

ファイル形式 ContentType
Parquet 'application/x-parquet'
CSV 'text/csv'
JSON 'application/json'
テキスト 'text/plain'

3.1.2 datetime の主な機能

datetimeとは

  • Pythonの標準ライブラリ
  • 日時処理を行うための機能を提供
  • S3のファイル名にタイムスタンプを付与する際に使用

現在日時の取得

from datetime import datetime

# 現在日時を取得
now = datetime.now()

日時のフォーマット(strftime)

# タイムスタンプ生成
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
# 例:'20250115_143052'

よく使うフォーマット指定子

指定子 意味
%Y 年(4桁) 2025
%m 月(2桁) 01
%d 日(2桁) 15
%H 時(24時間制、2桁) 14
%M 分(2桁) 30
%S 秒(2桁) 52

S3キー生成での使用例

# パターン1: タイムスタンプ形式
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
s3_key = f"data/{table_name}/{timestamp}.parquet"
# 例:'data/users/20250115_143052.parquet'

# パターン2: 日付形式
date = datetime.now().strftime('%Y-%m-%d')
s3_key = f"data/{table_name}/{date}.parquet"
# 例:'data/users/2025-01-15.parquet'

# パターン3: Hive形式(パーティション)
year = datetime.now().strftime('%Y')
month = datetime.now().strftime('%m')
day = datetime.now().strftime('%d')
s3_key = f"data/{table_name}/year={year}/month={month}/day={day}/data.parquet"
# 例:'data/users/year=2025/month=01/day=15/data.parquet'

3.2 S3キーの命名パターン

パターン1: シンプルな命名

# ファイル名だけ
s3_key = "users.parquet"

# 1階層のフォルダ
s3_key = "data/users.parquet"

パターン2: 日付ベースの命名

from datetime import datetime

# タイムスタンプ付き
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
s3_key = f"data/users/{timestamp}.parquet"
# 例:data/users/20250115_143052.parquet

# 日付のみ
date = datetime.now().strftime('%Y-%m-%d')
s3_key = f"data/users/{date}.parquet"
# 例:data/users/2025-01-15.parquet

パターン3: テーブル名ベース

table_name = "users"
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')

s3_key = f"data/{table_name}/{timestamp}.parquet"
# 例:data/users/20250115_143052.parquet

パターン4: Hive形式(パーティション)

データ分析でよく使われる形式です。

from datetime import datetime

year = datetime.now().strftime('%Y')
month = datetime.now().strftime('%m')
day = datetime.now().strftime('%d')

s3_key = f"data/users/year={year}/month={month}/day={day}/data.parquet"
# 例:data/users/year=2025/month=01/day=15/data.parquet

この形式だと、日付で効率的にデータを検索できます。


3.3 アップロードの実装

基本的なアップロードのコード

from datetime import datetime
import boto3

# 1. S3キー生成
table_name = 'users'  
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
s3_key = f'data/{table_name}/{timestamp}.parquet'

# 2. S3にアップロード
response = s3_client.put_object(
    Bucket=os.getenv('S3_BUCKET_NAME'),  
    Key=s3_key,
    Body=buffer.getvalue(),  
    ContentType='application/x-parquet'
)

# 3. 成功確認
if response['ResponseMetadata']['HTTPStatusCode'] == 200:
    print("✓ アップロード成功")

4. ETL処理の実装

import boto3
import os
import sys
from dotenv import load_dotenv
from botocore.exceptions import ClientError, NoCredentialsError
import pandas as pd
import pymysql
from io import BytesIO
from datetime import datetime

def upload_mysql_to_s3(table_name):  
    """
    MySQLテーブルをParquet形式でS3にアップロード
    """
    try:  
        # .envファイルを読み込む
        load_dotenv()

        # S3クライアントを作成
        s3_client = boto3.client(
            's3',
            aws_access_key_id=os.getenv('AWS_ACCESS_KEY_ID'),
            aws_secret_access_key=os.getenv('AWS_SECRET_ACCESS_KEY'),
            region_name=os.getenv('AWS_REGION', 'ap-northeast-1')
        )

        # MySQL接続を作成
        connection = pymysql.connect(
            host=os.getenv('MYSQL_HOST'),
            port=int(os.getenv('MYSQL_PORT', 3306)), 
            user=os.getenv('MYSQL_USER'),
            password=os.getenv('MYSQL_PASSWORD'),
            database=os.getenv('MYSQL_DATABASE')
        )

        # データの取得
        query = f"SELECT * FROM {table_name}"
        df = pd.read_sql(query, connection)
        
        print(f"✓ データ取得完了: {len(df)}")  

        # データの変換
        # BytesIOオブジェクトを作成
        buffer = BytesIO()
        df.to_parquet(buffer, engine='pyarrow', compression='snappy', index=False)  

        # S3キー生成
        timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
        s3_key = f'data/{table_name}/{timestamp}.parquet'

        # S3にアップロード
        response = s3_client.put_object(
            Bucket=os.getenv('S3_BUCKET_NAME'),  
            Key=s3_key,
            Body=buffer.getvalue(),  
            ContentType='application/x-parquet'
        )

        # 成功確認
        if response['ResponseMetadata']['HTTPStatusCode'] == 200:
            print(f"✓ アップロード成功: s3://{os.getenv('S3_BUCKET_NAME')}/{s3_key}") 
            print(f"  ファイルサイズ: {len(buffer.getvalue())} バイト")  
        
        connection.close()
        return True  
        
    except NoCredentialsError:  
        print("✗ AWS認証情報が見つかりません")
        return False
    except ClientError as e:
        print(f"✗ AWS接続エラー: {e}")
        return False
    except pymysql.Error as e:
        print(f"✗ MySQL接続エラー: {e}")
        return False
    except Exception as e:
        print(f"✗ エラー発生: {e}")
        return False

#CLIで実行時引数としてテーブル名を渡す
if __name__ == "__main__":
    if len(sys.argv) < 2:
        sys.exit(1)

    table_name = sys.argv[1]
    print(f"▶ テーブル '{table_name}' のアップロードを開始します...")
    upload_mysql_to_s3(table_name)

5. データフロー全体像

オンプレミスMySQL
   ↓ pymysql.connect() + pd.read_sql()
pandas DataFrame
   ↓ df.to_parquet(buffer, engine='pyarrow')
BytesIO(メモリ上のParquetデータ)
   ↓ s3_client.put_object(Body=buffer.getvalue())
AWS S3

各ステップで使用するライブラリ

ステップ 使用ライブラリ 主な機能
1. 環境変数読込 dotenv, os .envファイルから認証情報取得
2. DB接続 pymysql MySQL接続
3. データ取得 pandas SQLクエリ実行、DataFrame作成
4. Parquet変換 pyarrow, io Parquet形式への変換、バッファ作成
5. ファイル名生成 datetime タイムスタンプ付きS3キー生成
6. S3アップロード boto3 S3へのアップロード
7. エラー処理 botocore.exceptions 例外ハンドリング

まとめ

本記事では、オンプレミスMySQLからAWS S3へデータのアップロードまでの実装を行いました。

実装のポイント

  1. .envファイルで認証情報を安全に管理

    • AWS認証情報やDB接続情報を環境変数で管理
  2. メモリ上でのデータ処理

    • BytesIOを使用してディスクI/Oを回避
    • 高速で効率的な処理を実現
  3. Parquet形式での保存

    • 列指向フォーマットで圧縮効率が高い
    • データ分析に最適
  4. 適切なエラーハンドリング

    • 認証エラー、接続エラーを適切に処理
    • 本番運用に向けた基礎的な実装
0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?