前置き
前提
- prisma2インストール済み
- docker環境インストール済み
- 記事内のgqlgenはエイリアスに登録して使っています。
- gqlgenとprisma2のliftをある程度使っている人向け
サンプル
https://github.com/graphql-lab/subscription-with-postgres-notify
こんな感じの作っていきます
最終的にpythonのスクリプトを叩くと
異なる端末の画面がリアルタイムに更新されるというもの
これを応用すればスクレイピングした結果を定期的にwebサイトにつぶやかせる、というbotが
作れるかと

下準備
gqlgen initからビルドし、必要なディレクトリを作って...とやっても良いのですが
今回はgqlgenの使い方の記事という事ではないので割愛します。
という事で今回は
gqlkitというdockerベースのgraphqlサービスフレームワークを使っていきます。
gqlkitの内容としては
マイグレーションツールにprisma2のliftを、
サーバー構築にgqlgenを使っていて
あとは、handlerやmiddlewareなど、
よく使うであろう機能のディレクトリを詰め合わせただけの他愛もないシンプルなフレームワークです。
任意のディレクトリで下記のリポジトリをクローンします。
git clone git@github.com:gqlkit-lab/gqlkit.git subscription-with-postgres-notify
クローンできたらPostgreSQLとDB確認用のpgwebをdocker-composeで立ち上げましょう。
cd subscription-with-postgres-notify
docker-compose up -d
schema.prisma
PostgreSQLサーバーが立ち上がったら
早速、DBへのマイグレーション作業をやっていきます。
datasource db {
    provider = "postgresql"
    url      = "postgresql://postgres:postgres@localhost:5432/postgres?schema=public"
}
model messages {
    id   String @default(cuid()) @id
    text String
    created_at DateTime @default(now())
    updated_at DateTime @updatedAt
}
cd gqlkit-server/lift
prisma2 lift save
prisma2 lift up
これでpgwebで確認するとTablesのところにmessagesというテーブルが追加されているはずです。
ちなみに今回、ORMはgormを使うのでテーブル名の命名規則は複数形にしなければならない点に注意です。
schema.graphql
次に、schema.graphqlを書きgqlgenでresolver等をビルドしていきます。
ここまで来れば、もうサーバー側の実装は半分は終わった感じです。
type Query {
    readMessages: [Message!]!
}
type Subscription {
    messageCreated: Message!
}
type Message {
    id: ID!
    text: String!
    created_at: String
    updated_at: String!
}
cd ../ # gqlkit-serverのルート
gqlgen
PostgreSQLのテーブルを監視するためのSQL関数を用意する
※ここからが今回の本題となります。
PostgreSQLの監視、通知の仕組みを有効にするためには以下のようなSQLを実行する必要があるようです。
下記のSQLは簡単に言えば
INSERTやUPDATE文が実行された際の
イベントを検知するトリガーと、イベントをキャッチして何らかの処理を行うハンドラを
PostgreSQLデータベースに組み込む為のSQLといったところです。
参考にさせて頂いたのはこちらです。
begin;
create or replace function 「イベント名」_handler ()
	returns trigger
	language plpgsql
as $$
declare
	channel text := TG_ARGV[0];
	payload_json json;
begin
	payload_json = json_build_object(「ペイロード」);
	PERFORM pg_notify(channel, payload_json::text);
	RETURN NULL;
end;
$$;
CREATE TRIGGER 「イベント名」_trigger
AFTER 「SQLのメソッド名(INSERT、UPDATE、DELETEなど)」
ON 「テーブル名」
FOR EACH ROW
    EXECUTE PROCEDURE 「イベント名」_handler('「テーブル名」');
