4
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

【AWS】Backlog Webhook 駆動で動くエージェントをCDKとBedrock AgentCore Runtimeで実装した

4
Last updated at Posted at 2026-02-25

どうもこんにちは。
今回はBacklogでユーザがBot宛にメンション付きコメントを送信した時のWebhook駆動でエージェントを動かし、Backlogにコメントを返す仕組みを実装しました。

構成

Backlog
  |
  | Webhook (POST)
  v
Amazon API Gateway
  |
  v
Lambda: webhook_receiver
  |  ・メンション宛先チェック → Botユーザ宛ではない場合にはスキップ
  v
Amazon SQS
  |
  v
Lambda: agent_invoker
  |  ・AgentCore Runtime を呼び出しエージェントに回答を生成させる
  |  ・生成した回答を Backlog REST API でコメント投稿
  v
Bedrock AgentCore Runtime
  |  ・Strands Agent + backlog-mcp-server
  |  ・Backlog の課題情報を参照して回答テキストを生成
  v
Backlog API(コメント投稿)

技術スタック

  • CDK: Python
  • Lambda: Python
  • Amazon SQS
  • Amazon API Gateway
  • Amazon Bedrock AgentCore Runtime
  • Strands Agents(エージェントフレームワーク)
  • backlog-mcp-server(Backlog MCP サーバ、npx で起動)

実装手順

0. 前提条件

  • Bedrock AgentCore RuntimeへのエージェントのデプロイはCDKではやっていません
    • ただし、Lambdaから呼び出しているエージェントは以下の記事で実装したエージェントです

1. CDKプロジェクトを作成

mkdir backlog-agent-cdk
cd backlog-agent-cdk
cdk init app --language python
source .venv/bin/activate
pip install aws-cdk-lib constructs

2. スタックを実装

backlog_webhook_stack/stack.py にスタックを定義します。

import aws_cdk as cdk
from aws_cdk import (
    Stack,
    aws_sqs as sqs,
    aws_lambda as lambda_,
    aws_apigateway as apigateway,
    aws_lambda_event_sources as event_sources,
    aws_iam as iam,
    Duration,
)
from constructs import Construct

AGENT_RUNTIME_ARN = "arn:aws:bedrock-agentcore:ap-northeast-1:123456789012:runtime/your-agent-XXXXXXXXXX"


class BacklogWebhookStack(Stack):
    def __init__(self, scope: Construct, construct_id: str, backlog_api_key: str, backlog_user_id: str = "", backlog_domain: str = "example.backlog.jp", **kwargs):
        super().__init__(scope, construct_id, **kwargs)

        # --- SQS DLQ ---
        dlq = sqs.Queue(
            self, "BacklogWebhookDLQ",
            queue_name="BacklogWebhookDLQ",
            retention_period=Duration.days(14),
        )

        # --- SQS メインキュー ---
        queue = sqs.Queue(
            self, "BacklogWebhookQueue",
            queue_name="BacklogWebhookQueue",
            visibility_timeout=Duration.seconds(360),
            dead_letter_queue=sqs.DeadLetterQueue(
                max_receive_count=3,
                queue=dlq,
            ),
        )

        # --- Lambda 1: Webhook Receiver ---
        webhook_fn = lambda_.Function(
            self, "WebhookReceiver",
            function_name="BacklogWebhookReceiver",
            runtime=lambda_.Runtime.PYTHON_3_12,
            handler="lambda_function.lambda_handler",
            code=lambda_.Code.from_asset("lambda/webhook_receiver"),
            timeout=Duration.seconds(30),
            environment={
                "SQS_QUEUE_URL": queue.queue_url,
                "BACKLOG_USER_ID": backlog_user_id,
            },
        )
        queue.grant_send_messages(webhook_fn)

        # --- API Gateway ---
        api = apigateway.RestApi(
            self, "BacklogWebhookApi",
            rest_api_name="BacklogWebhookApi",
            deploy_options=apigateway.StageOptions(stage_name="prod"),
        )
        api.root.add_method(
            "POST",
            apigateway.LambdaIntegration(webhook_fn, proxy=True),
        )

        # --- Lambda 2: Agent Invoker ---
        agent_invoker_fn = lambda_.Function(
            self, "AgentInvoker",
            function_name="BacklogAgentInvoker",
            runtime=lambda_.Runtime.PYTHON_3_12,
            handler="lambda_function.lambda_handler",
            code=lambda_.Code.from_asset("lambda/agent_invoker"),
            timeout=Duration.seconds(300),
            environment={
                "AGENT_RUNTIME_ARN": AGENT_RUNTIME_ARN,
                "BACKLOG_API_KEY": backlog_api_key,
                "BACKLOG_DOMAIN": backlog_domain,
            },
        )

        # IAM: bedrock-agentcore:InvokeAgentRuntime
        agent_invoker_fn.add_to_role_policy(
            iam.PolicyStatement(
                actions=["bedrock-agentcore:InvokeAgentRuntime"],
                resources=[AGENT_RUNTIME_ARN, f"{AGENT_RUNTIME_ARN}/*"],
            )
        )

        # SQS イベントソース
        agent_invoker_fn.add_event_source(
            event_sources.SqsEventSource(
                queue,
                batch_size=1,
                report_batch_item_failures=True,
            )
        )

        # --- Outputs ---
        cdk.CfnOutput(
            self, "ApiEndpoint",
            value=api.url,
            description="Backlog Webhook の登録先 URL (POST /prod/)",
        )
        cdk.CfnOutput(
            self, "DLQUrl",
            value=dlq.queue_url,
            description="Dead Letter Queue URL",
        )

