やりたいこと
RustでGraphQLのサーバを立てて、RustでQuery/Mutation/Subscribeをやる。RustでGraphQLをやろうとして記事を検索すると、サーバー側だけとかクライアント側だけとかだったので、一気見できる記事が欲しかった。
使ったのは
- サーバー側
- async-graphql
- async-graphql-axum
- async-stream
- クライアント側
- graphql-client
- graphql-ws-client
async-graphql-axumとかgraphql-ws-clientとかがexampleを提供しているので細かい部分はそっちを読んで欲しい。
本記事では超簡易なTodoを扱うサーバを立てる。
GraphQLの超簡易説明
GraphQLのサーバーは下記機能を提供する
- Query: データの取得
- Mutation: データの更新
- Subscrpition: データの購読(データが変わったとか追加されたとかを通知する)
以上。
サーバーサイド
GraphQLで提供する下記機能を実装する
- Query
- Mutation
- Subscription
async-graphqlを利用する場合下記のような流れになる
- 提供するデータを定義する(下記main.rsのStorage)
- Query/Mutation/Subscriptionのそれぞれに対応したRootを作成する(下記main.rsのQueryRoot, MutationRoot, SubscriptionRoot)
- 各Rootを指定してスキーマを作成する(下記main.rsのSchema::build())
- axumのルートにスキーマを指定してサービスを開始する
- subscriptionは"/ws"でIFを切る
let schema = async_graphql::Schema::build(QueryRoot, MutationRoot, SubscriptionRoot)
.data(Storage::default())
.finish();
let app = Router::new()
.route(
"/",
get(graphiql).post_service(GraphQL::new(schema.clone())),
)
.route_service("/ws", GraphQLSubscription::new(schema));
let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
axum::serve(listener, app).await.unwrap();
勉強用にQueryRootだけとか作ると後々面倒なので一気に作った方が楽。
提供するデータを定義する
今回はDB接続とか無し。メモリ上に超簡易なToDoリストを持つ。
とりあえずTodoを定義して
#[derive(Clone)]
pub struct Todo {
pub id: u32,
pub text: String,
}
#[async_graphql::Object]
impl Todo {
pub async fn id(&self) -> u32 {
self.id
}
pub async fn text(&self) -> String {
self.text.clone()
}
}
メモリ上に保持するデータを定義する
use crate::models::todo::Todo;
#[derive(Default)]
pub struct TodoServiceData {
pub todos: Vec<Todo>,
}
impl TodoServiceData {
#[allow(clippy::new_without_default, dead_code)]
pub fn new() -> Self {
Self { todos: vec![] }
}
pub fn todos(&self) -> Vec<Todo> {
self.todos.clone()
}
}
各Rootから呼び出せる場所でデータを定義する
pub type Storage = Arc<Mutex<TodoServiceData>>;
QueryRoot
一覧取得を提供する。async_graphql::Objectを指定するだけ。簡単。
pub struct QueryRoot;
#[async_graphql::Object]
impl QueryRoot {
async fn todos<'a>(&self, ctx: &async_graphql::Context<'a>) -> Vec<Todo> {
log::info!("QueryRoot::todos()");
let todo_service_data = &ctx.data_unchecked::<Storage>().lock().await;
todo_service_data.todos()
}
}
MutationRoot
追加と削除のIFを提供する。TodoBroker::publish()はsubscriptionへ通知を飛ばしてるだけ(後述)。
pub struct MutationRoot;
#[async_graphql::Object]
impl MutationRoot {
async fn add_todo(
&self,
ctx: &async_graphql::Context<'_>,
text: String,
) -> async_graphql::Result<u32> {
log::info!("MutationRoot::add_todo()");
let todo_service_data = &mut ctx.data_unchecked::<Storage>().lock().await;
let todos = &mut todo_service_data.todos;
let id = match todos.iter().max_by_key(|todo| todo.id) {
Some(todo) => todo.id + 1,
None => 1,
};
todos.push(Todo {
id: id,
text: text.clone(),
});
TodoBroker::publish(TodoChanged {
mutation_type: MutationType::Created,
id: id,
});
Ok(id)
}
async fn delete_todo(
&self,
ctx: &async_graphql::Context<'_>,
id: u32,
) -> async_graphql::Result<u32> {
log::info!("MutationRoot::delete_todo()");
let todo_service_data = &mut ctx.data_unchecked::<Storage>().lock().await;
let todos = &mut todo_service_data.todos;
let index = todos.iter().position(|todo| todo.id == id).unwrap();
todos.remove(index);
TodoBroker::publish(TodoChanged {
mutation_type: MutationType::Deleted,
id: id,
});
Ok(id)
}
}
SubscriptionRoot
まずはsubscribeするデータを定義する。MutationRootで飛ばしてる内容もコレ。
#[derive(async_graphql::Enum, Eq, PartialEq, Copy, Clone)]
enum MutationType {
Created,
Deleted,
}
#[derive(Clone)]
pub struct TodoChanged {
mutation_type: MutationType,
id: u32,
}
#[async_graphql::Object]
impl TodoChanged {
async fn mutation_type(&self) -> MutationType {
self.mutation_type
}
async fn id(&self) -> u32 {
*(&self.id)
}
async fn todo(&self, ctx: &async_graphql::Context<'_>) -> async_graphql::Result<Option<Todo>> {
log::info!("TodoChanged::todo()");
let todo_service_data = &ctx.data_unchecked::<Storage>().lock().await;
let todos = todo_service_data.todos().clone();
let todo = todos.iter().filter(|todo| todo.id == self.id).next();
let todo = if let Some(todo) = todo {
Some(todo.clone())
} else {
None
};
Ok(todo)
}
}
次にブローカーを定義する。ざっくり説明すると下記2点を提供してる。
- subscriberの保持(SUBSCRIBERS + TodoBroker::subscribe()) <- SubscriptionRootから呼ばれる
- データ流し込み(TodoBroker::publish()) <- MutationRootから呼ばれる
TodoBroker
use futures_util::StreamExt;
use once_cell::sync::Lazy;
use slab::Slab;
use std::{
any::{Any, TypeId},
collections::HashMap,
sync::Mutex,
};
static SUBSCRIBERS: Lazy<Mutex<HashMap<TypeId, Box<dyn Any + Send>>>> = Lazy::new(Default::default);
struct Senders<T>(Slab<futures_channel::mpsc::UnboundedSender<T>>);
struct BrokerStream<T: Sync + Send + Clone + 'static>(
usize,
futures_channel::mpsc::UnboundedReceiver<T>,
);
fn with_senders<T, F, R>(f: F) -> R
where
T: Sync + Send + Clone + 'static,
F: FnOnce(&mut Senders<T>) -> R,
{
log::debug!("call with_senders");
let mut map = SUBSCRIBERS.lock().unwrap();
let senders = map
.entry(TypeId::of::<Senders<T>>())
.or_insert_with(|| Box::new(Senders::<T>(Default::default())));
f(senders.downcast_mut::<Senders<T>>().unwrap())
}
impl<T: Sync + Send + Clone + 'static> Drop for BrokerStream<T> {
fn drop(&mut self) {
log::debug!("BrokerStream::drop()");
with_senders::<T, _, _>(|senders| senders.0.remove(self.0));
}
}
impl<T: Sync + Send + Clone + 'static> futures_util::Stream for BrokerStream<T> {
type Item = T;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
log::debug!("BrokerStream::poll_next()");
self.1.poll_next_unpin(cx)
}
}
pub struct TodoBroker<T>(std::marker::PhantomData<T>);
impl<T: Sync + Send + Clone + 'static> TodoBroker<T> {
pub fn publish(msg: T) {
log::info!("TodoBroker::publish()");
with_senders::<T, _, _>(|senders| {
for (_, sender) in senders.0.iter_mut() {
sender.start_send(msg.clone()).ok();
}
});
}
pub fn subscribe() -> impl futures_util::Stream<Item = T> {
log::info!("TodoBroker::subscribe()");
with_senders::<T, _, _>(|senders| {
let (tx, rx) = futures_channel::mpsc::unbounded();
let id = senders.0.insert(tx);
BrokerStream(id, rx)
})
}
}
最後にSubscriptionRootを作る。SubscriptionRootで定義してあるtodoがクライアントから呼び出すsubscriptionの定義となる。
pub struct SubscriptionRoot;
#[async_graphql::Subscription]
impl SubscriptionRoot {
async fn interval(
&self,
#[graphql(default = 1)] n: i32,
) -> impl futures_util::Stream<Item = i32> {
log::info!("SubscriptionRoot::interval()");
let mut value = 0;
async_stream::stream! {
loop {
futures_timer::Delay::new(std::time::Duration::from_secs(1)).await;
value += n;
yield value;
}
}
}
async fn todos(
&self,
mutation_type: Option<MutationType>,
) -> impl futures_util::Stream<Item = TodoChanged> {
log::info!("SubscriptionRoot::todos()");
TodoBroker::<TodoChanged>::subscribe().filter(move |event| {
let res = if let Some(mutation_type) = mutation_type {
event.mutation_type == mutation_type
} else {
true
};
async move { res }
})
}
}
挙動確認
一通り実装が済んだら実行して確認。特にエンドポイントを変えていない場合、http://localhost:3000
でアクセスするとクエリとかを投げられる画面が表示される。
Todoを追加したり
mutation ($text: String){
addTodo(text: $text)
}
Todoリストを見たり
query{
todos {
id,
text,
}
}
できてればOK。
クライアント側
下記流れで実装してゆく
- コンソールアプリのベース作る
- graphql定義 & コード作成
- Queryたたく
- Mutationたたく
- Subscriptionたたく
コンソールアプリのベース作る
コンソールアプリとして実装。パラメタで挙動を切り替える。
#[derive(Debug, clap::Parser)]
#[command(version, about, long_about = None)]
struct Config {
#[clap(subcommand)]
subcommand: SubCommands,
}
#[derive(Debug, clap::Subcommand)]
enum SubCommands {
Query,
Add {
#[clap(short = 't', long = "text", required = true, ignore_case = true)]
text: String,
},
Delete {
#[clap(short = 'i', long = "id", required = true, ignore_case = true)]
id: u32,
},
Subscribe,
}
#[async_std::main]
async fn main() {
let config = Config::parse();
match config.subcommand {
SubCommands::Query => {
query();
}
SubCommands::Add { text } => {
add_todo(text);
}
SubCommands::Delete { id } => {
delete_todo(id);
}
SubCommands::Subscribe => {
subscribe().await;
}
}
}
graphql定義 & コード作成
Query, Mutation, Subscriptionを実行するには対応するコードを作成する必要がある。作る方法は
- graphqlマクロで作る
- graphql-client generateコマンドで作る
の2つ。どちらにしてもschema.jsonと対応するgraphqlファイルが必要になる。exampleなどでもgraphqlマクロでコードが作成されているのでマクロ利用で合わせたほうがいいかもしれない。
schema.jsonの作成
graphql-clientコマンドを利用するので、インストール
cargo install graphql_client_cli
作ったGraphQLサーバーを起動した状態で下記コマンドを実行する。
graphql-client introspect-schema http://localhost:3000 --output ./src/schema.json
./src/schema.json
ができているのを確認する(抜粋)
{
"data": {
"__schema": {
"directives": [
{
"args": [
{
"defaultValue": "\"No longer supported\"",
"description": "A reason for why it is deprecated, formatted using Markdown syntax",
"name": "reason",
"type": {
"kind": "SCALAR",
"name": "String",
"ofType": null
}
対応するgraphqlファイルの作成
Query, Mutation, Subscriptionそれぞれで作成する
query
, mutation
, subscription
に続く名称がそのままGraphQL呼び出し時のstruct名になる(TodoRepositoriesとか)。
query TodoRepositories {
todos {
id,
text
}
}
mutation TodoAddRepositories($text: String) {
addTodo(text: $text)
}
mutation TodoDeleteRepositories($id: Int) {
deleteTodo(id: $id)
}
subscription TodoChanged ($mutationType: MutationType) {
todos(mutationType: $mutationType) {
mutationType,
id,
todo {
id,
text,
}
}
}
コードの作成 - graphqlマクロで作る
graphql-clientで提供されているgraphqlマクロを指定するだけ。超お手軽。
#[derive(graphql_client::GraphQLQuery)]
#[graphql(
schema_path = "./src/schema.json",
query_path = "./src/query.graphql",
normalization = "rust"
)]
pub struct TodoRepositories;
#[derive(graphql_client::GraphQLQuery)]
#[graphql(
schema_path = "./src/schema.json",
query_path = "./src/mutation_add_todo.graphql",
normalization = "rust"
)]
pub struct TodoAddRepositories;
#[derive(graphql_client::GraphQLQuery)]
#[graphql(
schema_path = "./src/schema.json",
query_path = "./src/mutation_delete_todo.graphql",
normalization = "rust"
)]
pub struct TodoDeleteRepositories;
#[derive(graphql_client::GraphQLQuery)]
#[graphql(
schema_path = "./src/schema.json",
query_path = "./src/subscription.graphql"
)]
struct TodoChanged;
コードの作成 - graphql-client generateコマンドで作る
下記コマンドを実行するだけでgraphqlに対応するコードが作成される。超簡単。
graphql-client generate ./src/query.graphql --output-directory ./src --schema-path ./src/schema.json
graphql-client generate ./src/mutation_add_todo.graphql --output-directory ./src --schema-path ./src/schema.json
graphql-client generate ./src/mutation_delete_todo.graphql --output-directory ./src --schema-path ./src/schema.json
graphql-client generate ./src/subscription.graphql --output-directory ./src --schema-path ./src/schema.json
マクロ展開された後の状態が知りたいときにも使える。
Queryたたく
クライアント作ってリクエスト投げるだけ
fn query() {
log::info!("start query");
let client = reqwest::blocking::Client::new();
let variables = todo_repositories::Variables {};
let response_body = graphql_client::reqwest::post_graphql_blocking::<TodoRepositories, _>(
&client,
"http://localhost:3000/",
variables,
);
match response_body {
Ok(response) => match response.data {
Some(data) => {
let data = data as <TodoRepositories as graphql_client::GraphQLQuery>::ResponseData;
data.todos.iter().for_each(|todo| {
log::info!("id: {}, text: {}", todo.id, todo.text);
});
}
None => {
log::info!("data is none");
}
},
Err(err) => {
log::error!("{:?}", err);
}
}
}
Mutationたたく
クライアントつくって(略)
fn add_todo(text: String) {
let client = reqwest::blocking::Client::new();
let variables = todo_add_repositories::Variables { text: Some(text) };
let response_body = graphql_client::reqwest::post_graphql_blocking::<TodoAddRepositories, _>(
&client,
"http://localhost:3000/",
variables,
);
match response_body {
Ok(response) => match response.data {
Some(data) => {
let data =
data as <TodoAddRepositories as graphql_client::GraphQLQuery>::ResponseData;
log::info!("add todo id: {}", data.add_todo);
}
None => {
log::info!("data is none");
}
},
Err(err) => {
log::error!("{:?}", err);
}
}
}
fn delete_todo(id: u32) {
let client = reqwest::blocking::Client::new();
let variables = todo_delete_repositories::Variables {
id: Some(id as i64),
};
let response_body = graphql_client::reqwest::post_graphql_blocking::<TodoDeleteRepositories, _>(
&client,
"http://localhost:3000/",
variables,
);
match response_body {
Ok(response) => match response.data {
Some(data) => {
let data =
data as <TodoDeleteRepositories as graphql_client::GraphQLQuery>::ResponseData;
log::info!("delete todo id: {}", data.delete_todo);
}
None => {
log::info!("delete todo: data is none");
}
},
Err(err) => {
log::error!("{:?}", err);
}
}
}
Subscriptionたたく
ストリーム作って読み込むだけ
async fn subscribe() {
use async_tungstenite::tungstenite::{client::IntoClientRequest, http::HeaderValue};
let mut request = "ws://localhost:3000/ws".into_client_request().unwrap();
request.headers_mut().insert(
"Sec-WebSocket-Protocol",
HeaderValue::from_str("graphql-transport-ws").unwrap(),
);
use async_tungstenite::async_std::connect_async;
let (connection, response) = connect_async(request).await.unwrap();
log::info!("connected");
log::info!("connect_async response: {:?}", response);
println!("Connected");
let stream = graphql_ws_client::Client::build(connection)
.subscribe(StreamingOperation::<TodoChanged>::new(
todo_changed::Variables {
mutation_type: Some(todo_changed::MutationType::CREATED),
},
))
.await;
let mut subscription = match stream {
Ok(stream) => {
log::info!("created subscribe");
stream
}
Err(err) => {
log::error!("create stream error: {:?}", err);
std::process::exit(1);
}
};
log::info!("start subscribe");
while let Some(response) = subscription.next().await {
log::info!("recieve response");
match response {
Ok(response) => match response.data {
Some(data) => {
let mutation_type = match data.todos.mutation_type {
todo_changed::MutationType::CREATED => "created".to_string(),
todo_changed::MutationType::DELETED => "deleted".to_string(),
todo_changed::MutationType::Other(value) => value,
};
log::info!("subscribe response mutation_type: {:?}", mutation_type);
log::info!("subscribe response id: {:?}", data.todos.id);
match data.todos.todo {
Some(todo) => {
log::info!(
"subscribe response todo: (id: {:?}, text: {})",
todo.id,
todo.text
);
}
None => {
log::info!("subscribe response doto: None");
}
};
}
None => {
log::info!("subscribe data is none");
}
},
Err(err) => {
log::error!("subscribe error: {:?}", err);
}
}
}
log::info!("end subscribe");
}
挙動確認
コンソールを新規で2つ立ち上げて片方をsubscription指定
cargo run -- subscribe
もう片方で適当にaddとかする
cargo run -- add --text "this is cool"
subscription呼んでるコンソール側にデータが流れる
[INFO]: start graphql client
[DEBUG]: Client handshake done.
[INFO]: connected
[INFO]: connect_async response: Response { status: 101, version: HTTP/1.1, headers: {"connection": "upgrade", "upgrade": "websocket", "sec-websocket-accept": "KuFcO3MIdpU1aqjhaYG/VIHQ7kM=", "sec-websocket-protocol": "graphql-transport-ws", "date": "Sun, 09 Feb 2025 06:40:28 GMT"}, body: None }
Connected
[INFO]: created subscribe
[INFO]: start subscribe
[INFO]: recieve response
[INFO]: subscribe response mutation_type: "created"
[INFO]: subscribe response id: 2
[INFO]: subscribe response todo: (id: 2, text: this is cool)
振り返り
記事にまとめてしまうと簡単な感じになったけど、graphql-clientとかasync-graphqlのexample見ながら実装しても結構つまづいた。
-
cargo install graphql_client
がエラーになる - feature指定しないといけないけどドキュメント読み込まないと気づけない
- subscriptionでコネクション確立まで行くけどデータが流れてこない(Client作成まわり)
- サーバ側の挙動確認を行うためのクライアント作成でてこずる
- etc
つまづいた分、知見が溜まりまくった感ある。