LoginSignup
7
3

【ハンズオン】FlutterとRiverpodでgRPCの双方向ストリーミングを状態管理しよう

Last updated at Posted at 2023-12-13

DeNA 24卒 Advent Calendar 2023 13日目の記事です。

DeNA 24卒の@Funobuです。よろしくお願いします!

最近は、「AI美少女がメンターとして、塾のような手厚い学習のサポートをしてくれる」モバイルアプリを趣味で開発しています。今回は、その時に得た知見の一部を共有します。

冒頭

前提知識

この記事では、Flutter, Riverpod, gRPCを用いた実践的な内容を取り扱います。
そのため、以下の3項目を前提知識とします。

  • gRPCについて、その仕組みを軽く理解していること。
  • Flutterについて、基本的な文法やWidgetの仕組みを理解していること。
  • Riverpod v2について、その仕組みを軽く理解していること。

もし前提知識に関して不安がある方は、下記の記事を参考に学習してみると良いと思います。

目標

  • gRPCのBidirectional Streamingの、Dartでの実装方法を理解する
  • FlutterのStreamについて、理解する
  • APIとFlutterで、リアルタイムにメッセージをやり取りできるようになる

最終的にはメッセージをAPIとやり取りできる機能を再現します。また、API側はOpenAIを使って、AIに会話のメッセージを考えてもらいます。

デモ動画をYouTubeにアップロードしたところ、どうやらYouTubeのアカウントがBANされたみたいです。本当はデモ動画をお見せしたかったのですが、2023.12.13現在再審査の申請中です...。🙏

動画では可愛い美少女が表示されていますが、そこまで実装するのはかなり複雑なので、今回は会話の一連の流れのみを実装します。

システムの構成

今回実装するシステムは、ChatGPTのような、クライアント・サーバでリアルタイムにメッセージをやり取りするものを想定しています。複数のクライアント同士でメッセージをやり取りするわけではありません。

DeNA1.jpg

環境

  • Dart:3.2.0
  • Flutter:3.16.0
  • freezed_annotation:^2.4.1
  • hooks_riverpod:^2.4.3
  • riverpod_annotation:^2.2.0
  • flutter_hooks:^0.20.2
  • grpc:^3.2.4

実装のための準備 (環境構築)

GitHubからリポジトリをクローン

今回の機能を実装するために、前提となるAPI側のコードとFlutterのコードを用意しました。環境構築の方法は該当リポジトリのREADME.mdに書いてあります。

記事内では、あくまでFlutter側の実装についてのみ扱います。

筆者はAPI側の実装にGo言語を利用しています。API側のコードはGitHub上に公開しているので、詳しくはそちらをご覧ください。

OpenAIのAPIキーを取得

今回はAPI側でメッセージ生成に必要なため、OpenAI APIキーを取得してください。

OpenAI APIキーを取得したら、.envファイルにAPIキーを記載してください。

.env
PORT=8080
OPENAI_API_KEY=ここにAPIキーを入れる

実装するアプリの設計方針

gRPCのスキーマ設計

今回のgRPC通信に用いるprotobufファイルは以下の通りのものとします。

protobufのコード

schema/message_service_v1.proto
/**
 * MessageServiceはAIとの会話を提供するサービスです。
 */
service MessageService {
  /**
    * RealtimeConversation
    * リアルタイムにAIと会話をするためのリクエストとレスポンスを行うストリーミングRPCです。
  * */
  rpc RealtimeConversation(stream RealtimeConversationRequest) returns (stream RealtimeConversationResponse);
}

message RealtimeConversationRequest {
  /**
    * Message
    * メッセージ(ユーザ)
   */
  string message = 1;
}

message RealtimeConversationResponse {
  /**
    * MessageID
    * メッセージのID (1文字ずつではなく、1つのメッセージで一意)
   */
  string message_id = 1;
  /**
    * MessageChar
    * メッセージ1文字(AI)
   */
  string message_char = 2;
  /**
    * IsDone
    * 完了済みかどうか
    * 1つのメッセージのストリームが完了したかどうかを表します。
   */
  bool is_done = 3;
}

RealtimeConversationの仕様

RealtimeConversationの機能と、リクエストとレスポンスの型について説明します。まず、機能として再現したいのは、まさにChat GPTの返答のように、返答を1文字ずつ返してくれるものです。

