5
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

データサイエンスのためのTeradata入門: Script Table Operator(データベース上でのスクリプト実行)

Last updated at Posted at 2024-04-09

この記事について

Teradataデータベースの独自機能である、Script Table Operator (STO) について紹介します。
これは、データベースのあるサーバー上で直接スクリプトを実行する機能です。
データベース上のデータの処理方法をスクリプト言語で実装するため、柔軟性と効率性を両立することが可能になる、応用範囲の広い機能です。

実行コードは、日本テラデータGitHubにて公開中しています。
このコードは無料のTeradata環境 ClearScape Analytics Experience にて実行可能です。

要点

  • Script Table Operator (STO) により、データベース上でスクリプトを実行
  • データはスクリプトへの標準入力から、結果は標準出力により与える
  • スクリプトは、Teradata上の仮想プロセス(AMP)ごとに分散されて実行される
  • STOは Teradata VantageCloud Lake では利用できない点に注意。VantageCloud Enterprise と オンプレミス版で利用可能

事前準備

必要なライブラリをインストールします。

ライブラリのインストール
pip install pandas "sqlalchemy<2" teradataml

sqlalchemy バージョン2との互換性に不具合が出ているので、改善まではバージョン1を指定します

Teradataへの接続

実際の環境に合わせて接続情報を指定します。
わからなければデータベース管理者に聞いてみましょう。
この例はお試し環境 ClearScape Experience での設定です。

接続情報
from getpass import getpass
from urllib.parse import quote_plus

host = "host.docker.internal"
user = "demo_user"
database = "demo_user"
password = getpass("Password > ")
dbs_port = 1025
encryptdata = "true"

# sqlalchemy用の接続文字列
connstr = (
  f"teradatasql://{user}:{quote_plus(password)}@{host}/?"
  f"&database={database}"
  f"&dbs_port={dbs_port}"
  f"&encryptdata={encryptdata}"
)
セッション開始
from sqlalchemy import create_engine
from teradataml import create_context, DataFrame
engine = create_engine(connstr)
context = create_context(tdsqlengine=engine, temp_database_name=user)

Script Table Operator の仕組み

まずは Hello World

STOは、データベースサーバー上でスクリプト(Linuxコマンド)を実行する機能です。
下記は、最もシンプルな Helloプログラムです。

Hello, STO!
q = r"""
SELECT * FROM
  SCRIPT(
    SCRIPT_COMMAND('echo "Hello STO!"')    /* command */
    RETURNS ('answer VARCHAR(50)')         /* output type */
  )
"""

DataFrame(query=q)
#answer
#Hello STO!
#Hello STO!
#Hello STO!
#Hello STO!
  • echo というメッセージを出力するコマンドを実行しています
  • 結果は文字列なので、返り値にはVARCHAR型を指定します(データベースの性質上、型については割と厳密です)
  • 結果は、この環境では4行返ってきました。これはこのシステムが4つのAMP(仮想プロセス)から構成されているからです。各AMPで同じスクリプトが実行された結果、AMPの数だけ結果が得られています

複数列の結果を出力

上の例は文字列型の1列のみ出力されましたが、複数の列を出力する場合は区切り字を用います(CSVファイルを作るようなイメージ)。

結果を2列出力
q = r"""
SELECT * FROM

  SCRIPT(
    SCRIPT_COMMAND('echo "pi,3.14"')
    RETURNS ('variable VARCHAR(10), "value" FLOAT')
    DELIMITER (',')
  )
"""

DataFrame(query=q)
#variable  value
#pi        3.14
#pi        3.14
#pi        3.14
#pi        3.14
  • デフォルトの区切り時はタブ("\t")ですが、DELIMITER (',') でコンマを指定しています
  • コンマで区切られた出力を行うことで、出力が2列に分かれます
  • 細かいですが、value は予約語なので、ダブルクォートでくくる必要があります
  • 各AMPで同じスクリプトが実行されるので、やはり4行の結果が得られます

複数の行を出力

