0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

生成AIによるDatabricks最新記事まとめと著者傾向分析

Last updated at Posted at 2025-11-16

これまで手動でまとめページを更新していましたが、ページ数が増えるとメンテも困難になってきました。

ということで自動化します。全記事を対象にするのは色々難しかったので最近1ヶ月の記事を対象にしています。もとのまとめページのメンテは一旦停止しますが、今後どうするかは少し考えます。

もともとのまとめページの更新も再開しました。

ノートブック全体の概要と処理フロー

このノートブックは、データの前処理、分析、可視化を行うための一連のステップを示しています。以下は、処理フローの概要です。

  1. データのインポート: 必要なライブラリとデータセットをインポートします。
  2. データの前処理: 欠損値の処理やデータ型の変換を行います。
  3. データの分析: データの統計的分析を行い、重要な特徴を抽出します。
  4. データの可視化: 分析結果を視覚的に表現するためのグラフやチャートを作成します。
  5. 結果の解釈: 可視化されたデータを基に、結果を解釈し、結論を導きます。

このノートブックを通じて、データ分析の基本的な流れを理解し、実践することができます。

Qiita APIから記事情報を取得する処理の概要

この処理は、Qiita APIを使用して特定のユーザーまたはタグに関連する記事情報を取得するためのものです。以下のステップで実行されます。

  1. APIエンドポイントの設定: Qiita APIのエンドポイントを指定します。例えば、特定のユーザーの投稿を取得する場合は、https://qiita.com/api/v2/users/{user_id}/itemsを使用します。

  2. HTTPリクエストの送信: 設定したエンドポイントに対してGETリクエストを送信します。この際、必要に応じて認証トークンをヘッダーに含めることが重要です。

  3. レスポンスの処理: APIからのレスポンスを受け取り、JSON形式でデータを解析します。記事情報には、タイトル、作成日、更新日、タグ、いいね数などが含まれます。

  4. データの利用: 取得した記事情報をアプリケーション内で表示したり、他の処理に利用したりします。

この処理を通じて、Qiitaの豊富な記事情報を活用することができます。

記事のフィルタリング処理概要

このセクションでは、取得した記事を既存のデータベーステーブルと比較し、特定の条件に基づいてフィルタリングする処理について説明します。主に以下の2つの条件で絞り込みを行います。

  1. Databricksタグのフィルタリング:

    • 取得した記事の中から、Databricksに関連するタグを持つ記事のみを選別します。これにより、Databricksに関連する情報を効率的に収集することができます。
  2. 日付によるフィルタリング:

    • 記事の公開日や取得日を基に、特定の期間内に公開された記事を抽出します。これにより、最新の情報や特定の期間に関連する記事を容易に見つけることができます。

このフィルタリング処理を通じて、必要な情報を迅速に取得し、データ分析やレポート作成に役立てることが可能になります。

# Qiita APIから認証ユーザーの記事一覧をページングで取得
from time import sleep
import requests

# アクセストークン(セキュアに管理することを推奨)
access_token = "<Qiita APIキー>"

item_list_base_url = "https://qiita.com/api/v2/authenticated_user/items"
headers = { "Authorization": f"Bearer {access_token}" }

page = 1
per_page = 100  # 1ページあたりの取得件数

sleep_time = 0.1  # API呼び出し間隔(秒)

page_views = []  # 記事情報を格納するリスト

while True:
    # ページごとにAPIリクエスト
    item_list_url = f"{item_list_base_url}?page={page}&per_page={per_page}"

    # APIから記事一覧を取得
    items = requests.get(item_list_url, headers = headers)
    items_body = items.json()  # レスポンスをJSON形式で解析

    if not len(items_body):
        # 取得件数が0なら終了
        break

    for entry in items_body:
        item_id = entry["id"]

        print(f"Get infos: {entry['title']}")  # 記事タイトルを表示

        # 必要な情報を抽出
        title = entry["title"]
        url = entry["url"]
        tags = entry["tags"]
        created_at = entry["created_at"]
        page_views_count = entry["page_views_count"]
        lgtm = entry["likes_count"]
        body = entry["body"]
        
        # 抽出した情報を辞書形式でリストに追加
        page_views.append({
            "id": item_id,
            "title": title,
            "url": url,
            "tags": tags,
            "created_at": created_at,
            "page_views_count": page_views_count,
            "lgtm": lgtm,
            "body": body
        })

        sleep(sleep_time)  # API制限回避のため待機

    page += 1  # 次ページへ

    sleep(sleep_time)  # API制限回避のため待機
