概要
このドキュメントでは、C言語のatof
関数の動作をDatabricksに移行する方法を説明します。atof
関数は、文字列を浮動小数点数に変換するC言語の関数で、変換できない値を0として扱います。これはSQLのCAST
を用いたデータ型変換とは異なる動作です。本記事では、atof
関数と同等の処理をDatabricksで実現する方法を、具体的なコード例と共に示します。
以下に、データ型の変換と変換できない値を0にする処理を行う2つの方法を紹介します。既にデータ型の変換メソッドが実装していたため、私たちは2番目の方法を採用しました。
- 変換できない値を 0 にする処理とデータ型変換を同時に実行する方法
- 変換できない値を 0 にする処理とデータ型変換を別に実行する方法
C言語の開発環境として、paiza.ioのサービスを利用しました。このサービスはブラウザ上でコードの実行が可能で、非常に便利です。コード生成には、GitHub Copilotを使用しました。
参考リンク:ブラウザでプログラミング・実行ができる「オンライン実行環境」| paiza.IO
C言語のdouble
型の値をDatabricksのDECIMAL
型にマッピングしています。DECIMAL
型の処理をDOUBLE
型に変更すると、同様に動作します。そのため、必要に応じて型を変更してください。
動作差異の確認
1. Databricksの動作確認
Databricsk では、他の RDB と同様に TRY_CAST 関数を利用した場合に、変換できない値は NULL になりました。
%sql
WITH src AS (
SELECT
'12.1' AS decimal_col
UNION ALL
SELECT
'123.1' AS decimal_col
UNION ALL
SELECT
NULL AS decimal_col
UNION ALL
SELECT
'##.#' AS decimal_col
)
SELECT
*,
TRY_CAST(decimal_col AS DECIMAL(4, 1)) AS Databricks_result
FROM
src
2. C 言語の動作確認
C 言語では、変換できない値は 0 になり、NULL の値は基本的にはエラーとなります。エラーとなるため、分岐する処理が一般的なようです。
#include <stdio.h>
#include <stdlib.h>
int main() {
char *strs[] = {"12.1", "123.1", "##.#", NULL};
int num_strs = sizeof(strs) / sizeof(strs[0]);
for (int i = 0; i < num_strs; i++) {
if (strs[i] != NULL) {
double num = atof(strs[i]);
printf("The value of num is %.5f\n", num);
} else {
printf("The string is NULL.\n");
}
}
return 0;
}
NULL の値は基本的にはエラーとなります。
#include <stdio.h>
#include <stdlib.h>
int main() {
char *str = NULL;
float num = atof(str);
printf("%f\n", num);
return 0;
}
Segmentation fault (core dumped)
以上の結果から、以下の仕様を実装する必要があることが判明しました。
- NULL の場合には、NULL とする。
- 変換できない値は、0 とする。
- 変換できる値は、そのまま変更
3. Databricks での実装目標
expected_result
列の結果が出力されることを目標とします。
decimal_col | Databricks_result | expected_result |
---|---|---|
12.1 | 12.1 | 12.1 |
123.1 | 123.1 | 123.1 |
null | ||
##.# | 0 |
Databricks での検証コードと実行結果
事前準備
from pyspark.sql import Row
schema = "decimal_col string"
data = [
Row(decimal_col='12.1'),
Row(decimal_col='123.1'),
Row(decimal_col=None),
Row(decimal_col='##.#')
]
df = spark.createDataFrame(data, schema)
df.printSchema()
df.display()
1. 変換できない値を 0 にする処理とデータ型変換を同時に実行する方法
from pyspark.sql.functions import expr
df = df.withColumn(
"decimal_col",
expr(
"""
CASE
WHEN decimal_col IS NULL
THEN NULL
WHEN ISNULL(TRY_CAST(decimal_col AS DECIMAL(4,1)))
THEN CAST('0' AS decimal(4,1))
ELSE
CAST(decimal_col AS DECIMAL(4,1))
END
"""
),
)
df.printSchema()
df.display()
decimal_col |
---|
12.1 |
123.1 |
0 |
2. 変換できない値を 0 にする処理とデータ型変換を別に実行する方法
from pyspark.sql.functions import expr
df = df.withColumn(
"decimal_col",
expr(
"""
CASE
WHEN decimal_col IS NULL
THEN NULL
WHEN ISNULL(TRY_CAST(decimal_col AS DECIMAL(38, 0)))
THEN '0'
ELSE
decimal_col
END
"""
),
)
df.printSchema()
df.display()
df = df.withColumn(
"decimal_col",
expr("CAST(decimal_col AS DECIMAL(4, 1))"),
)
df.printSchema()
df.display()
decimal_col |
---|
12.1 |
123.1 |
0 |
共通処理化に向けて
共通化の必要性について
複数のカラムに対して同様の処理を記載することが多く、
from pyspark.sql import Row
schema = """
decimal_col_1 string,
decimal_col_2 string,
decimal_col_3 string
"""
data = [
Row(decimal_col_1='12.1', decimal_col_2='12.123', decimal_col_3='123456.123'),
Row(decimal_col_1='123.1', decimal_col_2='123.123', decimal_col_3='123456.123'),
Row(decimal_col_1=None, decimal_col_2=None, decimal_col_3=None),
Row(decimal_col_1='##.#', decimal_col_2='##.#', decimal_col_3='##.#')
]
df = spark.createDataFrame(data, schema)
df.printSchema()
df.display()
1. の方法の共通処理化
def process_dataframe_pattern1(
df,
tgt_col_conf,
):
with_cols_conf = {}
for col_name, col_type in tgt_col_conf.items():
with_cols_conf[col_name] = expr(
f"""
CASE
WHEN {col_name} IS NULL
THEN NULL
WHEN ISNULL(TRY_CAST({col_name} AS {col_type}))
THEN CAST('0' AS {col_type})
ELSE
CAST({col_name} AS {col_type})
END
"""
)
df = df.withColumns(with_cols_conf)
return df
tgt_col_conf = {
"decimal_col_1": "decimal(4,1)",
"decimal_col_2": "decimal(6,3)",
"decimal_col_3": "decimal(9,3)",
}
df = process_dataframe_pattern1(df, tgt_col_conf)
df.printSchema()
df.display()
2. の方法の共通処理化
def process_dataframe_pattern2_1(
df,
tgt_col_names,
):
with_cols_conf = {}
for col_name in tgt_col_names:
with_cols_conf[col_name] = expr(
f"""
CASE
WHEN {col_name} IS NULL
THEN NULL
WHEN ISNULL(TRY_CAST({col_name} AS decimal(38, 0)))
THEN '0'
ELSE
{col_name}
END
"""
)
df = df.withColumns(with_cols_conf)
return df
def process_dataframe_pattern2_2(
df,
tgt_col_conf,
):
with_cols_conf = {}
for col_name, col_type in tgt_col_conf.items():
with_cols_conf[col_name] = expr(
f"""
CASE
WHEN {col_name} IS NULL
THEN NULL
WHEN ISNULL(TRY_CAST({col_name} AS {col_type}))
THEN CAST('0' AS {col_type})
ELSE
CAST({col_name} AS {col_type})
END
"""
)
df = df.withColumns(with_cols_conf)
return df
tgt_col_conf = {
"decimal_col_1": "decimal(4,1)",
"decimal_col_2": "decimal(6,3)",
"decimal_col_3": "decimal(9,3)",
}
df = process_dataframe_pattern2_1(df, tgt_col_conf.keys())
df.printSchema()
df.display()
df = process_dataframe_pattern2_2(df, tgt_col_conf)
df.printSchema()
df.display()