はじめに
近年、クラウドネイティブな分散システムの構築・運用が一般化されつつあり、それに伴い可観測性(Observability)というワードが注目を集めていると思います。
分散システムに限らずですが、特にシステムが巨大になるにつれてパフォーマンスの問題やエラーが発生した際にどこが原因なのかを特定するのが困難になりがちです。
それらの問題が発生した際に迅速に原因を特定し対処するためには、アプリケーションやシステムの内部動作を可視化することが極めて重要です。
具体的には、Observabilityの3つの柱と呼ばれている以下のテレメトリデータを収集し、可視化することが重要です。
- ログ
- アプリケーションの実行結果やエラーの詳細を記録するもの
- メトリクス
- システムのパフォーマンスやリソース使用状況を統計情報として記録するもの
- トレース
- システム内での処理の流れ(いつ、どこで、なにが起きたのか)を時系列で記録するもの
本記事の内容について
先日、Pydanticのリポジトリを眺めていたところ、Pythonアプリケーションのテレメトリデータを収集・可視化するためのLogfireという可観測性プラットフォームの存在を知りました。
少し触ってみた感じですが、Logfireはシンプルで且つ簡単に導入できるライブラリです。
本記事では、Logfireの概要と、Logfireを使ってどのようにアプリケーションの内部動作を可視化できるかを紹介しようと思います。
また、Logfireの具体的な実装例を通じてObservabilityを体験できる内容となっているので、興味のある方は実際に手を動かしてみてください!
対象読者
- Logfireの概要を知りたい方
- LogfireでObservabilityを体験したい方
- Pythonアプリケーションのパフォーマンスやエラー情報を詳細に把握したい方
Logfireとは🔥
現在、Logfireはβ版として公開されており、無償で使える状態ですが、将来的には有償プランを提供する予定のようです。
触るなら今がチャンスです!
https://pydantic.dev/pricing
LogfireはPydanticチームが開発している可観測性プラットフォームです。
PydanticはPythonのデータバリデーションと設定管理のためのライブラリとして人気ですが、Logfireもそのシンプルさと使いやすさを継承していることが特徴のようです。
ドキュメントに詳しく載っているのですが、Logfireの主な特徴を以下に掻い摘んで列挙します。
シンプルな設定
- Logfireは設定が非常にシンプルです。
Pythonのアプリケーション内にたった数行のコードで導入できるので、複雑な設定に悩まされることなく簡単にObservabilityを実装できます。
OpenTelemetry統合
- LogfireはOpenTelemetryをラップして実装されており、一般的なPythonパッケージ(例えば、FastAPI、Django、SQLAlchemyなど)に対して自動instrumentingを提供してくれます。
そのため、これらの主要ライブラリを使用する際に追加の設定なしでテレメトリデータの収集が可能になります。
WebUIによるテレメトリデータの検索・可視化
Logfireが収集したテレメトリデータはLogfireのプラットフォーム上に収集されます。
Logfireプラットフォームでは、収集したテレメトリデータを可視化するための直感的なWebUIを利用することができます。↓
Logfireプラットフォームはインターネット経由でのみアクセス可能
Logfireプラットフォームはインターネット上にホストされており、現時点ではセルフホスト型のWebUIは提供されていません。
そのため、クローズドなオンプレミス環境等では利用できないですし、セキュリティやプライバシーに敏感な環境で使用する際には注意が必要です。
利用規約: https://docs.pydantic.dev/logfire/legal/terms_of_service/
PostgreSQLの標準SQLをサポート
Logfireプラットフォームに収集したデータはSQLクエリで照会できます。
また、LogfireはPostgreSQLで実装されているため、JSON演算子(->
や->>
)などの構文も使えます。
Logfireのセットアップ
ではここからLogfireの使い方を実例を交えて紹介します。
Logfireプラットフォームにアカウントを作成するためにはGithubアカウントが必要になります。
インストールおよびサインイン
まずはpipやpoetryなどの管理ツールを使ってlogfireをインストールします。
pip install logfire
インストール後、以下のコマンドを実行します。
logfire auth
上記を実行するとブラウザウィンドウが開くので、Logfireプラットフォームにサインインおよびログインします。
Welcome to Logfire! 🔥
Before you can send data to Logfire, we need to authenticate you.
Press Enter to open logfire.pydantic.dev in your browser...
Please open https://logfire.pydantic.dev/auth/device/XXXX in your browser to authenticate if it hasn't already.
Waiting for you to authenticate with Logfire...
Successfully authenticated!
Your Logfire credentials are stored in /Users/<user-name>/.logfire/default.toml
サインインすると、Logfireプラットフォーム上にGihubのアカウント名でOrganizationが作成されます。
※Organizationは、後述するプロジェクトやユーザーアカウントを管理するための基本的な単位です。
Logfireにプロジェクトを作る
新しい環境で初めてLogfireを使用するときは、プロジェクトというものを作成する必要があります。
プロジェクトは、データを収集する先の名前空間であり、箱のようなイメージです。
例えば、商用環境のデータはプロジェクトAに、検証環境のデータはプロジェクトBに送信するといった使い分けが可能になります。
なのでまずは、アプリケーション内で収集したデータをどのプロジェクトに送信するのかを指定する必要があります。
プロジェクトを作成する手順としては、適当にpythonファイルを作り(ここではsrc/app.py
)、以下のようにlogfireの処理を実装します。
import logfire
# Logfireライブラリを使用する際の初期設定を行うための関数を実行する
logfire.configure()
# 一般的なログ種別をサポートしている
logfire.info('Hello, {name}!', name='world')
logfire.warn("warn")
logfire.error("error")
logfire.exception("exception")
ちなみに上記の処理ですが、前述した3つのテレメトリデータのうち「ログ」を取得する処理に該当します。
その後、以下のように上記のコードを実行すると、新しいプロジェクトを作成することが求められます。
❯ python src/app.py
No Logfire project credentials found.
All data sent to Logfire must be associated with a project.
# まだプロジェクトがないので "n" を入力してEnter
Do you want to use one of your existing projects? [y/n] (y):
# プロジェクトを作るOrganizationを選択
To create and use a new project, please provide the following information:
Select the organization to create the project in [<organization-name>] (<organization-name>):
# 任意の名前を入力してプロジェクトを作る
Enter the project name (logfiretest): logfire-test
Project initialized successfully. You will be able to view it at: https://logfire.pydantic.dev/<organization-name>/logfire-test
Press Enter to continue:
Logfire project URL: https://logfire.pydantic.dev/<organization-name>/logfire-test
# ログが出力される。このログデータがlogfireプラットフォームの指定したプロジェクトに送信される
08:25:37.327 Hello, world!
08:25:37.328 warn
08:25:37.329 error
08:25:37.329 exception
ここで、上記の Logfire project URL: https://logfire.pydantic.dev/<organization-name>/logfire-test
にアクセスすると、 収集されたログデータがWebUIから閲覧できます🚀
以上でLogfireを使う準備が整いました。
Logfireでテレメトリデータを取得してみる
上記で実装したコードは単なるログデータを収集する処理なので、もう少し実用的な使い方を紹介します。
ですがその前に、Opentelemetryの重要な概念について理解が必要なので簡単に触れておきます。
OpentelemetryのTraceとSpanについてざっくり理解する
LogfireはOpenTelemetryをベースに実装されています。
よってLogfire(OpenTelemetry)を効果的に活用するためには、Trace(トレース)とSpan(スパン)という2つの概念を理解することが必要不可欠です。
※すでに知っている方はスキップしてください。
https://www.redhat.com/ja/blog/using-opentelemetry-and-jaeger-with-your-own-services/application
Trace
Traceは、実行された処理や操作がシステムを通過する際の全体的な経路を表すものです。
例えば、ユーザーがWebアプリケーションにリクエストを送信してから最終的にレスポンスがユーザーに返されるまでの一連の処理の流れを記録します。
Traceは1つ以上のSpanで構成されます。
Span
Spanは、Trace内の単一の操作を表します。
Traceは複数のSpanで構成されており、各Spanが特定の操作や処理を表します。Spanには一般的に以下のような情報が含まれます。
- 開始時間
- 終了時間
- 属性(例えば、データベースクエリの内容やHTTPリクエストの詳細など)
Logfireは以下のようにTraceをツリーで表示し、Trace内に含まれるSpanを可視化することができます。
Spanを収集する
Spanデータを収集するための簡単な例として、ユーザーが入力した番号をもとにリストから要素を取得する処理を用意しました。
import logfire
logfire.configure()
# Spanを計装する
with logfire.span('Accessing list element'): # 引数でSpan名を定義できる
numbers = [1, 2, 3, 4, 5]
# ユーザー入力
index = int(input('Enter the index of the element you want to access (0-4):'))
element = None
try:
element = numbers[index]
except IndexError:
logfire.exception("Index out of range", index=index, list_length=len(numbers))
logfire.info(
'Accessed element: {element} at index {index}',
element=element, index=index
)
Spanデータを収集するには、logfire.span()
コンテキストマネージャー(with句)を使用して対象の処理を囲みます。
これによって、with句内で発生したイベントや変数の情報をWebUIで閲覧することができます。
正常値を渡した場合
上記を実行し、リストのインデックス内の要素番号を指定すると、以下のようにLogfireのWebUIにSpan('Accessing list element'
)が表示されます。
また、Spanの +
をクリックすると、内部の logfire.info()
の情報が表示されます。
上記の情報を見ることで以下のことがわかります。
- 処理の開始〜終了までの所要時間(22.6msかかっている)
- 処理の詳細情報(ファイルパス/コード内で処理が実行された行数)
- 変数の値
異常値を渡した場合
リストのインデックス内に存在しない要素番号を指定すると、以下のようにExceptionログがWebUIに表示されます。
Tracebackも表示されるので、いつ、どこで、なにが起きたのかが一目瞭然ですね。
Spanをネストする
Spanはネスト構造にすることができます。これによって特定の操作内でさらに詳細な操作を追跡することができます。
import logfire
logfire.configure()
# 外側のSpanを計装
with logfire.span('Process user input'):
input_data = input('Enter a string to process:')
processed_data = None
# 内側のSpanを計装
with logfire.span('Reverse string'):
try:
processed_data = input_data[::-1]
except Exception as e:
logfire.exception("Error while reversing string", error=str(e))
logfire.info(
'Processed data: {processed_data}',
processed_data=processed_data
)
上記を実行すると、WebUIには以下のように外側のSpan内に内側のSpanがネスト構造になって表示されます。
関数にSpanを定義する
関数にlogfire.instrument()
デコレータを実装すると、関数内の処理をまるっとSpan対象にすることができます。
以下はmain()
から実行される各関数にlogfire.instrument()
を実装した場合の例です。
import logfire
import requests
import urllib.request, urllib.error
logfire.configure()
# 渡されたURLにGETリクエストを投げる
@logfire.instrument() # 引数なしの場合、関数名がSpan名になる
def fetch_data_from_api(api_url):
response = requests.get(api_url)
response.raise_for_status()
response_data = response.json()
result = json.dumps(response_data, indent=2)
logfire.info(f"Data fetched successfully: {result}")
return result
# 存在するURLかどうかチェックする
@logfire.instrument()
def check_api_url(url):
f = urllib.request.urlopen(url)
logfire.info("OK:" + url )
f.close()
@logfire.instrument("start main")
def main():
try:
input_url = input("Enter a API URL: ")
check_api_url(input_url)
result = fetch_data_from_api(input_url)
logfire.info("Main function completed successfully")
except Exception as e:
logfire.exception("An error occurred in main", error=str(e))
if __name__ == "__main__":
main()
上の処理を実行すると、WebUIには以下のとおり記録されます。
上記を言葉で説明すると、
処理の起点となる main
関数が親Spanとなり、main
関数から呼び出される check_api_url
と fetch_data_from_api
の関数が子Spanとして記録されます。
これは、複数の関連するSpanが時系列で繋がり、1つのTraceを形成している状態を表しています。
Metricsを収集する
Metricsは、システムやアプリケーションのパフォーマンスや状態を定量的に測定したデータです。
特定の時点での測定値やタイムスタンプ、関連するメタデータ(イベント名、イベント値など)で構成されます。
Logfireで取得可能なメトリクスタイプは以下のとおりです。
メトリクスタイプ | サマリー |
---|---|
Counter | 増加のみを記録するメトリクス 例:リクエスト数、エラー数 |
Histogram | 一連の値の分布を記録するメトリクス 例:リクエストの遅延時間、データの項目数 |
Gauge | 現在の状態を記録するメトリクス 例:メモリ使用量、アクティブな接続数 |
UpDownCounter | 増減を記録するメトリクス 例:アクティブユーザー数の増減、カートに保存された個数の増減 |
Callback Metrics | 時間間隔に基づいて自動的に更新されるメトリクス |
System Metrics | CPU,メモリ,ディスク,ネットワークなどのメトリクス ※別途、ライブラリのインストールが必要 |
詳細な使い方は以下の公式ドキュメントを参照してください。
ここでは、上記の中からいくつか抜粋してMetricsを収集し、WebUI上で可視化してみます。
UpDownCounter
ここでは、ある時点における測定対象の増減を記録するUpDownCounter
を使ってみます。
以下の例は、ユーザーがオンラインストアでカートに商品を追加および削除する操作を模擬したのものです。
import logfire
logfire.configure()
# UpDownCounterの設定
cart_items_counter = logfire.metric_up_down_counter(
'cart_items_counter',
unit='1',
description='Number of items in the cart'
)
def add_item_to_cart():
cart_items_counter.add(1)
def remove_item_from_cart():
cart_items_counter.add(-1)
add_item_to_cart()
add_item_to_cart()
add_item_to_cart()
remove_item_from_cart()
add_item_to_cart()
上記実行後、WebUIの Explorer
を開くと、画面下にSQLエディタの欄があります。
そこで以下画面のようにSQL文を書き、 Run Query
を実行すると、収集したUpDownCounterのメトリクスが表示されます。
select
recorded_timestamp, metric_name, metric_type, unit, start_timestamp, metric_description, scalar_value
from metrics
where metric_name = 'cart_items_counter'
order by recorded_timestamp;
※以下画面は、処理を何回か実行した例です。
scalar_value
カラムにカート内のデータ数が表示されています。
Histogram
次に、Histogramを使ってリクエストの応答時間を収集してみます。
以下はユーザー「user123」が、3つのリクエストを実行することを想定した例になります。
import logfire
import time
import random
logfire.configure()
# Histogramの設定
response_time_histogram = logfire.metric_histogram(
'response_time_histogram',
unit='milliseconds',
description='Response time of requests'
)
def handle_request(user_id):
start_time = time.time()
logfire.info("start request", user_id=user_id)
time.sleep(random.uniform(0.1, 0.9)) # 0.1〜0.9秒のランダムな遅延
logfire.info("end request", user_id=user_id)
end_time = time.time()
# Histogramの収集
# attributesを使うと任意の属性を追加することができる(ここではuser_id)
response_time_histogram.record(
(end_time - start_time) * 1000, attributes={'user_id': user_id}
)
user_id = "user123"
handle_request(user_id)
handle_request(user_id)
handle_request(user_id)
上記実行後、WebUIで以下のようにSQLクエリを実行します。
select
recorded_timestamp, metric_name, metric_type, unit, start_timestamp, metric_description,
histogram_count, histogram_min, histogram_max
from metrics
where metric_name = 'response_time_histogram'
order by recoreded_timestamp;
上記結果を見ると以下のようなことがわかります。
-
histogram_count
-
user123
から3つのリクエストが実行された
-
-
histogram_min
- 記録されたすべてのリクエスト応答時間の中で最も短い時間は「334.39016342163086ミリ秒」
-
histogram_max
- 記録されたすべてのリクエスト応答時間の中で最も長い時間は「874.6030330657959ミリ秒」
OpenTelemetry統合
次に、Logfireが提供しているOpenTelemetry統合の機能を簡単に紹介しようと思います。
ここでは、FastAPI + Pydantic + OpenAIのOpentelemetry統合機能を使ってテレメトリデータを収集してみます。
まず、必要な以下のパッケージをインストールします。
pip install fastapi openai 'logfire[fastapi]'
また、環境変数にOpenAIのAPIキーを定義し、openaiライブラリがAPIキーを読み込める状態にしておく必要があります。
準備できたら以下のコードを実装します。
import openai
import logfire
from pydantic import BaseModel
from fastapi import FastAPI
class RequestData(BaseModel):
query: str
# pydanticの計装
class ResponseData(
# PydanticPluginでResponseDataモデルを通過するすべての処理を収集する
# tagsをつけるとWebUIでタグが表示されてわかりやすくなる
BaseModel,
plugin_settings={"logfire": {"record": "all", "tags": ("ResponseData")}}
):
model: str
id: str
content: str
app = FastAPI()
client = openai.Client()
logfire.configure()
logfire.instrument_fastapi(app) # FastAPIの計装
logfire.instrument_openai(client) # OpenAIの計装
@app.post("/questions", response_model=ResponseData)
def handle_question(message: RequestData) -> ResponseData:
response = client.chat.completions.create(
model = "gpt-3.5-turbo",
messages = [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": message.query},
],
)
result = ResponseData(
model=response.model,
id=response.id,
content=response.choices[0].message.content
)
return result
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
ポイントは、以下の部分です。この数行をコード内に実装するだけでテレメトリデータを収集することができます。
# pydanticの計装
class ResponseData(
BaseModel,
plugin_settings={"logfire": {"record": "all", "tags": ("ResponseData")}}
):
model: str
id: str
content: str
logfire.instrument_fastapi(app) # FastAPIの計装
logfire.instrument_openai(client) # OpenAIの計装
上記を実行し、curlなどでFastAPIにリクエストを投げます。
❯ curl -X 'POST' \
'http://localhost:8000/questions' \
-H 'accept: application/json' \
-H 'Content-Type: application/json' \
-d '{
"query": "Open AIの 使い方を日本語で100文字以内で回答してください"
}'
# レスポンス
{"model":"gpt-3.5-turbo-0125","id":"chatcmpl-923H397CvFW0fUeaWeUv7b9L4rHOh1","content":"OpenAIを使う際は、APIにアクセスし、プログラムに組み込んで利用します。AIモデルを選択し、入力データを提供し、出力結果を取得することができます。ドキュメントやサンプルコードを参考に、効果的に活用しましょう。"}%
その後、LogfireのWebUIに移動し、Live
-> Custom levels
から debug
にチェックをつけます。
この状態で該当の各SpanをクリックするとFastAPI、Pydantic、OpenAIの詳細なテレメトリデータが閲覧できます。
また、以下のように Live
画面の上部にもSQLエディタが備わっています。
収集したテレメトリデータから特定の値を持つLogやSpanをSQLを使って抽出することができます。
以下は/questions
エンドポイントに関するデータを抽出しています。
Dashboard機能
最後に、LogfireのDashboard機能を紹介します。
Logfireでは、以下のようなWeb Service Dashbordを標準機能として提供しています。
他にもSystem Metrics用のDashboardや、ユーザー独自のカスタムDashboardも作ることができます。
カスタムDashboardでは、SQLで抽出したテレメトリデータを円グラフやテーブル、時系列チャートなどにプロットできます。
Dashboard機能の詳細は以下を参照してください。
さいごに
以上、Logfireを使ってPythonアプリケーションでObservabilityを体験してみようの紹介記事でした。
Logfireは、OpenTelemetryの複雑な設定や処理を簡素化し、Pythonアプリケーションに対して簡単に計測機能を組み込むことができる優れたツールだと思いました。
なお、本記事では紹介していない機能もまだまだあるので、興味のある方は公式ドキュメントを参考に色々試してみてください🔥
参考