既存テーブルからIDを取得しました。
Qiita APIから取得した記事数: 2190
新規記事候補数(テーブル比較後): 84
タグ(databricks)フィルタ後の記事数: 0
created_atサンプル: []
フィルタ後のデータが空です。created_at分布は表示しません。

新規記事をDatabricksテーブルに保存する処理の概要

このプロセスでは、新規記事をDatabricksのテーブルに保存するための手順を説明します。以下のステップで進めます。

  1. データの準備: 新規記事のデータを収集し、必要な形式に整形します。これには、タイトル、内容、作成日などのフィールドが含まれます。

  2. Databricksの接続: Databricksのクラスターに接続し、適切なデータベースを選択します。

  3. テーブルの確認: 記事を保存するためのテーブルが存在するか確認します。存在しない場合は、新たにテーブルを作成します。

  4. データの挿入: 準備したデータをDatabricksテーブルに挿入します。これには、Spark DataFrameを使用してデータを効率的に書き込む方法が含まれます。

  5. 確認: データが正しく挿入されたかを確認するために、テーブルの内容をクエリして結果を検証します。

このプロセスにより、新規記事がDatabricksテーブルに安全かつ効率的に保存されます。

# 新規記事があればDatabricksテーブルに保存
if not pdf.empty:
    # created_date列が存在する場合は削除
    if 'created_date' in pdf.columns:
        pdf = pdf.drop(columns=['created_date'])
    # PDFからSpark DataFrameを作成
    sdf = spark.createDataFrame(pdf)
    # テーブルの存在に応じて書き込みモードを設定
    mode = "append" if table_exists else "overwrite"
    # テーブルに書き込み(初回はoverwrite, 2回目以降はappend)
    sdf.write.mode(mode).saveAsTable("users.takaaki_yayoi.qiita_articles")

Databricksモデルサービングエンドポイント呼び出し関数の概要

この関数は、Databricksでデプロイされた機械学習モデルのサービングエンドポイントに対してリクエストを送信し、予測結果を取得するためのものです。以下の機能を提供します。

  • エンドポイントURLの指定: モデルがデプロイされているDatabricksのエンドポイントURLを指定します。
  • リクエストデータの準備: モデルに入力するデータを適切な形式に整形します。
  • HTTPリクエストの送信: 指定されたエンドポイントに対してPOSTリクエストを送信し、予測を要求します。
  • レスポンスの処理: 受け取ったレスポンスから予測結果を抽出し、必要に応じてエラーハンドリングを行います。

この関数を使用することで、Databricks上でホストされているモデルを簡単に呼び出し、リアルタイムで予測を得ることができます。

# Databricksモデルサービングエンドポイントを呼び出す関数
import os
import requests
import json

def call_claude_model(prompt, max_tokens=2000):
    """
    databricks-claude-sonnet-4-5エンドポイントを呼び出す
    Args:
        prompt: プロンプトテキスト
        max_tokens: 最大トークン数
    Returns:
        生成されたテキスト
    """
    # Databricksトークンを取得
    token = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().get()
    
    # ワークスペースURLを取得
    workspace_url = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiUrl().get()
    
    # エンドポイント名
    endpoint_name = "databricks-claude-sonnet-4-5"
    
    # エンドポイントURLを構築
    url = f"{workspace_url}/serving-endpoints/{endpoint_name}/invocations"
    
    # リクエストヘッダーを設定
    headers = {
        "Authorization": f"Bearer {token}",
        "Content-Type": "application/json"
    }
    
    # リクエストデータを設定
    data = {
        "messages": [
            {
                "role": "user",
                "content": prompt
            }
        ],
        "max_tokens": max_tokens
    }
    
    # サービングエンドポイントにPOSTリクエスト
    response = requests.post(url, headers=headers, json=data)
    
    # レスポンスのステータスコードを確認
    if response.status_code == 200:
        result = response.json()
        # Claude APIのレスポンス形式に対応
        if "choices" in result:
            return result["choices"][0]["message"]["content"]
        elif "content" in result:
            if isinstance(result["content"], list):
                return result["content"][0]["text"]
            return result["content"]
        else:
            return str(result)
    else:
        # エラーメッセージを表示
        raise Exception(f"モデルサービング呼び出しエラー: {response.status_code}, {response.text}")

記事要約・傾向分析・表形式生成の処理概要

