1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Informaticaのデータ統合サービスCDIのデータ統合処理をREST APIで実行してみた

Posted at

はじめに

はじめまして。 NTTデータ ソリューション事業本部 デジタルサクセスソリューション事業部 の nttd-nagano です。

Informatica(インフォマティカ) のクラウドデータマネージメントプラットフォームとして、「Intelligent Data Management Cloud」(※1。以下IDMCと記載)というものがあります。

今回は、そのIDMCのデータ統合サービス「Cloud Data Integration」(※2。以下CDIと記載)のデータ統合処理をREST APIで実行してみた ので、ご報告します。

※1. Intelligent Data Management Cloud 略称はIDMC。旧称はIICS。クラウドデータマネジメントプラットフォーム。以下IDMCと記載。
※2. Cloud Data Integration 略称はCDI。データ統合サービス。ETL処理(※3)やELT処理(※4)を担う。以下CDIと記載。
※3. ETL処理 データベースなどに蓄積されたデータから必要なものを抽出(Extract)し、目的に応じて変換(Transform)し、データを必要とするシステムに格納(Load)すること。
※4. ELT処理 ETL処理(※3)と対比して使われることが多い言葉。データ統合処理の順序を従来型のE→T→Lの順ではなく、E→L→Tの順でおこなう。近年ではDBMSの性能が爆発的に向上したことから、その性能を有効活用するために使われる手法。

idmc_summary.png

データ統合処理をGUI以外の手段で開始する方法

CDIのデータ統合処理(具体的には、タスクフローやマッピングタスク)を、GUIからではなく、シェルスクリプトやジョブスケジューラから実行したいことがあるかと思います。

データ統合処理をGUI以外の手段で実行する方法は、大別すると次の2種類あります。

  1. RunAJobユーティリティ
  2. REST API

今回は、後者のREST APIを解説します。

REST API

