はじめに
S3+DuckDB+GrafanaでQiita記事メトリクスのダッシュボードを作成するシリーズの第2回です。
コストや制約条件からアーキテクチャを議論した前回の内容をもとに、実際に環境を構築していきます。今回はインフラ周りの構築とデータ収集の設定まで進めます。
前回の記事はこちら。
基本的な構築手順は先人の方の記事にも詳しく書かれていますので、あわせてご参照ください。本記事では要点や差分などを中心にお伝えします。
本記事の差分としては、CDKではなくマネジメントコンソールを操作している点、Grafanaを直接ではなくDocker Composeで動かしている点、認証にCognitoによるOAuthを使用している点などがあります。
構築作業はAmazon Qに支援してもらいました。一部の設定ファイルやスクリプトはAmazon Qに手伝ってもらったものです。
アーキテクチャのおさらい
再掲になりますが、アーキテクチャは以下の図の通りです。
GitHub ActionsによりQiitaのAPIから取得した記事メトリクスはS3に保管されます。GrafanaとDuckDBはEC2インスタンス上で稼働し、S3上のデータをクエリして可視化します。
その他特色としては、以下の通りです。
- Cognitoによる認証がある
- インスタンスがALBの背後に設置されている
- GitHub ActionsからのアップロードにはOIDC認証を利用している
これから構築する構成が完成すると、https://[ドメイン]/grafana/ にアクセスしてログインすることで、Grafanaのダッシュボードが表示されます。
環境の構築
それでは環境を構築していきましょう。今回はマネジメントコンソールで操作します。
特に言及していない設定項目は初期値を使用します。
インフラの構築
インフラ周りの必要なリソースを構築していきます。作るものと作らないもの(あらかじめ用意されていると想定するもの)は以下の通りです。
作るもの
- S3バケット
- IAMロール
- IAM IDプロバイダ
- セキュリティグループ
- ALB
- EC2インスタンス
- Cognitoユーザープール
作らないもの
- VPC(
vpc-grafana) - サブネット
- インターネットゲートウェイ
- ACM証明書
- ドメイン(
example.com)
S3バケット
S3バケットは、アップロードされるメトリクスデータの保管庫となります。本記事では、以下の通りにバケットを作成します。
| 項目 | 設定値 |
|---|---|
| バケット名 | article-metrics-bucket |
| ストレージクラス | 標準 |
IAMロール
EC2インスタンス用(GrafanaEC2Role)とGitHub ActionsのOIDC認証用(GitHubActionsMetricsRole)の2つのIAMロールを作成します。
EC2インスタンス用IAMロール
EC2インスタンスに割り当てるIAMロール(GrafanaEC2Role)を以下の通り作成します。EC2インスタンス上で稼働するGrafana及びDuckDBがS3上のオブジェクトにアクセスできることを考慮して権限を割り当てます。
ポリシー一覧
| ポリシー名 | 種類 |
|---|---|
AmazonSSMManagedInstanceCore |
AWSマネージドポリシー |
AmazonSSMPatchAssociation |
AWSマネージドポリシー |
S3QiitaMetricsReadOnly |
カスタマーインラインポリシー |
カスタマーインラインポリシー(S3QiitaMetricsReadOnly)の設定内容
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:GetObject*",
"s3:ListBucket"
],
"Resource": [
"arn:aws:s3:::article-metrics-bucket/*",
"arn:aws:s3:::article-metrics-bucket"
]
}
]
}
信頼関係の設定内容
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "ec2.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
}
最小権限の付与が望ましいですが、ここでは厳密ではありません。
GitHub Actions用IAMロール
GitHub Actions用のIAMロール(GitHubActionsMetricsRole)を以下の通り作成します。GitHub ActionsがS3にファイルをアップロードする時に、OIDC(Open ID Connect)を使用して認証することで、アクセスキーやシークレットキーを使う必要がなくなります。
GitHubとAWS間のOIDC認証に関してはこちらの記事が詳しいです。
ポリシー一覧
| ポリシー名 | 種類 |
|---|---|
GitHubActionsS3MetricsPolicy |
カスタマーインラインポリシー |
カスタマーインラインポリシー(GitHubActionsS3MetricsPolicy)の設定内容
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:PutObject",
"s3:ListBucket"
],
"Resource": [
"arn:aws:s3:::article-metrics-bucket",
"arn:aws:s3:::article-metrics-bucket/*"
]
}
]
}
信頼関係の設定内容
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Federated": "arn:aws:iam::<YOUR_ACCOUNT_ID>:oidc-provider/token.actions.githubusercontent.com"
},
"Action": "sts:AssumeRoleWithWebIdentity",
"Condition": {
"StringEquals": {
"token.actions.githubusercontent.com:aud": "sts.amazonaws.com"
},
"StringLike": {
"token.actions.githubusercontent.com:sub": [
"repo:<YOUR_ACCOUNT_NAME>/<YOUR_REPOSITORY_NAME>:*"
]
}
}
}
]
}
<YOUR_ACCOUNT_ID>、<YOUR_ACCOUNT_NAME>、<YOUR_REPOSITORY_NAME>はお使いの環境に合わせてください。
IAM IDプロバイダ
OIDC認証を実現するために、IAMでIDプロバイダを作成します。
IDプロバイダの設定内容
| 項目 | 設定値 |
|---|---|
| プロバイダのタイプ | OpenID Connect |
| プロバイダのURL | https://token.actions.githubusercontent.com |
| 対象者 | sts.amazonaws.com |
セキュリティグループ
セキュリティグループは、ALB用(sg-alb)とEC2インスタンス用(sg-ec2)を作成します。社内からのアクセスに限定し、それぞれに以下の通りルールを設定します。
ALB用セキュリティグループ(sg-alb)
| 方向 | タイプ | プロトコル | ポート範囲 | 送信元/送信先 | 説明 |
|---|---|---|---|---|---|
| インバウンド | HTTPS | TCP | 443 | (当社環境のIPアドレス) | 社内からのアクセスに制限 |
| アウトバウンド | すべてのトラフィック | すべて | すべて | 0.0.0.0/0 | 全ての送信 |
EC2インスタンス用セキュリティグループ(sg-ec2)
| 方向 | タイプ | プロトコル | ポート範囲 | 送信元/送信先 | 説明 |
|---|---|---|---|---|---|
| インバウンド | カスタムTCP | TCP | 3000 | sg-alb |
ALBからのアクセスに制限 |
| アウトバウンド | すべてのトラフィック | すべて | すべて | 0.0.0.0/0 | 全ての送信 |
Grafanaは3000番ポートで稼働します。
EC2インスタンス
EC2インスタンスは、GrafanaとDuckDBが動作する基盤となります。本記事では、以下の設定でインスタンスを作成します。
| 項目 | 設定値 |
|---|---|
| 名前 | ec2-grafana |
| インスタンスタイプ | t3.small |
| OS | Amazon Linux 2023 |
| AMI名 | al2023-ami-2023.7.20250623.1-kernel-6.1-x86_64 |
| ストレージ | gp3 20GB |
| セキュリティグループ | sg-ec2 |
| IAMロール | GrafanaEC2Role |
ALB
ALBは、GrafanaでCognitoと連携してユーザー認証する際にリダイレクト先のURIにHTTPSを使用する必要があるため設置します。将来的に他の用途にも流用できるように、Grafanaへのアクセスはパス/grafanaに割り当てることにします。
なお、Cognitoによる認証は、ALBではなくGrafanaと直接連携することにします。
今回のようにサブパスを割り当てるとGrafanaで追加の設定が必要になります。
ALB基本設定
| 項目 | 設定値 |
|---|---|
| 名前 | alb-grafana |
| スキーム | Internet-facing |
| IPアドレスタイプ | お好み |
| VPC | vpc-grafana |
| セキュリティグループ | sg-alb |
リスナー設定
| プロトコル | ポート | SSL証明書 |
|---|---|---|
| HTTPS | 443 | ACM証明書 |
リスナールール設定
| 優先度 | 条件 | アクション |
|---|---|---|
| 1 | パスパターン: /grafana/*
|
認証アクションはオフ ターゲットグループ( tg-grafana)へ転送 |
ターゲットグループ設定
| 項目 | 設定値 |
|---|---|
| 名前 | tg-grafana |
| ターゲットタイプ | インスタンス |
| プロトコル | HTTP |
| ポート | 3000 |
| VPC | vpc-grafana |
| ヘルスチェックパス | /api/health |
| ターゲット | ec2-grafana |
Cognitoユーザープール
Grafanaのユーザーを管理するCognitoのユーザープールは以下の通り設定します。パスキーによる認証とセルフサインアップを利用できるようにします。
ユーザープールの作成
「ユーザープールを作成(アプリケーションのリソースを設定する)」画面で以下の項目を入力し、ユーザープールを作成します。
| 項目 | 設定値 |
|---|---|
| アプリケーションタイプ | 従来のウェブアプリケーション |
| アプリケーションに名前を付ける | grafana-app |
| サインイン識別子のオプション | メールアドレス |
| 自己登録を有効化 | 有効 |
| サインアップのための必須属性 |
ウィザードに従うと、ユーザープールとその中にアプリケーションが作成されます。ここで指定したアプリケーション名はアプリケーションクライアント名に使用されます。ユーザープール名は後から変更できます。
ユーザープール作成後の設定修正
ユーザープールが作成された後に設定を個別に変更します。
アプリケーションクライアント>アプリケーションクライアント情報を編集
| 項目 | 設定値 |
|---|---|
| 認証フロー |
ALLOW_USER_AUTH,ALLOW_USER_PASSWORD_AUTH,ALLOW_USER_SRP_AUTH, ALLOW_REFRESH_TOKEN_AUTH
|
アプリケーションクライアント>マネージドログインページの設定
| 項目 | 設定値 |
|---|---|
| 許可されているコールバックURL | https://example.com/grafana/login/generic_oauth |
| 許可されているサインアウトURL | https://example.com/grafana/login |
| OpenID Connectのスコープ |
openid, email, profile, aws.cognitosignin.user.admin
|
サインイン>選択ベースのサインインのオプションを編集
| 項目 | 設定値 |
|---|---|
| その他の選択肢 | パスキー |
サインイン>多要素認証(MFA)を編集
| 項目 | 設定値 |
|---|---|
| MFAの強制 | オプションのMFA |
| MFAの方法 | Authenticatorアプリケーション |
ユーザーグループの作成
Grafanaにおけるユーザーの権限管理はCognitoのユーザーグループを使用するので、管理者用と編集者用にユーザーグループを作成します。
管理者または編集者としたいユーザーを各ユーザーグループに追加することで、Grafana上で設定を変更することなく権限を付与できます。
| 項目 | 設定値 |
|---|---|
| グループ名 | grafana-admin |
| 説明 | Grafana管理者グループ |
| 項目 | 設定値 |
|---|---|
| グループ名 | grafana-editor |
| 説明 | Grafana編集者グループ |
閲覧者はユーザーグループに属しません。
アプリケーションの設定
Grafanaの構築とDuckDBプラグインの導入を進めます。
使用するツールのバージョンは以下の通りです。
| ツール | バージョン |
|---|---|
| Docker | 25.0.13 |
| Docker Compose | 2.40.2 |
| Grafana | Enterprise版 12.0.6
|
| DuckDBプラグイン | 0.3.1 |
一連の作業は、grafanaディレクトリの配下で作業を進めます。作業後のディレクトリ構造のイメージは以下の通りです。
├── grafana
│ ├── .env
│ ├── docker-compose.yml
│ ├── grafana.ini
│ └── plugins
│ └── motherduck-duckdb-datasource
│ ├── CHANGELOG.md
│ ├── LICENSE
│ ├── README.md
(省略)
│ ├── motherduck-duckdb-datasource
│ └── plugin.json
Grafanaの設定
本記事ではGrafanaはDocker Composeを使用して稼働させます。Docker及びDocker Composeのセットアップ手順は割愛します。
使用するコンテナイメージは公式ドキュメントで推奨されているEnterprise版を使用します。また、今後コンテナ内で作業する可能性を考慮し、ベースイメージはAlpineではなくUbuntuとします。
公式ドキュメントもご参考にお読みください。
コンテナの起動設定
コンテナの起動設定をdocker-compose.ymlに記載します。
services:
grafana:
image: grafana/grafana-enterprise:12.0.6-ubuntu
restart: always
ports:
- '3000:3000'
volumes:
- ./grafana.ini:/etc/grafana/grafana.ini
- ./plugins:/var/lib/grafana/plugins
- grafana-storage:/var/lib/grafana
environment:
GF_AUTH_GENERIC_OAUTH_CLIENT_ID: ${AUTH_GENERIC_OAUTH_CLIENT_ID}
GF_AUTH_GENERIC_OAUTH_CLIENT_SECRET: ${AUTH_GENERIC_OAUTH_CLIENT_SECRET}
GF_AUTH_GENERIC_OAUTH_AUTH_URL: ${AUTH_GENERIC_OAUTH_USER_POOL_DOMAIN}/oauth2/authorize
GF_AUTH_GENERIC_OAUTH_TOKEN_URL: ${AUTH_GENERIC_OAUTH_USER_POOL_DOMAIN}/oauth2/token
GF_AUTH_GENERIC_OAUTH_API_URL: ${AUTH_GENERIC_OAUTH_USER_POOL_DOMAIN}/oauth2/userInfo
GF_AUTH_GENERIC_OAUTH_LOGOUT_URL: ${AUTH_GENERIC_OAUTH_USER_POOL_DOMAIN}/logout
GF_DOMAIN: ${DOMAIN}
GF_SUB_PATH: ${SUB_PATH}
volumes:
grafana-storage:
driver: local
プラグイン及び設定ファイルをマウントし、Grafanaのデータを永続化しています。
また、認証情報などは環境変数として受け渡し、環境変数の設定値は.envファイルに外出ししてdocker-compose.ymlには含まないようにしています。
AUTH_GENERIC_OAUTH_CLIENT_ID=sampleoauthclientid1234567
AUTH_GENERIC_OAUTH_CLIENT_SECRET=sampleoauthclientsecretsampleoauthclientsecret123456
AUTH_GENERIC_OAUTH_USER_POOL_DOMAIN=https://ap-northeast-sample1234.auth.ap-northeast-1.amazoncognito.com
DOMAIN=example.com
SUB_PATH=/grafana
.envファイルに記載するOAuthに関する情報は、Cognitoの「アプリケーションクライアントに関する情報」と「ドメイン」から取得します。
Grafanaの環境設定
Grafanaの環境設定をgrafana.iniに記載します。
[server]
# サブパスでホストしているため、サブパスを含める
root_url = https://${GF_DOMAIN}${GF_SUB_PATH}/
# サブパスでホストしているため、trueにする
serve_from_sub_path = true
http_port = 3000
domain = ${GF_DOMAIN}
enforce_domain = true
[auth]
# Set to true to disable (hide) the login form, useful if you use OAuth, defaults to false
disable_login_form = true
# Cognitoとの連携用の設定
[auth.generic_oauth]
enabled = true
name = OAuth
scopes = openid email profile aws.cognito.signin.user.admin
auto_login = true
signout_redirect_url = ${GF_AUTH_GENERIC_OAUTH_LOGOUT_URL}?client_id=${GF_AUTH_GENERIC_OAUTH_CLIENT_ID}&logout_uri=https://${GF_DOMAIN}${GF_SUB_PATH}/login
name_attribute_path = email
# 環境変数で指定するので設定不要
# client_id = xxxxx
# client_secret = xxxxx
# auth_url = xxxxx
# token_url = xxxxx
# api_url = xxxxx
# userをcognitoのgroup(`grafana-admin`, `grafana-editor`)に所属させるとgrafanaのroleが割り当てられる
role_attribute_path = ("cognito:groups" | contains([*], 'grafana-admin') && 'Admin' || contains([*], 'grafana-editor') && 'Editor' || 'Viewer')
[plugins]
# 未署名のプラグインをロードするための設定
allow_loading_unsigned_plugins = motherduck-duckdb-datasource
サブパスでGrafanaをホストしているので、root_urlとserve_from_sub_pathは上記の通り設定します。また、auth.generic_oauthセクションはCognitoと連携するための設定、pluginsセクションは署名されていないプラグインであるDuckDBプラグインのロードを許可するための設定です。
root_urlを正しく設定しないと、Cognito認証後のリダイレクトに失敗するのでご注意ください。
本設定では、以下の記事を参考にさせていただきました。
設定内容の詳細は公式ドキュメントもご覧ください。
DuckDBプラグインの導入
DuckDBプラグインを導入します。DuckDBプラグインはGrafanaの公式プラグインリポジトリに登録されていないため、GitHubのリリースページから直接ダウンロードして導入します。その後、pluginsディレクトリに展開します。
wget https://github.com/motherduckdb/grafana-duckdb-datasource/releases/download/v0.3.1/motherduck-duckdb-datasource-0.3.1.zip
unzip motherduck-duckdb-datasource-0.3.1.zip -d plugins/
リポジトリに記載されているガイドの通りに展開すると、ディレクトリが入れ子になってしまうので、展開先のパスにmotherduck-duckdb-datasourceを含める必要はありません。
プラグインのソースコードは以下のGitHubリポジトリで公開されています。
起動
Grafanaコンテナを起動します。
docker compose up -d
無事に起動したら、https://example.com/grafana/ にアクセスしましょう。
以下のCognitoの認証画面が表示されたら、Grafanaは正常に起動しています。
下にある「New user? Create an account」からサインアップしましょう。
メールアドレスの認証を完了させ、最後まで進むと、Grafanaの画面が表示されます。
試しに新規登録したユーザーをユーザーグループgrafana-adminに追加してみましょう。Cognito側で追加操作をしておきます。
サインインしなおすと、管理者権限が付与されていることが分かります。サイドバーの項目が増え、Administrationの項目も追加されています。
Data sourcesの画面からDuckDBのプラグインが正常にインストールされているか確認しましょう。正常にインストールされている場合は、検索すると表示されるはずです。
Grafanaの起動が確認できても、サインイン後やサインアウト後の遷移でエラーが出る場合は、Cognito側のコールバックURL/サインアウトURL、またはGrafana側のOAuthの設定が正しくない可能性があります。設定を再確認してください。
メトリクス収集プログラム
API経由でQiitaのメトリクスを取得するスクリプトを実装します。前回の記事で検討した通り、GitHub Actionsで実行する構成となっています。
リポジトリは、以下のディレクトリ構造になっていると想定します。
YOUR_REPOSITORY
├── .github
│ └── workflows
│ └── stat.yml
├── metrics
│ ├── requirements.txt
│ └── qiita_metrics_collector.py
収集スクリプト
収集用のスクリプトは以下の通りです。Amazon Qを活用したら、比較的簡単に書くことができました。
いいね数やストック数の情報は執筆者以外でもAPIから取得できますが、閲覧数は執筆者本人しか確認・取得できません。そのため、各執筆者のAPIキーを発行し、GitHub Actionsのシークレットとして登録しておきます。シークレットの情報は、スクリプトには環境変数経由で渡されます。
Qiita APIの詳細は公式ドキュメントもご覧ください。
収集スクリプト(クリックして展開)
#!/usr/bin/env python3
"""
Qiita メトリクス収集ツール
MIT License
Copyright (c) 2025 Mitsubishi Electric Corporation
"""
import json
import logging
import os
import time
from datetime import datetime, timezone, timedelta
from pathlib import Path
from typing import Dict, List, Optional
import boto3
import pandas as pd
import requests
from botocore.exceptions import ClientError, NoCredentialsError
# 設定
JST = timezone(timedelta(hours=9))
QIITA_API_BASE = "https://qiita.com/api/v2"
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
class S3Handler:
"""S3アップロード処理クラス"""
def __init__(self, bucket_name: str, region: str = 'ap-northeast-1'):
"""
S3ハンドラーを初期化
Args:
bucket_name: S3バケット名
region: AWSリージョン
"""
self.bucket_name = bucket_name
self.region = region
self.client = None
try:
self.client = boto3.client('s3', region_name=region)
self.client.head_bucket(Bucket=bucket_name)
logger.info(f"Connected to S3 bucket '{bucket_name}' in region '{region}'")
except NoCredentialsError:
logger.error("AWS credentials not found")
raise
except ClientError as e:
logger.error(f"Failed to connect to S3 bucket: {e}")
raise
def upload_file(self, local_path: Path, data_type: str, file_format: str,
timestamp: str, use_partition: bool = False) -> bool:
"""
ファイルをS3にアップロード
Args:
local_path: ローカルファイルパス
data_type: データタイプ(metrics, master, likers, stockers)
file_format: ファイル形式(csv, parquet)
timestamp: タイムスタンプ
use_partition: パーティション構造を使用するか
Returns:
アップロード成功の場合True
"""
try:
# S3キー生成
if use_partition:
# メトリクスはパーティション構造
date_part = timestamp[:8] # YYYYMMDD
year, month, day = date_part[:4], date_part[4:6], date_part[6:8]
s3_key = f"metrics/year={year}/month={month}/day={day}/{data_type}_{timestamp}.{file_format}"
else:
# その他は上書き保存
s3_key = f"articles/{data_type}.{file_format}"
# メタデータ
metadata = {
'data-type': data_type,
'timestamp': timestamp,
'source': 'qiita-metrics-collector'
}
# アップロード実行
self.client.upload_file(
str(local_path),
self.bucket_name,
s3_key,
ExtraArgs={'Metadata': metadata}
)
logger.info(f"Uploaded: s3://{self.bucket_name}/{s3_key}")
return True
except ClientError as e:
logger.error(f"Failed to upload {local_path.name}: {e}")
return False
def upload_batch(self, files: List[Dict], timestamp: str) -> int:
"""
複数ファイルを一括アップロード
Args:
files: アップロードファイル情報のリスト
timestamp: タイムスタンプ
Returns:
成功したアップロード数
"""
success_count = 0
for file_info in files:
if self.upload_file(
file_info['path'],
file_info['data_type'],
file_info['format'],
timestamp,
file_info.get('use_partition', False)
):
success_count += 1
logger.info(f"Uploaded {success_count}/{len(files)} files to S3")
return success_count
class QiitaCollector:
"""Qiitaメトリクス収集・処理クラス"""
def __init__(self):
"""コレクターを初期化"""
self.fetch_time = datetime.now(JST).replace(microsecond=0)
self.batch_id = self.fetch_time.strftime('%Y%m%d_%H%M%S')
self.tokens = self._load_tokens()
self.local_dir = Path(os.getenv('LOCAL_DATA_DIR', './data'))
self.local_dir.mkdir(parents=True, exist_ok=True)
# S3ハンドラー初期化
self.s3_handler = None
s3_bucket = os.getenv('S3_BUCKET_NAME')
if s3_bucket:
try:
aws_region = os.getenv('AWS_REGION', 'ap-northeast-1')
self.s3_handler = S3Handler(s3_bucket, aws_region)
except (NoCredentialsError, ClientError) as e:
logger.error(f"S3 initialization failed: {e}")
logger.info("Continuing with local storage only")
else:
logger.info("No S3 bucket specified, using local storage only")
def _load_tokens(self) -> Dict[str, str]:
"""
環境変数からトークンを読み込み
KEY_で始まる環境変数とALL_SECRETS(GitHub Actions用)に対応
"""
tokens = {}
# KEY_で始まる環境変数から取得
for key, value in os.environ.items():
if key.startswith('KEY_') and value:
username = key[4:] # KEY_を除去
tokens[username] = value
# GitHub Actions用: ALL_SECRETSからKEY_で始まるもののみ取得
all_secrets = os.getenv('ALL_SECRETS')
if all_secrets:
try:
parsed_secrets = json.loads(all_secrets)
if isinstance(parsed_secrets, dict):
# KEY_で始まるキーのみを抽出
for key, value in parsed_secrets.items():
if key.startswith('KEY_') and value:
username = key[4:] # KEY_を除去
tokens[username] = value
else:
logger.warning("ALL_SECRETS should be a JSON object")
except json.JSONDecodeError as e:
logger.error(f"Failed to parse ALL_SECRETS JSON: {e}")
if not tokens:
logger.error("No Qiita tokens found. Set KEY_* environment variables or ALL_SECRETS with KEY_* entries.")
raise ValueError("No tokens configured")
logger.info(f"Loaded {len(tokens)} token(s) for users: {list(tokens.keys())}")
return tokens
def _api_request(self, token: str, endpoint: str, max_retries: int = 3) -> Optional[List[Dict]]:
"""
APIリクエスト実行(リトライ機能付き)
Args:
token: APIトークン
endpoint: APIエンドポイント
max_retries: 最大試行回数(初回含む)
Returns:
APIレスポンス(失敗時はNone)
"""
headers = {"Authorization": f"Bearer {token}"}
params = {"per_page": "100"}
url = f"{QIITA_API_BASE}{endpoint}"
for attempt in range(1, max_retries + 1):
try:
logger.debug(f"API request attempt {attempt}/{max_retries}: {endpoint}")
response = requests.get(url, headers=headers, params=params, timeout=30)
response.raise_for_status()
logger.debug(f"API request successful on attempt {attempt}: {endpoint}")
return response.json()
except requests.exceptions.RequestException as e:
if attempt == max_retries:
logger.error(f"API request failed after {max_retries} attempts for {endpoint}: {e}")
return None
else:
wait_time = 2 ** (attempt - 1) # 指数バックオフ: 1秒, 2秒
logger.warning(f"API request failed (attempt {attempt}/{max_retries}) for {endpoint}: {e}")
logger.info(f"Retrying in {wait_time} seconds...")
time.sleep(wait_time)
return None
def _collect_user_articles(self, username: str, token: str) -> Dict[str, List[Dict]]:
"""
ユーザーの記事データを収集
Args:
username: ユーザー名
token: APIトークン
Returns:
収集されたデータ(metadata, metrics, likers, stockers)
"""
logger.info(f"Collecting data for user '{username}'")
# 記事一覧取得
articles = self._api_request(token, "/authenticated_user/items")
if not articles:
logger.info(f"No articles found for user '{username}'")
return {"metadata": [], "metrics": [], "likers": [], "stockers": []}
logger.info(f"Processing {len(articles)} articles for user '{username}'")
# データ収集
collected_data = {"metadata": [], "metrics": [], "likers": [], "stockers": []}
for i, article in enumerate(articles, 1):
article_id = article['id']
title = article['title'][:50] + "..." if len(article['title']) > 50 else article['title']
logger.info(f"Processing article {i}/{len(articles)}: {title}")
# 基本データ収集
self._collect_article_data(article, collected_data)
# エンゲージメントデータ収集
self._collect_engagement_data(token, article_id, collected_data)
logger.info(f"Completed processing for user '{username}': {len(articles)} articles")
return collected_data
def _collect_article_data(self, article: Dict, collected_data: Dict[str, List[Dict]]) -> None:
"""記事の基本データを収集"""
article_id = article['id']
# メタデータ
collected_data["metadata"].append({
'user_id': article['user']['id'],
'article_id': article_id,
'title': article['title'],
'url': article['url'],
'tags': json.dumps([tag['name'] for tag in article.get('tags', [])]),
'created_at': article['created_at'],
'updated_at': article['updated_at'],
'last_updated_at': self.fetch_time.isoformat()
})
# メトリクス
collected_data["metrics"].append({
'batch_id': self.batch_id,
'user_id': article['user']['id'],
'article_id': article_id,
'views_count': article['page_views_count'],
'likes_count': article['likes_count'],
'stocks_count': article['stocks_count'],
'comments_count': article['comments_count'],
'measured_at': self.fetch_time.isoformat(),
'article_created_at': article['created_at'],
'article_updated_at': article['updated_at']
})
def _collect_engagement_data(self, token: str, article_id: str, collected_data: Dict[str, List[Dict]]) -> None:
"""記事のエンゲージメントデータを収集"""
# いいね詳細
likes = self._api_request(token, f"/items/{article_id}/likes")
if likes:
for like in likes:
collected_data["likers"].append({
'article_id': article_id,
'liker_user_id': like['user']['id'],
'liked_at': like['created_at'],
'last_collected_at': self.fetch_time.isoformat()
})
# ストック詳細
stockers = self._api_request(token, f"/items/{article_id}/stockers")
if stockers:
for stocker in stockers:
collected_data["stockers"].append({
'article_id': article_id,
'stocker_user_id': stocker['id'],
'last_collected_at': self.fetch_time.isoformat()
})
def _optimize_dataframe(self, df: pd.DataFrame, data_type: str) -> pd.DataFrame:
"""
DataFrameを最適化
Args:
df: 対象DataFrame
data_type: データタイプ
Returns:
最適化されたDataFrame
"""
if data_type == "metrics":
df['measured_at'] = pd.to_datetime(df['measured_at'])
df = df.sort_values(['article_id', 'measured_at'])
elif data_type == "metadata":
df['created_at'] = pd.to_datetime(df['created_at'])
df['updated_at'] = pd.to_datetime(df['updated_at'])
df = df.sort_values('created_at')
elif data_type == "likers":
df['liked_at'] = pd.to_datetime(df['liked_at'])
df = df.sort_values(['article_id', 'liked_at'])
elif data_type == "stockers":
df = df.sort_values('article_id')
return df
def _save_data_locally(self, data: List[Dict], data_type: str, use_timestamp: bool = False) -> Dict[str, Path]:
"""
データをローカルに保存
Args:
data: 保存するデータ
data_type: データタイプ
use_timestamp: タイムスタンプを使用するか
Returns:
保存されたファイルパスの辞書
"""
if not data:
logger.warning(f"No data to save for {data_type}")
return {}
# DataFrame作成・最適化
df = pd.DataFrame(data)
df = self._optimize_dataframe(df, data_type)
# ファイル名決定
if use_timestamp:
base_name = f"{data_type}_{self.batch_id}"
else:
base_name = data_type
# ローカル保存
csv_path = self.local_dir / f"{base_name}.csv"
parquet_path = self.local_dir / f"{base_name}.parquet"
df.to_csv(csv_path, index=False, encoding='utf-8')
df.to_parquet(parquet_path, index=False, compression='snappy', engine='pyarrow')
logger.info(f"Saved {data_type} data: {csv_path.name}, {parquet_path.name}")
return {"csv": csv_path, "parquet": parquet_path}
def _prepare_upload_files(self, saved_files: Dict[str, Dict[str, Path]]) -> List[Dict]:
"""S3アップロード用のファイル情報を準備"""
upload_files = []
for data_type, file_paths in saved_files.items():
use_partition = (data_type == "metrics")
for file_format, file_path in file_paths.items():
upload_files.append({
'path': file_path,
'data_type': data_type,
'format': file_format,
'use_partition': use_partition
})
return upload_files
def _generate_summary(self, all_data: Dict[str, List[Dict]]) -> str:
"""
サマリーレポート生成
Args:
all_data: 収集されたすべてのデータ
Returns:
サマリーレポート文字列
"""
metrics = all_data.get("metrics", [])
if not metrics:
return "No data collected"
# 統計計算
total_articles = len(set(m['article_id'] for m in metrics))
total_views = sum(m['views_count'] for m in metrics)
total_likes = sum(m['likes_count'] for m in metrics)
total_stocks = sum(m['stocks_count'] for m in metrics)
total_comments = sum(m['comments_count'] for m in metrics)
# 上位記事
top_articles = sorted(metrics, key=lambda x: x['views_count'], reverse=True)[:5]
# S3バケット情報
s3_info = ""
if self.s3_handler:
s3_info = f"s3://{self.s3_handler.bucket_name}"
else:
s3_info = "ローカル保存のみ"
report = f"""
=== Qiita メトリクス収集サマリー ===
収集日時: {self.fetch_time.strftime('%Y-%m-%d %H:%M:%S JST')}
バッチID: {self.batch_id}
保存先: {s3_info}
【収集統計】
総記事数: {total_articles:,}
総ビュー数: {total_views:,}
総いいね数: {total_likes:,}
総ストック数: {total_stocks:,}
総コメント数: {total_comments:,}
【エンゲージメント詳細】
いいね記録数: {len(all_data.get('likers', [])):,}
ストック記録数: {len(all_data.get('stockers', [])):,}
【S3保存構造】
- 記事マスター: {s3_info}/articles/master.parquet
- メトリクス:
- 時系列用: {s3_info}/metrics/year=YYYY/month=MM/day=DD/
- 最新用: {s3_info}/articles/metrics_latest.parquet
- いいね詳細: {s3_info}/articles/likers.parquet
- ストック詳細: {s3_info}/articles/stockers.parquet
【ビュー数上位記事】"""
for i, article in enumerate(top_articles, 1):
report += f"\n{i}. {article['article_id']} - {article['views_count']:,} ビュー"
report += "\n" + "=" * 45
return report
def collect_all_users(self) -> Dict[str, List[Dict]]:
"""
全ユーザーのデータを収集
Returns:
統合されたデータ
"""
logger.info("Starting Qiita metrics collection")
# 全ユーザーのデータ収集
all_data = {"metadata": [], "metrics": [], "likers": [], "stockers": []}
for username, token in self.tokens.items():
try:
user_data = self._collect_user_articles(username, token)
# データを統合
for data_type in all_data.keys():
all_data[data_type].extend(user_data[data_type])
if user_data["metrics"]:
logger.info(f"User '{username}': Collected {len(user_data['metrics'])} articles")
else:
logger.info(f"User '{username}': No data collected")
except Exception as e:
logger.error(f"Failed to collect data for user '{username}': {e}")
continue
return all_data
def save_and_upload(self, all_data: Dict[str, List[Dict]]) -> bool:
"""
データを保存してS3にアップロード
Args:
all_data: 保存するデータ
Returns:
処理成功の場合True
"""
if not all_data["metrics"]:
logger.warning("No data to save")
return False
# ローカル保存
saved_files = {}
saved_files["master"] = self._save_data_locally(all_data["metadata"], "master", False)
saved_files["metrics"] = self._save_data_locally(all_data["metrics"], "metrics", True)
saved_files["metrics_latest"] = self._save_data_locally(all_data["metrics"], "metrics_latest", False)
saved_files["likers"] = self._save_data_locally(all_data["likers"], "likers", False)
saved_files["stockers"] = self._save_data_locally(all_data["stockers"], "stockers", False)
# S3アップロード
if self.s3_handler:
upload_files = self._prepare_upload_files(saved_files)
success_count = self.s3_handler.upload_batch(upload_files, self.batch_id)
logger.info(f"S3 upload completed: {success_count} files")
return True
def run(self) -> None:
"""メイン実行処理"""
try:
# データ収集
all_data = self.collect_all_users()
# データ保存・アップロード
if self.save_and_upload(all_data):
# サマリー表示
summary = self._generate_summary(all_data)
print(summary)
logger.info("Metrics collection completed successfully")
else:
logger.warning("No data collected from any user")
except Exception as e:
logger.error(f"Collection failed: {e}")
raise
def main():
"""エントリーポイント"""
try:
collector = QiitaCollector()
collector.run()
except Exception as e:
logger.error(f"Application failed: {e}")
raise
if __name__ == '__main__':
main()
必要なライブラリは以下の通りです。
requests>=2.31.0
pandas>=2.0.0
boto3>=1.28.0
pyarrow>=12.0.0
GitHub Actionsの設定
GitHub Actionsでスクリプトを実行できるように設定します。
ワークフローの定義
GitHub Actionsのフローの定義は以下の(stat.yml)の通りです。ここでは実行タイミングとしてcronで0時/6時/12時/18時に起動するように指定しています。
ワークフローの実行が混み合うため、実際には15分程度実行開始の遅れがあります。
name: Qiita metrics collector (S3 upload)
# Controls when the workflow will run
on:
schedule:
- cron: '0 3,9,15,21 * * *' # 1日4回実行(UTC指定)
workflow_dispatch:
env:
AWS_ACCOUNT_ID: ${{ vars.AWS_ACCOUNT_ID }}
AWS_REGION: ${{ vars.AWS_REGION }}
AWS_IAM_ROLE: ${{ vars.AWS_IAM_ROLE }}
S3_BUCKET_NAME: ${{ vars.S3_BUCKET_NAME}}
permissions:
id-token: write
contents: read
jobs:
collect-metrics:
runs-on: ubuntu-latest
steps:
- name: Checkout repository
uses: actions/checkout@v4
- name: Setup Python
uses: actions/setup-python@v4
with:
python-version: '3.12'
architecture: 'x64'
- name: Install dependencies
run: |
pip install -r metrics/requirements.txt
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@v4
with:
role-to-assume: arn:aws:iam::${{ env.AWS_ACCOUNT_ID }}:role/${{ env.AWS_IAM_ROLE }}
aws-region: ${{ env.AWS_REGION }}
- name: Collect metrics
run: |
python metrics/qiita_metrics_collector.py
env:
ALL_SECRETS: ${{ toJSON(secrets) }}
環境変数とシークレット
上記のワークフローの定義では、アップロード先のAWS環境の情報とQiita APIキーの情報の登録が必要です。
GitHubのSecrets and variablesのページで、それぞれ環境変数とシークレットに登録します。
環境変数
| 環境変数名 | 設定例 | 説明 |
|---|---|---|
AWS_ACCOUNT_ID |
123456789012 |
AWSアカウントID |
AWS_REGION |
ap-northeast-1 |
AWSリージョン |
AWS_IAM_ROLE |
GitHubActionsMetricsRole |
GitHub ActionsがS3にアクセスするためのIAMロール名 |
S3_BUCKET_NAME |
article-metrics-bucket |
メトリクスデータを保存するS3バケット名 |
シークレット
本スクリプトにおいて、シークレット名はKEY_<QIITA_USER_ID>というように、QiitaのユーザーIDの前にKEY_を付けて登録します。
| シークレット名 | 設定内容 |
|---|---|
KEY_ALICE |
QiitaユーザーaliceのAPIキー |
KEY_BOB |
QiitaユーザーbobのAPIキー |
APIキーを発行する際は、スコープはread_qiitaだけでよいです。
アップロードされるファイルのサンプル
上記のスクリプトが実行されるとS3にアップロードされるファイルについてご紹介します。
本スクリプトでは、CSVファイルとParquetファイルの両方をアップロードします。DuckDB向けに列指向データ形式であるParquetファイルを使用し、人間も確認できるようにCSVも用意します。
ファイルの一覧は以下の通りです。
| ファイルの種類 | 内容 | 保存先パス |
|---|---|---|
| 記事マスター情報 | 記事のID、タイトル及び投稿者のIDなど | /articles/master.{csv,parquet} |
| いいね詳細 | 記事ごとのいいねの時系列情報 | /articles/likers.{csv,parquet} |
| ストック詳細 | 記事ごとのストック数 | /articles/stockers.{csv,parquet} |
| メトリクス(最新値) | 記事ごとのメトリクス | /articles/metrics_latest.{csv,parquet} |
| メトリクス(時系列) | 記事ごとのメトリクス | /metrics/year=YYYY/month=MM/day=DD/metrics_yyyymmdd_hhmmss.{csv,parquet} |
最新のメトリクス値を簡単に取得できるように、時系列で蓄積するファイルとは別に上書き方式でアップロードします。また、時系列データはクエリ性能を向上させるために、年、月、日のパーティションに分割してアップロードします。
ファイルの中身のサンプルは以下の通りです。
記事マスター情報
記事マスター情報(master.csv)には、以下の項目が含まれています。記事IDを主キーとして記事のメタデータを取得する際に使用する想定です。
- 投稿者のID
- 記事のID
- 記事タイトル
- 記事URL
- タグ
- 記事投稿日時
- 記事更新日時
- メトリクス収集日時
user_id,article_id,title,url,tags,created_at,updated_at,last_updated_at
melknzw,563cfcdcf5ff76917d6a,KubernetesのネットワークオブザーバビリティプラットフォームRetinaを試してみた<後編:キャプチャ使いこなし編>,https://qiita.com/melknzw/items/563cfcdcf5ff76917d6a,"[""Linux"", ""Network"", ""tcpdump"", ""kubernetes"", ""observability""]",2024-10-30T10:52:01+09:00,2024-10-30T13:42:58+09:00,2025-10-27T18:25:54+09:00
melknzw,15f5931b2fca0a5b5e39,KubernetesのネットワークオブザーバビリティプラットフォームRetinaを試してみた<中編:メトリクス使いこなし編>,https://qiita.com/melknzw/items/15f5931b2fca0a5b5e39,"[""Linux"", ""Network"", ""kubernetes"", ""observability""]",2024-09-11T13:28:30+09:00,2025-01-24T20:22:24+09:00,2025-10-27T18:25:54+09:00
いいね詳細
いいね詳細(likers.csv)には、以下の項目が含まれています。
- 記事のID
- いいねしたユーザーのID
- いいねした日時
- メトリクス収集日時
article_id,liker_user_id,liked_at,last_collected_at
563cfcdcf5ff76917d6a,sample_user1,2024-11-01 15:48:55+09:00,2025-10-27T18:25:54+09:00
563cfcdcf5ff76917d6a,sample_user2,2024-11-12 11:12:23+09:00,2025-10-27T18:25:54+09:00
ストック詳細
ストック詳細(stockers.csv)には、以下の項目が含まれています。
- 記事のID
- ストックしたユーザーのID
- メトリクス収集日時
article_id,stocker_user_id,last_collected_at
563cfcdcf5ff76917d6a,sample_user1,2025-10-27T18:25:54+09:00
メトリクス(最新値)
メトリクス(最新値)(metrics_latest.csv)には、以下の項目が含まれています。
- メトリクス収集のバッチID
- 投稿者のID
- 記事のID
- 閲覧数
- いいね数
- ストック数
- コメント数
- メトリクス収集日時
- 記事投稿日時
- 記事更新日時
batch_id,user_id,article_id,views_count,likes_count,stocks_count,comments_count,measured_at,article_created_at,article_updated_at
20251027_182554,melknzw,563cfcdcf5ff76917d6a,1234,9,3,0,2025-10-27T18:25:54+09:00,2024-10-30T10:52:01+09:00,2024-10-30T13:42:58+09:00
20251027_182554,melknzw,15f5931b2fca0a5b5e39,5678,5,1,0,2025-10-27T18:25:54+09:00,2024-09-11T13:28:30+09:00,2025-01-24T20:22:24+09:00
メトリクス(時系列)
メトリクス(時系列)(例:metrics_20251027_002414.csv)の中身の構造は、「メトリクス(最新値)」と同じです。
おわりに
本記事では、前回検討したS3+DuckDB+Grafanaの構成を実際に構築しました。
GrafanaのCognito連携は少し設定が複雑ですが、一度設定してしまえば管理者側も利用者側も非常に便利に使えます。ぜひ皆さんも試してみてください。
次回は、構築したGrafana上でDuckDBのクエリを書いて、Qiita記事メトリクスのダッシュボードを作成していきたいと思います。