このプロセスは、与えられた記事の内容を要約し、傾向を分析し、最終的にその結果を表形式で生成することを目的としています。以下は各ステップの概要です。

  1. 記事要約:

    • 記事の主要なポイントを抽出し、簡潔な要約を作成します。
    • 自然言語処理技術を使用して、重要な文やフレーズを特定します。
  2. 傾向分析:

    • 要約された内容を基に、記事のテーマやトピックに関する傾向を分析します。
    • データマイニング手法を用いて、頻出語や関連性の高いトピックを特定します。
  3. 表形式生成:

    • 要約と傾向分析の結果を整理し、視覚的に理解しやすい表形式で出力します。
    • 表には、要約された内容、関連するトピック、傾向の指標などが含まれます。

このプロセスにより、ユーザーは記事の重要な情報を迅速に把握し、トレンドを理解することができます。

Qiita投稿用Markdown生成処理の概要

このプロセスは、Qiitaに投稿するためのMarkdown形式のテキストを自動的に生成することを目的としています。以下のステップで構成されています。

  1. データ収集: 投稿に必要な情報を収集します。これには、タイトル、本文、タグ、画像などが含まれます。
  2. Markdownフォーマットの適用: 収集したデータをMarkdown形式に変換します。例えば、タイトルは#で始まり、リストは-*で表現されます。
  3. 出力: 最終的に生成されたMarkdownテキストを出力します。これにより、ユーザーはQiitaに簡単に投稿できるようになります。

このプロセスにより、手動でのMarkdown作成の手間を省き、効率的に情報を共有することが可能になります。

# Qiita記事の要約・傾向・Mermaid mindmap生成
from pyspark.sql import functions as F
import pandas as pd

# 過去1ヶ月の記事を取得
from datetime import datetime, timedelta

# 現在の日付から1ヶ月前の日付を計算
one_month_ago = datetime.now().replace(tzinfo=None) - timedelta(days=30)

# Qiitaの記事データをSparkから取得し、必要なカラムを選択
articles_df = (
    spark.table("users.takaaki_yayoi.qiita_articles")
    .withColumn("created_ts", F.to_timestamp("created_at"))
    .filter(F.col("created_ts") >= F.lit(one_month_ago))
    .orderBy(F.col("created_ts").desc_nulls_last())
    .select("id", "title", "url", "created_at", "body")
)

# 記事数を確認
article_count = articles_df.count()
print(f"過去1ヶ月の記事数: {article_count}")

# 記事が0件の場合は処理をスキップ
if article_count == 0:
    print("過去1ヶ月の記事が0件のため、要約・分析処理をスキップします。")
    dbutils.notebook.exit("過去1ヶ月の記事なし")

# ai_queryで要約を並列生成(Databricksモデルサービングを利用)
summary_df = articles_df.withColumn(
    "summary",
    F.expr("""
        ai_query(
            'databricks-claude-sonnet-4-5',
            concat('次のDatabricks記事本文を日本語で200文字以内で要約してください。\n\n本文: ', body, '\n\n要約:')
        )
    """)
)

# Pandasに変換し、要約生成の完了を表示
summary_pdf = summary_df.toPandas()
print(f"要約生成完了: {len(summary_pdf)}")

# --- ここから傾向抽出 ---
# 記事の要約を連結して、傾向抽出用のプロンプトを作成
all_summaries = "\n".join(summary_pdf["summary"].fillna("").tolist())

# 傾向抽出用プロンプト
trend_prompt = (
    f"以下は私が過去1ヶ月に書いたDatabricksに関する{len(summary_pdf)}件の記事の要約です。これらから読み取れる最近の著者としての傾向や注力しているテーマ、技術的な特徴を日本語で簡潔にまとめてください。\n\n" + all_summaries + "\n\n傾向まとめ:"
)

# モデルサービングで傾向を抽出
try:
    trend_str = call_claude_model(trend_prompt, max_tokens=1000)
    print("傾向抽出完了")
except Exception as e:
    print(f"傾向抽出エラー: {e}")
    trend_str = "(傾向抽出失敗)"