REST APIは、さらに次のように分類することができます。

  1. プラットフォームREST APIバージョン2
  2. プラットフォームREST APIバージョン3
  3. サービス固有のREST API(例えば、 データ統合REST API
  4. パブリッシュしたタスクフローに対するREST API(例えば、 実行監視

これらを組み合わせて、所望の処理を作ります。

今回は次のようにして、タスクフローを実行してセッションログを取得する処理を実現してみます。

処理 手段
ログインする プラットフォームREST APIバージョン3
タスクフローを実行する パブリッシュしたタスクフローに対するREST API(実行)
タスクフローのステータスを取得する パブリッシュしたタスクフローに対するREST API(監視)
セッションログを取得する プラットフォームREST APIバージョン2
ログアウトする プラットフォームREST APIバージョン3

Pythonコード

今回はPythonで実装してみました。

実行前に、認証情報を環境変数として設定しておく必要があります。

環境変数 設定値
IDMC_USERNAME IDMCのユーザー名。タスクフローの「許可されたユーザー」であるか、あるいは「許可されたグループ」に所属しているユーザーである必要があります。
IDMC_PASSWORD IDMCのユーザー名に対応するパスワード

※. REST APIではユーザー名/パスワードでの認証のみに対応しています。

このPythonコードは次のようにして実行します。

python run_tf.py 【タスクフロー名】

run_tf.py

from logging import basicConfig, getLogger
from logging import StreamHandler, Formatter
from logging import NOTSET, INFO
import os
from requests import get, post
from urllib.parse import urlparse
from time import sleep
import sys

def setup_logger(name):
    """ロガーをセットアップする
    """
    stream_handler = StreamHandler()
    stream_handler.setLevel(INFO)
    stream_handler.setFormatter(Formatter("%(asctime)s [%(levelname)s] %(message)s"))
    basicConfig(level=NOTSET, handlers=[stream_handler])
    logger = getLogger(name)

    return logger

logger = setup_logger(__name__)

def login(idmc_username, idmc_password):
    """ログインする
    """

    login_url = "https://dm-apne.informaticacloud.com/saas/public/core/v3/login"

    login_req_json = {"username": idmc_username, "password": idmc_password}
    login_response = post(url=login_url, json=login_req_json)
    if (login_response.status_code != 200):
        raise Exception("caught Exception : " + login_response.text)
    login_res_json = login_response.json()

    logger.info("ログインしました。")

    base_api_url = login_res_json["products"][0]["baseApiUrl"]
    session_id = login_res_json["userInfo"]["sessionId"]

    return base_api_url, session_id

def run_tf(session_id, server_url, taskflow_name):
    """タスクフローを実行する
    """

    headers_v3 = {"Content-Type": "application/json", "Accept": "application/json", "INFA-SESSION-ID": session_id }
    run_url = server_url + "/active-bpel/rt/" + taskflow_name

    run_response = post(run_url, headers = headers_v3)
    if (run_response.status_code != 200):
        raise Exception("caught Exception : " + run_response.text)
    run_res_json = run_response.json()
    tf_run_id = run_res_json["RunId"]

    logger.info("タスクフロー " + taskflow_name + " を実行しました。")

    return tf_run_id

def wait_for_tf_end(session_id, server_url, mt_run_id):
    """タスクフローの完了を待つ
    """

    headers_v3 = {"Content-Type": "application/json", "Accept": "application/json", "INFA-SESSION-ID": session_id }
    status_url = server_url + "/active-bpel/services/tf/status/" + mt_run_id

    status = "RUNNING"
    while (status == "RUNNING"):
        logger.info("タスクフローの完了を待っています。")
        sleep(5)
        status_response = get(status_url, headers=headers_v3)
        if status_response.status_code != 200:
            raise Exception("caught Exception : " + status_response.text)
        status_res_json = status_response.json()
        status = status_res_json["status"]
        mt_asset_name = str(status_res_json["subtaskDetails"]["details"]["tasks"][0]["assetName"])
        mt_run_id = str(status_res_json["subtaskDetails"]["details"]["tasks"][0]["runId"])

    if (status == "SUCCESS"):
        result = True
        logger.info("タスクフローが正常終了しました。")
    else:
        result = False
        logger.error("タスクフローが異常終了しました。")

    return result, mt_run_id, mt_asset_name

def get_session_log(session_id, base_api_url, mt_run_id, mt_asset_name):
    """セッションログを取得する
    """

    headers_v2 = {"Content-Type": "application/json", "Accept": "application/json", "icSessionId": session_id }

    # マッピングタスク名をもとに、マッピングタスクIDを取得する
    mt_url = base_api_url + "/api/v2/mttask/name/" + mt_asset_name
    mt_response = get(mt_url, headers = headers_v2)
    if (mt_response.status_code != 200):
        raise Exception("caught Exception : " + mt_response.text)
    mt_res_json = mt_response.json()
    mt_id = mt_res_json["id"]

    # マッピングタスクIDとマッピングタスク実行IDをもとに、ログエントリIDとセッションログファイル名を取得する
    log_entry_url = base_api_url + "/api/v2/activity/activityLog?id=" + mt_id + "&runId=" + mt_run_id
    log_entry_response = get(log_entry_url, headers = headers_v2)
    if (log_entry_response.status_code != 200):
        raise Exception("caught Exception : " + log_entry_response.text)
    log_entry_res_json = log_entry_response.json()
    log_entry_id = log_entry_res_json[0]["id"]
    session_log_file_name = log_entry_res_json[0]["entries"][0]["logEntryItemAttrs"]["Session Log File Name"]

    # ログエントリIDをもとに、セッションログを取得する
    session_log_url = base_api_url + "/api/v2/activity/activityLog/" + log_entry_id + "/sessionLog"
    session_log_response = get(session_log_url, headers = headers_v2)
    if (session_log_response.status_code != 200):
        raise Exception("caught Exception : " + session_log_response.text)
    with open(session_log_file_name, "w") as session_log_file:
        session_log_file.write(session_log_response.text)

    logger.info("セッションログ " + session_log_file_name + " を保存しました。")

def logout(session_id):
    """ログアウトする
    """

    headers_v3 = {"Content-Type": "application/json", "Accept": "application/json", "INFA-SESSION-ID": session_id }
    logout_url = "https://dm-apne.informaticacloud.com/saas/public/core/v3/logout"

    logout_response = post(url=logout_url, headers=headers_v3)
    if (logout_response.status_code != 200):
        raise Exception("caught Exception : " + logout_response.text)

def main(taskflow_name):
    idmc_username = os.environ["IDMC_USERNAME"]
    idmc_password = os.environ["IDMC_PASSWORD"]

    try:
        # ログインする
        base_api_url, session_id = login(idmc_username, idmc_password)

        # タスクフローの実行のためにBase API URLからドメイン部分のみを取り出す
        parse = urlparse(base_api_url)
        server_url = parse.scheme + "://" + parse.netloc

        # タスクフローを実行する
        tf_run_id = run_tf(session_id, server_url, taskflow_name)

        # タスクフローの完了を待つ
        result, mt_run_id, mt_asset_name = wait_for_tf_end(session_id, server_url, tf_run_id)

        if (result):
            # セッションログを取得する
            get_session_log(session_id, base_api_url, mt_run_id, mt_asset_name)

        # ログアウトする
        logout(session_id)
    except Exception as e:
        logger.error("Unexpected error: " + str(e))
        sys.exit(1)

if __name__ == "__main__":
    if len(sys.argv) == 2:
        taskflow_name = sys.argv[1]
    else:
        print("Usage: python run_tf.py taskflow_name")
        sys.exit(1)

    main(taskflow_name)

おわりに

以上、「Informaticaのデータ統合サービスCloud Data Integrationのデータ統合処理をREST APIで実行してみた」でした。

あまり複雑なことはしていないので、Pythonではなく、シェルスクリプトやPowerShellなどでも実装できるのではないでしょうか。

IDMCのデータ統合サービスCDIは30日間の無料体験ができる ので、この機会に試してみてはいかがでしょうか。

仲間募集

NTTデータ ソリューション事業本部 では、以下の職種を募集しています。

1. クラウド技術を活用したデータ分析プラットフォームの開発・構築(ITアーキテクト/クラウドエンジニア)

クラウド/プラットフォーム技術の知見に基づき、DWH、BI、ETL領域におけるソリューション開発を推進します。
https://enterprise-aiiot.nttdata.com/recruitment/career_sp/cloud_engineer

2. データサイエンス領域(データサイエンティスト/データアナリスト)

データ活用/情報処理/AI/BI/統計学などの情報科学を活用し、よりデータサイエンスの観点から、データ分析プロジェクトのリーダーとしてお客様のDX/デジタルサクセスを推進します。
https://enterprise-aiiot.nttdata.com/recruitment/career_sp/datascientist

3.お客様のAI活用の成功を推進するAIサクセスマネージャー

DataRobotをはじめとしたAIソリューションやサービスを使って、
お客様のAIプロジェクトを成功させ、ビジネス価値を創出するための活動を実施し、
お客様内でのAI活用を拡大、NTTデータが提供するAIソリューションの利用継続を推進していただく人材を募集しています。
https://nttdata-career.jposting.net/u/job.phtml?job_code=804

4.DX/デジタルサクセスを推進するデータサイエンティスト《管理職/管理職候補》 データ分析プロジェクトのリーダとして、正確な課題の把握、適切な評価指標の設定、分析計画策定や適切な分析手法や技術の評価・選定といったデータ活用の具現化、高度化を行い分析結果の見える化・お客様の納得感醸成を行うことで、ビジネス成果・価値を出すアクションへとつなげることができるデータサイエンティスト人材を募集しています。

https://nttdata-career.jposting.net/u/job.phtml?job_code=898

ソリューション紹介

Trusted Data Foundationについて

~データ資産を分析活用するための環境をオールインワンで提供するソリューション~
https://www.nttdata.com/jp/ja/lineup/tdf/
最新のクラウド技術を採用して弊社が独自に設計したリファレンスアーキテクチャ(Datalake+DWH+AI/BI)を顧客要件に合わせてカスタマイズして提供します。
可視化、機械学習、DeepLearningなどデータ資産を分析活用するための環境がオールインワンで用意されており、これまでとは別次元の量と質のデータを用いてアジリティ高くDX推進を実現できます。

TDFⓇ-AM(Trusted Data Foundation - Analytics Managed Service)について

~データ活用基盤の段階的な拡張支援(Quick Start) と保守運用のマネジメント(Analytics Managed)をご提供することでお客様のDXを成功に導く、データ活用プラットフォームサービス~
https://www.nttdata.com/jp/ja/lineup/tdf_am/
TDFⓇ-AMは、データ活用をQuickに始めることができ、データ活用の成熟度に応じて段階的に環境を拡張します。プラットフォームの保守運用はNTTデータが一括で実施し、お客様は成果創出に専念することが可能です。また、日々最新のテクノロジーをキャッチアップし、常に活用しやすい環境を提供します。なお、ご要望に応じて上流のコンサルティングフェーズからAI/BIなどのデータ活用支援に至るまで、End to Endで課題解決に向けて伴走することも可能です。

NTTデータとInformaticaについて

データ連携や処理方式を専門領域として10年以上取り組んできたプロ集団であるNTTデータは、データマネジメント領域でグローバルでの高い評価を得ているInformatica社とパートナーシップを結び、サービス強化を推進しています。
https://www.nttdata.com/jp/ja/lineup/informatica/

NTTデータとTableauについて

ビジュアル分析プラットフォームのTableauと2014年にパートナー契約を締結し、自社の経営ダッシュボード基盤への採用や独自のコンピテンシーセンターの設置などの取り組みを進めてきました。さらに2019年度にはSalesforceとワンストップでのサービスを提供開始するなど、積極的にビジネスを展開しています。

これまでPartner of the Year, Japanを4年連続で受賞しており、2021年にはアジア太平洋地域で最もビジネスに貢献したパートナーとして表彰されました。
また、2020年度からは、Tableauを活用したデータ活用促進のコンサルティングや導入サービスの他、AI活用やデータマネジメント整備など、お客さまの企業全体のデータ活用民主化を成功させるためのノウハウ・方法論を体系化した「デジタルサクセス」プログラムを提供開始しています。
https://www.nttdata.com/jp/ja/lineup/tableau/

NTTデータとAlteryxについて
Alteryxは、業務ユーザーからIT部門まで誰でも使えるセルフサービス分析プラットフォームです。

Alteryx導入の豊富な実績を持つNTTデータは、最高位にあたるAlteryx Premiumパートナーとしてお客さまをご支援します。

導入時のプロフェッショナル支援など独自メニューを整備し、特定の業種によらない多くのお客さまに、Alteryxを活用したサービスの強化・拡充を提供します。

https://www.nttdata.com/jp/ja/lineup/alteryx/

NTTデータとDataRobotについて
DataRobotは、包括的なAIライフサイクルプラットフォームです。

NTTデータはDataRobot社と戦略的資本業務提携を行い、経験豊富なデータサイエンティストがAI・データ活用を起点にお客様のビジネスにおける価値創出をご支援します。

https://www.nttdata.com/jp/ja/lineup/datarobot/

NTTデータとDatabricksについて
NTTデータでは、Databricks Inc.とソリューションパートナー契約を締結し、クラウド・データプラットフォーム「Databricks」の導入・構築、および活用支援を開始しています。

NTTデータではこれまでも、独自ノウハウに基づき、ビッグデータ・AIなど領域に係る市場競争力のあるさまざまなソリューションパートナーとともにエコシステムを形成し、お客さまのビジネス変革を導いてきました。
Databricksは、これら先端テクノロジーとのエコシステムの形成に強みがあり、NTTデータはこれらを組み合わせることでお客さまに最適なインテグレーションをご提供いたします。

https://www.nttdata.com/jp/ja/lineup/databricks/

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?