はじめに
みなさんdbtは使ってますか?
データパイプラインを長く運用していると必ず発生する問題としてドキュメントが腐る問題があります。今回は最近リリースされたAWS Bedrockのknowledge-baseを用いて自動でdbt docsの中身が生成できないかを検証してみたいと思います。
この記事はdbt Advent Calendar 2023の3日目の記事です。
概要
大きな流れとしては以下になります
- bedrockのknowledge baseにmanifest.jsonを登録
- knowledge baseを元にbedrockのagentを構築
- pythonなどでagentに問い合わせをするスクリプトを実装
dbtのprojectはサンプル用のjaffle_shopを利用します。
Amazon Bedrock
AWSが提供するLLMサービスです。AWS保有の大規模モデルに加えてAnthropicやllama2などのモデルも使用ができます。今回はAnthropicを使用しました。また現在us-east-1,us-west-2リージョンのみで利用可能です。
knowledge base
S3に配置されたデータをAmazon OpenSerchなどのベクターDBに格納し、それを元に検索拡張生成(RAG)を行うことができます。今回はdbtのメタデータがあるmanifest.jsonを用いてdbt modelのメタデータを検索してみます。
手順
manifest.jsonの変換
まずdbtのメタデータが格納されているmanifest.jsonをbedrockのknowledge baseに登録します。knowledge baseにはjson形式のデータは読み込みができないので、今回は必要な情報だけをcsvに変換してみます。
manifest.jsonは以下のような形で生成されます。とりあえず必要そうなデータだけ適当に抽出してみます。
...
},
"nodes": {
"model.jaffle_shop.customers": {
"database": "DB_NAME",
"schema": "PUBLIC",
"name": "customers",
"resource_type": "model",
"package_name": "jaffle_shop",
"path": "customers.sql",
"original_file_path": "models/customers.sql",
"unique_id": "model.jaffle_shop.customers",
"fqn": [
"jaffle_shop",
"customers"
],
"alias": "customers",
"checksum": {
"name": "sha256",
"checksum": "..."
},
"config": {
"enabled": true,
"alias": null,
"schema": null,
"database": null,
...
},
"tags": [],
"description": "This table has basic information about a customer, as well as some derived facts based on a customer's orders",
"columns": {
"customer_id": {
"name": "customer_id",
"description": "This is a unique identifier for a customer",
"meta": {},
"data_type": null,
"constraints": [],
"quote": null,
"tags": []
},
...
}
},
"meta": {},
...
"deferred": false,
"unrendered_config": {
"materialized": "table"
},
"created_at": 1701355701.124294,
"relation_name": "DB_NAME.PUBLIC.customers",
"raw_code": "with customers as (\n\n select * from {{ ref('stg_customers') }}\n\n),\n\norders as (\n\n select * from {{ ref('stg_orders') }}\n\n),\n\npayments as (\n\n select * from {{ ref('stg_payments') }}\n\n),\n\ncustomer_orders as (\n\n select\n customer_id,\n\n min(order_date) as first_order,\n max(order_date) as most_recent_order,\n count(order_id) as number_of_orders\n from orders\n\n group by customer_id\n\n),\n\ncustomer_payments as (\n\n select\n orders.customer_id,\n sum(amount) as total_amount\n\n from payments\n\n left join orders on\n payments.order_id = orders.order_id\n\n group by orders.customer_id\n\n),\n\nfinal as (\n\n select\n customers.customer_id,\n customers.first_name,\n customers.last_name,\n customer_orders.first_order,\n customer_orders.most_recent_order,\n customer_orders.number_of_orders,\n customer_payments.total_amount as customer_lifetime_value\n\n from customers\n\n left join customer_orders\n on customers.customer_id = customer_orders.customer_id\n\n left join customer_payments\n on customers.customer_id = customer_payments.customer_id\n\n)\n\nselect * from final",
"language": "sql",
"refs": [
{
"name": "stg_customers",
"package": null,
"version": null
},
...
],
"sources": [],
"metrics": [],
"depends_on": {
"macros": [],
"nodes": [
"model.jaffle_shop.stg_customers",
"model.jaffle_shop.stg_orders",
"model.jaffle_shop.stg_payments"
]
},
...
},
以下のpythonを実行します(chatgptに作ってもらいました)
import json
import csv
import os
def read_json_file(file_path):
"""Reads a JSON file and returns its content."""
with open(file_path, 'r', encoding='utf-8') as file:
return json.load(file)
def extract_csv_data(json_data):
"""Extracts data for CSV from the JSON data."""
return [
{
'database': node_info['database'],
'schema': node_info['schema'],
'name': node_info['name'],
'resource_type': node_info['resource_type']
}
for node_id, node_info in json_data['nodes'].items()
]
def write_to_csv(file_path, data):
"""Writes data to a CSV file."""
with open(file_path, 'w', newline='', encoding='utf-8') as file:
writer = csv.DictWriter(file, fieldnames=['database', 'schema', 'name', 'resource_type'])
writer.writeheader()
writer.writerows(data)
def main():
base_path = os.path.dirname(os.path.abspath(__file__))
json_file_path = os.path.join(base_path, '..', 'target', 'manifest.json')
csv_file_path = os.path.join(base_path, '..', 'target', 'output.csv')
json_data = read_json_file(json_file_path)
csv_data = extract_csv_data(json_data)
write_to_csv(csv_file_path, csv_data)
if __name__ == '__main__':
main()
すると以下のようなcsvが出力されます。
database,schema,name,resource_type,tags,description,raw_code
DB_NAME,PUBLIC,customers,model,[],"This table has basic information about a customer, as well as some derived facts based on a customer's orders",<長いSQL>
DB_NAME,PUBLIC,orders,model,[],"This table has basic information about orders, as well as some derived facts based on payments",<長いSQL>
DB_NAME,PUBLIC,stg_customers,model,[],,<長いSQL>
DB_NAME,PUBLIC,stg_payments,model,[],,<長いSQL>
DB_NAME,PUBLIC,stg_orders,model,[],,<長いSQL>
DB_NAME,PUBLIC,raw_customers,seed,[],,<長いSQL>
DB_NAME,PUBLIC,raw_orders,seed,[],,<長いSQL>
DB_NAME,PUBLIC,raw_payments,seed,[],,<長いSQL>
...
これをknowledge baseが読み込むS3にアップロードします。
bedrockのmodelの準備
bedrock側ではデフォルトではAWS Titanしか使えませんが、knowledge baseのテストにAnthropicが必要であることと、日本語にも対応しているそうなのでAnthropic Claudeを使いたいと思います。model accessのページからaccessを有効化します。使用用途などを聞かれますが、割と適当に書いても通りました。
knowlegde baseの構築
AWSのコンソール画面からknowledge baseの構築を行なっていきます。以下の情報を順次入力していきます。
- csvをアップロードしたS3のURL
- Embeddings model
- 現在Titan Embeddings G1 - Textのみ
- Vector database
- AWS OpenSerchを選択しました
Data Sourceをsyncした後、knowledge baseのテストが可能です。Testタブから適当なdbt modelの説明を求めてみます。
Claude v2.1だととりあえずそれっぽい返答をしていることがわかります(v2.0だとダメでした)。ymlに記載したdbt modelのdescriptionをそのまま出力しているようにも見えますが、、、とりあえずこれでよしとします。
agentの構築
OpenAPI schemaの作成
agentのapi使用を決めるためにスキーマ情報をS3にアップロードします。APIのcallはboto3で行うので特に使用しないのですが、以下のように設定します。
{
"openapi": "3.0.0",
"info": {
"title": "get model infomation API",
"version": "1.0.0",
"description": "get dbt model information from manifest.json."
},
"paths": {
"/model/{modelName}": {
"get": {
"summary": "get dbt model infomation.",
"description": "get dbt model information from manifest.json.",
"operationId": "getModelDescription",
"responses": {
"200": {
"content": {
"application/json": {
"schema": {
"type": "array",
"items": {
"type": "object",
"properties": {
"modelName": {
"type": "string"
},
"description": {
"type": "string"
}
}
}
}
}
}
}
}
}
}
}
}
lambdaの設定
Bedrockの実行時のAPI responseをカスタマイズするlambdaが必要になるのでこちらも設定します。こちらはドキュメントから特に変えていないです。
def lambda_handler(event, context):
response_body = {
'application/json': {
'body': "sample response"
}
}
action_response = {
'actionGroup': event['actionGroup'],
'apiPath': event['apiPath'],
'httpMethod': event['httpMethod'],
'httpStatusCode': 200,
'responseBody': response_body
}
session_attributes = event['sessionAttributes']
prompt_session_attributes = event['promptSessionAttributes']
api_response = {
'messageVersion': '1.0',
'response': action_response,
'sessionAttributes': session_attributes,
'promptSessionAttributes': prompt_session_attributes
}
return api_response
agentの構築
bedrockのコンソールからagentの構築を行います。今回はインタラクティブな実行は想定していないのですがuser inputをenableにした時の方が返答が返ってきたのでenableにしました。またWorking draftとしてLLMに事前情報をinputする必要があります。今回は以下のように書いてみました。
You are data steward using dbt. You assist users with their inquiries about the data pipeline.
使用modelはClaude v2です、v2.1はcoming soonでした
こちらも同じくtestが可能です。適当にプロンプトを入れてみます。日本語だとうまくいかなかったので英語にしました。
pythonでmarkdown形式で出力
これをboto3のクライアントから実行し、dbt docs用のmdファイルに書き出します。実装は以下のaws-sampleを参考にさせていただきました。
import uuid
import boto3
import logging
import os
import argparse
# Setup logging
logging.basicConfig(format='[%(asctime)s] p%(process)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s',
level=logging.INFO)
logger = logging.getLogger(__name__)
# Constants and configuration
ENDPOINT_URL = None
AGENT_ID = '...'
AGENT_ALIAS_ID = '...'
REGION_NAME = 'us-east-1'
def setup_client():
""" Sets up the boto3 client for Bedrock agent. """
client = boto3.client("bedrock-agent-runtime", endpoint_url=ENDPOINT_URL, region_name=REGION_NAME)
return client
def invoke_agent(client, input_prompt):
""" Invokes the Bedrock agent with given input text. """
session_id = str(uuid.uuid1()) # random identifier
enable_trace = True
response = client.invoke_agent(
inputText=input_prompt,
agentId=AGENT_ID,
agentAliasId=AGENT_ALIAS_ID,
sessionId=session_id,
enableTrace=enable_trace
)
return response
def process_response(response):
""" Processes the response from the agent. """
event_stream = response['completion']
model_description = ""
try:
for event in event_stream:
if 'chunk' in event:
data = event['chunk']['bytes']
logger.info(f"Final answer ->\\n{data.decode('utf8')}")
model_description = data.decode('utf8')
elif 'trace' in event:
continue
else:
raise Exception("unexpected event.", event)
except Exception as e:
raise Exception("Error in processing response: ", e)
return model_description
def write_to_markdown(model_description, model_name):
""" Writes the model description to a markdown file. """
output_text = f"""
{{% docs {model_name}_bedrock %}}
以下の文章はAWS Bedrockにより自動生成されたものです。
---
{model_description}
{{% enddocs %}}
"""
file_path = f"{os.path.dirname(__file__)}/../../docs/{model_name}_bedrock.md"
with open(file_path, 'w', encoding='utf-8') as file:
file.write(output_text)
def parse_arguments():
""" Parses command line arguments. """
parser = argparse.ArgumentParser(description='Process model name.')
parser.add_argument('model_name', type=str, help='The name of the model to process')
return parser.parse_args()
def main():
""" Main function to execute the script. """
args = parse_arguments()
model_name = args.model_name
client = setup_client()
input_prompt = f"{model_name}について教えて"
response = invoke_agent(client, input_prompt)
model_description = process_response(response)
write_to_markdown(model_description, model_name)
if __name__ == "__main__":
main()
これでmodel名を引数として実行すると、以下のようなmarkdownが生成されます。
$ python test.py customers
{% docs customers_bedrock %}
以下の文章はAWS Bedrockにより自動生成されたものです。
---
すみません、customersに関する情報を提供できる機能はありません。この質問に完全に答えるための十分な情報がありません。
{% enddocs %}
customersのdbt modelは存在するのですが検索ができていませんね。Claude v2.1がリリースされるまでは正確なレスポンスは難しそうです。プロンプトが適当すぎるので調整したらうまくいくかも?とは思うのですが、自分の手に負えない気がしてきたので、ここで終わらせることにします
おわりに
Amazon Bedrockを使ってmanifest.jsonからdocsの生成を行ってみました。情報はかなり適当に与えたのですが、Claude v2.1だとそれっぽい返答が返ってきたので驚きです。精度は怪しいですが、modelの精度が上がってRAGの知見が溜まってくればいずれdbt docsの更新の一部をLLMに任せられる日が来るのではないかと期待しています。
また、LLM周りは完全に門外漢なのでもっとこうした方がいいなどのアドバイスがあれば是非いただけますと幸いです。