4
2

TROCCO®のTerraform Provider(β版)で既存のBigQueryデータマート定義を一括取込する

Posted at

はじめに

TROCCO®のTerraform Provider(β版)が使えるようになり、その簡単な操作方法については前回の記事「TROCCO®のTerraform Provider(β版)ができたので最速で触ってみる」でご紹介しました。

これから先の可能性を感じさせるものながら、前回紹介したような手作業での既存設定の取込は煩雑すぎてやりたくないところ・・・。そこで、ひとまずローカル環境下で一括取込する方法を整理したので、ご紹介しようと思います。

  • 基本的なTerraformの動作については、前回の記事をご確認ください。
  • 筆者はTerraform初心者なので、こうするといいぞというのがあればぜひご指摘ください!

事前準備

以下のようなディレクトリ構造で準備をします。

.
├── modules
│   └── datamart
│       └── _provider.tf
├── .env
├── bulk_import.ipynb
└── provider.tf

まず、ルートディレクトリのprovider.tfにはproviderとmoduleの設定を記載します。今回はデータマート定義ごとに設定ファイルを分割したく、その際にファイルを管理しやすくするためmoduleの機能を利用しています。

provider.tf
terraform {
  required_providers {
    trocco = {
      source = "registry.terraform.io/trocco-io/trocco"
    }
  }
}

provider "trocco" {
  region = "japan"
}

module "datamart" {
  source = "./modules/datamart"
}

次にmodule側のディレクトリにも設定情報を記載します。

modules/datamart/_provider.tf
terraform {
  required_providers {
    trocco = {
      source = "registry.terraform.io/trocco-io/trocco"
    }
  }
}

provider "trocco" {
  region = "japan"
}

この設定はいらないはず?なのですが、これがないとregistry.terraform.io/hashicorp/troccoを取りにいくような挙動をしてしまうようでして、もしどう直せばよいかわかる方がいればご指摘いただけると助かります!

そして、.envには事前に取得したAPI KEYを記載します。

.env
TROCCO_API_KEY={TROCCO_API_KEY}

事前準備はこれで完了なので、bulk_import.ipynbでPythonコードを実行して取込処理を進めていきます。

データマート定義を一括取込する

先に処理の流れを概観しておきましょう。

  • APIを叩いて既存のデータマート定義の一覧を取得する
  • 取得したTROCCO側のデータマート定義IDをもとに、ルートディレクトリに取込用の.tfファイルを作成し、terraform importを実行する
  • terraform state showの実行結果をもとに、./modules/datamartのディレクトリに.tfファイルを再作成する
    • 命名はデータ転送モードでは転送先のデータセット/テーブル名、自由記述モードでは転送設定名ベースにする
    • 設定情報を.tfファイルに記載し、実行クエリについては.sqlファイルに切り出す
  • 取り込まれたstate情報はルートディレクトリになっているので、terraform state mv./modules/datamartのディレクトリに切り替える
  • 不要なファイルを削除する

Terraformでimportやapplyをする際にはTROCCO APIを叩くことになるので、大量にAPIを叩きすぎないようご注意ください。

実行コードは以下の通りです。少しずつ解説します。

クリックして実行コード全体を確認する
bulk_import.ipynb
# import required libraries and set environment variables
import urllib, os, json, subprocess, re, glob, shutil, time
from dotenv import load_dotenv
load_dotenv('.env')

def cmd_exec(cmd):
    res = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True).communicate()
    message = res[0].decode('utf-8')
    error_message = ''
    if message == '':
        error_message = res[1].decode('utf-8')
        print(error_message)
    return [message, error_message]

def terraform_init(save_dir):
    if os.path.isdir('.terraform/'):
        shutil.rmtree('.terraform/')
    for file in glob.glob(os.path.join(save_dir, '*')):
        if not file.endswith('_provider.tf'):
            os.remove(file)
    for file in glob.glob('*'):
        if 'tfstate' in file:
            os.remove(file)
        if re.search(r'_[0-9]*\.tf', file):
            os.remove(file)
    if os.path.exists('.terraform.lock.hcl'):
        os.remove('.terraform.lock.hcl')
    print(cmd_exec('terraform init')[0])
    print(cmd_exec('terraform apply')[0])

