3
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

目的

PostgreSQLにはLISTENのNOTIFYを使うことで非同期通信ができます。

LISTEN channel;
NOTIFY channel;
PID 1249988のサーバープロセスから非同期通知"channel"を受信しました。

Rustのtokio-postgresというドライバーを使って、受信してみたいと思います。

コード

テストコード

使い方はドライバーのソースの中のテストコードにありました。

#[tokio::test]
async fn notifications() {
    let (client, mut connection) = connect_raw("user=postgres").await.unwrap();

    let (tx, rx) = mpsc::unbounded();
    let stream =
        stream::poll_fn(move |cx| connection.poll_message(cx)).map_err(|e| panic!("{}", e));
    let connection = stream.forward(tx).map(|r| r.unwrap());
    tokio::spawn(connection);

    client
        .batch_execute(
            "LISTEN test_notifications;
             NOTIFY test_notifications, 'hello';
             NOTIFY test_notifications, 'world';",
        )
        .await
        .unwrap();

    drop(client);

    let notifications = rx
        .filter_map(|m| match m {
            AsyncMessage::Notification(n) => future::ready(Some(n)),
            _ => future::ready(None),
        })
        .collect::<Vec<_>>()
        .await;
    assert_eq!(notifications.len(), 2);
    assert_eq!(notifications[0].channel(), "test_notifications");
    assert_eq!(notifications[0].payload(), "hello");
    assert_eq!(notifications[1].channel(), "test_notifications");
    assert_eq!(notifications[1].payload(), "world");
}

ループで受け取る

上記コードを参考にしてメッセージをループで受け取るプログラムを書きました。

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let pg_config = std::env::var("PG_CONFIG")?;
    let (client, mut connection) = connect(&pg_config, NoTls).await?;

    let (tx, mut rx) = mpsc::unbounded();
    let stream =
        stream::poll_fn(move |cx| connection.poll_message(cx)).map_err(|e| panic!("{}", e));
    let connection = stream.forward(tx).map(|r| r.unwrap());
    tokio::spawn(connection);

    client
        .batch_execute(
            "LISTEN test_notifications",
        )
        .await?;

    loop {
        match rx.try_next() {
            Ok(m) => match m {
                Some(m) => println!("{:?}", m),
                None => {
                    println!("GOT MESSAGE None");
                    tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
                    continue;
                },
            },
            Err(err) => {
                println!("{:?}", err);
                tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
                continue;
            }
        }
    }
}

実行するとTryRecvErrorが続きます。

TryRecvError
TryRecvError
TryRecvError
TryRecvError
TryRecvError
...

psqlでNOTIFYします。

psql -h localhost -U user test -c "select pg_notify('test_notifications', 'bbb')"

受け取れました!

TryRecvError
TryRecvError
Notification(Notification { process_id: 1250055, channel: "test_notifications", payload: "bbb" })
TryRecvError
TryRecvError
...

受信されるまではTryRecvErrorを繰り返すようです。sleepを入れないともっとすごい勢いでながれていきます。

サーバーが落ちた時

さて、このプログラムを常駐して使いたいです。サーバーが落ちたり、ネットワークが不調になった時はリトライして再接続みたいなことがやりたいです。ではこの状態でサーバーを落としたらどうなるか試してみました。

TryRecvError
TryRecvError
thread 'tokio-runtime-worker' panicked at 'db error: FATAL: 管理者コマンドにより接続を終了しています', src/main.rs:83:76
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
GOT MESSAGE None
GOT MESSAGE None

connection.poll_messageのmap_errのpanic!でエラーが通知されました。
また以降は「GOT MESSAGE None」が続くようになりました。
よってどちらかでエラーを捕まえてリトライできるようにループ処理を停止したいと思います。
「GOT MESSAGE None」は他のタイミングでもでそうな気がするので、panicの方で検知します。

リトライ版

完全に動く形のコードは以下のようになります。

Cargo.toml
[package]
name = "pg"
version = "0.1.0"
edition = "2021"

[dependencies]
anyhow = "1.0"
futures-channel = "0.3.25"
futures-util = "0.3.25"
tokio-postgres = "0.7.7"
tokio = { version = "1.23.0", features = ["full"] }
main.rs
use futures_channel::mpsc;
use futures_util::{stream, FutureExt, StreamExt};
use tokio_postgres::{connect, AsyncMessage, NoTls};

pub struct Listener {
    config: String,
}

impl Listener {
    pub fn new(config: &str) -> Self {
        Self {
            config: config.to_owned(),
        }
    }

    pub async fn execute<F>(&self, f: F) -> anyhow::Result<()>
    where
        F: Fn(AsyncMessage),
    {
        let (client, mut connection) = connect(&self.config, NoTls).await?;
        // 停止用のチャンネル
        let (tx2, mut rx2) = mpsc::unbounded::<tokio_postgres::Error>();

        let (tx, mut rx) = mpsc::unbounded();
        let stream = stream::poll_fn(move |cx| {
            connection.poll_message(cx).map_err(|e| {
                // 停止用にエラーを通知
                tx2.unbounded_send(e).unwrap();
                panic!()
            })
        });

        let connection = stream.forward(tx).map(|r| r.unwrap());
        tokio::spawn(connection);

        client.batch_execute("LISTEN test_notifications").await?;

        loop {
            // ループの最初にエラーが通知されているか確認する
            if let Ok(Some(m)) = rx2.try_next() {
                println!("{}", m);
                // エラーがあったらループ停止
                break;
            }
            match rx.try_next() {
                Ok(m) => match m {
                    Some(m) => f(m),
                    None => {
                        println!("GOT MESSAGE None");
                        tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
                        continue;
                    }
                },
                Err(_err) => {
                    //println!("{:?}", err);
                    tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
                    continue;
                }
            }
        }

        Ok(())
    }
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let pg_config = std::env::var("PG_CONFIG")?;
    let listener = Listener::new(&pg_config);
    loop {
        let _ = listener
            .execute(|m| {
                println!("{:?}", m);
            })
            .await;
        println!("connection stop");
        tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
    }

    //Ok(())
}

処理を行う構造体としてListenerを定義しました。
executeメソッドにはメッセージが来た時に呼び出されるクロージャーを渡せるようにしました。
通信が切れるとexecuteメソッドが終わって5秒sleepしてから再接続を試みます。

止まっているところから接続を開始してNOTIFYを送り、サーバーを止めて再び起こした後にNOTIFYを送りました。

connection stop
connection stop
connection stop
Notification(Notification { process_id: 1250132, channel: "test_notifications", payload: "bbb" })
thread 'tokio-runtime-worker' panicked at 'explicit panic', src/main.rs:28:13
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
db error: FATAL: 管理者コマンドにより接続を終了しています
connection stop
connection stop
Notification(Notification { process_id: 1250183, channel: "test_notifications", payload: "bbb" })

まとめ

PostgreSQLのLISTENとNOTIFYを使ってRustで非同期通信ができました。
常駐プログラムとして、接続のリトライができることを確認しました。

3
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?