エントリポイントの app.py からコンテキスト経由でパラメータを渡します。

import aws_cdk as cdk
from backlog_webhook_stack.stack import BacklogWebhookStack

app = cdk.App()

backlog_api_key = app.node.try_get_context("backlog_api_key") or ""
backlog_user_id = app.node.try_get_context("backlog_user_id") or ""
backlog_domain  = app.node.try_get_context("backlog_domain") or "example.backlog.jp"

BacklogWebhookStack(app, "BacklogWebhookStack",
    backlog_api_key=backlog_api_key,
    backlog_user_id=backlog_user_id,
    backlog_domain=backlog_domain,
)

app.synth()

3. Lambda関数のコードを実装

Lambda 1: Webhook Receiver

lambda/webhook_receiver/lambda_function.py

Backlog から届いた Webhook を受け取り、Bot 自身のコメントでなく、かつ Bot へのメンションが含まれる場合のみ SQS に転送します。

import json
import boto3
import os

sqs = boto3.client('sqs')

BACKLOG_USER_ID = os.environ.get('BACKLOG_USER_ID', '')


def _unique_id(user: dict) -> str:
    """Backlog ユーザーの一意な識別子を返す(nulabAccount.uniqueId 優先)"""
    return user.get('nulabAccount', {}).get('uniqueId') or user.get('userId') or ''


def is_bot_comment(body: dict) -> bool:
    """コメントの投稿者がボット自身かどうかを確認する(ループ防止)"""
    if not BACKLOG_USER_ID:
        return False
    created_user_id = _unique_id(body.get('createdUser', {}))
    return created_user_id == BACKLOG_USER_ID


def is_mentioned(body: dict) -> bool:
    """APIキーのユーザ (BACKLOG_USER_ID) がコメントのメンション対象かどうかを確認する"""
    if not BACKLOG_USER_ID:
        return True  # 未設定の場合はすべて通す

    # notifications はトップレベルに存在する
    notifications = body.get('notifications', [])
    result = any(
        _unique_id(n.get('user', {})) == BACKLOG_USER_ID and n.get('reason') == 2
        for n in notifications
    )
    return result