def remove_label_id(match):
    return re.sub(r'id\s*=\s*[0-9]+\s*', '', match.group())

def remove_unnecessary_id(resource_config, trocco_datamart_id):
    # delete datamart id
    resource_config = resource_config.replace(f'\n    id                       = {trocco_datamart_id}', '')
    # delete label id
    resource_config = re.sub(r'(labels\s*=\s*\[.*?\])', remove_label_id, resource_config, flags=re.DOTALL)
    return resource_config


save_dir = "./modules/datamart"
terraform_init(save_dir)


# get datamart definitions
trocco_datamart_definitions_api = 'https://trocco.io/api/datamart_definitions?limit=5'
req = urllib.request.Request(trocco_datamart_definitions_api)
req.add_header('Authorization', f"Token {os.getenv('TROCCO_API_KEY')}")
res = urllib.request.urlopen(req)
datamart_definitions = json.load(res)['items']

# list existing datamart definitions
existing_datamart_definitions = []
for file in glob.glob(os.path.join(save_dir, '*.sql')):
    existing_datamart_definitions.append(file.split('__')[-1].split('.sql')[0])

# process each datamart definitions
for definition in datamart_definitions:
    trocco_datamart_id = definition['id']
    if str(trocco_datamart_id) in existing_datamart_definitions:
        print(f'{resource_type} - {trocco_datamart_id} is already imported')
        continue

    # check data warehouse type
    if definition['data_warehouse_type'] == 'bigquery':
        resource_type = 'trocco_bigquery_datamart_definition'
    else:
        continue

    # create base config
    print(f'start processing: {resource_type} - {trocco_datamart_id}')
    terraform_resource_id = f'{resource_type}_{trocco_datamart_id}'
    add_config_base = f'''
resource "{resource_type}" "{terraform_resource_id}" {{

}}'''
    with open(f'{terraform_resource_id}.tf', 'a', encoding='utf-8', newline='\n') as f:
        f.write(add_config_base)

    # import existing resource and show state
    terraform_resource_type_and_id = f'{resource_type}.{terraform_resource_id}'
    cmd_exec(f'terraform import {terraform_resource_type_and_id} {trocco_datamart_id}')
    show_res = cmd_exec(f'terraform state show {terraform_resource_type_and_id}')

    # adjust imported config
    state_config = show_res[0].replace(f'# {terraform_resource_type_and_id}:', '')
    if re.search(r'(?<=destination_dataset      = ")[^"]+', state_config) is not None:
        destination_dataset = re.search(r'(?<=destination_dataset      = ")[^"]+', state_config).group()
    else:
        destination_dataset = 'query'
    if re.search(r'(?<=destination_table        = ")[^"]+', state_config) is not None:
        destination_table = re.search(r'(?<=destination_table        = ")[^"]+', state_config).group()
    else:
        destination_table = definition['name']
    exclude_char_pattern = r'\(|\)|/|:|\[|\]|<|>|\*|\$'
    renamed_terraform_resource_id = f'{destination_dataset}__{destination_table}__{trocco_datamart_id}'.replace(' ', '_').replace('&', 'and')
    renamed_terraform_resource_id = re.sub(exclude_char_pattern, '', renamed_terraform_resource_id)

    # create sql file
    query = re.sub(r'\n        ', '\n', re.search(r'(?<=query                    = <<-EOT\n        )(.*?)(?=\n    EOT)', state_config, re.DOTALL).group())
    with open(os.path.join(save_dir, f'{renamed_terraform_resource_id}.sql'), 'w', encoding='utf-8', newline='\n') as f:
        f.write(query)

    # write tf file config and rename tf file
    tf_config_tmp = re.sub(
        r'(?<=query                    = )(<<-EOT\n.*?EOT)(?=\n)',
        f'file("${{path.module}}/{renamed_terraform_resource_id}.sql")',
        remove_unnecessary_id(state_config, trocco_datamart_id),
        flags=re.DOTALL
    )
    tf_config = re.sub(
        f'\nresource "{resource_type}" "{terraform_resource_id}"',
        f'\nresource "{resource_type}" "{renamed_terraform_resource_id}"',
        tf_config_tmp
    )
    with open(os.path.join(save_dir, f'{terraform_resource_id}.tf'), 'w', encoding='utf-8', newline='\n') as f:
        f.write(tf_config)
    os.rename(os.path.join(save_dir, f'{terraform_resource_id}.tf'), os.path.join(save_dir, f'{renamed_terraform_resource_id}.tf'))

    # move tfstate from root file to module
    module_id = re.sub('\.*$', '', re.sub('^\.*', '', '.'.join(save_dir.split('/'))))
    cmd_exec(f'terraform state mv {terraform_resource_type_and_id} module.datamart.{resource_type}.{renamed_terraform_resource_id}')
    os.remove(f'{resource_type}_{trocco_datamart_id}.tf')
    print(f'finish processing: {renamed_terraform_resource_id}')

    # wait 3 seconds
    time.sleep(3)

