3
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

OpenFOAM(olaFlow)の計算結果をsnowflakeにアップロードする

Last updated at Posted at 2025-03-10

はじめに

海岸工学、土木分野が好きなものです。数値計算も大好きです。
そんな私ですが、PythonやSnowflakeの勉強中です。

そこで、OpenFOAMの計算結果のSnowflakeへのアップロードにチャレンジします。

OpenFOAM(olaFlow)の数値計算

計算結果の概要

今回は二次元造波水路(長さ10m、高さ1m)をOpenFOAMで再現しました。
造波にはolaFlowを採用し、単位奥行きの断面二次元計算を実施しました。

OpenFOAMやolaFlowのインストールに興味がある方は以前の記事を参照にしてください!

数値計算のいくつかのパラメータは以下の通りです。今回は5s間のシミュレーションと短めです。
・波の様子:規則波
・波高:0.10m
・周期:1s
・計算時間:5s

計算結果の可視化

数値計算の様子を可視化した結果を以下に示します。緑色の線は波高計を表しています。
今回は3地点に波高計を設置しました。
それぞれ左端から3m、5m、7m地点に設置しています。
動画.gif

波高計で計測したデータ

olaFlowでは、水理模型実験で用いる波高計のように数値計算上の水面変動の時間変化を取得できます。
このスクリプト自体はolaFlowに元々あります!

取得したデータは以下のようになります。波高計それぞれのxy座標、計測結果が出力されます。
ファイル名は「Hakoukei〇〇」となるように変更しています。

Hakoukei01.xy
3 -0.05 
Hakoukei01
0.02 0.000000
0.04 0.000001
0.06 0.000002
0.08 0.000004
0.1 0.000006

ちなみに得られたデータはこんな感じ。
安定した波を計測するにはもう少し計算したほうがいいかも。
image.png

※ 今回は、波高計の設置方法などの詳細についてはスキップします

OpenFOAMの計算結果をsnowflakeにアップロードするための方針

今回のゴールは、OpenFOAMで計測した水面変動の記録を(前節のHakoukei01の結果)をsnowflakeにアップロードすることです。
そこで、以下の方針を考えました。

① Hakoukei01をcsvファイルにする
② csvファイルをsnowflakeにアップロードする

①、②を行うためにpythonでスクリプトを作成します。

作ってみる

Dockerコンテナの準備

基本的には、以前の記事と同じですが、Pythonのコードを使いたいので、少し追加しました。

コンテナ内でpython3、python3-pipを使えるようにしました

Dockerfileの中身はこちら
Dockerfile
FROM opencfd/openfoam-default:2012
RUN apt update && \
  apt install  -y tree nano git python3 python3-pip
COPY shell_script/copy_openfoam_tutorials.sh shell_script/install_olaFlow.sh /usr/local/bin/
COPY python_script/upload_to_snowflake.py python_script/output_data_to_csv.py .env requirements.txt /usr/local/bin/

# スクリプトに実行権限を付与
RUN chmod +x /usr/local/bin/copy_openfoam_tutorials.sh \
  /usr/local/bin/install_olaFlow.sh \

requirements.txtに記述したパッケージをpip installコマンドでインストールするようにしています。

compose.ymlの中身はこちら
compose.yml
# servicesにコンテナの定義を書く
services:
  # コンテナ名を指定
  myopenfoam:
    container_name: container_openfoam
    volumes:
      - /Users/<user_name>/myworkspace/MyOpenFOAM/OpenFOAM/run/:/root/OpenFOAM/-v2012/run/
      - /Users/<user_name>/myworkspace/MyOpenFOAM/OpenFOAM/olaFlow/:/root/OpenFOAM/-v2012/applications/utilities/olaFlow/tutorials
    # イメージのビルドに関する設定
    build:
      # Dockerfileがあるディレクトリのパスを指定
      context: .
      dockerfile: Dockerfile
    # イメージ名を設定
    image: openfoam-v2012
    platform: linux/amd64
    stdin_open: true
    tty: true
    command: /bin/bash -c "source /usr/lib/openfoam/openfoam2012/etc/bashrc && \
            /usr/local/bin/copy_openfoam_tutorials.sh && \
            /usr/local/bin/install_olaFlow.sh && \
            pip3 install -r /usr/local/bin/requirements.txt && \
            exec /bin/bash"    

requirements.txtの中身はこんな感じ

requirements.txtの中身はこちら
requirements.txt
snowflake-snowpark-python==1.29.0
python-dotenv==0.21.0

OpenFOAMの計算結果をcsvに変換する

デフォルトの機能ではOpenFOAMで計測した波高計の計測結果は拡張子のないファイルに出力されます。

そこで、今回作りたい機能は以下のようにしました。

① 計測結果ファイルをcsvファイルに変換したい。出力するディレクトリは「Hakoukei_csv」にする
② 設置した波高計の数だけ、csvファイルに変換する
③ カラムが設定されていないので、追加する。

さっそく作ってみました。

output_data_to_csv.py
import os
import csv
import logging

# ログの設定
logger = logging.getLogger("snow")
logging.basicConfig(
    level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)

# 入力ファイルが格納されるディレクトリ
input_dir = "Hakoukei/"

