0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

RustでGraphQLサーバを立ててRustでGraphQL呼び出す

Posted at

やりたいこと

RustでGraphQLのサーバを立てて、RustでQuery/Mutation/Subscribeをやる。RustでGraphQLをやろうとして記事を検索すると、サーバー側だけとかクライアント側だけとかだったので、一気見できる記事が欲しかった。

使ったのは

  • サーバー側
    • async-graphql
    • async-graphql-axum
    • async-stream
  • クライアント側
    • graphql-client
    • graphql-ws-client

async-graphql-axumとかgraphql-ws-clientとかがexampleを提供しているので細かい部分はそっちを読んで欲しい。
本記事では超簡易なTodoを扱うサーバを立てる。

GraphQLの超簡易説明

GraphQLのサーバーは下記機能を提供する

  • Query: データの取得
  • Mutation: データの更新
  • Subscrpition: データの購読(データが変わったとか追加されたとかを通知する)

以上。

サーバーサイド

最終形態のコード

GraphQLで提供する下記機能を実装する

  • Query
  • Mutation
  • Subscription

async-graphqlを利用する場合下記のような流れになる

  • 提供するデータを定義する(下記main.rsのStorage)
  • Query/Mutation/Subscriptionのそれぞれに対応したRootを作成する(下記main.rsのQueryRoot, MutationRoot, SubscriptionRoot)
  • 各Rootを指定してスキーマを作成する(下記main.rsのSchema::build())
  • axumのルートにスキーマを指定してサービスを開始する
    • subscriptionは"/ws"でIFを切る
main.rs
    let schema = async_graphql::Schema::build(QueryRoot, MutationRoot, SubscriptionRoot)
        .data(Storage::default())
        .finish();
    let app = Router::new()
        .route(
            "/",
            get(graphiql).post_service(GraphQL::new(schema.clone())),
        )
        .route_service("/ws", GraphQLSubscription::new(schema));
    let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
    axum::serve(listener, app).await.unwrap();

勉強用にQueryRootだけとか作ると後々面倒なので一気に作った方が楽。

提供するデータを定義する

今回はDB接続とか無し。メモリ上に超簡易なToDoリストを持つ。
とりあえずTodoを定義して

todo.rs
#[derive(Clone)]
pub struct Todo {
    pub id: u32,
    pub text: String,
}

#[async_graphql::Object]
impl Todo {
    pub async fn id(&self) -> u32 {
        self.id
    }

    pub async fn text(&self) -> String {
        self.text.clone()
    }
}

メモリ上に保持するデータを定義する

todo_service_data.rs
use crate::models::todo::Todo;

#[derive(Default)]
pub struct TodoServiceData {
    pub todos: Vec<Todo>,
}

impl TodoServiceData {
    #[allow(clippy::new_without_default, dead_code)]
    pub fn new() -> Self {
        Self { todos: vec![] }
    }

    pub fn todos(&self) -> Vec<Todo> {
        self.todos.clone()
    }
}

各Rootから呼び出せる場所でデータを定義する

roots.rs
pub type Storage = Arc<Mutex<TodoServiceData>>;

QueryRoot

一覧取得を提供する。async_graphql::Objectを指定するだけ。簡単。

roots.rs
pub struct QueryRoot;

#[async_graphql::Object]
impl QueryRoot {
    async fn todos<'a>(&self, ctx: &async_graphql::Context<'a>) -> Vec<Todo> {
        log::info!("QueryRoot::todos()");
        let todo_service_data = &ctx.data_unchecked::<Storage>().lock().await;
        todo_service_data.todos()
    }
}

MutationRoot

追加と削除のIFを提供する。TodoBroker::publish()はsubscriptionへ通知を飛ばしてるだけ(後述)。

roots.rs
pub struct MutationRoot;

#[async_graphql::Object]
impl MutationRoot {
    async fn add_todo(
        &self,
        ctx: &async_graphql::Context<'_>,
        text: String,
    ) -> async_graphql::Result<u32> {
        log::info!("MutationRoot::add_todo()");
        let todo_service_data = &mut ctx.data_unchecked::<Storage>().lock().await;
        let todos = &mut todo_service_data.todos;
        let id = match todos.iter().max_by_key(|todo| todo.id) {
            Some(todo) => todo.id + 1,
            None => 1,
        };
        todos.push(Todo {
            id: id,
            text: text.clone(),
        });
        TodoBroker::publish(TodoChanged {
            mutation_type: MutationType::Created,
            id: id,
        });
        Ok(id)
    }