# remove tfstate.backup
for file in glob.glob('*'):
    if re.search(r'terraform\.tfstate\..*backup', file):
        os.remove(file)

ライブラリのインポートとAPI KEYの設定

こちらは記載の通りです。

# import required libraries and set environment variables
import urllib, os, json, subprocess, re, glob, shutil, time
from dotenv import load_dotenv
load_dotenv('.env')

利用する関数の設定

シェルコマンドを実行して、その応答を利用するものがあるので、関数にして使いやすくしておきます。

def cmd_exec(cmd):
    res = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True).communicate()
    message = res[0].decode('utf-8')
    error_message = ''
    if message == '':
        error_message = res[1].decode('utf-8')
        print(error_message)
    return [message, error_message]

色々試しながら触ることもあると思うので、既存の設定を削除しつつ初期化するのをやりやすくしておきます。

def terraform_init(save_dir):
    if os.path.isdir('.terraform/'):
        shutil.rmtree('.terraform/')
    for file in glob.glob(os.path.join(save_dir, '*')):
        if not file.endswith('_provider.tf'):
            os.remove(file)
    for file in glob.glob('*'):
        if 'tfstate' in file:
            os.remove(file)
        if re.search(r'_[0-9]*\.tf', file):
            os.remove(file)
    if os.path.exists('.terraform.lock.hcl'):
        os.remove('.terraform.lock.hcl')
    print(cmd_exec('terraform init')[0])
    print(cmd_exec('terraform apply')[0])

stateファイルにあって、tfファイルに記載されてはいけないIDを削除する処理を作成しておきます。

def remove_label_id(match):
    return re.sub(r'id\s*=\s*[0-9]+\s*', '', match.group())

def remove_unnecessary_id(resource_config, trocco_datamart_id):
    # delete datamart id
    resource_config = resource_config.replace(f'\n    id                       = {trocco_datamart_id}', '')
    # delete label id
    resource_config = re.sub(r'(labels\s*=\s*\[.*?\])', remove_label_id, resource_config, flags=re.DOTALL)
    return resource_config

Terraformの初期化

下記を実行します。

save_dir = "./modules/datamart"
terraform_init(save_dir)

すると、何もない状態で初期化されます。

Initializing the backend...
Initializing modules...
- datamart in modules\datamart
Initializing provider plugins...
- Finding latest version of trocco-io/trocco...
- Installing trocco-io/trocco v0.1.2...
- Installed trocco-io/trocco v0.1.2 (self-signed, key ID 4AD358D3E1334E66)
Partner and community providers are signed by their developers.
If you'd like to know more about provider signing, you can read about it here:
https://www.terraform.io/docs/cli/plugins/signing.html
Terraform has created a lock file .terraform.lock.hcl to record the provider
selections it made above. Include this file in your version control repository
so that Terraform can guarantee to make the same selections by default when
you run "terraform init" in the future.

Terraform has been successfully initialized!

You may now begin working with Terraform. Try running "terraform plan" to see
any changes that are required for your infrastructure. All Terraform commands
should now work.

If you ever set or change modules or backend configuration for Terraform,
rerun this command to reinitialize your working directory. If you forget, other
commands will detect it and remind you to do so if necessary.


No changes. Your infrastructure matches the configuration.

