やりたいこと
社内向けに DataFactory のハンズオンを開催した際に、環境を丸ごと構築・削除できるように Terraform でやってみました。その際の手順などをまとめます。
同じようなことをやろうしている方の参考になれば幸いです!
作成するパイプライン
以下のようなパイプラインを Terraform で構築する。構成としてはメダリオンアーキテクチャを採用して、Raw -> Bronze -> Silver -> Gold という流れでデータをステージングしていく。
ソースとなるデータは2つ (CSV) で、最終的にその2つを JOIN して Gold データを作成する。また、ファイル名やストレージアカウントのコンテナ名はパイプラインのパラメータとして設定している。
Terraform コード
前提
- リソースグループは作成済みとする
- 既存リソースグループに対して新規のリソースを作成していく
- ADF からストレージアカウントへのアクセスには、システム割当マネージド ID を使う
プロバイダ設定
Terraform コード内で使う各プロバイダの設定。
provider.tf
terraform {
required_providers {
azurerm = {
source = "hashicorp/azurerm"
version = ">=3.64.0"
}
azuread = {
source = "hashicorp/azuread"
version = ">=2.39.0"
}
time = {
source = "hashicorp/time"
version = ">=0.9.1"
}
local = {
source = "hashicorp/local"
version = ">=2.4.0"
}
}
}
provider "azurerm" {
features {}
}
パラメータ設定
コード内で設定するパラメータなどを local.tf
でローカル変数として設定する。
local.tf
locals {
rg_name = "YOUR_RESOURCE_GROUP_NAME"
suffix = "YOUR_SUFFIX"
default_tags = {
Description = "ADF Handson"
Note = "This resource is created by Terraform"
}
storage_account = {
names = {
raw = join("", ["st", replace(local.suffix, "-", ""), "raw"]),
bronze = join("", ["st", replace(local.suffix, "-", ""), "bronze"]),
silver = join("", ["st", replace(local.suffix, "-", ""), "silver"]),
gold = join("", ["st", replace(local.suffix, "-", ""), "gold"])
}
account_tier = "Standard"
account_replication_type = "LRS"
}
container = {
names = {
sample = "sample"
}
}
blob = {
sources = {
sample1 = {
name = "sample1.csv"
source = "data/sample1.csv"
}
sample2 = {
name = "sample2.csv"
source = "data/sample2.csv"
}
}
}
data_factory = {
name = "adf-${local.suffix}"
}
}
既存リソースの読み込み
data
として既存のリソースなどを読み込む。今回は既存のリソースグループを読み込む。また、カレントユーザの情報を読み込む (サブスクリプション ID の取得用)。
data.tf
data "azurerm_resource_group" "my_rg" {
name = local.rg_name
}
data "azurerm_client_config" "current" {}
Blob データ
パイプラインで処理するデータも Terraform コードであらかじめストレージアカウントへアップロードしておく。そのためのサンプルデータ。data/
下に保存する。
sample1.csv
sample1_id,column11,column12,column13,column14,column15,useless_column
0,101,102,103,104,105,hoge
1,111,112,113,114,115,hoge
2,121,122,123,124,125,hoge
3,131,132,133,134,135,hoge
4,141,142,143,144,145,hoge
5,151,152,153,154,155,hoge
6,161,162,163,164,165,hoge
7,171,172,173,174,175,hoge
8,181,182,183,184,185,hoge
9,191,192,193,194,195,hoge
sample2.csv
sample2_id,column21,column22,column23,column24,column25
0,201,202,203,204,205
1,211,212,213,214,215
2,221,222,223,224,225
3,231,232,233,234,235
4,241,242,243,244,245
5,251,252,253,254,255
6,261,262,263,264,265
7,271,272,273,274,275
8,281,272,283,284,285
9,291,292,293,294,295
ストレージアカウントの作成
各ステージ (Raw ~ Gold) のデータを格納するためのストレージアカウントとコンテナを作成する。同時に、Raw には Blob データをローカルファイルからアップロードしておく。
create_storage_account.tf
resource "azurerm_storage_account" "storages" {
for_each = local.storage_account.names
name = each.value
resource_group_name = data.azurerm_resource_group.my_rg.name
location = data.azurerm_resource_group.my_rg.location
account_tier = local.storage_account.account_tier
account_replication_type = local.storage_account.account_replication_type
is_hns_enabled = true
tags = local.default_tags
}
resource "time_sleep" "wait_storage_account_created" {
depends_on = [azurerm_storage_account.storages]
create_duration = "60s"
}
resource "azurerm_storage_container" "st_containers" {
depends_on = [time_sleep.wait_storage_account_created] # wait for storage accounts creation
for_each = {
for item in setproduct(
[for st_name in local.storage_account.names : st_name],
[for container_name in local.container.names : container_name]
) : join("-", item) => item
}
storage_account_name = each.value[0]
container_access_type = "container"
name = each.value[1]
}
resource "time_sleep" "wait_container_created" {
depends_on = [azurerm_storage_container.st_containers]
create_duration = "60s"
}
resource "azurerm_storage_blob" "st_blob_raw" {
depends_on = [time_sleep.wait_container_created] # wait for containers creation
for_each = local.blob.sources
name = each.value.name
storage_account_name = azurerm_storage_account.storages["raw"].name
storage_container_name = azurerm_storage_container.st_containers["${local.storage_account.names["raw"]}-${local.container.names["sample"]}"].name
type = "Block"
source = each.value.source
}
Dataflow 定義
Gold データを作成する Dataflow の定義。パイプライン作成時にこの定義を利用する。pipeline_definitions/
に保存する。
create_gold_data_sample.txt
source(output(
sample1_id as short,
column11 as short,
column12 as short,
column13 as short,
column14 as short,
column15 as short
),
allowSchemaDrift: true,
validateSchema: false,
ignoreNoFilesFound: false) ~> sourceSilverSample1
source(output(
sample2_id as short,
column21 as short,
column22 as short,
column23 as short,
column24 as short,
column25 as short
),
allowSchemaDrift: true,
validateSchema: false,
ignoreNoFilesFound: false) ~> sourceSilverSample2
sourceSilverSample1, sourceSilverSample2 join(sample1_id == sample2_id,
joinType:'inner',
matchType:'exact',
ignoreSpaces: false,
broadcast: 'auto') ~> joinSample1Sample2
joinSample1Sample2 sink(allowSchemaDrift: true,
validateSchema: false,
filePattern:'sample.csv',
umask: 0022,
preCommands: [],
postCommands: [],
skipDuplicateMapInputs: true,
skipDuplicateMapOutputs: true,
mapColumn(
sample_id = sample1_id,
column11,
column12,
column13,
column14,
column15,
column21,
column22,
column23,
column24,
column25
)) ~> sinkGold
パイプライン定義
作成するパイプライン全体の定義。pipeline_definitions/
に保存する。
sample.json
[
{
"dependsOn": [],
"name": "CopyToBronzeSample1",
"inputs": [
{
"parameters": {
"container_name": {
"type": "Expression",
"value": "@pipeline().parameters.container_name"
},
"file_name": {
"type": "Expression",
"value": "@pipeline().parameters.sample_file_1"
}
},
"referenceName": "DelimitedTextRaw",
"type": "DatasetReference"
}
],
"outputs": [
{
"parameters": {
"container_name": {
"type": "Expression",
"value": "@pipeline().parameters.container_name"
},
"file_name": {
"type": "Expression",
"value": "@pipeline().parameters.sample_file_1"
}
},
"referenceName": "DelimitedTextBronze",
"type": "DatasetReference"
}
],
"policy": {
"retry": 0,
"retryIntervalInSeconds": 30,
"secureInput": false,
"secureOutput": false,
"timeout": "0.12:00:00"
},
"type": "Copy",
"typeProperties": {
"enableStaging": false,
"sink": {
"formatSettings": {
"fileExtension": ".txt",
"quoteAllText": true,
"type": "DelimitedTextWriteSettings"
},
"storeSettings": {
"type": "AzureBlobFSWriteSettings"
},
"type": "DelimitedTextSink"
},
"source": {
"formatSettings": {
"compressionProperties": null,
"type": "DelimitedTextReadSettings"
},
"storeSettings": {
"enablePartitionDiscovery": false,
"recursive": true,
"type": "AzureBlobFSReadSettings"
},
"type": "DelimitedTextSource"
},
"translator": {
"type": "TabularTranslator",
"typeConversion": true,
"typeConversionSettings": {
"allowDataTruncation": true,
"treatBooleanAsNumber": false
},
"mappings": [
{
"sink": {
"name": "sample1_id",
"physicalType": "String",
"type": "String"
},
"source": {
"name": "sample1_id",
"physicalType": "String",
"type": "String"
}
},
{
"sink": {
"name": "column11",
"physicalType": "String",
"type": "String"
},
"source": {
"name": "column11",
"physicalType": "String",
"type": "String"
}
},
{
"sink": {
"name": "column12",
"physicalType": "String",
"type": "String"
},
"source": {
"name": "column12",
"physicalType": "String",
"type": "String"
}
},
{
"sink": {
"name": "column13",
"physicalType": "String",
"type": "String"
},
"source": {
"name": "column13",
"physicalType": "String",
"type": "String"
}
},
{
"sink": {
"name": "column14",
"physicalType": "String",
"type": "String"
},
"source": {
"name": "column14",
"physicalType": "String",
"type": "String"
}
},
{
"sink": {
"name": "column15",
"physicalType": "String",
"type": "String"
},
"source": {
"name": "column15",
"physicalType": "String",
"type": "String"
}
},
{
"sink": {
"name": "useless_column",
"physicalType": "String",
"type": "String"
},
"source": {
"name": "useless_column",
"physicalType": "String",
"type": "String"
}
}
]
}
},
"userProperties": []
},
{
"dependsOn": [
{
"activity": "CopyToBronzeSample1",
"dependencyConditions": [
"Succeeded"
]
}
],
"name": "CopyToSilverSample1",
"inputs": [
{
"parameters": {
"container_name": {
"type": "Expression",
"value": "@pipeline().parameters.container_name"
},
"file_name": {
"type": "Expression",
"value": "@pipeline().parameters.sample_file_1"
}
},
"referenceName": "DelimitedTextBronze",
"type": "DatasetReference"
}
],
"outputs": [
{
"parameters": {
"container_name": {
"type": "Expression",
"value": "@pipeline().parameters.container_name"
},
"file_name": {
"type": "Expression",
"value": "@pipeline().parameters.sample_file_1"
}
},
"referenceName": "DelimitedTextSilver",
"type": "DatasetReference"
}
],
"policy": {
"retry": 0,
"retryIntervalInSeconds": 30,
"secureInput": false,
"secureOutput": false,
"timeout": "0.12:00:00"
},
"type": "Copy",
"typeProperties": {
"enableStaging": false,
"sink": {
"formatSettings": {
"fileExtension": ".txt",
"quoteAllText": true,
"type": "DelimitedTextWriteSettings"
},
"storeSettings": {
"type": "AzureBlobFSWriteSettings"
},
"type": "DelimitedTextSink"
},
"source": {
"formatSettings": {
"compressionProperties": null,
"type": "DelimitedTextReadSettings"
},
"storeSettings": {
"enablePartitionDiscovery": false,
"recursive": true,
"type": "AzureBlobFSReadSettings"
},
"type": "DelimitedTextSource"
},
"translator": {
"type": "TabularTranslator",
"typeConversion": true,
"typeConversionSettings": {
"allowDataTruncation": true,
"treatBooleanAsNumber": false
},
"mappings": [
{
"sink": {
"name": "sample1_id",
"physicalType": "String",
"type": "String"
},
"source": {
"name": "sample1_id",
"physicalType": "String",
"type": "String"
}
},
{
"sink": {
"name": "column11",
"physicalType": "String",
"type": "String"
},
"source": {
"name": "column11",
"physicalType": "String",
"type": "String"
}
},
{
"sink": {
"name": "column12",
"physicalType": "String",
"type": "String"
},
"source": {
"name": "column12",
"physicalType": "String",
"type": "String"
}
},
{
"sink": {
"name": "column13",
"physicalType": "String",
"type": "String"
},
"source": {
"name": "column13",
"physicalType": "String",
"type": "String"
}
},
{
"sink": {
"name": "column14",
"physicalType": "String",
"type": "String"
},
"source": {
"name": "column14",
"physicalType": "String",
"type": "String"
}
},
{
"sink": {
"name": "column15",
"physicalType": "String",
"type": "String"
},
"source": {
"name": "column15",
"physicalType": "String",
"type": "String"
}
}
]
}
},
"userProperties": []
},
{
"dependsOn": [],
"name": "CopyToBronzeSample2",
"inputs": [
{
"parameters": {
"container_name": {
"type": "Expression",
"value": "@pipeline().parameters.container_name"
},
"file_name": {
"type": "Expression",
"value": "@pipeline().parameters.sample_file_2"
}
},
"referenceName": "DelimitedTextRaw",
"type": "DatasetReference"
}
],
"outputs": [
{
"parameters": {
"container_name": {
"type": "Expression",
"value": "@pipeline().parameters.container_name"
},
"file_name": {
"type": "Expression",
"value": "@pipeline().parameters.sample_file_2"
}
},
"referenceName": "DelimitedTextBronze",
"type": "DatasetReference"
}
],
"policy": {
"retry": 0,
"retryIntervalInSeconds": 30,
"secureInput": false,
"secureOutput": false,
"timeout": "0.12:00:00"
},
"type": "Copy",
"typeProperties": {
"enableStaging": false,
"sink": {
"formatSettings": {
"fileExtension": ".txt",
"quoteAllText": true,
"type": "DelimitedTextWriteSettings"
},
"storeSettings": {
"type": "AzureBlobFSWriteSettings"
},
"type": "DelimitedTextSink"
},
"source": {
"formatSettings": {
"compressionProperties": null,
"type": "DelimitedTextReadSettings"
},
"storeSettings": {
"enablePartitionDiscovery": false,
"recursive": true,
"type": "AzureBlobFSReadSettings"
},
"type": "DelimitedTextSource"
},
"translator": {
"type": "TabularTranslator",
"typeConversion": true,
"typeConversionSettings": {
"allowDataTruncation": true,
"treatBooleanAsNumber": false
},
"mappings": [
{
"sink": {
"name": "sample2_id",
"physicalType": "String",
"type": "String"
},
"source": {
"name": "sample2_id",
"physicalType": "String",
"type": "String"
}
},
{
"sink": {
"name": "column21",
"physicalType": "String",
"type": "String"
},
"source": {
"name": "column21",
"physicalType": "String",
"type": "String"
}
},
{
"sink": {
"name": "column22",
"physicalType": "String",
"type": "String"
},
"source": {
"name": "column22",
"physicalType": "String",
"type": "String"
}
},
{
"sink": {
"name": "column23",
"physicalType": "String",
"type": "String"
},
"source": {
"name": "column23",
"physicalType": "String",
"type": "String"
}
},
{
"sink": {
"name": "column24",
"physicalType": "String",
"type": "String"
},
"source": {
"name": "column24",
"physicalType": "String",
"type": "String"
}
},
{
"sink": {
"name": "column25",
"physicalType": "String",
"type": "String"
},
"source": {
"name": "column25",
"physicalType": "String",
"type": "String"
}
}
]
}
},
"userProperties": []
},
{
"dependsOn": [
{
"activity": "CopyToBronzeSample2",
"dependencyConditions": [
"Succeeded"
]
}
],
"name": "CopyToSilverSample2",
"inputs": [
{
"parameters": {
"container_name": {
"type": "Expression",
"value": "@pipeline().parameters.container_name"
},
"file_name": {
"type": "Expression",
"value": "@pipeline().parameters.sample_file_2"
}
},
"referenceName": "DelimitedTextBronze",
"type": "DatasetReference"
}
],
"outputs": [
{
"parameters": {
"container_name": {
"type": "Expression",
"value": "@pipeline().parameters.container_name"
},
"file_name": {
"type": "Expression",
"value": "@pipeline().parameters.sample_file_2"
}
},
"referenceName": "DelimitedTextSilver",
"type": "DatasetReference"
}
],
"policy": {
"retry": 0,
"retryIntervalInSeconds": 30,
"secureInput": false,
"secureOutput": false,
"timeout": "0.12:00:00"
},
"type": "Copy",
"typeProperties": {
"enableStaging": false,
"sink": {
"formatSettings": {
"fileExtension": ".txt",
"quoteAllText": true,
"type": "DelimitedTextWriteSettings"
},
"storeSettings": {
"type": "AzureBlobFSWriteSettings"
},
"type": "DelimitedTextSink"
},
"source": {
"formatSettings": {
"compressionProperties": null,
"type": "DelimitedTextReadSettings"
},
"storeSettings": {
"enablePartitionDiscovery": false,
"recursive": true,
"type": "AzureBlobFSReadSettings"
},
"type": "DelimitedTextSource"
},
"translator": {
"type": "TabularTranslator",
"typeConversion": true,
"typeConversionSettings": {
"allowDataTruncation": true,
"treatBooleanAsNumber": false
},
"mappings": [
{
"sink": {
"name": "sample2_id",
"physicalType": "String",
"type": "String"
},
"source": {
"name": "sample2_id",
"physicalType": "String",
"type": "String"
}
},
{
"sink": {
"name": "column21",
"physicalType": "String",
"type": "String"
},
"source": {
"name": "column21",
"physicalType": "String",
"type": "String"
}
},
{
"sink": {
"name": "column22",
"physicalType": "String",
"type": "String"
},
"source": {
"name": "column22",
"physicalType": "String",
"type": "String"
}
},
{
"sink": {
"name": "column23",
"physicalType": "String",
"type": "String"
},
"source": {
"name": "column23",
"physicalType": "String",
"type": "String"
}
},
{
"sink": {
"name": "column24",
"physicalType": "String",
"type": "String"
},
"source": {
"name": "column24",
"physicalType": "String",
"type": "String"
}
},
{
"sink": {
"name": "column25",
"physicalType": "String",
"type": "String"
},
"source": {
"name": "column25",
"physicalType": "String",
"type": "String"
}
}
]
}
},
"userProperties": []
},
{
"dependsOn": [
{
"activity": "CopyToSilverSample1",
"dependencyConditions": [
"Succeeded"
]
},
{
"activity": "CopyToSilverSample2",
"dependencyConditions": [
"Succeeded"
]
}
],
"name": "CreateGoldData",
"policy": {
"retry": 0,
"retryIntervalInSeconds": 30,
"secureInput": false,
"secureOutput": false,
"timeout": "0.12:00:00"
},
"type": "ExecuteDataFlow",
"typeProperties": {
"compute": {
"computeType": "General",
"coreCount": 8
},
"dataFlow": {
"datasetParameters": {
"sinkGold": {
"container_name": {
"type": "Expression",
"value": "@pipeline().parameters.container_name"
},
"file_name": "sample.csv"
},
"sourceSilverSample1": {
"container_name": {
"type": "Expression",
"value": "@pipeline().parameters.container_name"
},
"file_name": {
"type": "Expression",
"value": "@pipeline().parameters.sample_file_1"
}
},
"sourceSilverSample2": {
"container_name": {
"type": "Expression",
"value": "@pipeline().parameters.container_name"
},
"file_name": {
"type": "Expression",
"value": "@pipeline().parameters.sample_file_2"
}
}
},
"parameters": {},
"referenceName": "create_gold_data_sample",
"type": "DataFlowReference"
},
"staging": {},
"traceLevel": "Fine"
},
"userProperties": []
}
]
DataFactory の作成
Data Factory リソースを作成する。
create_datafactory.tf
resource "azurerm_data_factory" "adf" {
name = local.data_factory.name
resource_group_name = data.azurerm_resource_group.my_rg.name
location = data.azurerm_resource_group.my_rg.location
identity {
type = "SystemAssigned"
}
tags = local.default_tags
}
resource "azurerm_data_factory_linked_service_data_lake_storage_gen2" "gen2_storages" {
for_each = local.storage_account.names
name = "LnkdSrvDataLakeGen2${title(each.key)}"
data_factory_id = azurerm_data_factory.adf.id
url = "https://${azurerm_storage_account.storages[each.key].name}.dfs.core.windows.net/"
use_managed_identity = true
}
resource "azurerm_data_factory_dataset_delimited_text" "storages" {
for_each = local.storage_account.names
name = "DelimitedText${title(each.key)}"
data_factory_id = azurerm_data_factory.adf.id
linked_service_name = azurerm_data_factory_linked_service_data_lake_storage_gen2.gen2_storages[each.key].name
azure_blob_fs_location {
file_system = "@dataset().container_name"
filename = "@dataset().file_name"
}
column_delimiter = ","
row_delimiter = "\n"
encoding = "UTF-8"
first_row_as_header = true
parameters = {
"container_name" = ""
"file_name" = ""
}
}
data "local_file" "sample" {
filename = "pipeline_definitions/sample.json"
}
resource "azurerm_data_factory_pipeline" "sample" {
depends_on = [
azurerm_data_factory_dataset_delimited_text.storages["raw"],
azurerm_data_factory_data_flow.create_gold_data_sample
]
name = "sample"
data_factory_id = azurerm_data_factory.adf.id
parameters = {
"container_name" = "sample"
"sample_file_1" = "sample1.csv"
"sample_file_2" = "sample2.csv"
}
activities_json = data.local_file.sample.content
}
data "local_file" "create_gold_data_sample" {
filename = "pipeline_definitions/create_gold_data_sample.txt"
}
resource "azurerm_data_factory_data_flow" "create_gold_data_sample" {
name = "create_gold_data_sample"
data_factory_id = azurerm_data_factory.adf.id
source {
name = "sourceSilverSample1"
dataset {
name = azurerm_data_factory_dataset_delimited_text.storages["silver"].name
}
}
source {
name = "sourceSilverSample2"
dataset {
name = azurerm_data_factory_dataset_delimited_text.storages["silver"].name
}
}
transformation {
name = "joinSample1Sample2"
}
sink {
name = "sinkGold"
dataset {
name = azurerm_data_factory_dataset_delimited_text.storages["gold"].name
}
}
script = data.local_file.create_gold_data_sample.content
}
ロールのアサイン
ストレージアカウント内のデータアクス用に、ADF のマネージド ID とカレントユーザにロールを割り当てる。割り当てるロールは、Storage Blob Data Contributor
。
assign_role.tf
resource "azurerm_role_assignment" "adf_write_storages" {
for_each = local.storage_account.names
scope = azurerm_storage_account.storages[each.key].id
role_definition_name = "Storage Blob Data Contributor"
principal_id = azurerm_data_factory.adf.identity[0].principal_id
}
resource "azurerm_role_assignment" "current_user_write_storages" {
for_each = local.storage_account.names
scope = azurerm_storage_account.storages[each.key].id
role_definition_name = "Storage Blob Data Contributor"
principal_id = data.azurerm_client_config.current.object_id
}
プロビジョニング
以下コマンドでプロビジョニングする。問題なければ今回作成することとしていたパイプラインが作成されているはず。
terraform init
terraform apply -y
動作確認
Debug
をクリックして確認する。Raw -> Bronze -> Silver -> Gold の順にデータが処理され、それぞれのストレージアカウント内にもデータが作成されているのが確認できる。
ということで
パイプラインを含めて環境をつくるために、Terraform で各リソースとパイプラインの構築を行いました。今回はハンズオン環境用としたかったので、この方法で行いましたが一般的にパイプラインは Data Factory Studio 上で GUI 操作によって作成する方が分かりやすさや操作面で優れていると思います。Github などとも連携してパイプラインのコードも管理できますし。逆に、Terraform で作成したパイプラインを Git 連携してしまうとめんどくさいことになりそうな気がします (未検証)。
用途は限られてしまいますが、こういった使い方もできるという紹介でした。
以上です。