スクリーンショット 2023-12-12 23.27.25.png

この 1文字ずつ返す 処理を再現するために、レスポンスの型であるRealtimeConversationResponseでは、message_charという名前でAPI側から随時送られてくる文字を1文字ずつ受け取っています。また、1つのメッセージの送信がすべて完了した場合は、is_doneをtrueにすることで1メッセージの送信完了を表します。

一方リクエストの型は簡単で、RealtimeConversationRequestで送信したいメッセージを書くだけです。上のChat GPTの例だと、「人生とは何ですか?」を送るのみです。

DeNA2.jpg

Flutterのディレクトリ構成

今回使用するFlutter側のディレクトリ構成は以下の通りです。

lib
├── generated
│   ├── message_service_v1.pb.dart
│   ├── message_service_v1.pbenum.dart
│   ├── message_service_v1.pbgrpc.dart
│   └── message_service_v1.pbjson.dart
├── logics
│   ├── realtime_conversation_logic.dart
│   └── realtime_conversation_logic.g.dart
├── main.dart
├── models
│   ├── message.dart
│   └── message.freezed.dart
├── providers
│   ├── config.dart
│   ├── config.freezed.dart
│   ├── config_provider.dart
│   ├── config_provider.g.dart
│   ├── grpc_channel_provider.dart
│   └── grpc_channel_provider.g.dart
├── theme.dart
└── views
    ├── realtime_conversation_bottom_sheet_widget.dart
    ├── realtime_conversation_form_view.dart
    ├── realtime_conversation_message_view.dart
    ├── realtime_conversation_message_widget.dart
    └── realtime_conversation_screen.dart

6 directories, 21 files

各ディレクトリ及びファイルごとの役割はそれぞれ以下の通りです。

  • generated:自動生成されたgRPCのコード
  • logics:アプリケーションのロジックに関するコード (RiverpodでProvider化する)
  • main.dart:アプリの起動部分
  • providers:gRPCのチャンネルやアプリ全体の設定など、全体で使用する変数をProvider化
  • theme.dart:アプリのUIに関するテーマ
  • views:アプリのUIに関するコード (ロジックもここで呼び出して使用する)

上のディレクトリ構成の中で特に重要なのがlogics(ロジック層)とviews(View層)で、それぞれの関係性は以下の図の通りになります。
DeNA3.jpg

実装のための準備 (Flutter)

gRPCのクライアントコード生成

DartでgRPCのコードを生成するには、protocコマンドを使用します。
この工程についてはドキュメントなどの情報が充実しているので、詳しい説明は割愛します。

frontend/scripts/gen_grpc.gen
#!/usr/bin/env bash