スクリプトが複数行の結果を出力すると、それらは別の行として出力されます。

AMPごとに2行出力
q = r"""
SELECT * FROM

  SCRIPT(
    SCRIPT_COMMAND('echo "hello"; echo "world"')
    RETURNS ('x VARCHAR(10) CHARACTER SET UNICODE')
    DELIMITER (',')
  )
"""

DataFrame(query=q)
#x
#hello
#hello
#world
#hello
#world
#world
#world
#hello
  • 各AMPが、"hello" と "world" の2行を出力するように変更しています
  • 結果、各AMPから2行の結果が得られています
  • なお、行順序はランダムになります

STOでPythonを使う下調べ

これまでの機能を用いて、使用すべきPythonコマンドを探してみます。

Pythonを探す

まずはデータベース上にインストールされたPythonコマンドを探します。

Pythonを探す
# DISTINCT を入れることで同じ結果の重複を排除
q = r"""
SELECT DISTINCT * FROM

  SCRIPT(
    SCRIPT_COMMAND('which python; which python3; which tdpython; which tdpython3')
    RETURNS ('answer VARCHAR(20)')
  )
"""

DataFrame(query=q)
#answer
#/usr/bin/tdpython3
#/usr/bin/python
#/usr/bin/python3
  • whichコマンドでインストールされているPythonコマンドを検索しています
  • 各AMPから同じ結果を出す必要がないので、DISTINCTを入れて重複を排除しています
  • 結果、tdpython3 python python3 の3つが存在することがわかりました

Pythonのバージョンを調べる

Pythonのバージョン
q = r"""
SELECT DISTINCT * FROM

  SCRIPT(
    SCRIPT_COMMAND('echo "python: $(python -V)";
                    echo "python3: $(python3 -V)";
                    echo "tdpython3: $(tdpython3 -V)"')
    RETURNS ('answer VARCHAR(100)')
  )
"""

DataFrame(query=q)
#answer
#python3: Python 3.4.10
#tdpython3: Python 3.8.17
#python:
  • 少し複雑なecho文ですが、python -V でバージョンを表示させています
  • 結果、tdpython3 が最も新しいので、これを使うべきとわかりました(もちろん結果は使う環境により異なります)
  • なお、python はおそらくLinuxにもともと入っているPython2ではないかと思います
  • このように自分で検索することが可能ですが、正確なPythonコマンドの所在は、システム管理者に確認するほうが安全ではあります

STOでPythonプログラムを実行

Hello STO (Python版)

Hello STO with Python
# スクリプトを作成して保存
script = r"""
print("Hello STO with Python!")
"""

with open("hello.py", "w") as f:
    f.write(script)

# ファイルをデータベースにインストール
from teradataml import install_file
install_file("hello", file_path="hello.py", file_on_client=True, replace=False)
# 初回は `replace=False`, 2回目以降は `replace=True` を指定

# ファイルの検索場所を指定
from teradataml import get_connection
q = f"SET SESSION SEARCHUIFDBPATH = {database}" 
conn = get_connection()
conn.execute(q)

q = f"""
SELECT * FROM

  SCRIPT(
    SCRIPT_COMMAND('tdpython3 {database}/hello.py')
    RETURNS ('message varchar(50)')
  )
"""
DataFrame(query=q)
#answer
#Hello STO with Python!
#Hello STO with Python!
#Hello STO with Python!
#Hello STO with Python!
  • メッセージを表示するだけのシンプルなPythonプログラムを作成し、これをinstall_file関数でデータベースへ送っています
  • PythonスクリプトをPythonで作るという変なことをしていますが、これはノートブックで完結させるためにそうしています。実際にはスクリプトは別のエディタなどで作ることが多いと思いますし、そのほうが書きやすいと思います
  • インストールしたファイルを検索場所を指定するため SEARCHUIFDBPATH を設定しています(User Installed File Databaseの意)
  • print関数はデフォルトで標準出力への書き込みになるので、メッセージがそのまま出力結果になります(他に sys.stdout.write でも出力できます)

