どうもこんにちは。
今回はBacklogでユーザがBot宛にメンション付きコメントを送信した時のWebhook駆動でエージェントを動かし、Backlogにコメントを返す仕組みを実装しました。
構成
Backlog
|
| Webhook (POST)
v
Amazon API Gateway
|
v
Lambda: webhook_receiver
| ・メンション宛先チェック → Botユーザ宛ではない場合にはスキップ
v
Amazon SQS
|
v
Lambda: agent_invoker
| ・AgentCore Runtime を呼び出しエージェントに回答を生成させる
| ・生成した回答を Backlog REST API でコメント投稿
v
Bedrock AgentCore Runtime
| ・Strands Agent + backlog-mcp-server
| ・Backlog の課題情報を参照して回答テキストを生成
v
Backlog API(コメント投稿)
技術スタック
- CDK: Python
- Lambda: Python
- Amazon SQS
- Amazon API Gateway
- Amazon Bedrock AgentCore Runtime
- Strands Agents(エージェントフレームワーク)
- backlog-mcp-server(Backlog MCP サーバ、npx で起動)
実装手順
0. 前提条件
- Bedrock AgentCore RuntimeへのエージェントのデプロイはCDKではやっていません
- ただし、Lambdaから呼び出しているエージェントは以下の記事で実装したエージェントです
1. CDKプロジェクトを作成
mkdir backlog-agent-cdk
cd backlog-agent-cdk
cdk init app --language python
source .venv/bin/activate
pip install aws-cdk-lib constructs
2. スタックを実装
backlog_webhook_stack/stack.py にスタックを定義します。
import aws_cdk as cdk
from aws_cdk import (
Stack,
aws_sqs as sqs,
aws_lambda as lambda_,
aws_apigateway as apigateway,
aws_lambda_event_sources as event_sources,
aws_iam as iam,
Duration,
)
from constructs import Construct
AGENT_RUNTIME_ARN = "arn:aws:bedrock-agentcore:ap-northeast-1:123456789012:runtime/your-agent-XXXXXXXXXX"
class BacklogWebhookStack(Stack):
def __init__(self, scope: Construct, construct_id: str, backlog_api_key: str, backlog_user_id: str = "", backlog_domain: str = "example.backlog.jp", **kwargs):
super().__init__(scope, construct_id, **kwargs)
# --- SQS DLQ ---
dlq = sqs.Queue(
self, "BacklogWebhookDLQ",
queue_name="BacklogWebhookDLQ",
retention_period=Duration.days(14),
)
# --- SQS メインキュー ---
queue = sqs.Queue(
self, "BacklogWebhookQueue",
queue_name="BacklogWebhookQueue",
visibility_timeout=Duration.seconds(360),
dead_letter_queue=sqs.DeadLetterQueue(
max_receive_count=3,
queue=dlq,
),
)
# --- Lambda 1: Webhook Receiver ---
webhook_fn = lambda_.Function(
self, "WebhookReceiver",
function_name="BacklogWebhookReceiver",
runtime=lambda_.Runtime.PYTHON_3_12,
handler="lambda_function.lambda_handler",
code=lambda_.Code.from_asset("lambda/webhook_receiver"),
timeout=Duration.seconds(30),
environment={
"SQS_QUEUE_URL": queue.queue_url,
"BACKLOG_USER_ID": backlog_user_id,
},
)
queue.grant_send_messages(webhook_fn)
# --- API Gateway ---
api = apigateway.RestApi(
self, "BacklogWebhookApi",
rest_api_name="BacklogWebhookApi",
deploy_options=apigateway.StageOptions(stage_name="prod"),
)
api.root.add_method(
"POST",
apigateway.LambdaIntegration(webhook_fn, proxy=True),
)
# --- Lambda 2: Agent Invoker ---
agent_invoker_fn = lambda_.Function(
self, "AgentInvoker",
function_name="BacklogAgentInvoker",
runtime=lambda_.Runtime.PYTHON_3_12,
handler="lambda_function.lambda_handler",
code=lambda_.Code.from_asset("lambda/agent_invoker"),
timeout=Duration.seconds(300),
environment={
"AGENT_RUNTIME_ARN": AGENT_RUNTIME_ARN,
"BACKLOG_API_KEY": backlog_api_key,
"BACKLOG_DOMAIN": backlog_domain,
},
)
# IAM: bedrock-agentcore:InvokeAgentRuntime
agent_invoker_fn.add_to_role_policy(
iam.PolicyStatement(
actions=["bedrock-agentcore:InvokeAgentRuntime"],
resources=[AGENT_RUNTIME_ARN, f"{AGENT_RUNTIME_ARN}/*"],
)
)
# SQS イベントソース
agent_invoker_fn.add_event_source(
event_sources.SqsEventSource(
queue,
batch_size=1,
report_batch_item_failures=True,
)
)
# --- Outputs ---
cdk.CfnOutput(
self, "ApiEndpoint",
value=api.url,
description="Backlog Webhook の登録先 URL (POST /prod/)",
)
cdk.CfnOutput(
self, "DLQUrl",
value=dlq.queue_url,
description="Dead Letter Queue URL",
)
エントリポイントの app.py からコンテキスト経由でパラメータを渡します。
import aws_cdk as cdk
from backlog_webhook_stack.stack import BacklogWebhookStack
app = cdk.App()
backlog_api_key = app.node.try_get_context("backlog_api_key") or ""
backlog_user_id = app.node.try_get_context("backlog_user_id") or ""
backlog_domain = app.node.try_get_context("backlog_domain") or "example.backlog.jp"
BacklogWebhookStack(app, "BacklogWebhookStack",
backlog_api_key=backlog_api_key,
backlog_user_id=backlog_user_id,
backlog_domain=backlog_domain,
)
app.synth()
3. Lambda関数のコードを実装
Lambda 1: Webhook Receiver
lambda/webhook_receiver/lambda_function.py
Backlog から届いた Webhook を受け取り、Bot 自身のコメントでなく、かつ Bot へのメンションが含まれる場合のみ SQS に転送します。
import json
import boto3
import os
sqs = boto3.client('sqs')
BACKLOG_USER_ID = os.environ.get('BACKLOG_USER_ID', '')
def _unique_id(user: dict) -> str:
"""Backlog ユーザーの一意な識別子を返す(nulabAccount.uniqueId 優先)"""
return user.get('nulabAccount', {}).get('uniqueId') or user.get('userId') or ''
def is_bot_comment(body: dict) -> bool:
"""コメントの投稿者がボット自身かどうかを確認する(ループ防止)"""
if not BACKLOG_USER_ID:
return False
created_user_id = _unique_id(body.get('createdUser', {}))
return created_user_id == BACKLOG_USER_ID
def is_mentioned(body: dict) -> bool:
"""APIキーのユーザ (BACKLOG_USER_ID) がコメントのメンション対象かどうかを確認する"""
if not BACKLOG_USER_ID:
return True # 未設定の場合はすべて通す
# notifications はトップレベルに存在する
notifications = body.get('notifications', [])
result = any(
_unique_id(n.get('user', {})) == BACKLOG_USER_ID and n.get('reason') == 2
for n in notifications
)
return result
def lambda_handler(event, context):
queue_url = os.environ['SQS_QUEUE_URL']
body_str = event.get('body', '{}')
try:
body = json.loads(body_str)
except json.JSONDecodeError:
return {'statusCode': 400, 'body': json.dumps({'status': 'invalid json'})}
if is_bot_comment(body):
return {'statusCode': 200, 'body': json.dumps({'status': 'skipped (bot comment)'})}
if not is_mentioned(body):
return {'statusCode': 200, 'body': json.dumps({'status': 'skipped'})}
sqs.send_message(QueueUrl=queue_url, MessageBody=body_str)
return {'statusCode': 200, 'body': json.dumps({'status': 'ok'})}
Lambda 2: Agent Invoker
lambda/agent_invoker/lambda_function.py
SQS メッセージを受け取り、AgentCore Runtime を呼び出してエージェントに回答を生成させ、Backlog REST API でコメントを投稿します。
import json
import re
import urllib.request
import boto3
import os
import logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
AGENT_RUNTIME_ARN = os.environ['AGENT_RUNTIME_ARN']
BACKLOG_API_KEY = os.environ['BACKLOG_API_KEY']
BACKLOG_DOMAIN = os.environ.get('BACKLOG_DOMAIN', 'example.backlog.jp')
def lambda_handler(event, context):
batch_item_failures = []
for record in event['Records']:
message_id = record['messageId']
try:
process_record(record)
except Exception as e:
logger.error(f"Failed to process message {message_id}: {e}", exc_info=True)
batch_item_failures.append({'itemIdentifier': message_id})
return {'batchItemFailures': batch_item_failures}
def invoke_agent(prompt: str, message_id: str) -> str:
"""AgentRuntime を呼び出して回答テキストを返す"""
client = boto3.client('bedrock-agentcore', region_name='ap-northeast-1')
payload = {
"input": {"prompt": prompt},
"api_data": {"backlog_api_key": BACKLOG_API_KEY},
}
response = client.invoke_agent_runtime(
agentRuntimeArn=AGENT_RUNTIME_ARN,
qualifier='DEFAULT',
runtimeSessionId=f"sqs-{message_id}",
payload=json.dumps(payload).encode('utf-8'),
)
body_text = response['response'].read().decode('utf-8')
logger.info(f"Agent raw response: {body_text}")
return json.loads(body_text)['result']
def strip_emoji(text: str) -> str:
"""Backlog API が受け付けない非BMP文字(絵文字等)を除去する"""
return re.sub(r'[\U00010000-\U0010FFFF]', '', text)
def post_backlog_comment(issue_key: str, content: str) -> None:
"""Backlog 課題にコメントを投稿する"""
url = f"https://{BACKLOG_DOMAIN}/api/v2/issues/{issue_key}/comments?apiKey={BACKLOG_API_KEY}"
# urlencode は非ASCII文字(日本語・絵文字)をパーセントエンコードするが
# Backlog API は生の UTF-8 バイト列を要求するため手動でエスケープする
safe_content = content.replace('%', '%25').replace('&', '%26').replace('=', '%3D')
data = f"content={safe_content}".encode('utf-8')
req = urllib.request.Request(url, data=data, method='POST',
headers={'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8'})
try:
with urllib.request.urlopen(req) as res:
logger.info(f"Backlog comment posted: status={res.status}")
except urllib.error.HTTPError as e:
logger.error(f"Backlog API error: status={e.code}, body={e.read().decode('utf-8')}")
raise
def process_record(record):
message_id = record['messageId']
body = json.loads(record['body'])
project_key = body['project']['projectKey']
content = body['content']
key_id = content['key_id']
summary = content.get('summary', '')
comment_content = content.get('comment', {}).get('content', '')
commenter_id = body.get('createdUser', {}).get('id')
issue_key = f"{project_key}-{key_id}"
prompt = (
f"以下のBacklog課題にコメントが届きました。\n"
f"課題キー: {issue_key}\n"
f"課題タイトル: {summary}\n"
f"コメント内容: {comment_content}\n\n"
f"上記のコメントに対して適切に回答してください。"
)
logger.info(f"Invoking agent for issue {issue_key}")
result_text = invoke_agent(prompt, message_id)
safe_text = strip_emoji(result_text)
comment = f"<@U{commenter_id}> \n{safe_text}" if commenter_id else safe_text
logger.info(f"Posting comment to Backlog issue {issue_key}")
post_backlog_comment(issue_key, comment)
logger.info(f"Completed processing for issue {issue_key}")
4. デプロイ
cdk bootstrap # 初回のみ
cdk deploy \
--context backlog_api_key=YOUR_API_KEY \
--context backlog_user_id=YOUR_NULAB_UNIQUE_ID \
--context backlog_domain=example.backlog.jp
デプロイ後、Outputs に表示される ApiEndpoint の URL を Backlog の Webhook 設定に登録します。
工夫したところ
Backlog Webhookで送信した後、応答がないと繰り返しWebhookが送信される
Backlog Webhookには以下のような仕様が存在します。
- Webhookを送信してから10秒で200HTTPステータスが返却されなかった場合、タイムアウトエラーとなる
- 初回Webhook送信に失敗 → 5分後リトライ
- 2回目Webhook送信に失敗 → 10分後リトライ
- 3回目Webhook送信に失敗 → 20分後リトライ
そのため、10秒以内に200 HTTP ステータスを返却するLambdaとエージェントを呼び出すLambdaをわけ、それらの処理の譲渡のためにSQSを挟むことで、Webhookのタイムアウト&ループを回避しています。
Backlogへのコメント送信でBacklog MCPを使用しないようにする
エージェントが複数回MCPを使用してコメントが複数個送信されないように、コメント送信ロジックはLambda側に実装しました。
ループ防止(Bot自身のコメントに反応しない)
エージェントが Backlog にコメントを投稿すると、そのコメントにも Webhook が発火します。何も対策しないと Bot が自分自身のコメントに反応し続けて無限ループになります。
webhook_receiver Lambda でコメント投稿者の ID を確認し、Bot 自身(BACKLOG_USER_ID)が投稿したコメントは SQS に転送せずスキップすることでループを防止しています。
def is_bot_comment(body: dict) -> bool:
if not BACKLOG_USER_ID:
return False
created_user_id = _unique_id(body.get('createdUser', {}))
return created_user_id == BACKLOG_USER_ID
BACKLOG_USER_ID には Backlog の nulabAccount.uniqueId を設定します。CDK の --context backlog_user_id=... で渡します。
Backlog APIの文字コード問題
Backlog の REST API(POST /api/v2/issues/{issueKey}/comments)にコメントを投稿する際、Python の urllib.parse.urlencode を使うと日本語がパーセントエンコードされてしまい、Backlog 側で文字化けが発生します。Backlog API は application/x-www-form-urlencoded の body を生の UTF-8 バイト列で受け付けるためです。
そのため、コンテンツを手動でエスケープしてから .encode('utf-8') でバイト列に変換しています。エスケープ対象は %・&・= の3文字のみ(フォームの区切り文字として解釈されるもの)です。
safe_content = content.replace('%', '%25').replace('&', '%26').replace('=', '%3D')
data = f"content={safe_content}".encode('utf-8')
また、LLM が生成した回答には絵文字(非BMP文字、U+10000以上)が含まれることがあります。Backlog API はこれらを受け付けないため、投稿前に正規表現で除去しています。
def strip_emoji(text: str) -> str:
return re.sub(r'[\U00010000-\U0010FFFF]', '', text)
まとめ
API Gateway → Lambda → SQS → Lambda の構成はWebhookからエージェントに飛ばす常套手段な気がするので、CDKでテンプレ化しておいても良いかもしれないです!
Slackにチャットボット実装した時も上記の構成でやりました!
UIの構築は必要ないので、スモールスタートという点ではやりやすいはず!
以上