0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Databricksの課金利用ログを分析してみる

Posted at

こちらの続きです。

ログの確認

約1日待ってから以下のコマンドをローカルマシンから実行してステータスを確認します。

Bash
curl -X GET -u <Databricksアカウントオーナーのメールアドレス>:<パスワード>  \
  'https://accounts.cloud.databricks.com/api/2.0/accounts/<DatabricksアカウントID>/log-delivery' | jq

statusSUCCEEDEDになっていれば、ログのデリバリーに成功したことになります。

Screen Shot 2022-10-04 at 19.32.55.png

AWSコンソールでも確認してみるとcsvファイルが生成されていることがわかります。
Screen Shot 2022-10-05 at 13.45.10.png

課金利用ログへのアクセス設定

分析にはDatabricksノートブックを使いますが、上のログにアクセスできる様に設定を行う必要があります。以下のステップを実行するには、Databricksのデプロイに使用したIAMロールのポリシーの変更も必要になるので確認しておきます。

このIAMロールはアカウントコンソールの以下の画面で確認できます。
Screen Shot 2022-10-05 at 13.54.25.png

こちらの手順に従ってインスタンスプロファイルを作成します。

IAMロールの作成

AWSコンソールでIAMにアクセスして、ここではbillable-log-access-roleというロールを作成します。

このロールが課金利用ログが格納されているバケットにアクセスできる様に以下のポリシーをアタッチします。

JSON
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "s3:ListBucket"
            ],
            "Resource": [
                "arn:aws:s3:::/ty-db-billable-log"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "s3:PutObject",
                "s3:GetObject",
                "s3:DeleteObject",
                "s3:PutObjectAcl"
            ],
            "Resource": [
                "arn:aws:s3:::/ty-db-billable-log/*"
            ]
        }
    ]
}

作成したIAMロールのインスタンスプロファイルのARNをメモしておきます。
Screen Shot 2022-10-05 at 13.57.02.png

バケットポリシーの設定

前のステップで作成した課金利用ログを格納しているS3バケットに上のロールがアクセスできる様に、以下のバケットポリシーを追加します。

JSON
        {
            "Effect": "Allow",
            "Principal": {
                "AWS": "arn:aws:iam::<DatabricksをデプロイしたAWSアカウントID>:role/billable-log-access-role"
            },
            "Action": [
                "s3:GetBucketLocation",
                "s3:ListBucket"
            ],
            "Resource": "arn:aws:s3:::ty-db-billable-usage-log"
        },
        {
            "Effect": "Allow",
            "Principal": {
                "AWS": "arn:aws:iam::<DatabricksをデプロイしたAWSアカウントID>:role/billable-log-access-role"
            },
            "Action": [
                "s3:PutObject",
                "s3:GetObject",
                "s3:DeleteObject",
                "s3:PutObjectAcl"
            ],
            "Resource": "arn:aws:s3:::ty-db-billable-usage-log/*"
        }

PassRoleの設定

Databricksのデプロイに使用したIAMロールで、上のbillable-log-access-roleロールを利用できる様にPassRoleを許可します。

Databricksのデプロイに使用したIAMロールに以下のポリシーを追加します。

JSON
{
            "Effect": "Allow",
            "Action": "iam:PassRole",
            "Resource": "arn:aws:iam::<DatabricksをデプロイしたAWSアカウントID>:role/billable-log-access-role"
}

インスタンスプロファイルの作成

Databricksの管理コンソールにアクセスし、Instance profilesタブをクリックしインスタンスプロファイルを作成します。

Screen Shot 2022-10-05 at 13.59.07.png

Databricksクラスターを作成し、インスタンスプロファイルをアタッチして起動します。
Screen Shot 2022-10-05 at 14.00.20.png

課金利用ログを分析する

分析ようノートブックの翻訳版はこちらです。

ログを格納しているパスを指定します。

Python
usagefilePath = "s3a://ty-db-billable-usage-log/billable-log/billable-usage/csv/"

アクセスできることを確認します。ACCESS DENIEDが出る場合には、上のインスタンスプロファイルに関連する設定を確認してください。

Python
dbutils.fs.ls("s3a://ty-db-billable-usage-log/")

スキーマを指定してデータを読み込み、一時テーブルで参照できる様にします。

Python
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.window import Window
import pyspark.sql.functions as func

