はじめに
オンプレミスのMySQLからAWS S3へのデータアップロードの実装を行ってみました。
.envファイルから環境変数を読み込む方法で以下のフローに沿って実装を行っています。
オンプレミスMySQL
↓ pymysql.connect() + pd.read_sql()
pandas DataFrame
↓ df.to_parquet(buffer, engine='pyarrow')
BytesIO(メモリ上のParquetデータ)
↓ s3_client.put_object(Body=buffer.getvalue())
AWS S3
備忘も兼ねて、各ライブラリにおける使用メソッドの役割や、
使い方についても説明を入れているので、初学者にもわかりやすいように作成をしています。
1. AWS接続(.env)
.envファイルから環境変数を読み込んでAWS認証を行う方法でAWS S3との接続を行います。他の接続方法については今回紹介しません。
1.1 環境準備
必要なライブラリのインストール
pip install python-dotenv boto3
インストール確認
pip list
以下のライブラリが表示されればOK
- python-dotenv
- boto3
- botocore(boto3と一緒にインストールされる)
利用する標準ライブラリ
- os
1.2 各ライブラリの役割
| ライブラリ | 役割 | 主な機能 |
|---|---|---|
| dotenv | 環境変数の読み込み | .envファイルからの読み取り |
| os | 環境変数の取得 | オペレーティングシステムとのやり取り |
| boto3 | AWS SDK | AWSサービスとの接続・操作 |
1.2.1 dotenv の主な機能
ルートディレクトリの.envファイルの読み込み
from dotenv import load_dotenv
load_dotenv()
Pathを指定した.envファイルの読み込み
load_dotenv(dotenv_path='/path/to/.env')
1.2.2 os の主な機能
環境変数の取得
import os
database_url = os.getenv('DATABASE_URL')
本稿未使用機能
# カレントディレクトリの取得
current_dir = os.getcwd()
print(f"現在のディレクトリ: {current_dir}")
# ファイルパスの操作
file_path = os.path.join('data', 'users', 'profile.csv')
print(file_path) # data/users/profile.csv(OSに応じて適切な区切り文字)
# ファイル・ディレクトリの存在確認
if os.path.exists('data'):
print("dataディレクトリが存在します")
# ディレクトリの作成
os.makedirs('logs/2025/01', exist_ok=True)
# ファイル一覧の取得
files = os.listdir('.')
print(f"ファイル数: {len(files)}")
1.2.3 boto3 の主な機能
boto3とは
- PythonからAWSサービスを操作するための公式SDK
- S3、EC2、RDS、Lambdaなど、ほぼすべてのAWSサービスに対応
- Amazon Web Services(AWS)によって開発・メンテナンスされている
boto3の2つのインターフェース
| インターフェース | 特徴 | 使用例 |
|---|---|---|
| Client(低レベルAPI) | AWSのAPIを直接呼び出す | 細かい制御が必要な場合 |
| Resource(高レベルAPI) | オブジェクト指向的な操作 | シンプルな操作の場合 |
Clientの作成
import boto3
# S3クライアントの作成
s3_client = boto3.client(
's3',
aws_access_key_id='YOUR_ACCESS_KEY',
aws_secret_access_key='YOUR_SECRET_KEY',
region_name='ap-northeast-1'
)
boto3.client() の主要パラメータ
| パラメータ | 説明 | 例 |
|---|---|---|
service_name |
AWSサービス名(必須) |
's3', 'ec2', 'rds'
|
region_name |
AWSリージョン |
'ap-northeast-1'(東京) |
aws_access_key_id |
アクセスキーID | 環境変数から取得 |
aws_secret_access_key |
シークレットアクセスキー | 環境変数から取得 |
endpoint_url |
カスタムエンドポイント | LocalStack使用時など |
config |
詳細設定(リトライ、タイムアウトなど) |
Config()オブジェクト |
1.2.4 botocore.exceptions の主な機能
botocore.exceptionsとは
- boto3の内部ライブラリ(boto3と一緒にインストールされる)
- AWS操作時に発生する例外を定義
- エラーハンドリングに使用
主要な例外クラス
| 例外クラス | 発生タイミング | 原因 |
|---|---|---|
| NoCredentialsError | 認証情報が見つからない | .envファイルの設定漏れ |
| ClientError | AWS APIの呼び出しエラー | バケット不存在、権限不足など |
NoCredentialsError
from botocore.exceptions import NoCredentialsError
try:
# S3操作
response = s3_client.list_buckets()
except NoCredentialsError:
print("✗ AWS認証情報が見つかりません")
発生する主な原因:
-
.envファイルが存在しない -
AWS_ACCESS_KEY_IDまたはAWS_SECRET_ACCESS_KEYが未設定 -
load_dotenv()を呼び出していない
ClientError
from botocore.exceptions import ClientError
try:
# S3操作
response = s3_client.put_object(...)
except ClientError as e:
error_code = e.response['Error']['Code']
if error_code == 'NoSuchBucket':
print("✗ バケットが存在しません")
elif error_code == 'AccessDenied':
print("✗ アクセス権限がありません")
主要なエラーコード
| エラーコード | 意味 | 対処法 |
|---|---|---|
NoSuchBucket |
バケットが存在しない | バケット名を確認 |
AccessDenied |
アクセス権限がない | IAMポリシーを確認 |
InvalidAccessKeyId |
アクセスキーIDが無効 | アクセスキーを確認 |
SignatureDoesNotMatch |
シークレットキーが無効 | シークレットキーを確認 |
1.3 boto3の認証情報取得の優先順位
Boto3は以下の順序で認証情報を探します:
1. コード内で明示的に指定 ← 本稿で使用
s3 = boto3.client(
's3',
aws_access_key_id='YOUR_KEY',
aws_secret_access_key='YOUR_SECRET'
)
2. 環境変数
export AWS_ACCESS_KEY_ID="YOUR_KEY"
export AWS_SECRET_ACCESS_KEY="YOUR_SECRET"
3. AWS認証情報ファイル (~/.aws/credentials)
[default]
aws_access_key_id = YOUR_KEY
aws_secret_access_key = YOUR_SECRET
4. IAMロール(EC2、Lambda、ECSで実行時)
本稿の採用方式: .envファイルから環境変数を読み込み、コード内で明示的に指定する方法(優先順位1)を採用しています。
1.4 AWS認証情報の取得方法
1. IAMユーザーの作成
- AWSマネジメントコンソールにログイン (https://aws.amazon.com/console/)
- 「IAM」サービスを開く
- 「ユーザー」→「ユーザーを作成」をクリック
- ユーザー名を入力(例:
python-s3-user) - 「次へ」をクリック
2. 権限の設定
以下のいずれかのポリシーをアタッチ:
-
学習用:
AmazonS3FullAccess(S3の全操作が可能) -
読み取り専用:
AmazonS3ReadOnlyAccess - 本番環境: 最小権限のカスタムポリシー
3. アクセスキーの作成
- 作成したユーザーをクリック
- 「セキュリティ認証情報」タブを選択
- 「アクセスキーを作成」をクリック
- 「ローカルコード」を選択
-
アクセスキーIDとシークレットアクセスキーをコピー
- ⚠️ 重要: シークレットキーはこの画面でしか表示されません
1.5 .envファイルの作成
プロジェクトのルートディレクトリに .env ファイルを作成します。
# MySQL接続情報
MYSQL_HOST=your-mysql-host
MYSQL_PORT=3306
MYSQL_USER=your-username
MYSQL_PASSWORD=your-password
MYSQL_DATABASE=your-database
# AWS接続情報
AWS_REGION=ap-northeast-1
S3_BUCKET_NAME=your-bucket-name
# AWS認証情報
AWS_ACCESS_KEY_ID=your-access-key-id
AWS_SECRET_ACCESS_KEY=your-secret-access-key
1.6 AWS S3接続の実装
基本的な接続コード
import boto3
import os
from dotenv import load_dotenv
from botocore.exceptions import ClientError, NoCredentialsError
# .envファイルを読み込む
load_dotenv()
# S3クライアントを作成
s3_client = boto3.client(
's3',
aws_access_key_id=os.getenv('AWS_ACCESS_KEY_ID'),
aws_secret_access_key=os.getenv('AWS_SECRET_ACCESS_KEY'),
region_name=os.getenv('AWS_REGION', 'ap-northeast-1')
)
# 接続テスト
try:
response = s3_client.list_buckets()
print("✓ AWS S3接続成功")
print(f"バケット数: {len(response['Buckets'])}")
except NoCredentialsError:
print("✗ 認証情報が見つかりません")
except ClientError as e:
print(f"✗ 接続エラー: {e}")
2. DBからのデータ取得・Parquet変換
2.1 使用ライブラリの役割
| ライブラリ | 役割 | 主な機能 |
|---|---|---|
| pandas | データ操作 | MySQLデータの取得、DataFrameの操作、Parquet変換 |
| pymysql | MySQL接続 | MySQLデータベースへの接続・クエリ実行 |
| pyarrow | Parquet処理 | Parquet形式への変換・圧縮 |
| io | データストリーム管理 | メモリ上でのファイル操作 |
2.1.1 pandas の主な機能
MySQLからデータの取得
# MySQLからDataFrame取得
query = f"SELECT * FROM {table_name}"
df = pd.read_sql(query, connection)
データフレームのParquet変換
df.to_parquet(buffer, engine='pyarrow', compression='snappy')
2.1.2 pymysql の主な機能
MySQL接続の作成
connection = pymysql.connect(
host='localhost',
user='user',
password='password',
database='database'
)
MySQL接続のクローズ
connection.close()
2.1.3 pyarrow の主な機能
pyarrow とは
- pandasでParquet形式を扱うために必要なParquetエンジン
- snappy, gzip, brotliなど複数の圧縮方式をサポート
データフレームのParquet変換(再掲)
# pyarrowでParquet変換、圧縮方式snappyで圧縮
df.to_parquet(buffer, engine='pyarrow', compression='snappy')
2.1.4 io の主な機能
io とは
- Pythonの標準ライブラリ
- メモリ上でファイルのような操作を行うための機能を提供(ファイルを実際に作成せずにメモリ上で処理)
⇒ ioでバッファを作成することで、ディスクにファイルを作る手間を省いて、メモリ上で直接データを変換して扱うことができる
io の2つの主要クラス
| クラス | データ型 | 用途 |
|---|---|---|
| StringIO | 文字列(str) | CSV、JSON、テキストデータ |
| BytesIO | バイト列(bytes) | Parquet、画像、バイナリデータ |
BytesIOの使用例
from io import BytesIO
# BytesIOオブジェクトを作成
buffer = BytesIO()
# バイトデータを書き込む
buffer.write(b"Hello, World!")
buffer.write(b"\n")
buffer.write(b"Binary data")
# 内容を取得
content = buffer.getvalue()
print(content)
# 出力: b'Hello, World!\nBinary data'
# クローズ
buffer.close()
2.2 DBからデータ取得・変換の実装
基本的な取得・変換コード
import pandas as pd
import pymysql
import os
from dotenv import load_dotenv
from io import BytesIO
# .envファイルを読み込む
load_dotenv()
# MySQL接続を作成
connection = pymysql.connect(
host=os.getenv('MYSQL_HOST'),
user=os.getenv('MYSQL_USER'),
password=os.getenv('MYSQL_PASSWORD'),
database=os.getenv('MYSQL_DATABASE')
)
# データの取得
table_name = 'users'
query = f"SELECT * FROM {table_name}"
df = pd.read_sql(query, connection)
# データの変換
# BytesIOオブジェクトを作成
buffer = BytesIO()
df.to_parquet(buffer, engine='pyarrow', compression='snappy')
3. S3へのデータアップロード
3.1 使用ライブラリの役割
| ライブラリ | 役割 | 主な機能 |
|---|---|---|
| boto3 | AWS SDK | S3へのデータアップロード |
| datetime | 日時処理 | タイムスタンプ生成、ファイル名への日付追加 |
3.1.1 boto3 の S3アップロード機能
boto3とは(再掲)
- 1章で接続に使用したライブラリ
- 接続だけでなく、S3へのデータアップロード機能も提供
S3アップロードの主要メソッド
| メソッド | 用途 | データ形式 |
|---|---|---|
| put_object() | オブジェクトをアップロード | メモリ上のデータ(推奨) |
| upload_file() | ローカルファイルをアップロード | ディスク上のファイル |
| upload_fileobj() | ファイルライクオブジェクトをアップロード | ストリーム |
本稿での使用メソッド: put_object() (メモリ上のParquetデータを直接アップロード)
put_object() の主要パラメータ
s3_client.put_object(
Bucket='バケット名', # 必須:アップロード先のS3バケット
Key='S3のパス/ファイル名', # 必須:S3上のオブジェクトキー(パス)
Body='アップロードするデータ', # 必須:アップロードするデータ本体
ContentType='MIMEタイプ' # 任意:ファイルの種類を指定
)
各パラメータの説明
| パラメータ | 説明 | 例 |
|---|---|---|
Bucket |
アップロード先のS3バケット名 | 'my-data-bucket' |
Key |
S3上のオブジェクトキー(ファイルパス) | 'data/users/20250115.parquet' |
Body |
アップロードするデータ(str or bytes) | buffer.getvalue() |
ContentType |
ファイルのMIMEタイプ | 'application/x-parquet' |
S3キーとは
S3キー = S3バケット内でファイルを識別するための名前(パス)
パソコンのフォルダ構造と同じように考えると理解しやすいです。
S3の構造
s3://バケット名/S3キー
| 要素 | 説明 | 例 |
|---|---|---|
| バケット名(Bucket) | データを保存する最上位のコンテナ | my-data-bucket |
| S3キー(Key) | バケット内のファイルのパス | data/users/20250115.parquet |
視覚的なイメージ
S3バケット: my-data-bucket
│
├── data/
│ ├── users/
│ │ ├── 20250115.parquet ← S3キー: data/users/20250115.parquet
│ │ └── 20250116.parquet ← S3キー: data/users/20250116.parquet
│ │
│ └── orders/
│ └── 20250115.parquet ← S3キー: data/orders/20250115.parquet
│
└── logs/
└── app.log ← S3キー: logs/app.log
S3キーの重要な特徴
-
実際のフォルダは存在しない
- S3は「フラットな構造」で、すべてのファイルは同じ階層に保存
-
/(スラッシュ)を含むキー名で「フォルダっぽく」見せているだけ
-
キーは文字列
- S3キーは単なる文字列
-
data/users/file.parquetもdata-users-file.parquetも同じ「文字列」
-
大文字小文字を区別する
-
Data/users/file.parquetとdata/users/file.parquetは別のファイル
-
主要なContentType一覧
| ファイル形式 | ContentType |
|---|---|
| Parquet | 'application/x-parquet' |
| CSV | 'text/csv' |
| JSON | 'application/json' |
| テキスト | 'text/plain' |
3.1.2 datetime の主な機能
datetimeとは
- Pythonの標準ライブラリ
- 日時処理を行うための機能を提供
- S3のファイル名にタイムスタンプを付与する際に使用
現在日時の取得
from datetime import datetime
# 現在日時を取得
now = datetime.now()
日時のフォーマット(strftime)
# タイムスタンプ生成
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
# 例:'20250115_143052'
よく使うフォーマット指定子
| 指定子 | 意味 | 例 |
|---|---|---|
%Y |
年(4桁) | 2025 |
%m |
月(2桁) | 01 |
%d |
日(2桁) | 15 |
%H |
時(24時間制、2桁) | 14 |
%M |
分(2桁) | 30 |
%S |
秒(2桁) | 52 |
S3キー生成での使用例
# パターン1: タイムスタンプ形式
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
s3_key = f"data/{table_name}/{timestamp}.parquet"
# 例:'data/users/20250115_143052.parquet'
# パターン2: 日付形式
date = datetime.now().strftime('%Y-%m-%d')
s3_key = f"data/{table_name}/{date}.parquet"
# 例:'data/users/2025-01-15.parquet'
# パターン3: Hive形式(パーティション)
year = datetime.now().strftime('%Y')
month = datetime.now().strftime('%m')
day = datetime.now().strftime('%d')
s3_key = f"data/{table_name}/year={year}/month={month}/day={day}/data.parquet"
# 例:'data/users/year=2025/month=01/day=15/data.parquet'
3.2 S3キーの命名パターン
パターン1: シンプルな命名
# ファイル名だけ
s3_key = "users.parquet"
# 1階層のフォルダ
s3_key = "data/users.parquet"
パターン2: 日付ベースの命名
from datetime import datetime
# タイムスタンプ付き
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
s3_key = f"data/users/{timestamp}.parquet"
# 例:data/users/20250115_143052.parquet
# 日付のみ
date = datetime.now().strftime('%Y-%m-%d')
s3_key = f"data/users/{date}.parquet"
# 例:data/users/2025-01-15.parquet
パターン3: テーブル名ベース
table_name = "users"
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
s3_key = f"data/{table_name}/{timestamp}.parquet"
# 例:data/users/20250115_143052.parquet
パターン4: Hive形式(パーティション)
データ分析でよく使われる形式です。
from datetime import datetime
year = datetime.now().strftime('%Y')
month = datetime.now().strftime('%m')
day = datetime.now().strftime('%d')
s3_key = f"data/users/year={year}/month={month}/day={day}/data.parquet"
# 例:data/users/year=2025/month=01/day=15/data.parquet
この形式だと、日付で効率的にデータを検索できます。
3.3 アップロードの実装
基本的なアップロードのコード
from datetime import datetime
import boto3
# 1. S3キー生成
table_name = 'users'
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
s3_key = f'data/{table_name}/{timestamp}.parquet'
# 2. S3にアップロード
response = s3_client.put_object(
Bucket=os.getenv('S3_BUCKET_NAME'),
Key=s3_key,
Body=buffer.getvalue(),
ContentType='application/x-parquet'
)
# 3. 成功確認
if response['ResponseMetadata']['HTTPStatusCode'] == 200:
print("✓ アップロード成功")
4. ETL処理の実装
import boto3
import os
import sys
from dotenv import load_dotenv
from botocore.exceptions import ClientError, NoCredentialsError
import pandas as pd
import pymysql
from io import BytesIO
from datetime import datetime
def upload_mysql_to_s3(table_name):
"""
MySQLテーブルをParquet形式でS3にアップロード
"""
try:
# .envファイルを読み込む
load_dotenv()
# S3クライアントを作成
s3_client = boto3.client(
's3',
aws_access_key_id=os.getenv('AWS_ACCESS_KEY_ID'),
aws_secret_access_key=os.getenv('AWS_SECRET_ACCESS_KEY'),
region_name=os.getenv('AWS_REGION', 'ap-northeast-1')
)
# MySQL接続を作成
connection = pymysql.connect(
host=os.getenv('MYSQL_HOST'),
port=int(os.getenv('MYSQL_PORT', 3306)),
user=os.getenv('MYSQL_USER'),
password=os.getenv('MYSQL_PASSWORD'),
database=os.getenv('MYSQL_DATABASE')
)
# データの取得
query = f"SELECT * FROM {table_name}"
df = pd.read_sql(query, connection)
print(f"✓ データ取得完了: {len(df)}行")
# データの変換
# BytesIOオブジェクトを作成
buffer = BytesIO()
df.to_parquet(buffer, engine='pyarrow', compression='snappy', index=False)
# S3キー生成
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
s3_key = f'data/{table_name}/{timestamp}.parquet'
# S3にアップロード
response = s3_client.put_object(
Bucket=os.getenv('S3_BUCKET_NAME'),
Key=s3_key,
Body=buffer.getvalue(),
ContentType='application/x-parquet'
)
# 成功確認
if response['ResponseMetadata']['HTTPStatusCode'] == 200:
print(f"✓ アップロード成功: s3://{os.getenv('S3_BUCKET_NAME')}/{s3_key}")
print(f" ファイルサイズ: {len(buffer.getvalue())} バイト")
connection.close()
return True
except NoCredentialsError:
print("✗ AWS認証情報が見つかりません")
return False
except ClientError as e:
print(f"✗ AWS接続エラー: {e}")
return False
except pymysql.Error as e:
print(f"✗ MySQL接続エラー: {e}")
return False
except Exception as e:
print(f"✗ エラー発生: {e}")
return False
#CLIで実行時引数としてテーブル名を渡す
if __name__ == "__main__":
if len(sys.argv) < 2:
sys.exit(1)
table_name = sys.argv[1]
print(f"▶ テーブル '{table_name}' のアップロードを開始します...")
upload_mysql_to_s3(table_name)
5. データフロー全体像
オンプレミスMySQL
↓ pymysql.connect() + pd.read_sql()
pandas DataFrame
↓ df.to_parquet(buffer, engine='pyarrow')
BytesIO(メモリ上のParquetデータ)
↓ s3_client.put_object(Body=buffer.getvalue())
AWS S3
各ステップで使用するライブラリ
| ステップ | 使用ライブラリ | 主な機能 |
|---|---|---|
| 1. 環境変数読込 |
dotenv, os
|
.envファイルから認証情報取得 |
| 2. DB接続 | pymysql |
MySQL接続 |
| 3. データ取得 | pandas |
SQLクエリ実行、DataFrame作成 |
| 4. Parquet変換 |
pyarrow, io
|
Parquet形式への変換、バッファ作成 |
| 5. ファイル名生成 | datetime |
タイムスタンプ付きS3キー生成 |
| 6. S3アップロード | boto3 |
S3へのアップロード |
| 7. エラー処理 | botocore.exceptions |
例外ハンドリング |
まとめ
本記事では、オンプレミスMySQLからAWS S3へデータのアップロードまでの実装を行いました。
実装のポイント
-
.envファイルで認証情報を安全に管理
- AWS認証情報やDB接続情報を環境変数で管理
-
メモリ上でのデータ処理
- BytesIOを使用してディスクI/Oを回避
- 高速で効率的な処理を実現
-
Parquet形式での保存
- 列指向フォーマットで圧縮効率が高い
- データ分析に最適
-
適切なエラーハンドリング
- 認証エラー、接続エラーを適切に処理
- 本番運用に向けた基礎的な実装