    async fn delete_todo(
        &self,
        ctx: &async_graphql::Context<'_>,
        id: u32,
    ) -> async_graphql::Result<u32> {
        log::info!("MutationRoot::delete_todo()");
        let todo_service_data = &mut ctx.data_unchecked::<Storage>().lock().await;
        let todos = &mut todo_service_data.todos;
        let index = todos.iter().position(|todo| todo.id == id).unwrap();
        todos.remove(index);
        TodoBroker::publish(TodoChanged {
            mutation_type: MutationType::Deleted,
            id: id,
        });
        Ok(id)
    }
}

SubscriptionRoot

まずはsubscribeするデータを定義する。MutationRootで飛ばしてる内容もコレ。

roots.rs
#[derive(async_graphql::Enum, Eq, PartialEq, Copy, Clone)]
enum MutationType {
    Created,
    Deleted,
}

#[derive(Clone)]
pub struct TodoChanged {
    mutation_type: MutationType,
    id: u32,
}

#[async_graphql::Object]
impl TodoChanged {
    async fn mutation_type(&self) -> MutationType {
        self.mutation_type
    }

    async fn id(&self) -> u32 {
        *(&self.id)
    }

    async fn todo(&self, ctx: &async_graphql::Context<'_>) -> async_graphql::Result<Option<Todo>> {
        log::info!("TodoChanged::todo()");
        let todo_service_data = &ctx.data_unchecked::<Storage>().lock().await;
        let todos = todo_service_data.todos().clone();
        let todo = todos.iter().filter(|todo| todo.id == self.id).next();
        let todo = if let Some(todo) = todo {
            Some(todo.clone())
        } else {
            None
        };
        Ok(todo)
    }
}

次にブローカーを定義する。ざっくり説明すると下記2点を提供してる。

  • subscriberの保持(SUBSCRIBERS + TodoBroker::subscribe()) <- SubscriptionRootから呼ばれる
  • データ流し込み(TodoBroker::publish()) <- MutationRootから呼ばれる
TodoBroker
todo_broker.rs
use futures_util::StreamExt;
use once_cell::sync::Lazy;
use slab::Slab;
use std::{
    any::{Any, TypeId},
    collections::HashMap,
    sync::Mutex,
};

static SUBSCRIBERS: Lazy<Mutex<HashMap<TypeId, Box<dyn Any + Send>>>> = Lazy::new(Default::default);

struct Senders<T>(Slab<futures_channel::mpsc::UnboundedSender<T>>);

struct BrokerStream<T: Sync + Send + Clone + 'static>(
    usize,
    futures_channel::mpsc::UnboundedReceiver<T>,
);

fn with_senders<T, F, R>(f: F) -> R
where
    T: Sync + Send + Clone + 'static,
    F: FnOnce(&mut Senders<T>) -> R,
{
    log::debug!("call with_senders");
    let mut map = SUBSCRIBERS.lock().unwrap();
    let senders = map
        .entry(TypeId::of::<Senders<T>>())
        .or_insert_with(|| Box::new(Senders::<T>(Default::default())));
    f(senders.downcast_mut::<Senders<T>>().unwrap())
}

impl<T: Sync + Send + Clone + 'static> Drop for BrokerStream<T> {
    fn drop(&mut self) {
        log::debug!("BrokerStream::drop()");
        with_senders::<T, _, _>(|senders| senders.0.remove(self.0));
    }
}

impl<T: Sync + Send + Clone + 'static> futures_util::Stream for BrokerStream<T> {
    type Item = T;
    fn poll_next(
        mut self: std::pin::Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
    ) -> std::task::Poll<Option<Self::Item>> {
        log::debug!("BrokerStream::poll_next()");
        self.1.poll_next_unpin(cx)
    }
}

pub struct TodoBroker<T>(std::marker::PhantomData<T>);