def lambda_handler(event, context):
    queue_url = os.environ['SQS_QUEUE_URL']
    body_str = event.get('body', '{}')

    try:
        body = json.loads(body_str)
    except json.JSONDecodeError:
        return {'statusCode': 400, 'body': json.dumps({'status': 'invalid json'})}

    if is_bot_comment(body):
        return {'statusCode': 200, 'body': json.dumps({'status': 'skipped (bot comment)'})}

    if not is_mentioned(body):
        return {'statusCode': 200, 'body': json.dumps({'status': 'skipped'})}

    sqs.send_message(QueueUrl=queue_url, MessageBody=body_str)
    return {'statusCode': 200, 'body': json.dumps({'status': 'ok'})}

Lambda 2: Agent Invoker

lambda/agent_invoker/lambda_function.py

SQS メッセージを受け取り、AgentCore Runtime を呼び出してエージェントに回答を生成させ、Backlog REST API でコメントを投稿します。

import json
import re
import urllib.request
import boto3
import os
import logging

logger = logging.getLogger()
logger.setLevel(logging.INFO)

AGENT_RUNTIME_ARN = os.environ['AGENT_RUNTIME_ARN']
BACKLOG_API_KEY = os.environ['BACKLOG_API_KEY']
BACKLOG_DOMAIN = os.environ.get('BACKLOG_DOMAIN', 'example.backlog.jp')


def lambda_handler(event, context):
    batch_item_failures = []

    for record in event['Records']:
        message_id = record['messageId']
        try:
            process_record(record)
        except Exception as e:
            logger.error(f"Failed to process message {message_id}: {e}", exc_info=True)
            batch_item_failures.append({'itemIdentifier': message_id})

    return {'batchItemFailures': batch_item_failures}


def invoke_agent(prompt: str, message_id: str) -> str:
    """AgentRuntime を呼び出して回答テキストを返す"""
    client = boto3.client('bedrock-agentcore', region_name='ap-northeast-1')

    payload = {
        "input": {"prompt": prompt},
        "api_data": {"backlog_api_key": BACKLOG_API_KEY},
    }

    response = client.invoke_agent_runtime(
        agentRuntimeArn=AGENT_RUNTIME_ARN,
        qualifier='DEFAULT',
        runtimeSessionId=f"sqs-{message_id}",
        payload=json.dumps(payload).encode('utf-8'),
    )

    body_text = response['response'].read().decode('utf-8')
    logger.info(f"Agent raw response: {body_text}")

    return json.loads(body_text)['result']


def strip_emoji(text: str) -> str:
    """Backlog API が受け付けない非BMP文字(絵文字等)を除去する"""
    return re.sub(r'[\U00010000-\U0010FFFF]', '', text)


def post_backlog_comment(issue_key: str, content: str) -> None:
    """Backlog 課題にコメントを投稿する"""
    url = f"https://{BACKLOG_DOMAIN}/api/v2/issues/{issue_key}/comments?apiKey={BACKLOG_API_KEY}"
    # urlencode は非ASCII文字(日本語・絵文字)をパーセントエンコードするが
    # Backlog API は生の UTF-8 バイト列を要求するため手動でエスケープする
    safe_content = content.replace('%', '%25').replace('&', '%26').replace('=', '%3D')
    data = f"content={safe_content}".encode('utf-8')
    req = urllib.request.Request(url, data=data, method='POST',
                                 headers={'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8'})
    try:
        with urllib.request.urlopen(req) as res:
            logger.info(f"Backlog comment posted: status={res.status}")
    except urllib.error.HTTPError as e:
        logger.error(f"Backlog API error: status={e.code}, body={e.read().decode('utf-8')}")
        raise


def process_record(record):
    message_id = record['messageId']
    body = json.loads(record['body'])

    project_key = body['project']['projectKey']
    content = body['content']
    key_id = content['key_id']
    summary = content.get('summary', '')
    comment_content = content.get('comment', {}).get('content', '')
    commenter_id = body.get('createdUser', {}).get('id')

    issue_key = f"{project_key}-{key_id}"

    prompt = (
        f"以下のBacklog課題にコメントが届きました。\n"
        f"課題キー: {issue_key}\n"
        f"課題タイトル: {summary}\n"
        f"コメント内容: {comment_content}\n\n"
        f"上記のコメントに対して適切に回答してください。"
    )

    logger.info(f"Invoking agent for issue {issue_key}")
    result_text = invoke_agent(prompt, message_id)

    safe_text = strip_emoji(result_text)
    comment = f"<@U{commenter_id}> \n{safe_text}" if commenter_id else safe_text

    logger.info(f"Posting comment to Backlog issue {issue_key}")
    post_backlog_comment(issue_key, comment)

    logger.info(f"Completed processing for issue {issue_key}")

