1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

deltalakeの初歩をやってみた

Last updated at Posted at 2025-12-02

deltalakeの初歩をやってみた

aws configure設定済み

deltalakeをインストール。

pipでdeltalakeをインストールしたいので、

python3 -m venv delta-env

# 2. 仮想環境に入る
source delta-env/bin/activate

# 3. pipを最新にする(重要!)
pip install --upgrade pip

# 4. 必要なパッケージを全部インストール
pip install deltalake pandas pyarrow

pip install boto3

これでdeltalakeは入るけどもし古くて使い物にならなかった場合下記の2つをやること

source delta-env/bin/activate

# 2. deltalake を最新版に強制アップグレード(これで直る!)
pip install --upgrade --force-reinstall deltalake

# 3. ついでにpyarrowも最新に(念のため)
pip install --upgrade pyarrow

pip show deltalake

# 3. 完全削除して最新版をインストール(これで絶対に直る!)
pip uninstall deltalake -y
pip install deltalake --upgrade --no-cache-dir

# 4. 再確認(1.18.0 以上になってるはず!)
pip show deltalake

次にS3パケットを作る

aws s3 mb s3://my-delta-tokyo-$(date +%Y%m%d-%H%M%S) --region ap-northeast-1
delta_create.py
# -*- coding: utf-8 -*-
import pandas as pd
import random
from deltalake import write_deltalake
import os

# ←←←←←←←←←←←←←←←←←←←←←←←←←←←←←←←←←←←←←←←←←←
os.environ["AWS_REGION"] = "ap-northeast-1"
# ←←←←←←←←←←←←←←←←←←←←←←←←←←←←←←←←←←←←←←←←←←

# あなたのバケット(正しい!)
S3_PATH = "s3://my-delta-test-20251201-223013/delta-sample-data/"

print("Delta Lake データ作成スタート!")

df = pd.DataFrame({
    "user_id": range(1, 10001),
    "名前": [f"山田{random.randint(1,999)}" if i % 3 == 0 else f"佐藤{random.randint(1,999)}" if i % 3 == 1 else f"鈴木{random.randint(1,999)}" for i in range(10000)],
    "年齢": random.choices(range(18, 80), k=10000),
    "住所": random.choices(["東京都", "大阪府", "愛知県", "福岡県", "北海道"], k=10000),
    "年収_万円": [random.randint(300, 1500) for _ in range(10000)],
    "登録日": pd.date_range("2024-01-01", periods=10000).strftime("%Y-%m-%d"),
})

write_deltalake(S3_PATH, df, mode="overwrite")

print("======================")
print("    完 全 成 功 !!!")
print("======================")
print(f"保存先 → {S3_PATH}")
print("今すぐAthenaでクエリできます!!")

今度はS3に上げたデータを読み込む処理までやりたいと思います。

# -*- coding: utf-8 -*-
import os
import time
import boto3
import pandas as pd
import random
from deltalake import write_deltalake

# 東京リージョン固定
REGION = "ap-northeast-1"
os.environ["AWS_DEFAULT_REGION"] = REGION

# 1. バケット自動作成
s3 = boto3.client("s3", region_name=REGION)
bucket_name = f"my-delta-tokyo-{int(time.time())}"
s3.create_bucket(
    Bucket=bucket_name,
    CreateBucketConfiguration={"LocationConstraint": REGION}
)
S3_PATH = f"s3://{bucket_name}/delta-sample-data/"
print(f"バケット作成 → {bucket_name}")

# 2. Delta Lake データ作成(10,000件)
df = pd.DataFrame({
    "user_id": range(1, 10001),
    "名前": [f"山田{random.randint(1,999)}" if i%3==0 else f"佐藤{random.randint(1,999)}" if i%3==1 else f"鈴木{random.randint(1,999)}" for i in range(10000)],
    "年齢": random.choices(range(18, 80), k=10000),
    "住所": random.choices(["東京都", "大阪府", "愛知県", "福岡県", "北海道"], k=10000),
    "年収_万円": [random.randint(300, 1500) for _ in range(10000)],
    "登録日": pd.date_range("2024-01-01", periods=10000).strftime("%Y-%m-%d")
})
write_deltalake(S3_PATH, df, mode="overwrite")
print("Delta Lake 書き込み完了!")

# 3. Glue データベース作成
glue = boto3.client("glue", region_name=REGION)
try:
    glue.create_database(DatabaseInput={"Name": "delta_lake_db"})
    print("Glueデータベース作成 → delta_lake_db")
except glue.exceptions.AlreadyExistsException:
    print("データベースは既に存在")