データを利用するスクリプト

次のような日次・地点別の気温データを用います。

DataFrame("temperature")
#date     location avg_temp	max_temp
#20/08/18 Fukuoka  30.0     34.9
#22/12/20 Sapporo  -4.7     -0.3
#20/05/04 Sapporo  16.7     22.5
#....

下記のようなスクリプトを作成します。
これは、摂氏で与えられる気温を華氏に変換するシンプルなプログラムですが、ここでのフォーカスはデータ入出力の仕組みです。

temperature1.py
# データを取得
# STOではデータは標準入力からTSVで取得
import pandas as pd
import sys

# STOが取得する数値データは科学計算表記で  '2.10000000000000E 001' のように与えられる
# Python がこれを読める用に空白を+に変えて '2.10000000000000E+001' のように変換する
str_to_float = lambda a: float(a.replace("E ", "E+"))
str_to_int = lambda a: int(a.replace("E ", "E+"))
x = pd.read_csv(sys.stdin, sep="\t", header=None,
                names=["date", "location", "avg_temp", "max_temp"],
                converters={"avg_temp": str_to_float,
                            "max_temp": str_to_float})

# 気温を 摂氏から華氏に変換
x["avg_temp_f"] = x["avg_temp"] * 9/5 + 32
x["max_temp_f"] = x["max_temp"] * 9/5 + 32

out = x[["avg_temp", "avg_temp_f", "max_temp", "max_temp_f"]]
# 結果は標準出力へTSV形式で与える
out.to_csv(sys.stdout, sep="\t", index=False, header=False)
  • STOでは、データは標準入力からタブ区切りの文字列で与えられます
  • そのため、pandas.read_csvsys.stdin を与えることでデータフレームに読み込むことが可能です
  • ただし、STOからの入力は数値の表現が '2.10000000000000E 001' のようになっていて、Pythonではそのままでは数値として解釈されません。空白を'+'に書き換えて '2.10000000000000E+001' とすると、数値に変換できるようになります
  • そこで、convertersオプションを指定することで事前に文字列を変換することで数値データを読めるようにしています
  • pandasのデータフレームを標準出力に書き出すために、to_csvメソッドを sys.stdoutに対して用いています
  • pandasを使わない(使えない)場合、csvライブラリを用いるなどしてデータを読み込んだり書き出したりすることも可能です
  • 標準入力からデータをとって標準出力へ書き出すって、ちょっと競技プログラミングみたいです
temperature1.pyを実行
# Teradata側にスクリプトを配置
from teradataml import install_file
install_file("temperature1", "temperature1.py", file_on_client=True, replace=False)
# 初回は `replace=False`, 2回目以降は `replace=True` を指定

q = f"""
SELECT * FROM

  SCRIPT(
    ON ( SELECT * FROM temperature )
    SCRIPT_COMMAND('tdpython3 {database}/temperature1.py;')
    RETURNS ('avg_temp FLOAT, avg_temp_f FLOAT, max_temp FLOAT, max_temp_f FLOAT')
  )
"""

DataFrame(query=q)
#avg_temp avg_temp_f         max_temp max_temp_f
#16.7     62.059999999999995 22.5     72.5
#18.7     65.66              24.6     76.28
#20.5     68.9               22.8     73.04
#....

AMP (仮想プロセス) ごとにデータを集約

上の例では1行ごとに1行を出力するプログラムでしたが、STOでは標準出力を開発者が自由に変えられるので、集約した結果を返すことも可能です。
下記のプログラムは、気温の平均値を計算するプログラムになっています。

temperature2.py
# データを取得
# STOではデータは標準入力からTSVで取得
import pandas as pd
import sys

# STOが取得する数値データは科学計算表記で  '2.10000000000000E 001' のように与えられる
# Python がこれを読める用に空白を+に変えて '2.10000000000000E+001' のように変換する
str_to_float = lambda a: float(a.replace("E ", "E+"))
str_to_int = lambda a: int(a.replace("E ", "E+"))
x = pd.read_csv(sys.stdin, sep="\t", header=None,
                names=["date", "location", "avg_temp", "max_temp"],
                converters={"avg_temp": str_to_float,
                            "max_temp": str_to_float})