protoc --dart_out=grpc:./lib/generated \
    -I ../schema \
    ../schema/*.proto

参考:

ストリーミング処理でやり取りするためのモデルの作成

まずは、freezedを使ってストリーミング処理でUI側とやり取りするためのモデルを作成します。
以下のコードにあるMessageRequestStreamModelFlutter→APIに使用するモデル、MessageResponseStreamModelAPI→Flutterに使用するモデルとします。

frontend/lib/models/message.dart
import 'package:freezed_annotation/freezed_annotation.dart';

part 'message.freezed.dart';

@freezed
class MessageRequestStreamModel with _$MessageRequestStreamModel {
  const factory MessageRequestStreamModel({
    required String message,
  }) = _ConversationMessageRequestStreamModel;
}

@freezed
class MessageResponseStreamModel with _$MessageResponseStreamModel {
  const factory MessageResponseStreamModel({
    required String messageId,
    required String message,
    required bool isDone,
  }) = _ConversationMessageResponseStreamModel;
}

2つのモデルの使い方は、gRPCのRealtimeConversationに使用するリクエストの型とほぼ変わりありませんが、レスポンスの型だけgRPCと変更点があります。それは、messageChar → messageになっていることです。

WidgetでSteamを購読する際、今回のようにメッセージを1文字ずつ受信する場合は注意が必要です。今回の場合、文字が送られる度にWidgetが再レンダリングされてしまうのが原因で、Streamとのコネクションが途切れ途切れになる現象に遭遇しました。

そのため、今回はgRPCのAPIから送られてきた文字をロジックの部分で結合していき、その時点のすべての文字をStreamで送るようにしています。

Riverpodを使ったgRPCの双方向通信の実装

ここからはRiverpodを使って、gRPCとのメッセージ送受信を管理するロジックを作ります。
今回は1文字ずつ送られてくるメッセージをリアルタイムに表示する要件を満たすため、StreamProviderを使用します。

StreamProviderとは、DartにおけるStreamを状態管理するためのRiverpodのProviderです。Streamとは、特定の型の値を、リアルタイムに相手に送信するための技術のことです。ゲームに例えると以下のような図になります。

DeNA4.jpg

上の図のように、Streamでは出版(Publish)と購読(Subscribe)の2つの役割が存在します。Publishは最新の情報をStreamを介して発信し続ける役割で、SubscribeはPublishから送られた最新の情報をリアルタイムに受け取る役割のことです。

今回の場合、Flutter ↔︎ APIで、メッセージの送信時と受信時ではそれぞれ以下の関係になります。

  • メッセージの送信:
    • Publish:Flutter
    • Subscribe:API
  • メッセージの受信:
    • Publish:API
    • Subscribe:Flutter

Flutterの部分をさらに深掘りすると、ロジックを責務を持つLogic層と、UIの描画に責務を持つView層は以下の関係になります。

  • メッセージの送信:
    • Publish:View層のフォーム
    • Subscribe:Logic層のStreamProvider
  • メッセージの受信:
    • Publish:Logic層のStreamProvider
    • Subscribe:View層のメッセージを表示するWidget

全体を踏まえると、Streamを使ったデータフローは以下の通りです。

  • メッセージの送信:View層 → Logic層 → API
  • メッセージの受信:API → Logic層 → View層

言葉だけでは理解するのは難しいと思うので、実際のコードをデバッグしながらStreamの流れを追っていくと、より深い理解に繋がります。実践あるのみです!

メッセージを管理するStreamProviderの実装 (Logic層)

ここからは実際に、gRPCとUIの間に立って、メッセージをリアルタイムに送受信するためのロジックを実装します。まずはコードの全体像とコード内のコメントを見て、ロジックの流れを理解します。

frontend/lib/logics/realtime_conversation_logic.dart
import 'dart:async';

import 'package:flutter/material.dart';
import 'package:riverpod_annotation/riverpod_annotation.dart';
import 'package:sample_app/models/message.dart';
import 'package:sample_app/generated/message_service_v1.pbgrpc.dart'
    as grpc_message_service;
import 'package:sample_app/providers/grpc_channel_provider.dart';

part 'realtime_conversation_logic.g.dart';

// UI側から送られたメッセージを受信し、realtimeConversationStreamLogicに渡すためのStreamController (Provider)
@riverpod
StreamController<MessageRequestStreamModel> conversationRequestStreamController(
  ConversationRequestStreamControllerRef ref,
) {
  // 複数の購読を想定して、broadcastにしておく
  final ctrl = StreamController<MessageRequestStreamModel>.broadcast();
  ref.onDispose(() {
    ctrl.close();
  });
  return ctrl;
}

// gRPCとのメッセージのやり取りを行うロジックのStreamProvider
// リクエスト側:conversationRequestStreamControllerを購読し、UI側からのメッセージを受け取り次第gRPC側にそのメッセージを送信
// レスポンス側:MessageResponseStreamModel型のストリームをブロードキャストで返す
@riverpod
Stream<MessageResponseStreamModel> realtimeConversationStreamLogic(
  RealtimeConversationStreamLogicRef ref,
) async* {
  // API側へリクエストを送る用のStreamController
  final grpcReqController =
      StreamController<grpc_message_service.RealtimeConversationRequest>();

  // UIからメッセージを送信する用に作成したStreamControllerのProviderを監視
  // メッセージを受信し次第、gRPCのリクエストを送信する
  ref.watch(conversationRequestStreamControllerProvider).stream.listen((event) {
    debugPrint('message: ${event.message}');
    grpcReqController.add(
      grpc_message_service.RealtimeConversationRequest(
        message: event.message,
      ),
    );
  });

  // StreamProviderの戻り値に使うStreamController (複数の購読で受け取れるようブロードキャスト)
  final resController =
      StreamController<MessageResponseStreamModel>.broadcast();

  // gRPCのサービスに接続するためのstub
  final channel = ref.read(grpcChannelProvider);
  final stub = grpc_message_service.MessageServiceClient(channel);
  // gRPCからストリームを受け取るためのStream
  final res = stub.realtimeConversation(
    grpcReqController.stream,
  );

  // 現在のメッセージを表す変数 (文字が追加されるたびに更新する)
  String currentMessage = '';
  try {
    res.listen(
      (event) async {
        // currentMessageでgRPCから受け取った文字を追加する
        currentMessage = currentMessage + event.messageChar;
        debugPrint('currentMessage: $currentMessage');
        // 新しいレスポンスのモデルを作成し、resControllerに渡す
        final res = MessageResponseStreamModel(
            messageId: event.messageId,
            message: currentMessage,
            isDone: event.isDone);
        resController.sink.add(res);

        // isDoneがtrueの場合は1メッセージが完了のため、currentMessageを空にする
        if (event.isDone) {
          // TODO: 本来はここで端末内にメッセージを保存する処理を行う
          debugPrint('messageId: ${event.messageId}, message: $currentMessage');
          currentMessage = '';
        }
      },
    );
  } catch (err) {
    // ここにエラーハンドリングを記述する
    rethrow;
  }

  // resControllerのストリームをStreamProviderの戻り値にする
  yield* resController.stream;
}

上記コードの流れを理解した上で、StreamProviderを使ったロジックを書く上で重要となるポイントを3つ紹介します。

1. StreamProviderは、v2からある程度自動生成できる

RiverpodのProviderは、バージョン2から最低限のコードを自動生成できるようになりました。
主にProviderの自動破棄処理や、Providerを参照するためのコードを自動生成してくれます。そのため、開発者はロジックの実装に集中できるようになりました。

今回使用するStreamProviderも、コードをある程度自動生成することができます。
手順は、以下の数行を書いて、dart run build_runner buildをターミナルで実行するだけです。

// riverpod_annotationのインポートを忘れない
import 'package:riverpod_annotation/riverpod_annotation.dart';

// 生成するコードのファイル名
part 'realtime_conversation_logic.g.dart';

//@riverpodをProvider化したい関数の上に付ける
@riverpod
Stream<MessageResponseStreamModel> realtimeConversationStreamLogic(
  RealtimeConversationStreamLogicRef ref,
) async* {}

コードを自動生成した後、そのProviderは関数名Providerという名前の変数を呼び出すことで、参照することができます。

final stream = ref.watch(realtimeConversationStreamLogicProvider);

return stream.when(
  data: (data) {
    return Text(data.message);
  },
  error: (err, _) => Text('Error: $err'),
  loading: () => const CircularProgressIndicator(),
);

今回はStreamProviderを自動生成しましたが、他の種類のProviderも自動生成することができます。詳しい生成方法は、下記ドキュメントを参考にすると良いでしょう。

2. PublishするStreamControllerにはbroadcast()を付けることを検討する

StreamをPublishする際には、StreamControllerを利用します。
このStreamControllerを使うことで、Publishする側は簡単に送りたい値をストリームを介して送ることができます。

// streamContrllerを作成する
final resController =
      StreamController<MessageResponseStreamModel>.broadcast();
      
// streamControllerのaddメソッドで、送りたい値をストリームを介して送信する
resController.sink.add("明けましておめでとうございます");

// streamControllerのstreamを購読することで、値を受け取ることができる
resController.stream.listen((event) {
  // 「明けましておめでとうございます」と表示される
  debugPrint(event)
});

以上のコードでStreamのPublishとSubscribeを実装できるのですが、StreamControllerを定義する上で一点だけ注意が必要です。それは、StreamControllerがbroadcastであるか否かです。

StreamControllerの後ろにbroadcastメソッドを付与すると、そのStreamControllerはbroadcast扱いとなります。通常のStreamControllerと何が違うかというと、それは一度に購読できるSubscribeの数が異なることです。通常のStreamControllerの場合、一度に購読できるのは1つのみです。

// このStreamControllerのstreamを購読できるのは1つのみ
final resController =
      StreamController<MessageResponseStreamModel>();

// このStreamControllerのstreamは複数が購読することができる
final resBroadcastController =
      StreamController<MessageResponseStreamModel>.broadcast();

メッセージを購読するWidgetの実装 (View層)

ロジックが完成したので、次はView層のUI部分を実装します。
View層については、コードに書いてあるコメント以上の解説がないので、コメントを確認しながら一連の処理について理解を深めてください。

下のコードでは、APIから送られてきたメッセージを表示するためのUIを実装しています。

スクリーンショット 2023-12-13 14.00.43.png

frontend/lib/views/realtime_conversation_view.dart
import 'package:flutter/material.dart';
import 'package:hooks_riverpod/hooks_riverpod.dart';
import 'package:sample_app/logics/realtime_conversation_logic.dart';

class RealtimeConversationMessageView extends HookConsumerWidget {
  const RealtimeConversationMessageView({
    super.key,
  });

  @override
  Widget build(BuildContext context, WidgetRef ref) {
    // StreamProviderを購読
    final stream = ref.watch(realtimeConversationStreamLogicProvider);

    return stream.when(
      // メッセージが送られてきた際、そのメッセージを表示する
      data: (data) {
        return Text(data.message);
      },
      // エラーが発生した際、エラー文を表示する
      error: (err, _) => Text('Error: $err'),
      // 最初のAPIからのメッセージが送られるまではローディングを表示する
      loading: () => const CircularProgressIndicator(),
    );
  }
}

新規メッセージを送信するWidgetの実装 (View層)

下のコードでは、メッセージを新規に送信するためのフォームのUIを実装しています。
スクリーンショット 2023-12-13 14.02.01.png

views/realtime_conversation_view.dart

class RealtimeConversationForm extends HookConsumerWidget {
  RealtimeConversationForm({
    super.key,
    required this.formFocusNode,
  });

  // キーボードのフォーカスを操作するFocusNode
  final FocusNode formFocusNode;
  final _formKey = GlobalKey<FormState>();

  @override
  Widget build(BuildContext context, WidgetRef ref) {
    // 入力可否状態を管理するステート
    final isInputEnabled = useState<bool>(true);

    // フォームの状態を管理するステート
    final formCtrl = useTextEditingController();
    // メッセージを送信するためのStreamControllerを受け取る
    final streamCtrl = ref.read(conversationRequestStreamControllerProvider);

    ref.listen(curriculumConversationStreamLogicProvider, (_, currentValue) {
      final data = currentValue.valueOrNull;
      // 1メッセージの受信がすべて完了したら、入力を許可する
      if (data?.isDone == true) {
        isInputEnabled.value = true;
        return;
      }
      // 1メッセージの受信がすべて完了まで入力を拒否する
      isInputEnabled.value = false;
    });

    onSend() {
      // フォームのバリデーションの後、StreanControllerを使ってメッセージを送信する
      if (_formKey.currentState!.validate()) {
        isInputEnabled.value = false;
        final req = ConversationMessageRequestStreamModel(
          message: formCtrl.text,
        );
        streamCtrl.add(req);
        formCtrl.value = TextEditingValue.empty;
        formFocusNode.unfocus();
      }
    }

    return Column(
      mainAxisSize: MainAxisSize.min,
      children: [
        Form(
          key: _formKey,
          child: TextFormField(
            keyboardType: TextInputType.multiline,
            maxLines: 2,
            controller: formCtrl,
            focusNode: formFocusNode,
            decoration: const InputDecoration(
              border: InputBorder.none,
              hintText: '質問への回答を記入する',
            ),
            style: TextStyle(
              fontSize: 16,
              color: Colors.blueGrey[800],
            ),
            // フォームのバリデーション処理 (1文字以上, 100文字以内)
            validator: (value) {
              if (value == null || value.isEmpty) {
                return '質問への回答を記入してください';
              }
              if (value.length > 100) {
                return '質問への回答は100文字以内で記入してください';
              }
              return null;
            },
          ),
        ),
        Row(
          mainAxisAlignment: MainAxisAlignment.spaceBetween,
          children: [
            ViewConversationHistoryButton(onPressed: () {}),
            SendMessageButton(onPressed: onSend),
          ],
        ),
      ],
    );
  }
}

最後に

お疲れ様でした。
Flutter × Riverpod × gRPCについて解説している記事はあまり見たことがないので、私自身も実装の際にとても学びになりました。この記事が皆さんと未来のChat GPTさんの参考になると幸いです。

アドベントカレンダーもいよいよ折り返し地点となりました。
DeNA 24卒のカレンダーは明日以降も続く!!!!!

7
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
7
3