はじめに
海岸工学、土木分野が好きなものです。数値計算も大好きです。
そんな私ですが、PythonやSnowflakeの勉強中です。
そこで、OpenFOAMの計算結果のSnowflakeへのアップロードにチャレンジします。
OpenFOAM(olaFlow)の数値計算
計算結果の概要
今回は二次元造波水路(長さ10m、高さ1m)をOpenFOAMで再現しました。
造波にはolaFlowを採用し、単位奥行きの断面二次元計算を実施しました。
OpenFOAMやolaFlowのインストールに興味がある方は以前の記事を参照にしてください!
数値計算のいくつかのパラメータは以下の通りです。今回は5s間のシミュレーションと短めです。
・波の様子:規則波
・波高:0.10m
・周期:1s
・計算時間:5s
計算結果の可視化
数値計算の様子を可視化した結果を以下に示します。緑色の線は波高計を表しています。
今回は3地点に波高計を設置しました。
それぞれ左端から3m、5m、7m地点に設置しています。
波高計で計測したデータ
olaFlowでは、水理模型実験で用いる波高計のように数値計算上の水面変動の時間変化を取得できます。
このスクリプト自体はolaFlowに元々あります!
取得したデータは以下のようになります。波高計それぞれのxy座標、計測結果が出力されます。
ファイル名は「Hakoukei〇〇」となるように変更しています。
3 -0.05
0.02 0.000000
0.04 0.000001
0.06 0.000002
0.08 0.000004
0.1 0.000006
ちなみに得られたデータはこんな感じ。
安定した波を計測するにはもう少し計算したほうがいいかも。
※ 今回は、波高計の設置方法などの詳細についてはスキップします
OpenFOAMの計算結果をsnowflakeにアップロードするための方針
今回のゴールは、OpenFOAMで計測した水面変動の記録を(前節のHakoukei01の結果)をsnowflakeにアップロードすることです。
そこで、以下の方針を考えました。
① Hakoukei01をcsvファイルにする
② csvファイルをsnowflakeにアップロードする
①、②を行うためにpythonでスクリプトを作成します。
作ってみる
Dockerコンテナの準備
基本的には、以前の記事と同じですが、Pythonのコードを使いたいので、少し追加しました。
コンテナ内でpython3、python3-pipを使えるようにしました
Dockerfileの中身はこちら
FROM opencfd/openfoam-default:2012
RUN apt update && \
apt install -y tree nano git python3 python3-pip
COPY shell_script/copy_openfoam_tutorials.sh shell_script/install_olaFlow.sh /usr/local/bin/
COPY python_script/upload_to_snowflake.py python_script/output_data_to_csv.py .env requirements.txt /usr/local/bin/
# スクリプトに実行権限を付与
RUN chmod +x /usr/local/bin/copy_openfoam_tutorials.sh \
/usr/local/bin/install_olaFlow.sh \
requirements.txtに記述したパッケージをpip installコマンドでインストールするようにしています。
compose.ymlの中身はこちら
# servicesにコンテナの定義を書く
services:
# コンテナ名を指定
myopenfoam:
container_name: container_openfoam
volumes:
- /Users/<user_name>/myworkspace/MyOpenFOAM/OpenFOAM/run/:/root/OpenFOAM/-v2012/run/
- /Users/<user_name>/myworkspace/MyOpenFOAM/OpenFOAM/olaFlow/:/root/OpenFOAM/-v2012/applications/utilities/olaFlow/tutorials
# イメージのビルドに関する設定
build:
# Dockerfileがあるディレクトリのパスを指定
context: .
dockerfile: Dockerfile
# イメージ名を設定
image: openfoam-v2012
platform: linux/amd64
stdin_open: true
tty: true
command: /bin/bash -c "source /usr/lib/openfoam/openfoam2012/etc/bashrc && \
/usr/local/bin/copy_openfoam_tutorials.sh && \
/usr/local/bin/install_olaFlow.sh && \
pip3 install -r /usr/local/bin/requirements.txt && \
exec /bin/bash"
requirements.txtの中身はこんな感じ
requirements.txtの中身はこちら
snowflake-snowpark-python==1.29.0
python-dotenv==0.21.0
OpenFOAMの計算結果をcsvに変換する
デフォルトの機能ではOpenFOAMで計測した波高計の計測結果は拡張子のないファイルに出力されます。
そこで、今回作りたい機能は以下のようにしました。
① 計測結果ファイルをcsvファイルに変換したい。出力するディレクトリは「Hakoukei_csv」にする
② 設置した波高計の数だけ、csvファイルに変換する
③ カラムが設定されていないので、追加する。
さっそく作ってみました。
import os
import csv
import logging
# ログの設定
logger = logging.getLogger("snow")
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
# 入力ファイルが格納されるディレクトリ
input_dir = "Hakoukei/"
# 出力ファイルを格納するディレクトリ
output_dir = "Hakoukei_csv"
# 出力ディレクトリが存在しない場合は作成
os.makedirs(output_dir, exist_ok=True)
# 入力ディレクトリ内のファイル名を取得
input_files = [
f
for f in os.listdir(input_dir)
if os.path.isfile(os.path.join(input_dir, f)) and "." not in f
]
# カラム名
cols = ["Time_s", "eta_m"]
for i in input_files:
# 入力ファイルのパス
input_file_path = os.path.join(input_dir, i)
# 出力ファイル名
output_file_name = i + ".csv"
# 出力ファイルのパス
output_file_path = os.path.join(output_dir, output_file_name)
try:
# 入力ファイルの読み込み、出力ファイルへの書き込み
with open(input_file_path, "r") as input_file, open(
output_file_path, "w", newline=""
) as output_file:
reader = csv.reader(input_file, delimiter=" ")
writer = csv.writer(output_file)
writer.writerow(cols)
for row in reader:
writer.writerow(row)
logger.info(f"作成完了:{output_file_name}")
except Exception as e:
logger.error(f"エラー発生: {e} ({i})")
csvファイルをsnowflakeにアップロードする
波高計の計測結果のcsvファイルをsnowflakeにアップロードしたいです。
今回は前提として、snowflakeでDB、スキーマ、ステージは作成しています。
前提となるオブジェクトを作成するためのSQLはこちら
use role sysadmin;
CREATE OR REPLACE WAREHOUSE my_wh WITH WAREHOUSE_SIZE='X-Small';
use warehouse my_wh;
create or replace database wave_db;
create or replace schema wave_db.wave_schema;
use database wave_db;
use schema wave_schema;
CREATE STAGE my_int_stage
ENCRYPTION = (TYPE = 'SNOWFLAKE_SSE');
snowflakeにアップロードするためのスクリプトはこんな感じにしました。
from snowflake.snowpark import Session
from dotenv import load_dotenv
import os
import logging
import csv
# ログの設定
logger = logging.getLogger("snow")
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
# パラメータ
table_name = "openfoam_data"
stage_name = "my_int_stage"
role = "sysadmin"
warehouse = "MY_WH"
database = "wave_db"
schema = "wave_schema"
target_file_path = "Hakoukei_csv/Hakoukei01.csv"
target_file = "Hakoukei01.csv"
# .env ファイルを読み込む
load_dotenv()
# 認証情報を取得
connection_parameters = {
"account": os.getenv("SNOWSQL_ACCOUNT"),
"user": os.getenv("SNOWSQL_USER"),
"password": os.getenv("SNOWSQL_PWD"),
}
# 環境変数の確認ログ
logger.info(f"環境変数: SNOWSQL_ACCOUNT={os.getenv('SNOWSQL_ACCOUNT')}")
logger.info(f"環境変数: SNOWSQL_USER={os.getenv('SNOWSQL_USER')}")
# Snowflakeのセッションを作成
try:
session = Session.builder.configs(connection_parameters).create()
logger.info("認証成功!")
except Exception as e:
logger.error(f"認証エラー: {e}")
session.close()
exit(1)
# コンテキスト
session.use_role(role)
session.use_warehouse(warehouse)
session.use_database(database)
session.use_schema(schema)
# csvファイルからカラム名を取得
with open(target_file_path, newline="") as csvfile:
spamreader = csv.reader(csvfile, delimiter=",", quotechar="|")
cols = next(spamreader)
create_table_sql = "create or replace table {} ({})".format(
table_name, ", ".join("{} VARCHAR(100)".format(i) for i in cols)
)
try:
session.sql(create_table_sql).collect()
logger.info(f"テーブル{table_name}を作成")
except Exception as e:
logger.error(f"テーブル '{table_name}' の作成に失敗: {e}")
session.close()
# 内部ステージを確認
stage_check = session.sql(f"SHOW STAGES LIKE '{stage_name}'").collect()
if not stage_check:
logger.error(f"ステージ '{stage_name}' が存在しません。")
session.close()
# ファイルをアップロード
try:
session.file.put(
target_file_path,
f"@{stage_name}",
auto_compress=False,
)
logger.info("ファイルを内部ステージにアップロード。")
except Exception as e:
logger.error(f"ファイルのPUTに失敗: {e}")
session.close()
# 内部テーブルからテーブルにコピーする
try:
result = session.sql(
f"copy into {table_name} from @{stage_name}/{target_file} "
"FILE_FORMAT = (TYPE = 'CSV' SKIP_HEADER = 1)"
).collect()
logger.info("コピー成功!!")
except Exception as e:
logger.error(f"内部ステージからテーブルのCOPYに失敗: {e}")
session.close()
# セッションの終了
session.close()
logger.info("成功!")
OpenFOAMの計算結果がsnowflakeにアップロードできているか確認
できてた!!
最後に
ログの出し方の勉強したいなぁ。