# このAMPにデータがない場合は何も出力しない
if len(x) == 0:
    sys.exit()

variables = ["avg_temp", "max_temp"]
out = x[variables].mean()

# 結果は標準出力へTSV形式で与える
import csv
writer = csv.writer(sys.stdout, delimiter="\t", lineterminator="\n")
writer.writerow(out)
  • 1つ前の例と違い、csvライブラリを用いて結果を出力しています (標準出力へタブ区切りで出力できれば方法は自由です)
temperature2.pyを実行
from teradataml import install_file
install_file("temperature2", "temperature2.py", file_on_client=True, replace=False)
# 初回は `replace=False`, 2回目以降は `replace=True` を指定

# スクリプトを実行
q = f"""
SELECT * FROM

  SCRIPT(
    ON ( SELECT * FROM temperature )
    SCRIPT_COMMAND('tdpython3 {database}/temperature2.py;')
    RETURNS ('avg_temp FLOAT, max_temp FLOAT')
  )
"""

DataFrame(query=q)
#avg_temp            max_temp
#16.604632768361583  20.77709981167608
#16.34968128983877   20.49673790776153
#16.55687221396731   20.712407132243687
#16.39501322251606   20.523724971666038
  • 実行結果は、4つのAMPごとに平均値を計算するため、4行になっています
  • ですが、データがどのようにAMPに分散されるかは指定していないので、あまり実態的な意味のある結果ではないです(無作為に4つのサンプルに分けてそれぞれ平均を取ったような理屈です)
  • 多くのケースでは、データをAMPにどう分割するかを制御して、意図したグループに対して処理を行うことが多いです

次のスクリプトは、観測地点(location)ごとにデータをAMPに分配し、そのデータに対してスクリプトを実行する例です。

temperature3.py
# データを取得
# STOではデータは標準入力からTSVで取得
import pandas as pd
import sys

# STOが取得する数値データは科学計算表記で  '2.10000000000000E 001' のように与えられる
# Python がこれを読める用に空白を+に変えて '2.10000000000000E+001' のように変換する
str_to_float = lambda a: float(a.replace("E ", "E+"))
str_to_int = lambda a: int(a.replace("E ", "E+"))
x = pd.read_csv(sys.stdin, sep="\t", header=None,
                names=["date", "location", "avg_temp", "max_temp"],
                converters={"avg_temp": str_to_float,
                            "max_temp": str_to_float})

# このAMPにデータがない場合は何も出力しない
if len(x) == 0:
    sys.exit()

location = x["location"].loc[0]  # このAMPに含まれるlocation
variables = ["avg_temp", "max_temp"]
out = x.mean()
out = [location] + out.to_list()

# 結果は標準出力へTSV形式で与える
import csv
writer = csv.writer(sys.stdout, delimiter="\t", lineterminator="\n")
writer.writerow(out)
  • このスクリプトも平均値を計算するプログラムですが、AMP内で観測地(location)が固定であることを前提としています
temperature3.pyを実行
from teradataml import install_file
install_file("temperature3", "temperature3.py", file_on_client=True, replace=False)
# 初回は `replace=False`, 2回目以降は `replace=True` を指定

# スクリプトを実行
q = f"""
SELECT * FROM

  SCRIPT(
    ON ( SELECT * FROM temperature )  PARTITION BY location
    SCRIPT_COMMAND('tdpython3 {database}/temperature3.py;')
    
    RETURNS ('location VARCHAR(10), avg_temp FLOAT, max_temp FLOAT')
  )
"""