# 利用量のスキーマ
usageSchema = StructType([
  StructField("workspaceId", StringType(), False),
  StructField("timestamp", DateType(), False),
  StructField("clusterId", StringType(), False),
  StructField("clusterName", StringType(), False),
  StructField("clusterNodeType", StringType(), False),
  StructField("clusterOwnerUserId", StringType(), False),
  StructField("clusterCustomTags", StringType(), False),
  StructField("sku", StringType(), False),
  StructField("dbus", FloatType(), False),
  StructField("machineHours", FloatType(), False),
  StructField("clusterOwnerUserName", StringType(), False),
  StructField("tags", StringType(), False)
])

# 利用量のデータフレームを作成してキャッシュ
df = (spark.read
      .option("header", "true")
      .option("escape", "\"")
      .schema(usageSchema)
      .csv(usagefilePath)
      )

usageDF = (df.select("workspaceId",
                     "timestamp",
                     "clusterId",
                     "clusterName",
                     "clusterNodeType",
                     "clusterOwnerUserId",
                     "clusterCustomTags",
                     when(col("sku") == "STANDARD_INTERACTIVE_OPSEC", "All Purpose Compute")
                     .when(col("sku") == "STANDARD_AUTOMATED_NON_OPSEC", "Jobs Compute")
                     .when(col("sku") == "STANDARD_INTERACTIVE_NON_OPSEC", "All Purpose Compute")
                     .when(col("sku") == "LIGHT_AUTOMATED_NON_OPSEC", "Jobs Compute Light")
                     .when(col("sku") == "STANDARD_AUTOMATED_OPSEC", "Jobs Compute")
                     .when(col("sku") == "LIGHT_AUTOMATED_OPSEC", "Jobs Compute Light")
                     .when(col("sku") == "STANDARD_ALL_PURPOSE_COMPUTE", "All Purpose Compute")
                     .when(col("sku") == "STANDARD_JOBS_COMPUTE", "Jobs Compute")
                     .when(col("sku") == "STANDARD_JOBS_LIGHT_COMPUTE", "Jobs Compute Light")
                     .when(col("sku") == "PREMIUM_ALL_PURPOSE_COMPUTE", "All Purpose Compute")
                     .when(col("sku") == "PREMIUM_JOBS_COMPUTE", "Jobs Compute")
                     .when(col("sku") == "PREMIUM_JOBS_LIGHT_COMPUTE", "Jobs Compute Light")
                     .when(col("sku") == "ENTERPRISE_ALL_PURPOSE_COMPUTE", "All Purpose Compute")
                     .when(col("sku") == "ENTERPRISE_JOBS_COMPUTE", "Jobs Compute")
                     .when(col("sku") == "ENTERPRISE_JOBS_LIGHT_COMPUTE", "Jobs Compute Light")
                     .otherwise(col("sku")).alias("sku"),
                     "dbus",
                     "machineHours",
                     "clusterOwnerUserName",
                     "tags")
           .withColumn("tags", when(col("tags").isNotNull(), col("tags")).otherwise(col("clusterCustomTags")))
           .withColumn("tags", from_json("tags", MapType(StringType(), StringType())).alias("tags"))
           .drop("userId")
           .cache()
          )

# SQLコマンドを使うために一時テーブルを作成
usageDF.createOrReplaceTempView("usage")

動的にフィルタリングできるようにウィジェットを作成します。

Python
# 動的にフィルタリングできる様にウィジェットを作成
import datetime
dbutils.widgets.removeAll()

# 日付ウィンドウのフィルター
now = datetime.datetime.now()
dbutils.widgets.text("Date - End", now.strftime("%Y-%m-%d"))
dbutils.widgets.text("Date - Beginning", now.strftime("%Y-%m-%d"))

# SKU価格。お客様のアカウントごとに固有な値を入力できる様にテキストウィジェットを作成
skus = spark.sql("select distinct(sku) from usage").rdd.map(lambda row : row[0]).collect()
for sku in skus:  # お客様固有のSKUのテキストボックスを表示
  dbutils.widgets.text("SKU Price - " + sku, ".00")

# 時系列グラフにおける時間単位
dbutils.widgets.dropdown("Time Unit", "Day", ["Day", "Month"])
timeUnit = "Time"

# コミットの金額
dbutils.widgets.text("Commit Dollars", "00.00")
commit = getArgument("Commit Dollars")

# タグのキー
tags = spark.sql("select distinct(explode(map_keys(tags))) from usage").rdd.map(lambda row : row[0]).collect()
if len(tags) > 0:
  defaultTag = tags[0]
  dbutils.widgets.dropdown("Tag Key", str(defaultTag), [str(x) for x in tags])

# 利用量のタイプ
dbutils.widgets.dropdown("Usage", "Spend", ["Spend", "DBUs", "Cumulative Spend", "Cumulative DBUs"])