# 出力ファイルを格納するディレクトリ
output_dir = "Hakoukei_csv"

# 出力ディレクトリが存在しない場合は作成
os.makedirs(output_dir, exist_ok=True)

# 入力ディレクトリ内のファイル名を取得
input_files = [
    f
    for f in os.listdir(input_dir)
    if os.path.isfile(os.path.join(input_dir, f)) and "." not in f
]

# カラム名
cols = ["Time_s", "eta_m"]

for i in input_files:
    # 入力ファイルのパス
    input_file_path = os.path.join(input_dir, i)
    # 出力ファイル名
    output_file_name = i + ".csv"
    # 出力ファイルのパス
    output_file_path = os.path.join(output_dir, output_file_name)
    try:
        # 入力ファイルの読み込み、出力ファイルへの書き込み
        with open(input_file_path, "r") as input_file, open(
            output_file_path, "w", newline=""
        ) as output_file:
            reader = csv.reader(input_file, delimiter=" ")
            writer = csv.writer(output_file)

            writer.writerow(cols)
            for row in reader:
                writer.writerow(row)
            logger.info(f"作成完了:{output_file_name}")
    except Exception as e:
        logger.error(f"エラー発生: {e} ({i})")

csvファイルをsnowflakeにアップロードする

波高計の計測結果のcsvファイルをsnowflakeにアップロードしたいです。
今回は前提として、snowflakeでDB、スキーマ、ステージは作成しています。

前提となるオブジェクトを作成するためのSQLはこちら
use role sysadmin;
CREATE OR REPLACE WAREHOUSE my_wh WITH WAREHOUSE_SIZE='X-Small';
use warehouse my_wh;


create or replace database wave_db;
create or replace schema wave_db.wave_schema;
use database wave_db;
use schema wave_schema;

CREATE STAGE my_int_stage
  ENCRYPTION = (TYPE = 'SNOWFLAKE_SSE');

snowflakeにアップロードするためのスクリプトはこんな感じにしました。

upload_to_snowflake.py
from snowflake.snowpark import Session
from dotenv import load_dotenv
import os
import logging
import csv

# ログの設定
logger = logging.getLogger("snow")
logging.basicConfig(
    level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)

# パラメータ
table_name = "openfoam_data"
stage_name = "my_int_stage"
role = "sysadmin"
warehouse = "MY_WH"
database = "wave_db"
schema = "wave_schema"
target_file_path = "Hakoukei_csv/Hakoukei01.csv"
target_file = "Hakoukei01.csv"

# .env ファイルを読み込む
load_dotenv()

# 認証情報を取得
connection_parameters = {
    "account": os.getenv("SNOWSQL_ACCOUNT"),
    "user": os.getenv("SNOWSQL_USER"),
    "password": os.getenv("SNOWSQL_PWD"),
}

# 環境変数の確認ログ
logger.info(f"環境変数: SNOWSQL_ACCOUNT={os.getenv('SNOWSQL_ACCOUNT')}")
logger.info(f"環境変数: SNOWSQL_USER={os.getenv('SNOWSQL_USER')}")

# Snowflakeのセッションを作成
try:
    session = Session.builder.configs(connection_parameters).create()
    logger.info("認証成功!")
except Exception as e:
    logger.error(f"認証エラー: {e}")
    session.close()
    exit(1)


# コンテキスト
session.use_role(role)
session.use_warehouse(warehouse)
session.use_database(database)
session.use_schema(schema)

# csvファイルからカラム名を取得
with open(target_file_path, newline="") as csvfile:

    spamreader = csv.reader(csvfile, delimiter=",", quotechar="|")
    cols = next(spamreader)

create_table_sql = "create or replace table {} ({})".format(
    table_name, ", ".join("{} VARCHAR(100)".format(i) for i in cols)
)
try:
    session.sql(create_table_sql).collect()
    logger.info(f"テーブル{table_name}を作成")
except Exception as e:
    logger.error(f"テーブル '{table_name}' の作成に失敗: {e}")
    session.close()

# 内部ステージを確認
stage_check = session.sql(f"SHOW STAGES LIKE '{stage_name}'").collect()
if not stage_check:
    logger.error(f"ステージ '{stage_name}' が存在しません。")
    session.close()

# ファイルをアップロード
try:
    session.file.put(
        target_file_path,
        f"@{stage_name}",
        auto_compress=False,
    )
    logger.info("ファイルを内部ステージにアップロード。")
except Exception as e:
    logger.error(f"ファイルのPUTに失敗: {e}")
    session.close()


# 内部テーブルからテーブルにコピーする
try:
    result = session.sql(
        f"copy into {table_name} from @{stage_name}/{target_file} "
        "FILE_FORMAT = (TYPE = 'CSV' SKIP_HEADER = 1)"
    ).collect()
    logger.info("コピー成功!!")
except Exception as e:
    logger.error(f"内部ステージからテーブルのCOPYに失敗: {e}")
    session.close()


# セッションの終了
session.close()
logger.info("成功!")

OpenFOAMの計算結果がsnowflakeにアップロードできているか確認

できてた!!

image.png

最後に

ログの出し方の勉強したいなぁ。

3
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?