commit;
これをpgwebのqueryのところで叩いてしまっても良いとは思うのですが
それでは、少々不格好ですのでgormで叩いてやるというふうにしてみます。
servantというディレクトリを作りPGNotifyBuilderというパッケージを作ります
dropTriggerという関数はこれによって
graphqlサーバーを起動した際に一旦、古いトリガーを全て削除し
throwNotificationSQLに書いてあるSQLで再度トリガーを作り直します。
dropTriggerが無いとトリガーの重複でエラーが発生してしまうのでここがポイントです。
package PGNotifyBuilder
import (
	"fmt"
	"log"
	"github.com/jinzhu/gorm"
	_ "github.com/lib/pq"
)
type Receive struct {
	DBConnect string
	EventName string
	Table string
	SqlMethod string
	Payload string
}
type PgTrigger struct {
	Tgname string
}
func dropTrigger(db *gorm.DB, tableName string) {
	var triggers []*PgTrigger
	db.Table("pg_trigger").Select("tgname").Scan(&triggers)
	for _, trigger := range triggers {
		sql := fmt.Sprintf(`DROP TRIGGER IF EXISTS %s ON %s CASCADE;`, trigger.Tgname, tableName)
		db.Exec(sql)
	}
}
func throwNotificationSQL(eventName string, table string, sqlMethod string, payload string) string {
	re := fmt.Sprintf(
		`
begin;
create or replace function %s_handler ()
	returns trigger
	language plpgsql
as $$
declare
	channel text := TG_ARGV[0];
	payload_json json;
begin
	payload_json = json_build_object(%s);
	PERFORM pg_notify(channel, payload_json::text);
	RETURN NULL;
end;
$$;
CREATE TRIGGER %s_trigger
AFTER %s
ON %s
FOR EACH ROW
    EXECUTE PROCEDURE %s_handler('%s');
commit;
`, eventName, payload, eventName, sqlMethod, table, eventName, table)
	return re
}
func Serve(r *Receive) {
	db, err := gorm.Open("postgres", r.DBConnect)
	defer db.Close()
	if err != nil {
		log.Fatal(err)
	}
	dropTrigger(db, r.Table)
	db.Exec(throwNotificationSQL(r.EventName, r.Table, r.SqlMethod, r.Payload))
}
続いて、servant/PGNotifyBuilder/PGNotifyBuilder.goを
server.goに読み込みます。
package main
import (
	"gqlkit/env"
	"gqlkit/handler"
	pgnb "gqlkit/servant/PGNotifyBuilder"
	"log"
	"net/http"
	"github.com/go-chi/chi"
	"github.com/go-chi/chi/middleware"
	"github.com/rs/cors"
)
func main() {
	// Payloadはschema.prismaのmessagesモデルを参考に
	pgnb.Serve(&pgnb.Receive{
		DBConnect: env.DB_CONNECT,
		EventName: "message_created",
		Table:     "messages",
		SqlMethod: "INSERT",
		Payload: `
		'id', NEW.id,
		'text', NEW.text,
		'created_at', NEW.created_at,
		'updated_at', NEW.updated_at
		`,
	})
	r := chi.NewRouter()
	cors := cors.New(cors.Options{
		AllowedOrigins: []string{env.GQL_SERVER_ALLOW_ORIGIN},
		AllowedMethods: []string{"GET", "POST", "OPTIONS"},
		AllowedHeaders: []string{"Accept", "Authorization", "Content-Type", "X-CSRF-Token"},
	})
	r.Use(middleware.SetHeader("Content-Type", "application/json"))
	r.Use(cors.Handler)
	r.Handle("/", handler.Playground())
	r.Handle("/query", handler.Graphql())
	log.Printf("connect to http://localhost:%s/ for GraphQL playground", env.GQL_SERVER_PORT)
	log.Fatal(http.ListenAndServe(":"+env.GQL_SERVER_PORT, r))
}
GraphQLサーバーのresolverを書く
まずは、resolver.go
package graph
import "gqlkit/graph/model"
// This file will not be regenerated automatically.
//
// It serves as dependency injection for your app, add any dependencies you require here.
type Resolver struct {
	messages []*model.Message
	message  *model.Message
}
続いてschema.resolvers.go
ReadMessages
func (r *queryResolver) ReadMessages(ctx context.Context) ([]*model.Message, error) {
	db, err := gorm.Open("postgres", env.DB_CONNECT)
	defer db.Close()
	if err != nil {
		return nil, fmt.Errorf(err.Error())
	}
	db.Order("created_at desc").Find(&r.messages)
	return r.messages, nil
}
MessageCreated
func (r *subscriptionResolver) MessageCreated(ctx context.Context) (<-chan *model.Message, error) {
	event := make(chan *model.Message)
	reportProblem := func(ev pq.ListenerEventType, err error) {
		if err != nil {
			fmt.Println(err.Error())
		}
	}
	listener := pq.NewListener(env.DB_CONNECT, 10*time.Second, time.Minute, reportProblem)
	err := listener.Listen("messages")
	if err != nil {
		panic(err)
	}
	go func() {
		for {
			select {
			case n := <-listener.Notify:
				err = json.Unmarshal([]byte(n.Extra), &r.message)
				if err != nil {
					fmt.Println(err)
				}
				event <- r.message
			}
		}
	}()
	return event, nil
}
テスト用のPython scriptを用意する
以上でGraphQLサーバーの構築はできましたので
GraphQLサーバーがちゃんと、Subscriptionの通知結果を返すのかテストする為のスクリプトを書きます。
別にpythonでなくてもいいですし
GraphQLサーバーにCreateMessageというmutationを作ってテストしてもいいのですが
今回は、pythonでスクレイピングした内容をPostgreSQLにINSERTした際に
通知をするという今後の想定もあるという事で敢えてpythonを使います。
docker-compose.ymlファイルがある階層にcreate-messageというディレクトリを作ります。
mkdir create-message
cd create-message
touch main.py
import psycopg2
import uuid
from datetime import datetime, timedelta, timezone
def db_connect():
    return psycopg2.connect("host=localhost port=5432 user=postgres dbname=postgres password=postgres sslmode=disable")
def main():
    with db_connect() as conn:
        with conn.cursor() as db:
            _id = uuid.uuid4()
            jst = timezone(timedelta(hours=+9), 'JST')
            now = datetime.now(jst)
            now = now.isoformat(timespec='seconds')
            db.execute("""
                INSERT 
                INTO messages (
                    id,
                    text,
                    created_at,
                    updated_at
                ) VALUES (%s,%s,%s,%s);
            """,(
                str(_id),
                "test from python script",
                now,
                now
            ))
if __name__ == '__main__':
    main()
GraphQLサーバーを起動する
GraphQLサーバーを起動する前にenv.goファイルの
godotenvのコメントアウトを外します。
ここまでのソースコード内で登場していたenv.〇〇がこれで読み込めるようになります。
gqlkit-serverのルートで下記を実行します。
go run server.go
http://localhost:8080 でGraphQL Playgroundを開きます。
Playgroundで下記Queryを実行します。
ローダーが回り出し
通知待ち状態になります。
subscription{
  messageCreated{
  id
  text
  created_at
  updated_at
}
}
実際にpythonのscriptを実行してみます。
textにはtest from python scriptとでも入れておきましょう。
python main.py
まとめ
以上が、PostgreSQLのNotifyを使ってGraphQL Subscriptionを実装する方法の
バックエンド実装部分でした。
次回はnuxt.jsでのフロントエンド実装例をご紹介します。

