はじめに
Lambda(Python)を使用してQiitaの特定のタグのついた記事を取得し、Notion DBへ保存するサーバーレスアプリケーションを構築したので、その内容についてまとめます。
本アプリケーションを構築しようとした背景として以下が理由となります
- Lambda(Python)を使用したことがなかったので個人で学習してみたかったため
- EKS, Kubernetes関連のQiita記事で参考になる記事を読みたいが、検索から拾うのが面倒だったため、タグから自動で取得したかったため
- EventBridge Scheduler -> Lambda -> SNSの連携を業務上使用したことがなかったため
使用した技術
- AWS Lambda
- Amazon EventBridge Scheduler
- Amazon SNS
- Python (Runtime: 3.12)
Pythonコードの全体は以下にあります。
前提
- Pythonがインストールされていること (Lambdaのサーバーレス環境を使用しますが、外部ライブラリ(
notion_client
ライブラリ)をLambdaレイヤーに追加する必要があるためOSにPythonをインストールしpip intall
が必要なためです) - 使用端末がWindowsOSの場合、Git BashをインストールもしくはWSL2環境でLinuxコマンドが使用できること
- Notion Secretsキーが発行済みで、対象のNotionページに連携していること
- Notion DBに必要カラムが準備されていること(今回の場合はTitle, Tags, URL, Date, 作成者のカラム)
- Qiita アクセストークンが発行されていること
Notion Secretsキーの発行方法
Qiita アクセストークンの発行方法
Slack Webhook URLの発行方法
上記リンクからIncoming Webhook
の管理画面へ移動し、「チャンネルを選択」から通知したいチャンネルを選択し、Incoming Webhook
インテグレーションの追加をクリック。
インテグレーションの設定項目の中に Webhook URL
が表示されるため、このURLをコピーしておく。
標準ライブラリと外部ライブラリ
Pythonには標準ライブラリと外部ライブラリが存在します。
標準ライブラリはPythonインストールとともに自動的に提供されるライブラリです。
つまりサーバーレス環境で使用する場合、LambdaのRuntimeを指定してPythonの関数を作成した場合、Pythonをインストールしたのと同じ状況であることから、標準ライブラリはimport
さえしてしまえば使用できます。
逆に外部ライブラリは標準ライブラリには含まれていない特定の機能を使う外部で開発されたライブラリのことです。
標準ライブラリには含まれていないため、ローカルでインストールしたライブラリをZip化し、Lambdaのレイヤーとして含める必要があります。
構築
AWSリソースについては全てをIaC化しても良かったのですが、Lambdaなどはコンソールからでもすぐに作成できましたので一部IaC化、一部手動で作成しています。
notion_clientライブラリをLambdaレイヤーに追加
- notion_clientライブラリのインストール
※私の場合はGit Bashを使用して以下コマンドを実行しています。
mkdir python
cd python
# 作成したディレクトリにnotion-clientをインストール
pip install notion-client -t .
cd ../
# pythonディレクトリを再帰的にzipし、Lambdaレイヤーを作成
zip -r notion_layer.zip python
外部ライブラリをレイヤーに追加する場合、外部ライブラリを含むディレクトリ名はpython
である必要があります。
ライブラリを含めたpython
ディレクトリをzip化し、Lambdaレイヤーに追加します。
2.AWSコンソールからLambda -> レイヤー -> [レイヤーを作成]を押下。
3.名前、説明等は適宜入力し、.zipファイルをアップロードから作成したnotion_layer.zipをアップロードする。
互換性のあるアーキテクチャ、互換性のあるランタイムは適宜選択してください。
AWS CLIでレイヤーを作成する場合
aws lambda publish-layer-version \
--layer-name notion-layer-v3_12 \
--zip-file fileb://notion_layer.zip \ # .zipが存在するディレクトリで実行
--compatible-runtimes python3.12
main関数の作成
Lambdaの画面から[関数を作成] -> [一から作成] を選択し、関数名、ランタイム(Python3.12)、アーキテクチャを選択し、関数を作成
requestsライブラリをARN指定でレイヤーに追加
外部レイヤーを使用するのにローカルでインストールして、Zip化してレイヤーに追加って処理がちょっと面倒くさいな?と思い、もう少し良い方法はないかな、と思って探していたら以下記事でARNを指定すればレイヤー追加できるPythonのLambdaレイヤーを公開しているGitHubリポジトリが存在するとのことでこちらを使ってみました。
この記事にもある通り有名どころのライブラリはそろっているようなので、有名どころのrequests
ライブラリは存在していました。
詳細の使い方はGitHubのREADME.mdを見ていただくと良いかと思いますが、List of ARNs
から対象のPython Runtimeを選択すると、レイヤーのリージョンごとのリストが表示されるので、ここから対象リージョンのARNsのjson
, csv
, html
から確認します。
ブラウザで見る分にはhtml
から見た方が見やすいです。
上記で確認したARNをコピーしておき、レイヤーを追加します。
指定の方法は簡単で、対象のLambda関数の画面からレイヤーを追加で[ARN を指定] -> [ARN を指定]箇所でコピーしたARNを貼り付け[検証]を押下 -> ARNに誤りが無ければ、説明、互換性のあるランタイム、互換性のあるアーキテクチャが表示されるため、[追加]を押下。
main関数のPythonコードおよびその他設定
環境変数の定義
- NOTION_DB_ID: Notion DBのIDを指定
- NOTION_TOKEN: Notion Secretsキーの値を指定
- QIITA_TOKEN: Qiitaアクセストークンの値を指定
- SNS_TOPIC_ARN: arn:aws:sns:ap-northeast-1:${AWS::AccountId}:slack-notifications-lambda-notion # SNS_TOPIC_ARNについてはSNS構築後に反映します。
main関数のPythonコード
import os
import requests
from datetime import datetime, timedelta
from urllib.parse import urlparse
from notion_client import Client
import boto3
import json
# 環境変数から認証情報取得
QIITA_TOKEN = os.getenv('QIITA_TOKEN') # Qiitaのアクセストークン
NOTION_TOKEN = os.getenv('NOTION_TOKEN') # Notion Secretキー
NOTION_DB_ID = os.getenv('NOTION_DB_ID') # Notion DBキー
SNS_TOPIC_ARN = os.getenv('SNS_TOPIC_ARN') # SNSトピックARN
# AWSクライアント初期化(関数外で定義)
sns_client = boto3.client('sns')
def lambda_handler(event, context):
try:
# 1. Qiitaから記事取得
articles = fetch_qiita_articles()
# 2. Notionクライアント初期化
notion = Client(auth=NOTION_TOKEN)
# 3. 既存記事のURLを取得
existing_urls = get_existing_article_urls(notion)
# 4. 新規記事のみフィルタリング
new_articles = [
article for article in articles
if normalize_url(article["url"]) not in existing_urls
]
# 5. Notionに書き込み
results = []
for article in new_articles:
result = create_notion_page(notion, article)
results.append(result)
# 6. SNSで成功通知 (追加)
send_sns_notification(
status="SUCCESS",
message=f"{len(new_articles)}件の新規記事を登録しました",
details={
"processed": len(new_articles),
"skipped": len(articles) - len(new_articles)
}
)
return {
"statusCode": 200,
"body": {
"message": f"{len(new_articles)}件の新規記事を登録しました(スキップ: {len(articles) - len(new_articles)}件)",
"details": results
}
}
except Exception as e:
# 7. SNSで失敗通知 (追加)
send_sns_notification(
status="FAILED",
message="処理中にエラーが発生しました",
details={"error": str(e)}
)
raise e
def send_sns_notification(status, message, details):
"""SNS通知を送信"""
notification = {
"status": status,
"message": message,
"details": details,
"timestamp": datetime.now().isoformat()
}
print(f"SNSに送信: {notification}") # send_sns_notification()内に追加
sns_client.publish(
TopicArn=SNS_TOPIC_ARN,
Message=json.dumps(notification, ensure_ascii=False),
Subject=f"Qiita-Notion Sync {status}"
)
def get_existing_article_urls(notion):
"""NotionDBから既存記事のURL一覧を取得"""
existing_pages = notion.databases.query(
database_id=NOTION_DB_ID,
filter={"property": "URL", "url": {"is_not_empty": True}}
)
return {
normalize_url(page["properties"]["URL"]["url"])
for page in existing_pages["results"]
}
def normalize_url(url):
"""URLを正規化(クエリパラメータや末尾スラッシュを無視)"""
parsed = urlparse(url)
return f"{parsed.scheme}://{parsed.netloc}{parsed.path}".rstrip("/")
def fetch_qiita_articles():
"""QiitaからKubernetes/EKSタグの記事を取得"""
url = "https://qiita.com/api/v2/items"
# 検索条件(直近7日間、タグ指定)
params = {
"query": "tag:Kubernetes OR tag:EKS created:>={}".format(
(datetime.now() - timedelta(days=7)).strftime('%Y-%m-%d')
),
"per_page": 20 # 最大取得件数
}
headers = {}
if QIITA_TOKEN:
headers["Authorization"] = f"Bearer {QIITA_TOKEN}"
response = requests.get(url, headers=headers, params=params)
response.raise_for_status()
return response.json()
def create_notion_page(notion, article):
"""Notionにページ作成"""
# タグ情報を抽出
tags = [tag["name"] for tag in article.get("tags", [])]
# Notionプロパティ形式に変換
properties = {
"Title": {
"title": [
{
"text": {
"content": article["title"]
}
}
]
},
"URL": {
"url": article["url"]
},
"Date": {
"date": {
"start": article["created_at"] # Qiitaの記事作成日を設定
}
},
"Tags": {
"multi_select": [
{"name": tag} for tag in tags
]
},
"作成者": {
"rich_text": [
{
"text": {
"content": article["user"]["id"]
}
}
]
}
}
# Notionにページ作成
response = notion.pages.create(
parent={"database_id": NOTION_DB_ID},
properties=properties
)
print(f"Qiita記事作成日: {article['created_at']}")
print(f"Notionページ作成日: {response['created_time']}")
return {
"qiita_id": article["id"],
"notion_id": response["id"],
"title": article["title"]
}
main関数のリソースベースポリシー
${AWS::Region}
, ${AWS::AccountId}
については適宜自身の環境のリージョン、AWSアカウントIDに置き換えてください。
以下のリソースベースポリシーは、EventBridge Schedulerがmain関数を呼び出す許可を与えています。
{
"Version": "2012-10-17",
"Id": "default",
"Statement": [
{
"Sid": "eventbridge-scheduler",
"Effect": "Allow",
"Principal": {
"Service": "events.amazonaws.com"
},
"Action": "lambda:InvokeFunction",
"Resource": "arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:notion-connect",
"Condition": {
"ArnLike": {
"AWS:SourceArn": "arn:aws:scheduler:${AWS::Region}:${AWS::AccountId}:schedule/*"
}
}
}
]
}
main関数にアタッチしているロールのポリシー
デフォルトで作成されたロールに対して"Action": "sns:Publish"を追加し、SNSへPublishする権限を与えています。
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "logs:CreateLogGroup",
"Resource": "arn:aws:logs:ap-${AWS::Region}:${AWS::AccountId}:*"
},
{
"Effect": "Allow",
"Action": [
"logs:CreateLogStream",
"logs:PutLogEvents"
],
"Resource": [
"arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:/aws/lambda/notion-connect:*"
]
},
{
"Effect": "Allow",
"Action": "sns:Publish",
"Resource": "arn:aws:sns:${AWS::Region}:${AWS::AccountId}:slack-notifications-lambda-notion"
}
]
}
EventBridge SchedulerのCloudFormationデプロイ
Lambdaを定期実行するEventBridge Schedulerについてはロールと合わせて作成する必要がありますので、こちらはIac化しました。
以下ロール -> EventBridge Schedulerの順でCloudFormationテンプレートをデプロイします。
EventBridge SchedulerのCron
設定は毎週月曜日12時に定期実行されるように設定しているため、実行時間を変えたい場合は適宜ここを変更する
EventBridge Schedulerのロール
#=======================================================================
# EventBridge SchedulerがLambda関数を実行するためのロール
#=======================================================================
AWSTemplateFormatVersion: '2010-09-09'
Resources:
NotionLambdaRole:
Type: 'AWS::IAM::Role'
Properties:
RoleName: 'lambda-eventbridge-role'
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: 'Allow'
Principal:
Service: 'scheduler.amazonaws.com'
Action: 'sts:AssumeRole'
Policies:
- PolicyName: 'lambda-eventbridge-policy'
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: 'Allow'
Action:
- 'lambda:InvokeFunction'
Resource: '*'
EventBridge SchedulerのCloudFormationテンプレート
#=======================================================================
# Qiita APIを叩いて、NotionDBに連携するLambdaを定期実行
#=======================================================================
AWSTemplateFormatVersion: '2010-09-09'
Resources:
LambdaEventbridgeScheduler:
Type: AWS::Scheduler::Schedule
Properties:
Name: "eventbridge-scheduler-lambda-notion"
Description: "Qiita APIを叩いて対象タグの記事を取得しNotionDBに登録するLambdaを定期実行"
ScheduleExpression: "cron(0 12 ? * 2 *)" # 毎週月曜日12時に定期実行
ScheduleExpressionTimezone: "Asia/Tokyo"
Target:
Arn: !Sub arn:${AWS::Partition}:lambda:${AWS::Region}:${AWS::AccountId}:function:notion-connect
RoleArn: !Sub arn:${AWS::Partition}:iam::${AWS::AccountId}:role/lambda-eventbridge-role
FlexibleTimeWindow:
Mode: "OFF"
Slack通知用関数のPythonコードおよびその他設定
環境変数の定義
- SLACK_WEBHOOK_URL: Slack WebhookのURLを指定
レイヤー追加
main関数と同様にrequestsライブラリを追加
Slack通知用関数のPythonコード
import os
import json
import requests
SLACK_WEBHOOK_URL = os.getenv('SLACK_WEBHOOK_URL')
def lambda_handler(event, context):
try:
print(f"受信イベント: {event}") # lambda_handlerの先頭に追加
# SNSメッセージを解析
sns_message = json.loads(event['Records'][0]['Sns']['Message'])
# Slack用メッセージを生成
slack_msg = build_slack_message(sns_message)
# Slackに送信
response = requests.post(
SLACK_WEBHOOK_URL,
json=slack_msg,
timeout=3
)
response.raise_for_status()
return {"statusCode": 200}
except Exception as e:
print(f"Slack通知失敗: {str(e)}")
# 必要に応じてCloudWatch LogsやDead Letter Queueへ
raise e
def build_slack_message(sns_msg):
"""SNSメッセージからSlack用ブロックを生成"""
if sns_msg["status"] == "SUCCESS":
return {
"blocks": [
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": f"✅ *同期成功*: {sns_msg['message']}"
}
},
{
"type": "section",
"fields": [
{
"type": "mrkdwn",
"text": f"*処理日時*\n{sns_msg['timestamp']}"
},
{
"type": "mrkdwn",
"text": f"*詳細*\n{sns_msg['details']}"
}
]
}
]
}
else:
return {
"blocks": [
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": f"❌ *同期失敗*: {sns_msg['message']}"
}
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": f"```{sns_msg['details']['error']}```"
}
}
]
}
Slack通知用Lambdaのリソースベースポリシー
${AWS::Region}
, ${AWS::AccountId}
については適宜自身の環境のリージョン、AWSアカウントIDに置き換えてください。
以下のリソースベースポリシーは、SNSがSlack通知用関数を呼び出す許可を与えています。
{
"Version": "2012-10-17",
"Id": "default",
"Statement": [
{
"Sid": "sns-${AWS::Region}-${AWS::AccountId}-slack-notifications-lambda-notion",
"Effect": "Allow",
"Principal": {
"Service": "sns.amazonaws.com"
},
"Action": "lambda:InvokeFunction",
"Resource": "arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:slack-notification",
"Condition": {
"ArnLike": {
"AWS:SourceArn": "arn:aws:sns:${AWS::Region}:${AWS::AccountId}:slack-notifications-lambda-notion"
}
}
}
]
}
SNSトピック/サブスクリプションの作成
SNSではメッセージ送信元をPublisher
, メッセージ受信側をSubscriber
と呼びますが、
今回のSNSの役目はLambdaのmain関数がQiitaから情報を取得し、何件の記事を取得できたか?を後続のSlack通知用関数に届ける橋渡しをするために使用します。
役割は以下となります。
Publisher
: main関数
Subscriber
: Slack通知用関数
SNSトピックの役割が理解できたところで、コンソールからSNSトピックを作成します。
タイプはスタンダードとし、トピックを作成します。
作成したSNSトピックのサブスクリプションを作成します。
これでnotion-connect Lambda
がメッセージをSNSへPublish
し、SNSが受け取ったメッセージをslack-notification Lambda
にSubscribe
することで、slack-notification Lambda
内で記述しているSlack Webhook URL
に向けてメッセージを送信することができます。
実際にメッセージが通知されると、以下のようにSlackに通知されます。
Notionには以下のように登録されます。
最後に
Lambda初心者がサーバーレスアプリを作成してみましたが、Pythonのコードは生成AIを活用してコード生成してもらっています。
Pythonはまだまだ勉強中のため、理解を深めていきたいと思います。
Pythonスクレイピングに関する初心者向けの本は以下が非常に優しく解説しています。