deltalakeの初歩をやってみた
aws configure設定済み
deltalakeをインストール。
pipでdeltalakeをインストールしたいので、
python3 -m venv delta-env
# 2. 仮想環境に入る
source delta-env/bin/activate
# 3. pipを最新にする(重要!)
pip install --upgrade pip
# 4. 必要なパッケージを全部インストール
pip install deltalake pandas pyarrow
pip install boto3
これでdeltalakeは入るけどもし古くて使い物にならなかった場合下記の2つをやること
source delta-env/bin/activate
# 2. deltalake を最新版に強制アップグレード(これで直る!)
pip install --upgrade --force-reinstall deltalake
# 3. ついでにpyarrowも最新に(念のため)
pip install --upgrade pyarrow
pip show deltalake
# 3. 完全削除して最新版をインストール(これで絶対に直る!)
pip uninstall deltalake -y
pip install deltalake --upgrade --no-cache-dir
# 4. 再確認(1.18.0 以上になってるはず!)
pip show deltalake
次にS3パケットを作る
aws s3 mb s3://my-delta-tokyo-$(date +%Y%m%d-%H%M%S) --region ap-northeast-1
delta_create.py
# -*- coding: utf-8 -*-
import pandas as pd
import random
from deltalake import write_deltalake
import os
# ←←←←←←←←←←←←←←←←←←←←←←←←←←←←←←←←←←←←←←←←←←
os.environ["AWS_REGION"] = "ap-northeast-1"
# ←←←←←←←←←←←←←←←←←←←←←←←←←←←←←←←←←←←←←←←←←←
# あなたのバケット(正しい!)
S3_PATH = "s3://my-delta-test-20251201-223013/delta-sample-data/"
print("Delta Lake データ作成スタート!")
df = pd.DataFrame({
"user_id": range(1, 10001),
"名前": [f"山田{random.randint(1,999)}" if i % 3 == 0 else f"佐藤{random.randint(1,999)}" if i % 3 == 1 else f"鈴木{random.randint(1,999)}" for i in range(10000)],
"年齢": random.choices(range(18, 80), k=10000),
"住所": random.choices(["東京都", "大阪府", "愛知県", "福岡県", "北海道"], k=10000),
"年収_万円": [random.randint(300, 1500) for _ in range(10000)],
"登録日": pd.date_range("2024-01-01", periods=10000).strftime("%Y-%m-%d"),
})
write_deltalake(S3_PATH, df, mode="overwrite")
print("======================")
print(" 完 全 成 功 !!!")
print("======================")
print(f"保存先 → {S3_PATH}")
print("今すぐAthenaでクエリできます!!")
今度はS3に上げたデータを読み込む処理までやりたいと思います。
# -*- coding: utf-8 -*-
import os
import time
import boto3
import pandas as pd
import random
from deltalake import write_deltalake
# 東京リージョン固定
REGION = "ap-northeast-1"
os.environ["AWS_DEFAULT_REGION"] = REGION
# 1. バケット自動作成
s3 = boto3.client("s3", region_name=REGION)
bucket_name = f"my-delta-tokyo-{int(time.time())}"
s3.create_bucket(
Bucket=bucket_name,
CreateBucketConfiguration={"LocationConstraint": REGION}
)
S3_PATH = f"s3://{bucket_name}/delta-sample-data/"
print(f"バケット作成 → {bucket_name}")
# 2. Delta Lake データ作成(10,000件)
df = pd.DataFrame({
"user_id": range(1, 10001),
"名前": [f"山田{random.randint(1,999)}" if i%3==0 else f"佐藤{random.randint(1,999)}" if i%3==1 else f"鈴木{random.randint(1,999)}" for i in range(10000)],
"年齢": random.choices(range(18, 80), k=10000),
"住所": random.choices(["東京都", "大阪府", "愛知県", "福岡県", "北海道"], k=10000),
"年収_万円": [random.randint(300, 1500) for _ in range(10000)],
"登録日": pd.date_range("2024-01-01", periods=10000).strftime("%Y-%m-%d")
})
write_deltalake(S3_PATH, df, mode="overwrite")
print("Delta Lake 書き込み完了!")
# 3. Glue データベース作成
glue = boto3.client("glue", region_name=REGION)
try:
glue.create_database(DatabaseInput={"Name": "delta_lake_db"})
print("Glueデータベース作成 → delta_lake_db")
except glue.exceptions.AlreadyExistsException:
print("データベースは既に存在")
# 4. Glueクローラー作成(最新仕様に完全対応!TablePrefix削除)
account_id = boto3.client("sts").get_caller_identity()["Account"]
role_arn = f"arn:aws:iam::{account_id}:role/AWSGlueServiceRoleDefault"
try:
# クローラーが実行中の場合は停止
try:
glue.stop_crawler(Name="tokyo-delta-crawler")
print("クローラー停止中...")
time.sleep(3)
except:
pass
glue.delete_crawler(Name="tokyo-delta-crawler")
print("古いクローラー削除")
time.sleep(2) # 削除完了まで待機
except glue.exceptions.EntityNotFoundException:
pass
except Exception as e:
print(f"クローラー削除エラー(無視): {e}")
glue.create_crawler(
Name="tokyo-delta-crawler",
Role=role_arn,
DatabaseName="delta_lake_db",
Targets={"S3Targets": [{"Path": f"s3://{bucket_name}/delta-sample-data/"}]},
SchemaChangePolicy={"UpdateBehavior": "UPDATE_IN_DATABASE", "DeleteBehavior": "LOG"},
# ← ここを削除して最新仕様に!
)
glue.start_crawler(Name="tokyo-delta-crawler")
print("クローラー起動完了!! 60秒後にAthenaで確認できます!")
# 5. Athena設定(クエリ結果保存先)
athena = boto3.client("athena", region_name=REGION)
query_output_bucket = f"my-athena-results-{int(time.time())}"
s3.create_bucket(
Bucket=query_output_bucket,
CreateBucketConfiguration={"LocationConstraint": REGION}
)
print(f"Athena結果保存バケット作成 → {query_output_bucket}")
# 6. Athenaワークグループ作成
try:
athena.delete_work_group(WorkGroup="delta-workgroup", RecursiveDeleteOption=True)
print("古いワークグループ削除")
time.sleep(2)
except athena.exceptions.InvalidRequestException:
pass
except Exception as e:
print(f"ワークグループ削除エラー(無視): {e}")
athena.create_work_group(
Name="delta-workgroup",
Configuration={
"ResultConfiguration": {
"OutputLocation": f"s3://{query_output_bucket}/results/"
},
"EnforceWorkGroupConfiguration": True,
}
)
print("Athenaワークグループ作成 → delta-workgroup")
# 7. Athenaでサンプルクエリ実行(60秒待機後)
print("\n⏳ クローラー処理待機中(60秒)...")
time.sleep(60)
try:
query = "SELECT COUNT(*) as record_count FROM delta_lake_db.table LIMIT 10"
response = athena.start_query_execution(
QueryString=query,
QueryExecutionContext={"Database": "delta_lake_db"},
WorkGroup="delta-workgroup"
)
query_execution_id = response["QueryExecutionId"]
print(f"✓ Athenaクエリ実行開始 → Query ID: {query_execution_id}")
except Exception as e:
print(f"⚠️ Athenaクエリ実行エラー: {e}")
print(" (Delta Lake形式のテーブルは Athena で直接クエリできない場合があります)")
query_execution_id = "N/A"
print("\n" + "="*60)
print(" 完 全 自 動 設 定 完 了 !!!")
print("="*60)
print(f"バケット → {bucket_name}")
print(f"Athena結果保存 → {query_output_bucket}")
print(f"Athena URL → https://{REGION}.console.aws.amazon.com/athena/home?region={REGION}#query")
print(f"クエリ実行ID → {query_execution_id}")
print("="*60)
print("\n📊 Athenaクエリ例:")
print(" SELECT * FROM delta_lake_db.table LIMIT 10;")
print(" SELECT COUNT(*) FROM delta_lake_db.table;")
print(" SELECT 住所, COUNT(*) FROM delta_lake_db.table GROUP BY 住所;")
print("="*60)
# 8. Delta テーブルを直接読み込んで表示(可能ならこちらが最も確実)
try:
print("\n📥 Delta テーブルを直接 S3 から読み込みます...")
from deltalake import DeltaTable
dt = DeltaTable(S3_PATH)
# DeltaTable.to_pandas() はテーブル全体を読み込むため大きい場合注意
df_sample = dt.to_pandas().head(10)
print("--- Delta 表のサンプル(最大 10 行) ---")
print(df_sample)
except Exception as e:
print(f"⚠️ Delta からの読み込みエラー: {e}")
print("代替手段:S3 上のファイル一覧を取得して最初の Parquet をダウンロードして表示します...")
try:
objs = s3.list_objects_v2(Bucket=bucket_name, Prefix="delta-sample-data/")
contents = objs.get("Contents", [])
if not contents:
print("S3 にファイルが見つかりませんでした。")
else:
print("--- S3 上の先頭ファイル一覧 ---")
for o in contents[:10]:
print(f"- {o['Key']}")
# 最初の parquet ファイルを探してダウンロード
parquet_keys = [o['Key'] for o in contents if o['Key'].endswith('.parquet')]
if parquet_keys:
first_key = parquet_keys[0]
local_tmp = '/tmp/first_parquet.parquet'
s3.download_file(bucket_name, first_key, local_tmp)
try:
import pyarrow.parquet as pq
table = pq.read_table(local_tmp)
print("--- Parquet ファイルのサンプル(最大 10 行) ---")
print(table.to_pandas().head(10))
except Exception as e2:
print(f"Parquet 読み込みエラー: {e2}")
else:
print("Parquet ファイルが見つかりませんでした。")
except Exception as e3:
print(f"代替手段でも失敗しました: {e3}")
一応読み込んだデータが表示されるはずです。
athenaとかGlueとか使ったんですが、うまくいかなかった....
Icebergとの連携がいいとの話なのでIcebergを使ってみたいと思います。
# 1. 環境に入る
source /home/ユーザー名/delta-io/delta-env/bin/activate
# 3. 環境から抜ける(終了後)
deactivate
権限まわり
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"glue:CreateDatabase",
"glue:DeleteDatabase",
"glue:GetDatabase",
"glue:GetDatabases",
"glue:CreateTable",
"glue:DeleteTable",
"glue:GetTable",
"glue:GetTables",
"glue:CreatePartition",
"glue:DeletePartition",
"glue:GetPartition",
"glue:GetPartitions",
"glue:BatchCreatePartition",
"glue:BatchDeletePartition"
],
"Resource": [
"arn:aws:glue:region:account-id:catalog",
"arn:aws:glue:region:account-id:database/*",
"arn:aws:glue:region:account-id:table/*",
"arn:aws:glue:region:account-id:partition/*"
]
}
]
}
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:GetObject",
"s3:ListBucket"
],
"Resource": [
"arn:aws:s3:::your-source-bucket",
"arn:aws:s3:::your-source-bucket/*"
]
},
{
"Effect": "Allow",
"Action": [
"s3:PutObject",
"s3:GetObject"
],
"Resource": [
"arn:aws:s3:::your-results-bucket/*"
]
}
]
}
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "AllowCrossAccountAccess",
"Effect": "Allow",
"Principal": {
"AWS": "arn:aws:iam::other-account-id:root"
},
"Action": "s3:*",
"Resource": [
"arn:aws:s3:::your-bucket",
"arn:aws:s3:::your-bucket/*"
]
}
]
}
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"athena:StartQueryExecution",
"athena:StopQueryExecution",
"athena:GetQueryExecution",
"athena:GetQueryResults",
"athena:ListQueryExecutions",
"athena:TagResource",
"athena:UntagResource"
],
"Resource": "*"
},
{
"Effect": "Allow",
"Action": [
"glue:GetDatabase",
"glue:GetDatabases",
"glue:GetTable",
"glue:GetTables",
"glue:GetPartition",
"glue:GetPartitions",
"glue:BatchGetPartition"
],
"Resource": "*"
},
{
"Effect": "Allow",
"Action": [
"s3:GetObject",
"s3:ListBucket",
"s3:PutObject"
],
"Resource": [
"arn:aws:s3:::your-source-bucket/*",
"arn:aws:s3:::your-results-bucket/*"
]
}
]
}
もしくは
aws iam create-role --role-name AWSGlueServiceRoleDefault --assume-role-policy-document '{"Version":"2012-10-17","Statement":[{"Effect":"Allow","Principal":{"Service":"glue.amazonaws.com"},"Action":"sts:AssumeRole"}]}'
aws iam attach-role-policy --role-name AWSGlueServiceRoleDefault --policy-arn arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole
aws iam attach-role-policy --role-name AWSGlueServiceRoleDefault --policy-arn arn:aws:iam::aws:policy/AmazonS3FullAccess
でいけるはず