この記事について
Teradataデータベースの独自機能である、Script Table Operator (STO) について紹介します。
これは、データベースのあるサーバー上で直接スクリプトを実行する機能です。
データベース上のデータの処理方法をスクリプト言語で実装するため、柔軟性と効率性を両立することが可能になる、応用範囲の広い機能です。
実行コードは、日本テラデータGitHubにて公開中しています。
このコードは無料のTeradata環境 ClearScape Analytics Experience にて実行可能です。
要点
- Script Table Operator (STO) により、データベース上でスクリプトを実行
- データはスクリプトへの標準入力から、結果は標準出力により与える
- スクリプトは、Teradata上の仮想プロセス(AMP)ごとに分散されて実行される
- STOは Teradata VantageCloud Lake では利用できない点に注意。VantageCloud Enterprise と オンプレミス版で利用可能
事前準備
必要なライブラリをインストールします。
pip install pandas "sqlalchemy<2" teradataml
sqlalchemy
バージョン2との互換性に不具合が出ているので、改善まではバージョン1を指定します
Teradataへの接続
実際の環境に合わせて接続情報を指定します。
わからなければデータベース管理者に聞いてみましょう。
この例はお試し環境 ClearScape Experience での設定です。
from getpass import getpass
from urllib.parse import quote_plus
host = "host.docker.internal"
user = "demo_user"
database = "demo_user"
password = getpass("Password > ")
dbs_port = 1025
encryptdata = "true"
# sqlalchemy用の接続文字列
connstr = (
f"teradatasql://{user}:{quote_plus(password)}@{host}/?"
f"&database={database}"
f"&dbs_port={dbs_port}"
f"&encryptdata={encryptdata}"
)
from sqlalchemy import create_engine
from teradataml import create_context, DataFrame
engine = create_engine(connstr)
context = create_context(tdsqlengine=engine, temp_database_name=user)
Script Table Operator の仕組み
まずは Hello World
STOは、データベースサーバー上でスクリプト(Linuxコマンド)を実行する機能です。
下記は、最もシンプルな Helloプログラムです。
q = r"""
SELECT * FROM
SCRIPT(
SCRIPT_COMMAND('echo "Hello STO!"') /* command */
RETURNS ('answer VARCHAR(50)') /* output type */
)
"""
DataFrame(query=q)
#answer
#Hello STO!
#Hello STO!
#Hello STO!
#Hello STO!
-
echo
というメッセージを出力するコマンドを実行しています - 結果は文字列なので、返り値にはVARCHAR型を指定します(データベースの性質上、型については割と厳密です)
- 結果は、この環境では4行返ってきました。これはこのシステムが4つのAMP(仮想プロセス)から構成されているからです。各AMPで同じスクリプトが実行された結果、AMPの数だけ結果が得られています
複数列の結果を出力
上の例は文字列型の1列のみ出力されましたが、複数の列を出力する場合は区切り字を用います(CSVファイルを作るようなイメージ)。
q = r"""
SELECT * FROM
SCRIPT(
SCRIPT_COMMAND('echo "pi,3.14"')
RETURNS ('variable VARCHAR(10), "value" FLOAT')
DELIMITER (',')
)
"""
DataFrame(query=q)
#variable value
#pi 3.14
#pi 3.14
#pi 3.14
#pi 3.14
- デフォルトの区切り時はタブ(
"\t"
)ですが、DELIMITER (',')
でコンマを指定しています - コンマで区切られた出力を行うことで、出力が2列に分かれます
- 細かいですが、
value
は予約語なので、ダブルクォートでくくる必要があります - 各AMPで同じスクリプトが実行されるので、やはり4行の結果が得られます
複数の行を出力
スクリプトが複数行の結果を出力すると、それらは別の行として出力されます。
q = r"""
SELECT * FROM
SCRIPT(
SCRIPT_COMMAND('echo "hello"; echo "world"')
RETURNS ('x VARCHAR(10) CHARACTER SET UNICODE')
DELIMITER (',')
)
"""
DataFrame(query=q)
#x
#hello
#hello
#world
#hello
#world
#world
#world
#hello
- 各AMPが、"hello" と "world" の2行を出力するように変更しています
- 結果、各AMPから2行の結果が得られています
- なお、行順序はランダムになります
STOでPythonを使う下調べ
これまでの機能を用いて、使用すべきPythonコマンドを探してみます。
Pythonを探す
まずはデータベース上にインストールされたPythonコマンドを探します。
# DISTINCT を入れることで同じ結果の重複を排除
q = r"""
SELECT DISTINCT * FROM
SCRIPT(
SCRIPT_COMMAND('which python; which python3; which tdpython; which tdpython3')
RETURNS ('answer VARCHAR(20)')
)
"""
DataFrame(query=q)
#answer
#/usr/bin/tdpython3
#/usr/bin/python
#/usr/bin/python3
-
which
コマンドでインストールされているPythonコマンドを検索しています - 各AMPから同じ結果を出す必要がないので、
DISTINCT
を入れて重複を排除しています - 結果、
tdpython3
python
python3
の3つが存在することがわかりました
Pythonのバージョンを調べる
q = r"""
SELECT DISTINCT * FROM
SCRIPT(
SCRIPT_COMMAND('echo "python: $(python -V)";
echo "python3: $(python3 -V)";
echo "tdpython3: $(tdpython3 -V)"')
RETURNS ('answer VARCHAR(100)')
)
"""
DataFrame(query=q)
#answer
#python3: Python 3.4.10
#tdpython3: Python 3.8.17
#python:
- 少し複雑なecho文ですが、
python -V
でバージョンを表示させています - 結果、
tdpython3
が最も新しいので、これを使うべきとわかりました(もちろん結果は使う環境により異なります) - なお、
python
はおそらくLinuxにもともと入っているPython2ではないかと思います - このように自分で検索することが可能ですが、正確なPythonコマンドの所在は、システム管理者に確認するほうが安全ではあります
STOでPythonプログラムを実行
Hello STO (Python版)
# スクリプトを作成して保存
script = r"""
print("Hello STO with Python!")
"""
with open("hello.py", "w") as f:
f.write(script)
# ファイルをデータベースにインストール
from teradataml import install_file
install_file("hello", file_path="hello.py", file_on_client=True, replace=False)
# 初回は `replace=False`, 2回目以降は `replace=True` を指定
# ファイルの検索場所を指定
from teradataml import get_connection
q = f"SET SESSION SEARCHUIFDBPATH = {database}"
conn = get_connection()
conn.execute(q)
q = f"""
SELECT * FROM
SCRIPT(
SCRIPT_COMMAND('tdpython3 {database}/hello.py')
RETURNS ('message varchar(50)')
)
"""
DataFrame(query=q)
#answer
#Hello STO with Python!
#Hello STO with Python!
#Hello STO with Python!
#Hello STO with Python!
- メッセージを表示するだけのシンプルなPythonプログラムを作成し、これを
install_file
関数でデータベースへ送っています - PythonスクリプトをPythonで作るという変なことをしていますが、これはノートブックで完結させるためにそうしています。実際にはスクリプトは別のエディタなどで作ることが多いと思いますし、そのほうが書きやすいと思います
- インストールしたファイルを検索場所を指定するため
SEARCHUIFDBPATH
を設定しています(User Installed File Databaseの意) -
print
関数はデフォルトで標準出力への書き込みになるので、メッセージがそのまま出力結果になります(他にsys.stdout.write
でも出力できます)
データを利用するスクリプト
次のような日次・地点別の気温データを用います。
DataFrame("temperature")
#date location avg_temp max_temp
#20/08/18 Fukuoka 30.0 34.9
#22/12/20 Sapporo -4.7 -0.3
#20/05/04 Sapporo 16.7 22.5
#....
下記のようなスクリプトを作成します。
これは、摂氏で与えられる気温を華氏に変換するシンプルなプログラムですが、ここでのフォーカスはデータ入出力の仕組みです。
# データを取得
# STOではデータは標準入力からTSVで取得
import pandas as pd
import sys
# STOが取得する数値データは科学計算表記で '2.10000000000000E 001' のように与えられる
# Python がこれを読める用に空白を+に変えて '2.10000000000000E+001' のように変換する
str_to_float = lambda a: float(a.replace("E ", "E+"))
str_to_int = lambda a: int(a.replace("E ", "E+"))
x = pd.read_csv(sys.stdin, sep="\t", header=None,
names=["date", "location", "avg_temp", "max_temp"],
converters={"avg_temp": str_to_float,
"max_temp": str_to_float})
# 気温を 摂氏から華氏に変換
x["avg_temp_f"] = x["avg_temp"] * 9/5 + 32
x["max_temp_f"] = x["max_temp"] * 9/5 + 32
out = x[["avg_temp", "avg_temp_f", "max_temp", "max_temp_f"]]
# 結果は標準出力へTSV形式で与える
out.to_csv(sys.stdout, sep="\t", index=False, header=False)
- STOでは、データは標準入力からタブ区切りの文字列で与えられます
- そのため、
pandas.read_csv
にsys.stdin
を与えることでデータフレームに読み込むことが可能です - ただし、STOからの入力は数値の表現が
'2.10000000000000E 001'
のようになっていて、Pythonではそのままでは数値として解釈されません。空白を'+'
に書き換えて'2.10000000000000E+001'
とすると、数値に変換できるようになります - そこで、
converters
オプションを指定することで事前に文字列を変換することで数値データを読めるようにしています -
pandas
のデータフレームを標準出力に書き出すために、to_csv
メソッドをsys.stdout
に対して用いています -
pandas
を使わない(使えない)場合、csv
ライブラリを用いるなどしてデータを読み込んだり書き出したりすることも可能です - 標準入力からデータをとって標準出力へ書き出すって、ちょっと競技プログラミングみたいです
# Teradata側にスクリプトを配置
from teradataml import install_file
install_file("temperature1", "temperature1.py", file_on_client=True, replace=False)
# 初回は `replace=False`, 2回目以降は `replace=True` を指定
q = f"""
SELECT * FROM
SCRIPT(
ON ( SELECT * FROM temperature )
SCRIPT_COMMAND('tdpython3 {database}/temperature1.py;')
RETURNS ('avg_temp FLOAT, avg_temp_f FLOAT, max_temp FLOAT, max_temp_f FLOAT')
)
"""
DataFrame(query=q)
#avg_temp avg_temp_f max_temp max_temp_f
#16.7 62.059999999999995 22.5 72.5
#18.7 65.66 24.6 76.28
#20.5 68.9 22.8 73.04
#....
AMP (仮想プロセス) ごとにデータを集約
上の例では1行ごとに1行を出力するプログラムでしたが、STOでは標準出力を開発者が自由に変えられるので、集約した結果を返すことも可能です。
下記のプログラムは、気温の平均値を計算するプログラムになっています。
# データを取得
# STOではデータは標準入力からTSVで取得
import pandas as pd
import sys
# STOが取得する数値データは科学計算表記で '2.10000000000000E 001' のように与えられる
# Python がこれを読める用に空白を+に変えて '2.10000000000000E+001' のように変換する
str_to_float = lambda a: float(a.replace("E ", "E+"))
str_to_int = lambda a: int(a.replace("E ", "E+"))
x = pd.read_csv(sys.stdin, sep="\t", header=None,
names=["date", "location", "avg_temp", "max_temp"],
converters={"avg_temp": str_to_float,
"max_temp": str_to_float})
# このAMPにデータがない場合は何も出力しない
if len(x) == 0:
sys.exit()
variables = ["avg_temp", "max_temp"]
out = x[variables].mean()
# 結果は標準出力へTSV形式で与える
import csv
writer = csv.writer(sys.stdout, delimiter="\t", lineterminator="\n")
writer.writerow(out)
- 1つ前の例と違い、
csv
ライブラリを用いて結果を出力しています (標準出力へタブ区切りで出力できれば方法は自由です)
from teradataml import install_file
install_file("temperature2", "temperature2.py", file_on_client=True, replace=False)
# 初回は `replace=False`, 2回目以降は `replace=True` を指定
# スクリプトを実行
q = f"""
SELECT * FROM
SCRIPT(
ON ( SELECT * FROM temperature )
SCRIPT_COMMAND('tdpython3 {database}/temperature2.py;')
RETURNS ('avg_temp FLOAT, max_temp FLOAT')
)
"""
DataFrame(query=q)
#avg_temp max_temp
#16.604632768361583 20.77709981167608
#16.34968128983877 20.49673790776153
#16.55687221396731 20.712407132243687
#16.39501322251606 20.523724971666038
- 実行結果は、4つのAMPごとに平均値を計算するため、4行になっています
- ですが、データがどのようにAMPに分散されるかは指定していないので、あまり実態的な意味のある結果ではないです(無作為に4つのサンプルに分けてそれぞれ平均を取ったような理屈です)
- 多くのケースでは、データをAMPにどう分割するかを制御して、意図したグループに対して処理を行うことが多いです
次のスクリプトは、観測地点(location)ごとにデータをAMPに分配し、そのデータに対してスクリプトを実行する例です。
# データを取得
# STOではデータは標準入力からTSVで取得
import pandas as pd
import sys
# STOが取得する数値データは科学計算表記で '2.10000000000000E 001' のように与えられる
# Python がこれを読める用に空白を+に変えて '2.10000000000000E+001' のように変換する
str_to_float = lambda a: float(a.replace("E ", "E+"))
str_to_int = lambda a: int(a.replace("E ", "E+"))
x = pd.read_csv(sys.stdin, sep="\t", header=None,
names=["date", "location", "avg_temp", "max_temp"],
converters={"avg_temp": str_to_float,
"max_temp": str_to_float})
# このAMPにデータがない場合は何も出力しない
if len(x) == 0:
sys.exit()
location = x["location"].loc[0] # このAMPに含まれるlocation
variables = ["avg_temp", "max_temp"]
out = x.mean()
out = [location] + out.to_list()
# 結果は標準出力へTSV形式で与える
import csv
writer = csv.writer(sys.stdout, delimiter="\t", lineterminator="\n")
writer.writerow(out)
- このスクリプトも平均値を計算するプログラムですが、AMP内で観測地(location)が固定であることを前提としています
from teradataml import install_file
install_file("temperature3", "temperature3.py", file_on_client=True, replace=False)
# 初回は `replace=False`, 2回目以降は `replace=True` を指定
# スクリプトを実行
q = f"""
SELECT * FROM
SCRIPT(
ON ( SELECT * FROM temperature ) PARTITION BY location
SCRIPT_COMMAND('tdpython3 {database}/temperature3.py;')
RETURNS ('location VARCHAR(10), avg_temp FLOAT, max_temp FLOAT')
)
"""
DataFrame(query=q)
#location avg_temp max_temp
#Sapporo 9.887590282337491 13.871569271175312
#Fukuoka 17.791989494418907 21.85896257386737
#Naha 23.53880499015102 26.225082074852267
#Tokyo 16.460866710439923 21.149179251477346
#Sendai 13.636703873933028 17.991989494418913
#Osaka 17.317268548916612 21.675771503611298
#Nagoya 16.704005252790544 21.621470781352592
- 実行時に
PARTITION BY location
をつけることで、同じlocation
のデータが1つのAMPに集められてスクリプトが実行されます - 結果として、7つの観測値ごとの平均値が計算されています
- この計算自体は
groupby
して平均値を出すことで実現できるものですが、グループごとにデータを分散配置して並列的にスクリプトを実行する、という仕組みには様々な応用が考えられます
STOによるマイクロモデリング:観測地点別に予測モデルの学習・予測
機械学習モデルの学習は、与えられたデータを用いて1つのモデルを作る、という作業なので、広い意味でデータの集約に含まれます。
ですので、上で見たSTOとデータ分割の仕組みを利用して、グループごとに別のモデルを作成したり、それを用いて予測する、ということを効率的に実施することができます。
STOによるマイクロモデルの学習
# データを取得
# STOではデータは標準入力からTSVで取得
import pandas as pd
import sys
from statsmodels.tsa.arima.model import ARIMA
# STOが取得する数値データは科学計算表記で '2.10000000000000E 001' のように与えられる
# Python がこれを読める用に空白を+に変えて '2.10000000000000E+001' のように変換する
str_to_float = lambda a: float(a.replace("E ", "E+"))
str_to_int = lambda a: int(a.replace("E ", "E+"))
x = pd.read_csv(sys.stdin, sep="\t", header=None,
names=["date", "location", "avg_temp", "max_temp"],
converters={"avg_temp": str_to_float,
"max_temp": str_to_float})
# このAMPにデータがない場合は何も出力しない
if len(x) == 0:
sys.exit()
location = x["location"].loc[0] # このAMPに含まれるlocation
x = x.sort_values("date")
y = x["max_temp"]
y.index = pd.to_datetime(x["date"], format="%Y-%m-%d")
model = ARIMA(y, order=(2,0,1), freq="D").fit()
# model を文字列情報として保存
# 一度バイナリデータに変換してから文字列に書き出す
from base64 import b64encode
import pickle
model_obj = b64encode(pickle.dumps(model)) # bytes type
model_str = model_obj.decode("ascii")
out = [location, model_str]
# 結果は標準出力へTSV形式で与える
import csv
writer = csv.writer(sys.stdout, delimiter="\t", lineterminator="\n")
writer.writerow(out)
- このプログラムは、与えられたデータを用いて、時系列予測を行うARIMAモデルを学習します
- 学習したモデルを標準出力に書き出す必要があります
- 文字列で表現するのが1つの方法で、モデルオブジェクトを pickle形式のバイナリに変換して、さらにそれを base64方式でアスキー文字列に変換しています
- これは汎用的な方法ですが、比較的大きなテキスト情報になってしまうところが弱点です。ケースによってはより効率的な保存方法があると思います
# Teradata側にスクリプトを配置
from teradataml import install_file
install_file("temperature4", "temperature4.py", file_on_client=True, replace=False)
# 初回は `replace=False`, 2回目以降は `replace=True` を指定
# スクリプトを実行
# 今回は、学習結果を後に利用するため、
# 結果を手元に取得するのではなくモデルを(一時)テーブルに書き出す形にする
q = f"""
CREATE VOLATILE TABLE temperature_model
AS (
SELECT * FROM
SCRIPT(
ON (
SELECT
CAST(CAST("date" AS FORMAT 'YYYY-MM-DD') AS CHAR(10)) AS "date",
location,
avg_temp,
max_temp
FROM temperature
) PARTITION BY location
SCRIPT_COMMAND('tdpython3 {database}/temperature4.py;')
RETURNS ('location VARCHAR(10), model CLOB(10M)')
)
)
WITH DATA
ON COMMIT PRESERVE ROWS
"""
from teradataml import get_connection
conn = get_connection()
conn.execute(q)
# 学習結果を確認
DataFrame("temperature_model").to_pandas()
#location model
#Sendai gASVHCgBAAAAAACMG3N0YXRzbW9kZWxzLnRzYS5hcmltYS...
#Fukuoka gASVHCgBAAAAAACMG3N0YXRzbW9kZWxzLnRzYS5hcmltYS...
#Nagoya gASVHCgBAAAAAACMG3N0YXRzbW9kZWxzLnRzYS5hcmltYS...
#Tokyo gASVHCgBAAAAAACMG3N0YXRzbW9kZWxzLnRzYS5hcmltYS...
#Naha gASVHCgBAAAAAACMG3N0YXRzbW9kZWxzLnRzYS5hcmltYS...
#Osaka gASVHCgBAAAAAACMG3N0YXRzbW9kZWxzLnRzYS5hcmltYS...
#Sapporo gASVHCgBAAAAAACMG3N0YXRzbW9kZWxzLnRzYS5hcmltYS...
- スクリプトを実行すると、文字列に変換したモデルが出力されます
- このモデルは後で利用したいので、テーブルに書き出しています(
CREATE VOLATILE TABLE AS ~
) -
VOLATILE TABLE
は、セッションを閉じると破棄される一時テーブルです(実際のプロジェクトでは永続テーブルに書き出しましょう) -
PARTITION BY location
をしているので、観測値別に個別の予測モデルが作成されています
STOによるマイクロモデルの予測
# データを取得
# STOではデータは標準入力からTSVで取得
import pandas as pd
import sys
x = pd.read_csv(sys.stdin, sep="\t", header=None,
names=["location", "model"])
# このAMPにデータがない場合は何も出力しない
if len(x) == 0:
sys.exit()
for _, (location, model) in x.iterrows():
import pickle
from base64 import b64decode
model_str = model.encode("ascii")
model_obj = b64decode(model_str)
model = pickle.loads(model_obj)
yhat = model.forecast(20) # 20日分の予測
# 予測はインデックス付きの series で返るので、データフレームに変換
yhat = pd.DataFrame(yhat).reset_index()
yhat.insert(0, "location", location) # location 情報を追加
# 予測結果を出力
yhat.to_csv(sys.stdout, sep="\t", index=False, header=False)
-
location
とmodel
を含んだデータを入力から受け取り予測値を返すプログラムです - モデルの文字列表現を逆に変換してもとのARIMAモデルに戻し、これを用いて将来予測を行います
- 予測結果は標準出力に書き出します
from teradataml import install_file
install_file("temperature5", "temperature5.py", file_on_client=True, replace=False)
# スクリプトを実行
q = f"""
SELECT * FROM
SCRIPT(
ON (
SELECT * FROM temperature_model )
SCRIPT_COMMAND('tdpython3 {database}/temperature5.py;')
RETURNS ('location VARCHAR(10), "date" DATE, max_temp_forecast FLOAT')
)
"""
DataFrame(query=q)
#location date max_temp_forecast
#Tokyo 24/03/05 12.270377085561474
#Fukuoka 24/03/04 9.843999815897176
#Fukuoka 24/03/05 10.424787303561775
#Nagoya 24/03/03 10.57562841850719
#Nagoya 24/03/05 11.702318362889917
#....
- 実行結果は、観測地点ごとに20日分の予測値が得られています
結び
以上、Script Table Operator 機能の仕組みを紹介しました。
柔軟かつ効率的な実装が可能な機能なので、紹介したマイクロモデリングに限らず様々な応用が考えられると思います(アイデアがあればコメントいただけると嬉しいです)。
疑問点・懸念点などありましたらぜひコメントください。GitHubの方にIssueを立てていただいても結構です。