はじめに
Qlik Talend Data Integration(QTDI)は、オンプレ/クラウド問わず様々なソースからデータを取り込み、Snowflake や BigQuery などのクラウド DWH に向けて、レプリケーション〜変換〜データマート作成までのパイプラインを一気通貫で作れるサービスです。
※ 以下QTDIとします。
公式ドキュメントやデモを見ると便利そうなのは分かるものの、
「実際にオンプレ DB から Snowflake にデータを流すとどんな画面になるのか?」
「どのステップで何を設定するのか?」
といったイメージは持ちにくいと思います。
本記事では、オンプレミスのデータベース → Snowflake へデータを連携する、最小構成のパイプラインを題材に(本当に初歩の初歩です)、QTDIの基本コンセプトと画面遷移、パイプライン構成の考え方を整理してみます。
公式ドキュメント
※ ちなみになのですが、Qlikの公式ドキュメントを確認する際は、英語版を確認することをお勧めします。
というのも日本語版では、情報が古い、誤訳があるなどの問題が少なからずあるようなので、、
■ Introduction Data Integration
https://help.qlik.com/en-US/cloud-services/Subsystems/Hub/Content/Sense_Hub/DataIntegration/Introduction/Data-services-introduction.htm
Qlik Talend Data Integration紹介
概要
QTDIは、ソースデータを自動で取り込み(Landing)→ Snowflake などでステージング & 変換テーブルを自動生成(Storage / Transform)→ 最終的なデータマートを自動構築(Data mart)のプロセスを一気通貫で実施できます。
メダリオンアーキテクチャに則った構成でGUI上でコンポーネントを構築することが可能な、高機能なELTデータ統合ツールと言えます。

基礎用語
ここだけは押さえておきたい用語をあらかじめ記載しておきます。
| 用語 | 解説 |
|---|---|
| ランディングタスク | ソースシステム(DBやSaaSアプリ)からデータを抽出し、"ランディングゾーン"と呼ばれる領域に持ってくる処理を担当 |
| ストレージタスク | ランディング済みのデータを分析向けの読み取り最適形式(テーブルビューや履歴保持など)に整えて"データセット"として生成・更新するタスク |
| オンボードタスク | ランディングタスク + ストレージタスクの総称 |
今回実施したいこと
MySQLをソースとして、Snowflakeをターゲットとしてパイプラインを作成します。
イメージ図

通信順序としては、次の通りです。
- Qlik Data Movement Gateway(以降 DMG)は、HTTPSを使用して、Qlik Cloudへと相互に認証され暗号化された接続を確立します。
- Qlik Cloudはタスク実行指示をData Movement Gatewayに送信し、次にタスクのステータスがQlik Cloudに報告されます。
- Data Movement Gatewayはデータソースからデータを取得し、指定されたターゲットにプッシュします。
事前準備
- MySQLサーバーを立ち上げて、DMGと相互に通信ができるようにしておきましょう。
- MySQLに適当なサンプルデータを放り込んでおきましょう。(※1)
- 読み取りたいデータベースに適切な権限を与えておきましょう。(※2)
- DMGにドライバーを設定しておきましょう。(今回ですと、MySQLとSnowflakeのドライバー)(※2)
- DMGを立ち上げておきましょう。(※3)
(※1)
#データベースを作成する
CREATE DATABASE qlikdb;
#作成したデータベース一覧を確認する
SHOW DATABASES;
#データベースを使用
use qlikdb;
#サンプルデータ導入1
CREATE TABLE employees (
id INT AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(50),
department VARCHAR(50),
hire_date DATE,
salary DECIMAL(10, 2)
);
INSERT INTO employees (name, department, hire_date, salary) VALUES
('Tanaka', 'Sales', '2021-01-10', 5500000),
('Suzuki', 'IT', '2020-07-15', 6000000),
('Sato', 'HR', '2019-03-20', 4800000),
('Kato', 'Sales', '2022-11-01', 5300000);
(※2)必要に応じて参考にして下さい
https://help.qlik.com/en-US/cloud-services/Subsystems/Hub/Content/Sense_Hub/DataIntegration/TargetConnections/mysql-target.htm
python --version
cd opt/qlik/gateway/movement/drivers/bin
#MySQLドライバーインストール
sudo -E ./install mysql
#Snowflakeドライバーインストール
sudo -E ./install snowflake
#停止
sudo systemctl stop repagent
#再始動
sudo systemctl start repagent
# Active: active (running) を確認
sudo systemctl status repagent
(※3)必要に応じて参考にして下さい
https://qiita.com/kazu_1994/items/859f81f08926a3c85466
データパイプラインタスクの作成
-
プロジェクトを新規作成し、「データパイプライン」を押下後、ターゲットにSnowflakeを指定

※ ランディングターゲットは任意
データパイプラインタスクの実行
-
「検証」を押下
※ 検証フェーズでは、タスクの設定内容が正しく構成されているかをチェックします。(データ処理は実行されません。あくまで設定確認です。)

-
「準備」を押下
※ タスクを実行する前に必要なストリーム、メタデータ、内部コンポーネントを作成します。(ストレージタスクでデータをターゲットに入れる際の、言うなれば"ガワ"を作成します。)

補足
今回デフォルトでCDC連携ONになっていたため、ランディングタスクは常に実行されている状態です。
特に長期間使用しない場合は、ランディングタスクを停止しておいてもいいかもしれません。(ランディングゾーンの種別により、コストがかかるでしょう。)
おわりに
QTDI は、データ統合の複雑さを極力排除しながら、閉じた環境からクラウドへの移行を驚くほどシンプルにしてくれるサービスです。
今回紹介したのは、あくまで最小構成のパイプラインですが、実際に触れてみることで QTDI が提供する「運用のしやすさ」や「変更に強いアーキテクチャ」を体感できるはずです。
本記事が、クラウド DWH 活用の第一歩として、あなたのデータ統合プロジェクトを前進させるヒントになれば幸いです。
それでは皆様、ごきげんよう。
