impl<T: Sync + Send + Clone + 'static> TodoBroker<T> {
    pub fn publish(msg: T) {
        log::info!("TodoBroker::publish()");
        with_senders::<T, _, _>(|senders| {
            for (_, sender) in senders.0.iter_mut() {
                sender.start_send(msg.clone()).ok();
            }
        });
    }

    pub fn subscribe() -> impl futures_util::Stream<Item = T> {
        log::info!("TodoBroker::subscribe()");
        with_senders::<T, _, _>(|senders| {
            let (tx, rx) = futures_channel::mpsc::unbounded();
            let id = senders.0.insert(tx);
            BrokerStream(id, rx)
        })
    }
}

最後にSubscriptionRootを作る。SubscriptionRootで定義してあるtodoがクライアントから呼び出すsubscriptionの定義となる。

roots.rs
pub struct SubscriptionRoot;

#[async_graphql::Subscription]
impl SubscriptionRoot {
    async fn interval(
        &self,
        #[graphql(default = 1)] n: i32,
    ) -> impl futures_util::Stream<Item = i32> {
        log::info!("SubscriptionRoot::interval()");
        let mut value = 0;
        async_stream::stream! {
            loop {
                futures_timer::Delay::new(std::time::Duration::from_secs(1)).await;
                value += n;
                yield value;
            }
        }
    }

    async fn todos(
        &self,
        mutation_type: Option<MutationType>,
    ) -> impl futures_util::Stream<Item = TodoChanged> {
        log::info!("SubscriptionRoot::todos()");
        TodoBroker::<TodoChanged>::subscribe().filter(move |event| {
            let res = if let Some(mutation_type) = mutation_type {
                event.mutation_type == mutation_type
            } else {
                true
            };
            async move { res }
        })
    }
}

挙動確認

一通り実装が済んだら実行して確認。特にエンドポイントを変えていない場合、http://localhost:3000でアクセスするとクエリとかを投げられる画面が表示される。

image.png

Todoを追加したり

mutation ($text: String){
  addTodo(text: $text) 
}

image.png

Todoリストを見たり

query{
  todos {
    id,
    text,
  }
}

image.png

できてればOK。

クライアント側

最終形コード

下記流れで実装してゆく

  • コンソールアプリのベース作る
  • graphql定義 & コード作成
  • Queryたたく
  • Mutationたたく
  • Subscriptionたたく

コンソールアプリのベース作る

コンソールアプリとして実装。パラメタで挙動を切り替える。

main.rs
#[derive(Debug, clap::Parser)]
#[command(version, about, long_about = None)]
struct Config {
    #[clap(subcommand)]
    subcommand: SubCommands,
}

#[derive(Debug, clap::Subcommand)]
enum SubCommands {
    Query,
    Add {
        #[clap(short = 't', long = "text", required = true, ignore_case = true)]
        text: String,
    },
    Delete {
        #[clap(short = 'i', long = "id", required = true, ignore_case = true)]
        id: u32,
    },
    Subscribe,
}

#[async_std::main]
async fn main() {
    let config = Config::parse();
    match config.subcommand {
        SubCommands::Query => {
            query();
        }
        SubCommands::Add { text } => {
            add_todo(text);
        }
        SubCommands::Delete { id } => {
            delete_todo(id);
        }
        SubCommands::Subscribe => {
            subscribe().await;
        }
    }
}

graphql定義 & コード作成

Query, Mutation, Subscriptionを実行するには対応するコードを作成する必要がある。作る方法は

  1. graphqlマクロで作る
  2. graphql-client generateコマンドで作る

の2つ。どちらにしてもschema.jsonと対応するgraphqlファイルが必要になる。exampleなどでもgraphqlマクロでコードが作成されているのでマクロ利用で合わせたほうがいいかもしれない。

schema.jsonの作成

graphql-clientコマンドを利用するので、インストール

cargo install graphql_client_cli

作ったGraphQLサーバーを起動した状態で下記コマンドを実行する。

graphql-client introspect-schema http://localhost:3000 --output ./src/schema.json

./src/schema.jsonができているのを確認する(抜粋)