4. デプロイ

cdk bootstrap  # 初回のみ
cdk deploy \
  --context backlog_api_key=YOUR_API_KEY \
  --context backlog_user_id=YOUR_NULAB_UNIQUE_ID \
  --context backlog_domain=example.backlog.jp

デプロイ後、Outputs に表示される ApiEndpoint の URL を Backlog の Webhook 設定に登録します。

工夫したところ

Backlog Webhookで送信した後、応答がないと繰り返しWebhookが送信される

Backlog Webhookには以下のような仕様が存在します。

  • Webhookを送信してから10秒で200HTTPステータスが返却されなかった場合、タイムアウトエラーとなる
  • 初回Webhook送信に失敗 → 5分後リトライ
  • 2回目Webhook送信に失敗 → 10分後リトライ
  • 3回目Webhook送信に失敗 → 20分後リトライ

そのため、10秒以内に200 HTTP ステータスを返却するLambdaとエージェントを呼び出すLambdaをわけ、それらの処理の譲渡のためにSQSを挟むことで、Webhookのタイムアウト&ループを回避しています。

Backlogへのコメント送信でBacklog MCPを使用しないようにする

エージェントが複数回MCPを使用してコメントが複数個送信されないように、コメント送信ロジックはLambda側に実装しました。

ループ防止(Bot自身のコメントに反応しない)

エージェントが Backlog にコメントを投稿すると、そのコメントにも Webhook が発火します。何も対策しないと Bot が自分自身のコメントに反応し続けて無限ループになります。

webhook_receiver Lambda でコメント投稿者の ID を確認し、Bot 自身(BACKLOG_USER_ID)が投稿したコメントは SQS に転送せずスキップすることでループを防止しています。

def is_bot_comment(body: dict) -> bool:
    if not BACKLOG_USER_ID:
        return False
    created_user_id = _unique_id(body.get('createdUser', {}))
    return created_user_id == BACKLOG_USER_ID

BACKLOG_USER_ID には Backlog の nulabAccount.uniqueId を設定します。CDK の --context backlog_user_id=... で渡します。

Backlog APIの文字コード問題

Backlog の REST API(POST /api/v2/issues/{issueKey}/comments)にコメントを投稿する際、Python の urllib.parse.urlencode を使うと日本語がパーセントエンコードされてしまい、Backlog 側で文字化けが発生します。Backlog API は application/x-www-form-urlencoded の body を生の UTF-8 バイト列で受け付けるためです。

そのため、コンテンツを手動でエスケープしてから .encode('utf-8') でバイト列に変換しています。エスケープ対象は %&= の3文字のみ(フォームの区切り文字として解釈されるもの)です。

safe_content = content.replace('%', '%25').replace('&', '%26').replace('=', '%3D')
data = f"content={safe_content}".encode('utf-8')

また、LLM が生成した回答には絵文字(非BMP文字、U+10000以上)が含まれることがあります。Backlog API はこれらを受け付けないため、投稿前に正規表現で除去しています。

def strip_emoji(text: str) -> str:
    return re.sub(r'[\U00010000-\U0010FFFF]', '', text)

まとめ

API Gateway → Lambda → SQS → Lambda の構成はWebhookからエージェントに飛ばす常套手段な気がするので、CDKでテンプレ化しておいても良いかもしれないです!

Slackにチャットボット実装した時も上記の構成でやりました!

UIの構築は必要ないので、スモールスタートという点ではやりやすいはず!

以上

4
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
4
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?