Qiita Teams that are logged in
You are not logged in to any team

Log in to Qiita Team
Community
OrganizationAdvent CalendarQiitadon (β)
Service
Qiita JobsQiita ZineQiita Blog
10
Help us understand the problem. What is going on with this article?
@ksky

PostgreSQLのPub/Sub機能とJavaのクライアント実装

More than 1 year has passed since last update.

フューチャー Advent Calendar 2019 18日目の記事です。


Pub/Sub型のメッセージングアーキテクチャを採用するにあたっては、kafkaなどのブローカーミドルウェアや、Amazon SNS、Google Cloud Pub/Subなどのマネージドサービスを利用するケースが多いかと思いますが、実はPostgreSQLでもPub/Subができます。

すでに業務でPostgreSQLを使っていれば、新たにPub/Subブローカーを構築しなくても、疎結合なシステム間通信を簡易的に実現できます。

本記事ではこの機能の紹介と、Pub/SubクライアントをJavaで実装する場合の選択肢、考慮点を示しています。
※実行環境はPostgreSQL 11.2とJava 11です
※データベースの文字コードはUTF-8としています

NOTIFY/LISTEN

PostgreSQLのPub/Sub機能に関連するクエリは次の3つです。

  • NOTIFY(Publishを実行)
    • 構文:NOTIFY channel [ , payload ]
    • 同じ機能の関数としてpg_notifyも用意されている
  • LISTEN(Subscribeを開始)
    • 構文:LISTEN channel
  • UNLISTEN(Subscribeを終了)
    • 構文:UNLISTEN { channel | * }

基本的な使い方と挙動を見ていきます。

チャネル"foo"のSubscribeを開始
LISTEN foo;
チャネル"foo"に"hello"というデータをPublish
NOTIFY foo, 'hello';
-- または
SELECT pg_notify('foo', 'hello');

-- "foo"をSubscribe済みのセッションには次の通知が届く
-- Asynchronous notification "foo" with payload "hello" received from server process with PID 14728.
payload無しの通知も可能
NOTIFY foo;

-- Asynchronous notification "foo" received from server process with PID 14728.
チャネル"foo"のSuscribeを終了する
UNLISTEN foo;

とてもシンプルですね。
続いて、本機能の主な仕様を挙げつつ利用時の考慮点を示します。
詳細は公式ドキュメントをご覧いただければと思います。

チャネル

  • チャネルはPub/Sub通信する際のキーとなる任意の文字列です。LISTEN対象のチャネルとNOTIFYを実行するチャネルが異なるとデータのやり取りができません。
  • 一つのセッションで複数のチャネルをLISTENすることができます。
  • チャネルに指定できる文字は、ASCIIの場合英数字とアンダーバー(_)が使用できます。大文字/小文字は区別されません。なお、マルチバイト文字も使用できることを確認しています。

    マルチバイト文字のチャネルに通知
    NOTIFY こんにちは, '世界';
    
    -- Asynchronous notification "こんにちは" with payload "世界" received from server process with PID 14728.
    
  • 63バイトを超えるチャネルは登録することができません。超えた分は切り捨てて処理されます。この制限はテーブルなど他のデータベースオブジェクトとも共通しています。

スコープ

  • Pub/Subを行うDBセッションは、同一データベースに接続し、かつ同じチャネルを通知対象としなければいけません。
  • データベースが同じであれば、スキーマが異なっていても通知できます。

pgsql_notify_listen_scope.png

ペイロードのデータ型・サイズ

  • ペイロードに乗せられるデータはテキストのみで、バイナリは送受信できません。
  • バイナリデータを乗せる場合はencode関数でテキスト形式に変換したり、呼出元アプリでJSON文字列等にシリアライズしてあげる必要があります。
  • ペイロードのサイズ上限は8000バイト未満で、これを超えると次のエラーが返ります。

    ERROR:  payload string too long
    SQL state: 22023
    

トランザクション

  • トランザクション内でNOTIFYしたデータは、COMMITしたタイミングで、LISTENしたセッションに通知されます。ROLLBACKすると通知されません。
  • トランザクション内でNOTIFYしたデータの中で、ペイロードが同一のものはまとめられます。送信順序は保証されます。
