本記事の概要
AWSやGCPなどのインフラサービスを使わずに、Supabase だけでバックエンドを構築できれば、開発がより簡単になります。
しかし、少し高度なサービスを検討する際、SQSのようなメッセージキューイングが必要になるケースがあります。
実際、SupabaseのコミュニティのDiscussionでも、Amazon Simple Queue Serviceに相当する機能への要望が挙げられています。
https://github.com/orgs/supabase/discussions/8508
このDiscussionで提案されていた pgmq(Postgres Message Queue) というサービスが、
Supabase上で利用できる可能性があることがわかったため、実際に検証してみました。
2024/12/5に正式にextensionで利用できるようになりました!
https://supabase.com/blog/supabase-queues
ということで、そちらを試した記事も執筆したので良ければこちらの方もご参照ください
https://qiita.com/komishinnn/items/30aa947fbd9bc2163490
pgmqとは?
pgmqは、その名の通りPostgreSQLをベースにしたメッセージキューイングシステムです。
主な特徴は以下の通りです:
- PostgreSQLのテーブルを使ってメッセージを管理されるので、マネージドに寄せられる
- AWS SQSやRSMQと同等のAPI
- メッセージは明示的に消されない限り残り続ける
Supabaseを活用する環境において、他のインフラサービスへの依存を避けたいケースでは、pgmqが有力な選択肢となり得ます。
Supabaseにpgmqを組み込むには?
執筆時点はExtensionにまだpgmqは含まれておりません。
https://supabase.com/docs/guides/database/extensions
しかし、Supabaseのコミュニティが開発しているdbdevを利用することで、pgmqを導入できます。
以下の手順に従って、まずdbdevをインストールします
https://supabase.github.io/dbdev/install-in-db-client/
続いて、dbdevを使ってpgmqをinstallします。
https://database.dev/plpgsql/pgmq
install |
---|
true |
インストール後、schemaにpgmqが正常に登録されたことを確認できます
SELECT EXISTS (
SELECT 1
FROM information_schema.schemata
WHERE schema_name = 'pgmq'
);
-- true
使ってみる
公式のSQL Examplesをやってみます
https://github.com/tembo-io/pgmq?tab=readme-ov-file#sql-examples
-- creates the queue
SELECT pgmq.create('my_queue');
キューが作られたので、このキューにメッセージを送ってみます
-- messages are sent as JSON
SELECT * from pgmq.send(
queue_name => 'my_queue',
msg => '{"foo": "bar1"}'
);
delay optionを付けてもう一個送ります。
SELECT * from pgmq.send(
queue_name => 'my_queue',
msg => '{"foo": "bar2"}',
delay => 5
);
send |
---|
2 |
送信した2つのメッセージを、可視性タイムアウトを30秒に設定して読み取ります。
SELECT * FROM pgmq.read(
queue_name => 'my_queue',
vt => 30,
qty => 2
);
msg_id | read_ct | enqueued_at | vt | message |
---|---|---|---|---|
1 | 1 | 2024-11-12 11:17:02.01372+00 | 2024-11-12 11:19:14.209527+00 | {"foo":"bar1"} |
2 | 1 | 2024-11-12 11:17:38.942936+00 | 2024-11-12 11:19:14.209554+00 | {"foo":"bar2"} |
即座にメッセージを読み込み直してみると
SELECT * FROM pgmq.read(
queue_name => 'my_queue',
vt => 30,
qty => 1
);
Success. No rows returned
というように、メッセージは取得できません。
ただし、30秒経過後に再度試すと、read_ctが1増加したメッセージを取得できました
msg_id | read_ct | enqueued_at | vt | message |
---|---|---|---|---|
1 | 2 | 2024-11-12 11:17:02.01372+00 | 2024-11-12 11:21:18.159732+00 | {"foo":"bar1"} |
その他にも、Dead Letter Queueに類似したメッセージのArchive機能なども備えており、シンプルで使いやすい実装となっています
Edge Functionと組み合わせて使う
pgmqをEdge Functionと組み合わせることで、効率的な非同期処理を実現できます。
ここでは、メッセージキューを活用してEdge Functionを呼び出す実装例を紹介します。
処理の流れ
- クライアントサイドからのメッセージ送信をトリガーに、APIエンドポイントを介してpgmqにメッセージを格納
- サーバーサイドで定期的にキューを監視し、メッセージ存在時にEdge Functionを起動
- Edge Functionがメッセージ内容に応じた処理を実行し、完了後にメッセージを削除
今回はとりあえずsetIntervalで一定間隔置きにキュー確認を行うようにします
'use client'
import { useState, useEffect } from 'react';
export default function MessageSender() {
const [isPolling, setIsPolling] = useState(false);
useEffect(() => {
const POLLING_INTERVAL = 60000;
let intervalId: NodeJS.Timeout;
if (isPolling) {
// 初回実行
handleGetMessage();
// 定期実行の設定
intervalId = setInterval(() => {
handleGetMessage();
}, POLLING_INTERVAL);
}
// クリーンアップ関数
return () => {
if (intervalId) {
clearInterval(intervalId);
}
};
}, [isPolling]); // isPollingの変更を監視
const handleSendMessage = async () => {
try {
const response = await fetch('/api/send-message', {
method: 'POST'
});
if (!response.ok) {
throw new Error('Failed to send message');
}
const data = await response.json();
console.log('Message sent with ID:', data.messageId);
alert('Message sent successfully!');
} catch (error) {
console.error('Error:', error);
alert('Failed to send message');
}
};
const handleGetMessage = async () => {
try {
const response = await fetch('/api/send-message', { method: 'GET' });
if (!response.ok) {
throw new Error('Failed to get message');
}
console.log('Message retrieved at:', new Date().toLocaleString());
} catch (error) {
console.error('Error getting message:', error);
}
};
return (
<div className="space-y-4">
<button
onClick={handleSendMessage}
className="px-4 py-2 bg-blue-500 text-white rounded hover:bg-blue-600 disabled:bg-gray-400"
>
Send Message
</button>
<button
onClick={() => setIsPolling(!isPolling)}
className={`px-4 py-2 text-white rounded ${
isPolling ? 'bg-red-500 hover:bg-red-600' : 'bg-blue-500 hover:bg-blue-600'
}`}
>
{isPolling ? 'Stop Polling' : 'Start Polling'}
</button>
</div>
);
}
import { Pgmq } from 'pgmq-js'
import { NextResponse } from 'next/server'
interface QueueMessage {
content: string
timestamp: string
}
export async function GET() {
try {
const pgmq = await Pgmq.new({
host: process.env.SUPABASE_HOST!,
database: 'postgres',
password: process.env.DB_PASSWORD!,
port: parseInt(process.env.DB_PORT!) || 5432,
user: process.env.DB_USER!,
ssl: process.env.DB_SSL === 'true',
}, { skipExtensionCreation: true })
const queueName = 'send_message'
const vt = 30;
const msg = await pgmq.msg.read(queueName, vt)
if (msg) {
const response = await fetch('http://127.0.0.1:54321/functions/v1/process-message', {
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify(msg.message)
})
if (!response.ok) {
throw new Error('Edge Functionの呼び出しに失敗しました')
}
await pgmq.msg.delete(queueName, msg.msgId)
return NextResponse.json({
success: true,
message: msg
})
}
console.log("No message found")
return NextResponse.json({
success: false,
message: 'No message found'
})
} catch (error) {
return NextResponse.json(
{ error: 'メッセージの送信に失敗しました' },
{ status: 500 }
)
}
}
export async function POST() {
try {
const pgmq = await Pgmq.new({
host: process.env.SUPABASE_HOST!,
database: 'postgres',
password: process.env.DB_PASSWORD!,
port: parseInt(process.env.DB_PORT!) || 5432,
user: process.env.DB_USER!,
ssl: process.env.DB_SSL === 'true',
}, { skipExtensionCreation: true })
const queueName = 'send_message'
const message: QueueMessage = {
content: `Hello from browser! ${new Date().toISOString()}`,
timestamp: new Date().toISOString()
}
const msgId = await pgmq.msg.send(queueName, message)
return NextResponse.json({
success: true,
messageId: msgId
})
} catch (error) {
console.error('Error:', error)
return NextResponse.json(
{ error: 'Failed to send message' },
{ status: 500 }
)
}
}
メールを送信するEdge Functionの実装は以下のドキュメントを参考にしました
https://supabase.com/docs/guides/functions/examples/send-emails#2-edit-the-handler-function
const handler = async (req: Request) => {
try {
const body = await req.json()
console.log('Processing message:', body)
const res = await fetch('https://api.resend.com/emails', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
Authorization: `Bearer ${Deno.env.get('RESEND_API_KEY')}`,
},
body: JSON.stringify({
from: 'onboarding@resend.dev',
to: '実際のメールアドレス',
subject: body.content,
html: '<strong>it works!</strong>',
}),
})
const data = await res.json()
if (res.ok) {
return new Response(JSON.stringify(data), {
status: 200,
headers: { 'Content-Type': 'application/json' },
})
} else {
return new Response(JSON.stringify({ error: 'Failed to process message' }), {
status: 500,
headers: { 'Content-Type': 'application/json' },
})
}
} catch (error) {
return new Response(JSON.stringify({ error: 'Failed to process message' }), {
status: 500,
headers: { 'Content-Type': 'application/json' },
})
}
}
Deno.serve(handler)
動作を以下の手順で確かめました
- 「Send Message」ボタンを複数回押下し、連続的なメッセージ送信
- 「Start Polling」による定期的なキュー監視の開始
検証結果
Message sent with ID: 1
page.tsx:55 Message retrieved at: 2024/11/13 21:25:21
page.tsx:41 Message sent with ID: 2
page.tsx:41 Message sent with ID: 3
page.tsx:55 Message retrieved at: 2024/11/13 21:26:21
page.tsx:55 Message retrieved at: 2024/11/13 21:27:21
上記ログから、メッセージが正常にキューイングされ、1分間隔で順次処理されていることが確認できました。
また、各メッセージに対応するメール送信も正常に完了し、意図した非同期処理が実現できています。