ノートブックの上部にウィジェットが表示されます。
Screen Shot 2022-10-05 at 14.35.18.png

分析するためのデータフレームを準備します。

Python
# SKU名とレートからデータフレームを作成。これは、費用を得るために利用量データフレームとjoinされます。
skuVals = [str(sku) for sku in skus]
wigVals = [getArgument("SKU Price - " + sku) for sku in skus]

skuZip = list(zip(skuVals, wigVals)) # RDDに並列化するために、それぞれのSKUと対応するレートごとのリストオブジェクトを作成します。

skuRdd = sc.parallelize(skuZip) # RDDの作成

skuSchema = StructType([
  StructField("sku", StringType(), True),
  StructField("rate", StringType(), True)])

skuDF = spark.createDataFrame(skuRdd, skuSchema) # データフレームの作成
Python
timeUnitFormat = "yyyy-MM-dd" if getArgument("Time Unit") == "Day" else "yyyy-MM"

# それぞれの期間、コミット、SKU、タグの値ごとの費用、DBU、累積費用、累積DBUを含む大規模なデータフレーム
globalDF = (usageDF
            .filter(usageDF.timestamp.between(getArgument("Date - Beginning"), getArgument("Date - End")))
            .join(skuDF, "Sku")
            .withColumn("Spend", usageDF["dbus"] * skuDF["rate"])
            .withColumn("Commit", lit(getArgument("Commit Dollars")))
            .withColumn("Tag", usageDF["tags." + getArgument("Tag Key")])
            .select(date_format("timestamp", timeUnitFormat).alias(timeUnit), "Spend", "dbus", "Sku", "Commit", "Tag")
            .groupBy(timeUnit, "Commit", "Sku", "Tag")
            .sum("Spend", "dbus")
            .withColumnRenamed("sum(dbus)", "DBUs")
            .withColumnRenamed("sum(Spend)", "Spend")
            .orderBy(timeUnit)
           )

# 単一のコミット、SKU、タグの値に対応する小規模なデータフレームを作成し、費用、DBUにタウする累積値を計算する関数
def usageBy(columnName):
  
  # 累積費用/DBUを得るためのウィンドウ関数
  cumulativeUsage = Window \
  .orderBy(timeUnit) \
  .partitionBy(columnName) \
  .rowsBetween(Window.unboundedPreceding, 0)
  
  # 指定されたカラムの値に対して当該期間にデータがない場合に行を持たないのではなく、
  # ゼロの費用、DBUの値を持つ行を追加し、適切な集約処理に加算します。
  # グラフは累積値をゼロとして解釈しないためです。
  zeros = globalDF.select(columnName).distinct().withColumn("Spend", lit(0)).withColumn("DBUs", lit(0))
  zerosByTime = globalDF.select(timeUnit).distinct().crossJoin(zeros)
  
  return (globalDF
          .select(timeUnit, columnName, "Spend", "DBUs")
          .union(zerosByTime)
          .groupBy(timeUnit, columnName)
          .sum("Spend", "DBUs")
          .withColumnRenamed("sum(DBUs)", "DBUs")
          .withColumnRenamed("sum(Spend)", "Spend")
          .withColumn("Cumulative Spend", func.sum(func.col("Spend")).over(cumulativeUsage))
          .withColumn("Cumulative DBUs", func.sum(func.col("DBUs")).over(cumulativeUsage))
          .select(timeUnit, columnName, getArgument("Usage"))
          .withColumnRenamed(getArgument("Usage"), "Usage")
          .orderBy(timeUnit)
         )

display(globalDF)

Screen Shot 2022-10-05 at 14.37.10.png

以下のレポートを生成します。

  1. 利用量の時系列変化
  2. SKUごとの利用量の時系列変化
  3. 選択されたタグごとの利用量

利用量の時系列変化

Python
display(usageBy("Commit"))

Screen Shot 2022-10-05 at 14.38.21.png

SKUごとの利用量の時系列変化

Python
display(usageBy("Sku"))

Screen Shot 2022-10-05 at 14.39.36.png

タグごとの利用量の時系列変化

以下の例では、クラスターIDごとにグルーピングしています。カスタムタグをクラスターに付与することができるので、プロジェクト単位で集計するということも可能です。

Python
display(usageBy("Tag"))

Screen Shot 2022-10-05 at 14.40.24.png

こちらのノートブックはサンプルですので、本格的なダッシュボードを構成したいという場合には、こちらのノートブックのデータ処理ロジックをジョブ化し、Databricks SQLのダッシュボードなどからアクセするということをご検討ください。

Databricks 無料トライアル

Databricks 無料トライアル

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?