まとめられた通知
BEGIN;
NOTIFY foo, 'a';
NOTIFY foo, 'a';
NOTIFY foo, 'a';
NOTIFY foo, 'b';
NOTIFY foo, 'c';
COMMIT;

-- Asynchronous notification "foo" with payload "a" received from server process with PID 14728.
-- Asynchronous notification "foo" with payload "b" received from server process with PID 14728.
-- Asynchronous notification "foo" with payload "c" received from server process with PID 14728.

未処理メッセージの蓄積サイズ

  • DBインスタンスには、トランザクションが未完了なメッセージをメモリ上に溜めておくことが出来るNotificationキューを持っています。標準インストールの場合サイズは8GBで、使用量が半分になると警告ログが出力されます。
  • トランザクションが終了するとキューデータがクリーンアップされます。ペイロードを目一杯使った場合およそ100万件で上限に掛かるため、適当な件数単位でCOMMITしましょう。
  • Notificationキューの使用率はpg_notification_queue_usage関数で確認できます(0から1までの小数で表現)。

JavaによるPub/Subクライアント実装

これまで記載したPub/Sub通信をJavaで実装するときのパターンを3種類紹介します。

PostgreSQL JDBCドライバ

PostgreSQL本家のJDBCドライバを使った実装例です(本家の実装例はこちら)。
Mavenを使う場合は以下の依存関係を追加します。

pom.xml
<dependency>
    <groupId>org.postgresql</groupId>
    <artifactId>postgresql</artifactId>
    <version>42.2.8</version>
</dependency>
// 事前にLISTEN用コネクションを作成しておく
private final org.postgresql.jdbc.PgConnection listenerConn = DriverManager.getConnection(URL, USERNAME, PASSWORD).unwrap(PgConnection.class);

/**
 * 通知の受信を開始します。
 *
 * @param channel チャネル
 */
private void startListen(final String channel) {
    try {
        try (var stmt = this.listenerConn.createStatement()) {
            stmt.execute("LISTEN " + channel);
            while (true) {
                var notifications = pgconn.getNotifications(10 * 1000);
                if (this.terminated) {
                    return;
                }
                if (notifications == null) {
                    continue;
                }
                for (var n : notifications) {
                    LOG.info("Received Notification: pid={}, channel={}, payload={}", n.getPID(), n.getName(), n.getParameter());
                }
            }
        }
    } catch (SQLException e) {
        LOG.error("exception thrown {}", e.getMessage());
    }
}

/**
 * PostgreSQLサーバに通知を行います。
 *
 * @param channel チャネル
 * @param payload メッセージペイロード
 */
private void notify(final String channel, final String payload) {
    try {
        var conn = DriverManager.getConnection(URL, USERNAME, PASSWORD).unwrap(PgConnection.class);
        var pstmt = conn.prepareStatement("select pg_notify(?, ?)");
        try (conn; pstmt) {
            pstmt.setString(1, channel);
            pstmt.setString(2, payload);
            pstmt.execute();
        }
        LOG.info("Notified: pid={}, channel={}, payload={}", pgconn.getBackendPID(), channel, payload);
    } catch (SQLException e) {
        LOG.error("exception thrown", e);
    }
}
  • PgConnection#getNotifications(int timeoutMillis)を使うと、通知が来るまで指定の時間ここでブロックするため、ループで囲えばロングポーリング的なロジックになります。
  • なおNOTIFYクエリでペイロードのパラメータバインドを試みるとorg.postgresql.util.PSQLExceptionが出てしまい、対処法がどうにも見つからなかったのでpg_notifyを実行するようにしています。

PGJDBC-NG

  • PGJDBC-NGはJDBC4.2に準拠し、PostgreSQLの機能をより高度に使うことをめざして開発されているOSSです。
pom.xml
<dependency>
    <groupId>com.impossibl.pgjdbc-ng</groupId>
    <artifactId>pgjdbc-ng</artifactId>
    <version>0.8.3</version>
</dependency>
// 事前にLISTEN用コネクションを作成しておく
private final com.impossibl.postgres.api.jdbc.PGConnection listenerConn = DriverManager.getConnection(URL, USERNAME, PASSWORD).unwrap(PGConnection.class);

/**
 * 通知の受信を開始します。
 *
 * @param channel チャネル
 */