DataFrame(query=q)
#location  avg_temp            max_temp
#Sapporo   9.887590282337491   13.871569271175312
#Fukuoka   17.791989494418907  21.85896257386737
#Naha      23.53880499015102   26.225082074852267
#Tokyo     16.460866710439923  21.149179251477346
#Sendai    13.636703873933028  17.991989494418913
#Osaka     17.317268548916612  21.675771503611298
#Nagoya    16.704005252790544  21.621470781352592
  • 実行時に PARTITION BY location をつけることで、同じ location のデータが1つのAMPに集められてスクリプトが実行されます
  • 結果として、7つの観測値ごとの平均値が計算されています
  • この計算自体は groupby して平均値を出すことで実現できるものですが、グループごとにデータを分散配置して並列的にスクリプトを実行する、という仕組みには様々な応用が考えられます

STOによるマイクロモデリング:観測地点別に予測モデルの学習・予測

機械学習モデルの学習は、与えられたデータを用いて1つのモデルを作る、という作業なので、広い意味でデータの集約に含まれます。
ですので、上で見たSTOとデータ分割の仕組みを利用して、グループごとに別のモデルを作成したり、それを用いて予測する、ということを効率的に実施することができます。

STOによるマイクロモデルの学習

temperature4.py
# データを取得
# STOではデータは標準入力からTSVで取得
import pandas as pd
import sys
from statsmodels.tsa.arima.model import ARIMA

# STOが取得する数値データは科学計算表記で  '2.10000000000000E 001' のように与えられる
# Python がこれを読める用に空白を+に変えて '2.10000000000000E+001' のように変換する
str_to_float = lambda a: float(a.replace("E ", "E+"))
str_to_int = lambda a: int(a.replace("E ", "E+"))
x = pd.read_csv(sys.stdin, sep="\t", header=None,
                names=["date", "location", "avg_temp", "max_temp"],
                converters={"avg_temp": str_to_float,
                            "max_temp": str_to_float})

# このAMPにデータがない場合は何も出力しない
if len(x) == 0:
    sys.exit()

location = x["location"].loc[0]  # このAMPに含まれるlocation
x = x.sort_values("date")
y = x["max_temp"]
y.index = pd.to_datetime(x["date"], format="%Y-%m-%d")

model = ARIMA(y, order=(2,0,1), freq="D").fit()

# model を文字列情報として保存
# 一度バイナリデータに変換してから文字列に書き出す
from base64 import b64encode 
import pickle 
model_obj = b64encode(pickle.dumps(model))  # bytes type
model_str = model_obj.decode("ascii")

out = [location, model_str]

# 結果は標準出力へTSV形式で与える
import csv
writer = csv.writer(sys.stdout, delimiter="\t", lineterminator="\n")
writer.writerow(out)
  • このプログラムは、与えられたデータを用いて、時系列予測を行うARIMAモデルを学習します
  • 学習したモデルを標準出力に書き出す必要があります
  • 文字列で表現するのが1つの方法で、モデルオブジェクトを pickle形式のバイナリに変換して、さらにそれを base64方式でアスキー文字列に変換しています
  • これは汎用的な方法ですが、比較的大きなテキスト情報になってしまうところが弱点です。ケースによってはより効率的な保存方法があると思います
temperature4.pyを実行
# Teradata側にスクリプトを配置
from teradataml import install_file
install_file("temperature4", "temperature4.py", file_on_client=True, replace=False)
# 初回は `replace=False`, 2回目以降は `replace=True` を指定

# スクリプトを実行
# 今回は、学習結果を後に利用するため、
# 結果を手元に取得するのではなくモデルを(一時)テーブルに書き出す形にする
q = f"""
CREATE VOLATILE TABLE temperature_model 
AS (
  SELECT * FROM

  SCRIPT(
    ON (
      SELECT
        CAST(CAST("date" AS FORMAT 'YYYY-MM-DD') AS CHAR(10)) AS "date",
        location, 
        avg_temp,
        max_temp
      FROM temperature 
    )  PARTITION BY location
    SCRIPT_COMMAND('tdpython3 {database}/temperature4.py;')
    
    RETURNS ('location VARCHAR(10), model CLOB(10M)')
  )
)
WITH DATA
ON COMMIT PRESERVE ROWS
"""