# 4. Glueクローラー作成(最新仕様に完全対応!TablePrefix削除)
account_id = boto3.client("sts").get_caller_identity()["Account"]
role_arn = f"arn:aws:iam::{account_id}:role/AWSGlueServiceRoleDefault"

try:
    # クローラーが実行中の場合は停止
    try:
        glue.stop_crawler(Name="tokyo-delta-crawler")
        print("クローラー停止中...")
        time.sleep(3)
    except:
        pass
    
    glue.delete_crawler(Name="tokyo-delta-crawler")
    print("古いクローラー削除")
    time.sleep(2)  # 削除完了まで待機
except glue.exceptions.EntityNotFoundException:
    pass
except Exception as e:
    print(f"クローラー削除エラー(無視): {e}")

glue.create_crawler(
    Name="tokyo-delta-crawler",
    Role=role_arn,
    DatabaseName="delta_lake_db",
    Targets={"S3Targets": [{"Path": f"s3://{bucket_name}/delta-sample-data/"}]},
    SchemaChangePolicy={"UpdateBehavior": "UPDATE_IN_DATABASE", "DeleteBehavior": "LOG"},
    # ← ここを削除して最新仕様に!
)

glue.start_crawler(Name="tokyo-delta-crawler")
print("クローラー起動完了!! 60秒後にAthenaで確認できます!")

# 5. Athena設定(クエリ結果保存先)
athena = boto3.client("athena", region_name=REGION)
query_output_bucket = f"my-athena-results-{int(time.time())}"
s3.create_bucket(
    Bucket=query_output_bucket,
    CreateBucketConfiguration={"LocationConstraint": REGION}
)
print(f"Athena結果保存バケット作成 → {query_output_bucket}")

# 6. Athenaワークグループ作成
try:
    athena.delete_work_group(WorkGroup="delta-workgroup", RecursiveDeleteOption=True)
    print("古いワークグループ削除")
    time.sleep(2)
except athena.exceptions.InvalidRequestException:
    pass
except Exception as e:
    print(f"ワークグループ削除エラー(無視): {e}")

athena.create_work_group(
    Name="delta-workgroup",
    Configuration={
        "ResultConfiguration": {
            "OutputLocation": f"s3://{query_output_bucket}/results/"
        },
        "EnforceWorkGroupConfiguration": True,
    }
)
print("Athenaワークグループ作成 → delta-workgroup")

# 7. Athenaでサンプルクエリ実行(60秒待機後)
print("\n⏳ クローラー処理待機中(60秒)...")
time.sleep(60)

try:
    query = "SELECT COUNT(*) as record_count FROM delta_lake_db.table LIMIT 10"
    response = athena.start_query_execution(
        QueryString=query,
        QueryExecutionContext={"Database": "delta_lake_db"},
        WorkGroup="delta-workgroup"
    )
    query_execution_id = response["QueryExecutionId"]
    print(f"✓ Athenaクエリ実行開始 → Query ID: {query_execution_id}")
except Exception as e:
    print(f"⚠️  Athenaクエリ実行エラー: {e}")
    print("   (Delta Lake形式のテーブルは Athena で直接クエリできない場合があります)")
    query_execution_id = "N/A"

print("\n" + "="*60)
print("     完 全 自 動 設 定 完 了 !!!")
print("="*60)
print(f"バケット          → {bucket_name}")
print(f"Athena結果保存     → {query_output_bucket}")
print(f"Athena URL       → https://{REGION}.console.aws.amazon.com/athena/home?region={REGION}#query")
print(f"クエリ実行ID      → {query_execution_id}")
print("="*60)
print("\n📊 Athenaクエリ例:")
print("  SELECT * FROM delta_lake_db.table LIMIT 10;")
print("  SELECT COUNT(*) FROM delta_lake_db.table;")
print("  SELECT 住所, COUNT(*) FROM delta_lake_db.table GROUP BY 住所;")
print("="*60)

# 8. Delta テーブルを直接読み込んで表示(可能ならこちらが最も確実)
try:
    print("\n📥 Delta テーブルを直接 S3 から読み込みます...")
    from deltalake import DeltaTable

    dt = DeltaTable(S3_PATH)
    # DeltaTable.to_pandas() はテーブル全体を読み込むため大きい場合注意
    df_sample = dt.to_pandas().head(10)
    print("--- Delta 表のサンプル(最大 10 行) ---")
    print(df_sample)