# --- ここから表形式の傾向分析生成 ---
# 表形式で傾向を整理するためのプロンプトを作成
table_prompt = (
    "次の要約をもとに、著者の最近の傾向や注力テーマ、技術的特徴を表形式(Markdown table)で整理してください。\n\n"
    "重要なルール:\n"
    "1. 3-5行程度の表にする\n"
    "2. 列は「カテゴリ」「主要技術・テーマ」「具体的な取り組み」の3列\n"
    "3. Markdown tableの形式で出力する\n"
    "4. ヘッダー行と区切り行を含める\n\n"
    "正しい例:\n"
    "| カテゴリ | 主要技術・テーマ | 具体的な取り組み |\n"
    "|---------|----------------|----------------|\n"
    "| AI/MLエージェント開発 | Model Context Protocol, Claude Sonnet | 感情分析、会話履歴管理、企業データ連携 |\n"
    "| データ管理基盤 | Delta Lake, Unity Catalog | タイムトラベル、VACUUM/RESTORE、ガバナンス |\n"
    "| MLOpsとデプロイ | MLflow, モデルサービング | モデル管理、トレーシング、推論エンドポイント |\n\n"
    "要約: " + (trend_str or "")
)

# モデルサービングで表を生成
try:
    table_code = call_claude_model(table_prompt, max_tokens=1500)
    print("表形式生成完了")
except Exception as e:
    print(f"表形式生成エラー: {e}")
    table_code = None

# summary_pdf, trend_str, table_code を出力
print(summary_pdf)
print(trend_str)
print(table_code)

# --- summary_pdfをSpark DataFrameに変換 ---
summary_sdf = spark.createDataFrame(summary_pdf[['id', 'summary']])
from delta.tables import DeltaTable

# DeltaTableオブジェクトを取得
delta_table = DeltaTable.forName(spark, "users.takaaki_yayoi.qiita_articles")

# summary_sdf: id, summary のみを持つDataFrame
# idで一致する行のsummaryを更新、なければ追加
(
    delta_table.alias("target")
    .merge(
        summary_sdf.alias("source"),
        "target.id = source.id"
    )
    .whenMatchedUpdate(set={"summary": "source.summary"})
    .whenNotMatchedInsert(values={"id": "source.id", "summary": "source.summary"})
    .execute()
)

print("summary列のupsertが完了しました。")

Qiita APIで既存記事を更新する処理の概要

Qiita APIを使用して既存の記事を更新するには、以下の手順を踏む必要があります。

  1. APIトークンの取得: Qiita APIを利用するためには、まずAPIトークンを取得する必要があります。これはQiitaのアカウント設定から取得できます。

  2. HTTPリクエストの準備: 記事を更新するためには、HTTP PATCHリクエストを使用します。このリクエストには、更新したい記事のIDと、更新内容(タイトル、本文、タグなど)を含める必要があります。

  3. リクエストの送信: 準備したリクエストをQiita APIのエンドポイントに送信します。エンドポイントはhttps://qiita.com/api/v2/items/{item_id}です。

  4. レスポンスの確認: APIからのレスポンスを確認し、更新が成功したかどうかを判断します。成功した場合は、更新された記事の情報が返されます。

  5. エラーハンドリング: 更新処理中にエラーが発生した場合は、適切にエラーハンドリングを行い、ユーザーにエラーメッセージを表示します。

このプロセスを通じて、Qiita上の既存の記事をプログラムから簡単に更新することができます。

# Qiita APIを使って既存記事を更新
import requests
import pprint

# 更新する記事のURL
url_items = 'https://qiita.com/api/v2/items/2af61d3fa992a589ec26'
# 認証ヘッダーの設定
headers = {'Authorization': 'Bearer <Qiita APIキー>'}

# 必要な変数から投稿本文を生成
default_summary_pdf = summary_pdf if 'summary_pdf' in locals() else None
default_trend_str = trend_str if 'trend_str' in locals() else None
default_table_code = table_code if 'table_code' in locals() else None
post_str = generate_qiita_post(default_summary_pdf, default_trend_str, default_table_code)

# 更新する記事のデータ
item_data = {
    'title': '生成AIによるDatabricks最新記事まとめと著者傾向分析(自動生成)',
    'body': post_str,
    'private': False,
    'tags': [{'name': 'Databricks'}],
    'coediting': False,
}

# Qiita APIにPATCHリクエストを送信
r_post = requests.patch(url_items, headers=headers, json=item_data)
# レスポンスの内容を表示
pprint.pprint(r_post.text)

毎日更新処理が実行されます。
Screenshot 2025-11-17 at 7.41.22.png

表形式でのまとめ。
Screenshot 2025-11-17 at 7.41.35.png

記事ごとの要約。
Screenshot 2025-11-17 at 7.41.49.png

はじめてのDatabricks

はじめてのDatabricks

Databricks無料トライアル

Databricks無料トライアル

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?