from teradataml import get_connection
conn = get_connection()
conn.execute(q)

# 学習結果を確認
DataFrame("temperature_model").to_pandas()
#location  model
#Sendai    gASVHCgBAAAAAACMG3N0YXRzbW9kZWxzLnRzYS5hcmltYS...
#Fukuoka   gASVHCgBAAAAAACMG3N0YXRzbW9kZWxzLnRzYS5hcmltYS...
#Nagoya    gASVHCgBAAAAAACMG3N0YXRzbW9kZWxzLnRzYS5hcmltYS...
#Tokyo     gASVHCgBAAAAAACMG3N0YXRzbW9kZWxzLnRzYS5hcmltYS...
#Naha      gASVHCgBAAAAAACMG3N0YXRzbW9kZWxzLnRzYS5hcmltYS...
#Osaka     gASVHCgBAAAAAACMG3N0YXRzbW9kZWxzLnRzYS5hcmltYS...
#Sapporo   gASVHCgBAAAAAACMG3N0YXRzbW9kZWxzLnRzYS5hcmltYS...
  • スクリプトを実行すると、文字列に変換したモデルが出力されます
  • このモデルは後で利用したいので、テーブルに書き出しています(CREATE VOLATILE TABLE AS ~
  • VOLATILE TABLE は、セッションを閉じると破棄される一時テーブルです(実際のプロジェクトでは永続テーブルに書き出しましょう)
  • PARTITION BY location をしているので、観測値別に個別の予測モデルが作成されています

STOによるマイクロモデルの予測

temperature5.py
# データを取得
# STOではデータは標準入力からTSVで取得
import pandas as pd
import sys

x = pd.read_csv(sys.stdin, sep="\t", header=None,
                names=["location", "model"])

# このAMPにデータがない場合は何も出力しない
if len(x) == 0:
    sys.exit()

for _, (location, model) in x.iterrows():
    import pickle
    from base64 import b64decode
    model_str = model.encode("ascii")
    model_obj = b64decode(model_str)
    model = pickle.loads(model_obj)
    
    yhat = model.forecast(20)  # 20日分の予測
    # 予測はインデックス付きの series で返るので、データフレームに変換
    yhat = pd.DataFrame(yhat).reset_index()
    yhat.insert(0, "location", location)  # location 情報を追加
    # 予測結果を出力
    yhat.to_csv(sys.stdout, sep="\t", index=False, header=False)
  • locationmodel を含んだデータを入力から受け取り予測値を返すプログラムです
  • モデルの文字列表現を逆に変換してもとのARIMAモデルに戻し、これを用いて将来予測を行います
  • 予測結果は標準出力に書き出します
temperature5.pyを実行
from teradataml import install_file
install_file("temperature5", "temperature5.py", file_on_client=True, replace=False)

# スクリプトを実行
q = f"""
SELECT * FROM

  SCRIPT(
    ON (
      SELECT * FROM temperature_model )
    SCRIPT_COMMAND('tdpython3 {database}/temperature5.py;')
    
    RETURNS ('location VARCHAR(10), "date" DATE, max_temp_forecast FLOAT')
  )
"""

DataFrame(query=q)
#location  date      max_temp_forecast
#Tokyo     24/03/05  12.270377085561474
#Fukuoka   24/03/04  9.843999815897176
#Fukuoka   24/03/05  10.424787303561775
#Nagoya    24/03/03  10.57562841850719
#Nagoya    24/03/05  11.702318362889917
#....
  • 実行結果は、観測地点ごとに20日分の予測値が得られています

結び

以上、Script Table Operator 機能の仕組みを紹介しました。
柔軟かつ効率的な実装が可能な機能なので、紹介したマイクロモデリングに限らず様々な応用が考えられると思います(アイデアがあればコメントいただけると嬉しいです)。
疑問点・懸念点などありましたらぜひコメントください。GitHubの方にIssueを立てていただいても結構です。

5
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?