{
  "data": {
    "__schema": {
      "directives": [
        {
          "args": [
            {
              "defaultValue": "\"No longer supported\"",
              "description": "A reason for why it is deprecated, formatted using Markdown syntax",
              "name": "reason",
              "type": {
                "kind": "SCALAR",
                "name": "String",
                "ofType": null
              }

対応するgraphqlファイルの作成

Query, Mutation, Subscriptionそれぞれで作成する

query, mutation, subscriptionに続く名称がそのままGraphQL呼び出し時のstruct名になる(TodoRepositoriesとか)。

query.graphql
query TodoRepositories {
  todos {
    id,
    text
  }
}
mutation_add_todo.graphql
mutation TodoAddRepositories($text: String) {
  addTodo(text: $text)
}
mutation_delete_todo.graphql
mutation TodoDeleteRepositories($id: Int) {
  deleteTodo(id: $id)
}
subscription.graphql
subscription TodoChanged ($mutationType: MutationType) {
  todos(mutationType: $mutationType) {
    mutationType,
    id,
    todo {
      id,
      text,
    } 
  }
}

コードの作成 - graphqlマクロで作る

graphql-clientで提供されているgraphqlマクロを指定するだけ。超お手軽。

main.rs
#[derive(graphql_client::GraphQLQuery)]
#[graphql(
    schema_path = "./src/schema.json",
    query_path = "./src/query.graphql",
    normalization = "rust"
)]
pub struct TodoRepositories;

#[derive(graphql_client::GraphQLQuery)]
#[graphql(
    schema_path = "./src/schema.json",
    query_path = "./src/mutation_add_todo.graphql",
    normalization = "rust"
)]
pub struct TodoAddRepositories;

#[derive(graphql_client::GraphQLQuery)]
#[graphql(
    schema_path = "./src/schema.json",
    query_path = "./src/mutation_delete_todo.graphql",
    normalization = "rust"
)]
pub struct TodoDeleteRepositories;

#[derive(graphql_client::GraphQLQuery)]
#[graphql(
    schema_path = "./src/schema.json",
    query_path = "./src/subscription.graphql"
)]
struct TodoChanged;

コードの作成 - graphql-client generateコマンドで作る

下記コマンドを実行するだけでgraphqlに対応するコードが作成される。超簡単。

graphql-client generate ./src/query.graphql --output-directory ./src --schema-path ./src/schema.json
graphql-client generate ./src/mutation_add_todo.graphql --output-directory ./src --schema-path ./src/schema.json
graphql-client generate ./src/mutation_delete_todo.graphql --output-directory ./src --schema-path ./src/schema.json
graphql-client generate ./src/subscription.graphql --output-directory ./src --schema-path ./src/schema.json

マクロ展開された後の状態が知りたいときにも使える。

Queryたたく

クライアント作ってリクエスト投げるだけ

main.rs
fn query() {
    log::info!("start query");
    let client = reqwest::blocking::Client::new();
    let variables = todo_repositories::Variables {};

    let response_body = graphql_client::reqwest::post_graphql_blocking::<TodoRepositories, _>(
        &client,
        "http://localhost:3000/",
        variables,
    );
    match response_body {
        Ok(response) => match response.data {
            Some(data) => {
                let data = data as <TodoRepositories as graphql_client::GraphQLQuery>::ResponseData;
                data.todos.iter().for_each(|todo| {
                    log::info!("id: {}, text: {}", todo.id, todo.text);
                });
            }
            None => {
                log::info!("data is none");
            }
        },
        Err(err) => {
            log::error!("{:?}", err);
        }
    }
}

Mutationたたく

クライアントつくって(略)

main.rs
fn add_todo(text: String) {
    let client = reqwest::blocking::Client::new();
    let variables = todo_add_repositories::Variables { text: Some(text) };

    let response_body = graphql_client::reqwest::post_graphql_blocking::<TodoAddRepositories, _>(
        &client,
        "http://localhost:3000/",
        variables,
    );
    match response_body {
        Ok(response) => match response.data {
            Some(data) => {
                let data =
                    data as <TodoAddRepositories as graphql_client::GraphQLQuery>::ResponseData;
                log::info!("add todo id: {}", data.add_todo);
            }
            None => {
                log::info!("data is none");
            }
        },
        Err(err) => {
            log::error!("{:?}", err);
        }
    }
}