Terraform has compared your real infrastructure against your configuration
and found no differences, so no changes are needed.

Apply complete! Resources: 0 added, 0 changed, 0 destroyed.

転送設定一覧を取得する

APIから転送設定の一覧を取得します。

# get datamart definitions
trocco_datamart_definitions_api = 'https://trocco.io/api/datamart_definitions?limit=5'
req = urllib.request.Request(trocco_datamart_definitions_api)
req.add_header('Authorization', f"Token {os.getenv('TROCCO_API_KEY')}")
res = urllib.request.urlopen(req)
datamart_definitions = json.load(res)['items']

また、取込済のデータマートIDもまとめておきます(初回は対象がありません)。

# list existing datamart definitions
existing_datamart_definitions = []
for file in glob.glob(os.path.join(save_dir, '*.sql')):
    existing_datamart_definitions.append(file.split('__')[-1].split('.sql')[0])

転送設定をループ処理で取り込む

リソースのインポートをするには、一意のリソースIDを付与したtfファイルの設定を記載しておく必要があるので、{resouce_type}_{trocco_datamart_id}で一意のIDとして利用します。

# process each datamart definitions
for definition in datamart_definitions:
    trocco_datamart_id = definition['id']
    if str(trocco_datamart_id) in existing_datamart_definitions:
        print(f'{resource_type} - {trocco_datamart_id} is already imported')
        continue

    # check data warehouse type
    if definition['data_warehouse_type'] == 'bigquery':
        resource_type = 'trocco_bigquery_datamart_definition'
    else:
        continue

    # create base config
    print(f'start processing: {resource_type} - {trocco_datamart_id}')
    terraform_resource_id = f'{resource_type}_{trocco_datamart_id}'
    add_config_base = f'''
resource "{resource_type}" "{terraform_resource_id}" {{

}}'''
    with open(f'{terraform_resource_id}.tf', 'a', encoding='utf-8', newline='\n') as f:
        f.write(add_config_base)

データ転送モードの場合は転送先のデータセット名とテーブル名、自由記述モードの場合はデータマート定義名を抽出し、取り込んだ設定を書き込む先のリソース名/ファイル名として利用します。

(書きながら、自由記述モードのデータマート定義名が日本語の場合にエラーとなる予感がしてしまったのですが、そのあたりはすみませんということで・・・)

    # import existing resource and show state
    terraform_resource_type_and_id = f'{resource_type}.{terraform_resource_id}'
    cmd_exec(f'terraform import {terraform_resource_type_and_id} {trocco_datamart_id}')
    show_res = cmd_exec(f'terraform state show {terraform_resource_type_and_id}')

    # adjust imported config
    state_config = show_res[0].replace(f'# {terraform_resource_type_and_id}:', '')
    if re.search(r'(?<=destination_dataset      = ")[^"]+', state_config) is not None:
        destination_dataset = re.search(r'(?<=destination_dataset      = ")[^"]+', state_config).group()
    else:
        destination_dataset = 'query'
    if re.search(r'(?<=destination_table        = ")[^"]+', state_config) is not None:
        destination_table = re.search(r'(?<=destination_table        = ")[^"]+', state_config).group()
    else:
        destination_table = definition['name']
    exclude_char_pattern = r'\(|\)|/|:|\[|\]|<|>|\*|\$'
    renamed_terraform_resource_id = f'{destination_dataset}__{destination_table}__{trocco_datamart_id}'.replace(' ', '_').replace('&', 'and')
    renamed_terraform_resource_id = re.sub(exclude_char_pattern, '', renamed_terraform_resource_id)

