6
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Supabaseにpgmqを組み込んで使ってみた

Last updated at Posted at 2024-11-13

本記事の概要

AWSやGCPなどのインフラサービスを使わずに、Supabase だけでバックエンドを構築できれば、開発がより簡単になります。

しかし、少し高度なサービスを検討する際、SQSのようなメッセージキューイングが必要になるケースがあります。
実際、SupabaseのコミュニティのDiscussionでも、Amazon Simple Queue Serviceに相当する機能への要望が挙げられています。
https://github.com/orgs/supabase/discussions/8508

このDiscussionで提案されていた pgmq(Postgres Message Queue) というサービスが、
Supabase上で利用できる可能性があることがわかったため、実際に検証してみました。

image.png

2024/12/5に正式にextensionで利用できるようになりました!
https://supabase.com/blog/supabase-queues

ということで、そちらを試した記事も執筆したので良ければこちらの方もご参照ください
https://qiita.com/komishinnn/items/30aa947fbd9bc2163490

pgmqとは?

pgmqは、その名の通りPostgreSQLをベースにしたメッセージキューイングシステムです。
主な特徴は以下の通りです:

  • PostgreSQLのテーブルを使ってメッセージを管理されるので、マネージドに寄せられる
  • AWS SQSやRSMQと同等のAPI
  • メッセージは明示的に消されない限り残り続ける

Supabaseを活用する環境において、他のインフラサービスへの依存を避けたいケースでは、pgmqが有力な選択肢となり得ます。

Supabaseにpgmqを組み込むには?

執筆時点はExtensionにまだpgmqは含まれておりません。
https://supabase.com/docs/guides/database/extensions

しかし、Supabaseのコミュニティが開発しているdbdevを利用することで、pgmqを導入できます。
以下の手順に従って、まずdbdevをインストールします
https://supabase.github.io/dbdev/install-in-db-client/

続いて、dbdevを使ってpgmqをinstallします。
https://database.dev/plpgsql/pgmq

install
true

インストール後、schemaにpgmqが正常に登録されたことを確認できます

SELECT EXISTS (
    SELECT 1
    FROM information_schema.schemata
    WHERE schema_name = 'pgmq'
);

-- true

使ってみる

公式のSQL Examplesをやってみます
https://github.com/tembo-io/pgmq?tab=readme-ov-file#sql-examples

-- creates the queue
SELECT pgmq.create('my_queue');

キューが作られたので、このキューにメッセージを送ってみます

-- messages are sent as JSON
SELECT * from pgmq.send(
  queue_name  => 'my_queue',
  msg         => '{"foo": "bar1"}'
);

delay optionを付けてもう一個送ります。

SELECT * from pgmq.send(
  queue_name => 'my_queue',
  msg        => '{"foo": "bar2"}',
  delay      => 5
);
send
2

送信した2つのメッセージを、可視性タイムアウトを30秒に設定して読み取ります。

SELECT * FROM pgmq.read(
  queue_name => 'my_queue',
  vt         => 30,
  qty        => 2
);
msg_id read_ct enqueued_at vt message
1 1 2024-11-12 11:17:02.01372+00 2024-11-12 11:19:14.209527+00 {"foo":"bar1"}
2 1 2024-11-12 11:17:38.942936+00 2024-11-12 11:19:14.209554+00 {"foo":"bar2"}

即座にメッセージを読み込み直してみると

SELECT * FROM pgmq.read(
  queue_name => 'my_queue',
  vt         => 30,
  qty        => 1
);

Success. No rows returned というように、メッセージは取得できません。
ただし、30秒経過後に再度試すと、read_ctが1増加したメッセージを取得できました

msg_id read_ct enqueued_at vt message
1 2 2024-11-12 11:17:02.01372+00 2024-11-12 11:21:18.159732+00 {"foo":"bar1"}

その他にも、Dead Letter Queueに類似したメッセージのArchive機能なども備えており、シンプルで使いやすい実装となっています:thumbsup:

Edge Functionと組み合わせて使う

pgmqをEdge Functionと組み合わせることで、効率的な非同期処理を実現できます。
ここでは、メッセージキューを活用してEdge Functionを呼び出す実装例を紹介します。

処理の流れ

  1. クライアントサイドからのメッセージ送信をトリガーに、APIエンドポイントを介してpgmqにメッセージを格納
  2. サーバーサイドで定期的にキューを監視し、メッセージ存在時にEdge Functionを起動
  3. Edge Functionがメッセージ内容に応じた処理を実行し、完了後にメッセージを削除

今回はとりあえずsetIntervalで一定間隔置きにキュー確認を行うようにします

Page.tsx
'use client'

import { useState, useEffect } from 'react';