fn delete_todo(id: u32) {
    let client = reqwest::blocking::Client::new();
    let variables = todo_delete_repositories::Variables {
        id: Some(id as i64),
    };

    let response_body = graphql_client::reqwest::post_graphql_blocking::<TodoDeleteRepositories, _>(
        &client,
        "http://localhost:3000/",
        variables,
    );
    match response_body {
        Ok(response) => match response.data {
            Some(data) => {
                let data =
                    data as <TodoDeleteRepositories as graphql_client::GraphQLQuery>::ResponseData;
                log::info!("delete todo id: {}", data.delete_todo);
            }
            None => {
                log::info!("delete todo: data is none");
            }
        },
        Err(err) => {
            log::error!("{:?}", err);
        }
    }
}

Subscriptionたたく

ストリーム作って読み込むだけ

main.rs
async fn subscribe() {
    use async_tungstenite::tungstenite::{client::IntoClientRequest, http::HeaderValue};

    let mut request = "ws://localhost:3000/ws".into_client_request().unwrap();
    request.headers_mut().insert(
        "Sec-WebSocket-Protocol",
        HeaderValue::from_str("graphql-transport-ws").unwrap(),
    );

    use async_tungstenite::async_std::connect_async;
    let (connection, response) = connect_async(request).await.unwrap();
    log::info!("connected");
    log::info!("connect_async response: {:?}", response);

    println!("Connected");

    let stream = graphql_ws_client::Client::build(connection)
        .subscribe(StreamingOperation::<TodoChanged>::new(
            todo_changed::Variables {
                mutation_type: Some(todo_changed::MutationType::CREATED),
            },
        ))
        .await;
    let mut subscription = match stream {
        Ok(stream) => {
            log::info!("created subscribe");
            stream
        }
        Err(err) => {
            log::error!("create stream error: {:?}", err);
            std::process::exit(1);
        }
    };

    log::info!("start subscribe");
    while let Some(response) = subscription.next().await {
        log::info!("recieve response");
        match response {
            Ok(response) => match response.data {
                Some(data) => {
                    let mutation_type = match data.todos.mutation_type {
                        todo_changed::MutationType::CREATED => "created".to_string(),
                        todo_changed::MutationType::DELETED => "deleted".to_string(),
                        todo_changed::MutationType::Other(value) => value,
                    };
                    log::info!("subscribe response mutation_type: {:?}", mutation_type);
                    log::info!("subscribe response id: {:?}", data.todos.id);
                    match data.todos.todo {
                        Some(todo) => {
                            log::info!(
                                "subscribe response todo: (id: {:?}, text: {})",
                                todo.id,
                                todo.text
                            );
                        }
                        None => {
                            log::info!("subscribe response doto: None");
                        }
                    };
                }
                None => {
                    log::info!("subscribe data is none");
                }
            },
            Err(err) => {
                log::error!("subscribe error: {:?}", err);
            }
        }
    }
    log::info!("end subscribe");
}

挙動確認

コンソールを新規で2つ立ち上げて片方をsubscription指定

cargo run -- subscribe

もう片方で適当にaddとかする

cargo run -- add --text "this is cool"

subscription呼んでるコンソール側にデータが流れる

[INFO]: start graphql client
[DEBUG]: Client handshake done.
[INFO]: connected
[INFO]: connect_async response: Response { status: 101, version: HTTP/1.1, headers: {"connection": "upgrade", "upgrade": "websocket", "sec-websocket-accept": "KuFcO3MIdpU1aqjhaYG/VIHQ7kM=", "sec-websocket-protocol": "graphql-transport-ws", "date": "Sun, 09 Feb 2025 06:40:28 GMT"}, body: None }
Connected
[INFO]: created subscribe
[INFO]: start subscribe
[INFO]: recieve response
[INFO]: subscribe response mutation_type: "created"
[INFO]: subscribe response id: 2
[INFO]: subscribe response todo: (id: 2, text: this is cool)

振り返り

記事にまとめてしまうと簡単な感じになったけど、graphql-clientとかasync-graphqlのexample見ながら実装しても結構つまづいた。

  • cargo install graphql_clientがエラーになる
  • feature指定しないといけないけどドキュメント読み込まないと気づけない
  • subscriptionでコネクション確立まで行くけどデータが流れてこない(Client作成まわり)
  • サーバ側の挙動確認を行うためのクライアント作成でてこずる
  • etc

つまづいた分、知見が溜まりまくった感ある。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?