private void startListen(final String channel) {
    try {
        this.listenerConn.addNotificationListener(new PGNotificationListener() {
            @Override
            public void notification(final int pid, final String channel, final String payload) {
                LOG.info("Received Notification: {}, {}, {}", pid, channel, payload);
            }
        });
        try (var stmt = this.listenerConn.createStatement()) {
            stmt.execute("LISTEN " + channel);
        }
    } catch (SQLException e) {
        LOG.error("exception thrown {}", e.getMessage());
    }
}

// notify()は、PostgreSQL JDBCドライバと同様

ご覧の通り、こちらは通知受信時の動作をイベントリスナーの形で実装できます。
チャネルを指定してリスナーを登録することも可能です。

R2DBC

R2DBCは、リアクティブプログラミングの観点から新たに開発されたJDBCドライバです。
Reactive Streamsに完全準拠し、I/Oは完全にノンブロッキングであると謳っています。

pom.xml
<dependency>
    <groupId>io.r2dbc</groupId>
    <artifactId>r2dbc-postgresql</artifactId>
    <version>0.8.0.RELEASE</version>
</dependency>
// 事前に送受信用のコネクションを設定しておく
private Mono<PostgresqlConnection> receiver;
private Mono<PostgresqlConnection> sender;

var connFactory = new PostgresqlConnectionFactory(PostgresqlConnectionConfiguration.builder()
        .host("...")
        .port(5432)
        .username("...")
        .password("...")
        .database("...")
        .build());
this.receiver = connFactory.create();
this.sender = connFactory.create();

/**
 * 通知の受信を開始します。
 *
 * @param channel チャネル
 */
private void startListen(final String channel) {
    this.receiver.map(pgconn -> {
        return pgconn.createStatement("LISTEN " + channel)
                .execute()
                .flatMap(PostgresqlResult::getRowsUpdated)
                .thenMany(pgconn.getNotifications())
                .doOnNext(notification -> LOG.info("Received Notification: {}, {}, {}", notification.getProcessId(), notification.getName(), notification.getParameter()))
                .doOnSubscribe(s -> LOG.info("listen start"))
                .subscribe();
    }).subscribe();
}

/**
 * PostgreSQLサーバに通知を行います。
 *
 * @param channel チャネル
 * @param payload メッセージペイロード
 */
private void notify(final String channel, final String payload) {
    this.sender.map(pgconn -> {
        return pgconn.createStatement("NOTIFY " + channel + ", '" + payload + "'")
                .execute()
                .flatMap(PostgresqlResult::getRowsUpdated)
                .then()
                .doOnSubscribe(s -> LOG.info("Notified: channel={}, payload={}", channel, payload))
                .subscribe();
    }).subscribe();
}

R2DBCを使う際は、依存するProject ReactorのAPIを全面的に使うことになります。
今回は簡単な説明にとどめますが、クエリを実行、結果をハンドリングし、指定のタイミングで動く付帯的なアクションを設定する、という一連のフローを構築して、最後にこのフローが動き出すようにsubscribe()を呼び出しています。
doOnNext()で通知が届いたときのアクションを、doOnSubscribe()でsubscribeしたとき(クエリを実行するタイミング)のアクションを設定しており、ここでは単純にログ出力しています。
JavaのStream APIと似たスタイルで非同期・ストリーム処理を作る感じで、私も初見は面食らったのですが、こちらのページがとても勉強になりました。

おわりに

PostgreSQLのNOTIFY/LISTENはリリース9.0で、待ち状態イベントの格納先が従来のシステムテーブルからメモリキューに代わり、通知と一緒にペイロードを送信できるようになったことで、性能と利便性が向上しおよそ現在の形になりました。
機能自体はかなり古くから搭載されているようですがQiitaではこれまで記事化されていないため、社内の技術検証で得た情報整理も兼ねて記事化してみました。

ちなみにR2DBCは現在も鋭意開発中で私も今年からウォッチしているプロジェクトなのですが、折よく今年9月にNOTIFY/LISTENの機能をサポートするようになったため本記事でも少し取り上げました。
R2DBCをフィーチャーした記事もそのうち書こうかと思います。

10
Help us understand the problem. What is going on with this article?
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
ksky
future
ITを武器とした課題解決型のコンサルティングサービスを提供します

Comments

No comments
Sign up for free and join this conversation.
Sign Up
If you already have a Qiita account Login
10
Help us understand the problem. What is going on with this article?