export default function MessageSender() {
  const [isPolling, setIsPolling] = useState(false);

  useEffect(() => {
    const POLLING_INTERVAL = 60000;
    let intervalId: NodeJS.Timeout;

    if (isPolling) {
      // 初回実行
      handleGetMessage();

      // 定期実行の設定
      intervalId = setInterval(() => {
        handleGetMessage();
      }, POLLING_INTERVAL);
    }

    // クリーンアップ関数
    return () => {
      if (intervalId) {
        clearInterval(intervalId);
      }
    };
  }, [isPolling]); // isPollingの変更を監視

  const handleSendMessage = async () => {
    try {
      const response = await fetch('/api/send-message', {
        method: 'POST'
      });

      if (!response.ok) {
        throw new Error('Failed to send message');
      }

      const data = await response.json();
      console.log('Message sent with ID:', data.messageId);
      alert('Message sent successfully!');
    } catch (error) {
      console.error('Error:', error);
      alert('Failed to send message');
    }
  };

  const handleGetMessage = async () => {
    try {
      const response = await fetch('/api/send-message', { method: 'GET' });
      if (!response.ok) {
        throw new Error('Failed to get message');
      }
      console.log('Message retrieved at:', new Date().toLocaleString());
    } catch (error) {
      console.error('Error getting message:', error);
    }
  };

  return (
    <div className="space-y-4">
      <button
        onClick={handleSendMessage}
        className="px-4 py-2 bg-blue-500 text-white rounded hover:bg-blue-600 disabled:bg-gray-400"
      >
        Send Message
      </button>
      <button
        onClick={() => setIsPolling(!isPolling)}
        className={`px-4 py-2 text-white rounded ${
          isPolling ? 'bg-red-500 hover:bg-red-600' : 'bg-blue-500 hover:bg-blue-600'
        }`}
      >
        {isPolling ? 'Stop Polling' : 'Start Polling'}
      </button>
    </div>
  );
}

app/api/send-message/route.ts
import { Pgmq } from 'pgmq-js'
import { NextResponse } from 'next/server'

interface QueueMessage {
  content: string
  timestamp: string
}

export async function GET() {
  try {
    const pgmq = await Pgmq.new({
      host: process.env.SUPABASE_HOST!,
      database: 'postgres',
      password: process.env.DB_PASSWORD!,
      port: parseInt(process.env.DB_PORT!) || 5432,
      user: process.env.DB_USER!,
      ssl: process.env.DB_SSL === 'true',
    }, { skipExtensionCreation: true })

    const queueName = 'send_message'
    const vt = 30;
    const msg = await pgmq.msg.read(queueName, vt)

    if (msg) {
      const response = await fetch('http://127.0.0.1:54321/functions/v1/process-message', {
        method: 'POST',
        headers: {
          'Content-Type': 'application/json'
        },
        body: JSON.stringify(msg.message)
      })

      if (!response.ok) {
        throw new Error('Edge Functionの呼び出しに失敗しました')
      }

      await pgmq.msg.delete(queueName, msg.msgId)
      return NextResponse.json({
        success: true,
        message: msg
      })
    }

    console.log("No message found")
    return NextResponse.json({
      success: false,
      message: 'No message found'
    })
  } catch (error) {
    return NextResponse.json(
      { error: 'メッセージの送信に失敗しました' },
      { status: 500 }
    )
  }
}

export async function POST() {
  try {
    const pgmq = await Pgmq.new({
      host: process.env.SUPABASE_HOST!,
      database: 'postgres',
      password: process.env.DB_PASSWORD!,
      port: parseInt(process.env.DB_PORT!) || 5432,
      user: process.env.DB_USER!,
      ssl: process.env.DB_SSL === 'true',
    }, { skipExtensionCreation: true })

    const queueName = 'send_message'
    const message: QueueMessage = {
      content: `Hello from browser! ${new Date().toISOString()}`,
      timestamp: new Date().toISOString()
    }

    const msgId = await pgmq.msg.send(queueName, message)

    return NextResponse.json({
      success: true,
      messageId: msgId
    })
  } catch (error) {
    console.error('Error:', error)
    return NextResponse.json(
      { error: 'Failed to send message' },
      { status: 500 }
    )
  }
}

メールを送信するEdge Functionの実装は以下のドキュメントを参考にしました
https://supabase.com/docs/guides/functions/examples/send-emails#2-edit-the-handler-function

supabase/functions/process-message/index.ts
const handler = async (req: Request) => {
  try {
    const body = await req.json()
    console.log('Processing message:', body)

    const res = await fetch('https://api.resend.com/emails', {
      method: 'POST',
      headers: {
        'Content-Type': 'application/json',
        Authorization: `Bearer ${Deno.env.get('RESEND_API_KEY')}`,
      },
      body: JSON.stringify({
        from: 'onboarding@resend.dev',
        to: '実際のメールアドレス',
        subject: body.content,
        html: '<strong>it works!</strong>',
      }),
    })

    const data = await res.json()

    if (res.ok) {
      return new Response(JSON.stringify(data), {
        status: 200,
        headers: { 'Content-Type': 'application/json' },
      })
    } else {
      return new Response(JSON.stringify({ error: 'Failed to process message' }), {
        status: 500,
        headers: { 'Content-Type': 'application/json' },
      })
    }
  } catch (error) {
    return new Response(JSON.stringify({ error: 'Failed to process message' }), {
      status: 500,
      headers: { 'Content-Type': 'application/json' },
    })
  }
}

Deno.serve(handler)

動作を以下の手順で確かめました

  1. 「Send Message」ボタンを複数回押下し、連続的なメッセージ送信
  2. 「Start Polling」による定期的なキュー監視の開始

image.png

検証結果

Message sent with ID: 1
page.tsx:55 Message retrieved at: 2024/11/13 21:25:21
page.tsx:41 Message sent with ID: 2
page.tsx:41 Message sent with ID: 3
page.tsx:55 Message retrieved at: 2024/11/13 21:26:21
page.tsx:55 Message retrieved at: 2024/11/13 21:27:21

上記ログから、メッセージが正常にキューイングされ、1分間隔で順次処理されていることが確認できました。
また、各メッセージに対応するメール送信も正常に完了し、意図した非同期処理が実現できています。

image.png

6
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
6
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?