クエリの記載内容をsqlファイルに切り出し、tfファイルからfileとして参照する形に変更します。

    # create sql file
    query = re.sub(r'\n        ', '\n', re.search(r'(?<=query                    = <<-EOT\n        )(.*?)(?=\n    EOT)', state_config, re.DOTALL).group())
    with open(os.path.join(save_dir, f'{renamed_terraform_resource_id}.sql'), 'w', encoding='utf-8', newline='\n') as f:
        f.write(query)

    # write tf file config and rename tf file
    tf_config_tmp = re.sub(
        r'(?<=query                    = )(<<-EOT\n.*?EOT)(?=\n)',
        f'file("${{path.module}}/{renamed_terraform_resource_id}.sql")',
        remove_unnecessary_id(state_config, trocco_datamart_id),
        flags=re.DOTALL
    )
    tf_config = re.sub(
        f'\nresource "{resource_type}" "{terraform_resource_id}"',
        f'\nresource "{resource_type}" "{renamed_terraform_resource_id}"',
        tf_config_tmp
    )
    with open(os.path.join(save_dir, f'{terraform_resource_id}.tf'), 'w', encoding='utf-8', newline='\n') as f:
        f.write(tf_config)
    os.rename(os.path.join(save_dir, f'{terraform_resource_id}.tf'), os.path.join(save_dir, f'{renamed_terraform_resource_id}.tf'))

tfファイルは作り直しましたが、stateファイルで参照しているリソースIDとずれが発生しているので、リソースIDを変更して不要なファイルを削除します。

    # move tfstate from root file to module
    module_id = re.sub('\.*$', '', re.sub('^\.*', '', '.'.join(save_dir.split('/'))))
    cmd_exec(f'terraform state mv {terraform_resource_type_and_id} module.datamart.{resource_type}.{renamed_terraform_resource_id}')
    os.remove(f'{resource_type}_{trocco_datamart_id}.tf')
    print(f'finish processing: {renamed_terraform_resource_id}')

    # wait 3 seconds
    time.sleep(3)

# remove tfstate.backup
for file in glob.glob('*'):
    if re.search(r'terraform\.tfstate\..*backup', file):
        os.remove(file)

結果を確かめる

ここまでを実行すると、以下のような形でファイルが取り込まれます。

.
├── .terraform
│   ├── modules
│   │   └── modules.json
│   └── providers
│       └── registry.terraform.io
│           └── trocco-io
│               └── trocco
│                   └── 0.1.2
│                       └── windows_amd64
│                           ├── CHANGELOG.md
│                           ├── LICENSE
│                           ├── README.md
│                           └── terraform-provider-trocco_v0.1.2.exe
├── modules
│   └── datamart
│       ├── _provider.tf
│       ├── dataset_x__table_y__49474.sql
│       ├── dataset_x__table_y__49474.tf
│       ├── dataset_x__table_y__49475.sql
│       ├── dataset_x__table_y__49475.tf
│       ├── query__terraform_sample__49476.sql
│       └── query__terraform_sample__49476.tf
├── .env
├── .terraform.lock.hcl
├── bulk_import.ipynb
├── provider.tf
└── terraform.tfstate

terraform planを実行しても差分はありません。

module.datamart.trocco_bigquery_datamart_definition.dataset_x__table_y__49474: Refreshing state... [name=terraform_sample]
module.datamart.trocco_bigquery_datamart_definition.query__terraform_sample__49476: Refreshing state... [name=terraform_sample]
module.datamart.trocco_bigquery_datamart_definition.dataset_x__table_y__49475: Refreshing state... [name=terraform_sample]

No changes. Your infrastructure matches the configuration.

Terraform has compared your real infrastructure against your configuration
and found no differences, so no changes are needed.

取り込んだファイルは例えば以下のようになっています。クエリをSQLファイルとして切り出すことができたので、ここにSQLFluffを使ったLintをかけるというようなこともできそうですね。

dataset_x__table_y__49474.tf

resource "trocco_bigquery_datamart_definition" "dataset_x__table_y__49474" {
    bigquery_connection_id   = {bigquery_connection_id}
    destination_dataset      = "dataset_x"
    destination_table        = "table_y"
    is_runnable_concurrently = false
    name                     = "terraform_sample"
    query                    = file("${path.module}/dataset_x__table_y__49474.sql")
    query_mode               = "insert"
    write_disposition        = "truncate"
}

dataset_x__table_y__49474.sql
select
  col1,
  col2,
  col3
from
  `project-x.dataset_x.table_x`

さいごに

ということで、既存のデータマート定義を一括取り込みすることができました。今回はローカル環境の検証でもありまだできることはありそうで、運用フローやStateファイルの管理も含めて、引き続き考えていけるといいかと思っています。

4
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
4
2