except Exception as e:
    print(f"⚠️ Delta からの読み込みエラー: {e}")
    print("代替手段:S3 上のファイル一覧を取得して最初の Parquet をダウンロードして表示します...")
    try:
        objs = s3.list_objects_v2(Bucket=bucket_name, Prefix="delta-sample-data/")
        contents = objs.get("Contents", [])
        if not contents:
            print("S3 にファイルが見つかりませんでした。")
        else:
            print("--- S3 上の先頭ファイル一覧 ---")
            for o in contents[:10]:
                print(f"- {o['Key']}")

            # 最初の parquet ファイルを探してダウンロード
            parquet_keys = [o['Key'] for o in contents if o['Key'].endswith('.parquet')]
            if parquet_keys:
                first_key = parquet_keys[0]
                local_tmp = '/tmp/first_parquet.parquet'
                s3.download_file(bucket_name, first_key, local_tmp)
                try:
                    import pyarrow.parquet as pq
                    table = pq.read_table(local_tmp)
                    print("--- Parquet ファイルのサンプル(最大 10 行) ---")
                    print(table.to_pandas().head(10))
                except Exception as e2:
                    print(f"Parquet 読み込みエラー: {e2}")
            else:
                print("Parquet ファイルが見つかりませんでした。")
    except Exception as e3:
        print(f"代替手段でも失敗しました: {e3}")

一応読み込んだデータが表示されるはずです。
athenaとかGlueとか使ったんですが、うまくいかなかった....
Icebergとの連携がいいとの話なのでIcebergを使ってみたいと思います。

# 1. 環境に入る
source /home/ユーザー名/delta-io/delta-env/bin/activate

# 3. 環境から抜ける(終了後)
deactivate

権限まわり

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "glue:CreateDatabase",
        "glue:DeleteDatabase",
        "glue:GetDatabase",
        "glue:GetDatabases",
        "glue:CreateTable",
        "glue:DeleteTable",
        "glue:GetTable",
        "glue:GetTables",
        "glue:CreatePartition",
        "glue:DeletePartition",
        "glue:GetPartition",
        "glue:GetPartitions",
        "glue:BatchCreatePartition",
        "glue:BatchDeletePartition"
      ],
      "Resource": [
        "arn:aws:glue:region:account-id:catalog",
        "arn:aws:glue:region:account-id:database/*",
        "arn:aws:glue:region:account-id:table/*",
        "arn:aws:glue:region:account-id:partition/*"
      ]
    }
  ]
}
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "s3:GetObject",
        "s3:ListBucket"
      ],
      "Resource": [
        "arn:aws:s3:::your-source-bucket",
        "arn:aws:s3:::your-source-bucket/*"
      ]
    },
    {
      "Effect": "Allow",
      "Action": [
        "s3:PutObject",
        "s3:GetObject"
      ],
      "Resource": [
        "arn:aws:s3:::your-results-bucket/*"
      ]
    }
  ]
}
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "AllowCrossAccountAccess",
      "Effect": "Allow",
      "Principal": {
        "AWS": "arn:aws:iam::other-account-id:root"
      },
      "Action": "s3:*",
      "Resource": [
        "arn:aws:s3:::your-bucket",
        "arn:aws:s3:::your-bucket/*"
      ]
    }
  ]
}

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "athena:StartQueryExecution",
        "athena:StopQueryExecution",
        "athena:GetQueryExecution",
        "athena:GetQueryResults",
        "athena:ListQueryExecutions",
        "athena:TagResource",
        "athena:UntagResource"
      ],
      "Resource": "*"
    },
    {
      "Effect": "Allow",
      "Action": [
        "glue:GetDatabase",
        "glue:GetDatabases",
        "glue:GetTable",
        "glue:GetTables",
        "glue:GetPartition",
        "glue:GetPartitions",
        "glue:BatchGetPartition"
      ],
      "Resource": "*"
    },
    {
      "Effect": "Allow",
      "Action": [
        "s3:GetObject",
        "s3:ListBucket",
        "s3:PutObject"
      ],
      "Resource": [
        "arn:aws:s3:::your-source-bucket/*",
        "arn:aws:s3:::your-results-bucket/*"
      ]
    }
  ]
}

もしくは

aws iam create-role --role-name AWSGlueServiceRoleDefault   --assume-role-policy-document '{"Version":"2012-10-17","Statement":[{"Effect":"Allow","Principal":{"Service":"glue.amazonaws.com"},"Action":"sts:AssumeRole"}]}'
aws iam attach-role-policy --role-name AWSGlueServiceRoleDefault   --policy-arn arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole
aws iam attach-role-policy --role-name AWSGlueServiceRoleDefault   --policy-arn arn:aws:iam::aws:policy/AmazonS3FullAccess

でいけるはず

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?