5
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

AWS Lambdaを使ってLLMの推論結果を段階的に表示する

Last updated at Posted at 2024-03-15

LLMの業務利用

LLMを使ったサービスが世の中に数多く登場し、実際に活用している方も多くいらっしゃるのではないでしょうか?
個人で利用する場合は好きなサービスを自由に使えますが、業務で使う場合は情報流出のリスク等の観点から自由に使えないことがほとんどかと思います。
業務で使う場合は、情報流出防止の観点で入力データが学習データとして利用されないことが前提条件になるでしょう。
その場合は、これらのリスクをクリアしたサービスを使うか、自身で環境を構築するかの2択になってくるかと思います。

自身で環境構築を行う際に、意外なハードルになるのがLLMの推論結果を段階的に表示する部分だったりします。
利用するモデルや入出力のデータの量にもよりますが、LLMの出力が完了するまでに分単位の時間がかかることも珍しくありません。
出力が完了するまで、LLMの出力結果が一切表示されない実装をしてしまうと、利用者は何も表示されない画面を見ながら待ち続ける必要があります。これではユーザ体験が非常に悪くなります。
LLMは段階的に出力を行う仕組みになっているため、その出力結果を段階的に画面に表示することで、ユーザは出力結果を読みながら出力完了を待つことができ、ユーザ体験の向上を図ることができます。

この記事では、AWS Lambadaを使って、LLMの推論結果を段階的に表示する方法をご紹介します。
過去にLLMの業務利用について記事を書いているので、こちらも合わせてご覧ください!
参考:Amazon Bedrockを使ってClaudeを業務利用したい!

AWS Lambdaとは

既にご存知の方も多いと思いますが、AWS Lambda(以降、Lambda)はサーバレスでコードを実行できるサービスです。以下のような特徴があり、気軽に利用できることが大きなメリットです。

  • コードの実行時間に応じた課金モデルであるため、アイドル時には課金されない
  • トラフィックに合わせて自動的にスケーリングされる
  • 他のAWSサービスとの連携が容易(イベント駆動で実装しやすい)

メリットがある反面、Lambdaのサービスの制約を受けるので実装する際は注意が必要です。特によく引っかかるのは以下の部分です。(詳しい上限に関するの情報はこちら

  • リクエストとレスポンスのペイロードサイズの上限が6MB(同期処理)であること
  • デプロイパッケージのサイズが(zip圧縮の状態で)上限50MBであること
    • Lambdaレイヤーやコンテナイメージを使うことで回避可能
  • 実行時間の上限が15分間であること
    • API Gatewayと組み合わせた同期処理の場合は、上限30秒であること

LambdaからLLMを実行する際に、特に障壁となるのは実行時間の制限の部分になります。
Webシステムとして実装する際は、多くの場合でAPI Gatewayと組み合わせることになりますが、その際は実行時間の上限が30秒となります。通常の一般的なREST APIの処理であれば30秒もあれば十分ですが、LLMで推論する場合30秒では全然足りません。

実装方法

この記事では、LambdaのStreaming Responseの機能を利用した実装方法をご紹介します。
今回ご紹介する方法は、Node.jsのランタイムしか利用できませんのでご注意ください。

今回ご紹介するコードはgenerative-ai-use-cases-jp というアセットのものになります。こちらのアセットは、ベーシックなLLMチャットの機能をはじめ、ドキュメント検索と組み合わせたRAGチャット、Bedrockのエージェント機能を利用したエージェントチャットなど、様々なユースケースを簡単に試すことができます。CDKを使って簡単にデプロイできるので、試してみてください!

image.png

image.png

また、この記事では解説しませんが、WebSocketを利用した実装も可能です。bedrock-claude-chatというベーシックなLLMチャットと、タスク特化型のチャット(RAGも可能)を実現できるカスタムボットの機能が実装されているアセットでは、WebSocketで実装しています。WebSocketの場合は、Pythonでも実装することが可能なので、合わせてこちらもご確認ください!

image.png

Streaming Responseの実装方法

前置きが長くなりましたが、実際に実装方法を見ていきます。
こちらで解説するコードは、すべてTypeScriptで記載しています。また、AWSのリソースはCDKで定義し、フロントエンドはReactのSPA構成となります。

バックエンド

image.png
バックエンドは上記のような実装となっています。
LLMの推論を除く通常のAPIについては、API GatewayとLambdaを組み合わせた構成となっています(こちらは一般的な作りなので解説しません)。
しかし、LLMの推論を行うStreaming Responseの処理については、フロントエンドから直接Lambdaを実行する作りとなっています。これは、API GatewayがStreaming Responseに対応していないためです。

「直接Lambdaを実行するのは危険では?」と思う方もいらっしゃるかもしれませんが、適切に構成することで安全に実行することができます。

Lambdaの定義

以下が、CDKによるLambdaの定義部分の抜粋です。
LambdaにはあらかじめAWSのSDKがバンドルされていますが、新しい機能はこのバンドルされているSDKでサポートしていない場合があります。その場合は、bundlingの設定で明示的にバンドルするようにしましょう。

定義自体は通常のLambdaと同じですが、grantInvokeで認証済みのユーザに対してのみ実行権限を与えています。こちらは、CognitoのIdentityPoolを使って認可を行います。IdentityPoolと紐付けているUserPoolのユーザのみが、このLambdaを実行できます。

    const predictStreamFunction = new NodejsFunction(this, 'PredictStream', {
      runtime: Runtime.NODEJS_18_X,
      entry: './lambda/predictStream.ts',
      timeout: Duration.minutes(15),
      environment: {
        MODEL_REGION: modelRegion,
        MODEL_IDS: JSON.stringify(modelIds),
        IMAGE_GENERATION_MODEL_IDS: JSON.stringify(imageGenerationModelIds),
        AGENT_REGION: agentRegion,
        AGENT_MAP: JSON.stringify(agentMap),
      },
      bundling: {
        nodeModules: [
          '@aws-sdk/client-bedrock-runtime',
          '@aws-sdk/client-bedrock-agent-runtime',
        ],
      },
    });

    predictStreamFunction.grantInvoke(idPool.authenticatedRole);

Cognitoの定義

IdentityPoolとUserPoolは、以下のように定義が可能です(こちらの抜粋)。IPアドレスによるアクセス制限を行いたい場合は、Identity PoolのIAMポリシーにaws:SourceIpの定義をすることで実現可能です(LambdaのStreaming Responseを利用する場合は、AWS WAFを直接アタッチできないのでご注意ください)。

    const userPool = new UserPool(this, 'UserPool', {
      // SAML 認証を有効化する場合、UserPool を利用したセルフサインアップは利用しない。セキュリティを意識して閉じる。
      selfSignUpEnabled: props.samlAuthEnabled
        ? false
        : props.selfSignUpEnabled,
      signInAliases: {
        username: false,
        email: true,
      },
      passwordPolicy: {
        requireUppercase: true,
        requireSymbols: true,
        requireDigits: true,
        minLength: 8,
      },
    });

    const client = userPool.addClient('client', {
      idTokenValidity: Duration.days(1),
    });

    const idPool = new IdentityPool(this, 'IdentityPool', {
      authenticationProviders: {
        userPools: [
          new UserPoolAuthenticationProvider({
            userPool,
            userPoolClient: client,
          }),
        ],
      },
    });

    if (props.allowedIpV4AddressRanges || props.allowedIpV6AddressRanges) {
      const ipRanges = [
        ...(props.allowedIpV4AddressRanges
          ? props.allowedIpV4AddressRanges
          : []),
        ...(props.allowedIpV6AddressRanges
          ? props.allowedIpV6AddressRanges
          : []),
      ];

      idPool.authenticatedRole.attachInlinePolicy(
        new Policy(this, 'SourceIpPolicy', {
          statements: [
            new PolicyStatement({
              effect: Effect.DENY,
              resources: ['*'],
              actions: ['*'],
              conditions: {
                NotIpAddress: {
                  'aws:SourceIp': ipRanges,
                },
              },
            }),
          ],
        })
      );
    }

Lambdaの実装

以下は、BedrockをResponseStreamで実行するコードです(こちらから抜粋)。
ポイントは、function*というジェネレーター関数を利用することです。このジェネレーター関数は、関数の実行を一時停止したり、一時停止した位置から実行を再開することができる特殊な関数です。yieldで値を返すと同時に関数を一時停止します。そして、next()を実行すると、関数の実行が再開するという動きをします。
この実装では、for...ofを利用しているので、ループの度に自動でnext()が実行されます。つまり、BedrockクライアントのInvokeModelWithResponseStreamCommandから、断片的なレスポンスが返ってくるたびに自動で関数の実行が再開するという動きをします。

async function* invokeStream (model, messages) {
  try {
    const command = new InvokeModelWithResponseStreamCommand({
      modelId: model.modelId,
      body: createBodyText(model.modelId, messages),
      contentType: 'application/json',
    });
    const res = await client.send(command);

    if (!res.body) {
      return;
    }

    for await (const streamChunk of res.body) {
      if (!streamChunk.chunk?.bytes) {
        break;
      }
      const body = JSON.parse(
        new TextDecoder('utf-8').decode(streamChunk.chunk?.bytes)
      );
      const outputText = extractOutputText(model.modelId, body);
      if (outputText) {
        yield outputText;
      }
      if (body.stop_reason) {
        break;
      }
    }
  } catch (e) {
    if (
      e instanceof ThrottlingException ||
      e instanceof ServiceQuotaExceededException
    ) {
      yield 'ただいまアクセスが集中しているため時間をおいて試してみてください。';
    } else {
      yield 'エラーが発生しました。時間をおいて試してみてください。';
    }
  }
}

以下は実際のLambda関数のコードとなります(コードはこちら)。上記のBedrockを実行するコードを呼び出しています。
awslambda.streamifyResponseを利用することで、LambdaでStreamingResponseを実現できます。
上記のyieldで返された値をfor...ofでループしながら受け取り、reponseStream.writeを利用してクライアントにレスポンスを段階的に返します。

declare global {
  namespace awslambda {
    function streamifyResponse(
      f: (
        event: PredictRequest,
        responseStream: NodeJS.WritableStream,
        context: Context
      ) => Promise<void>
    ): Handler;
  }
}

export const handler = awslambda.streamifyResponse(
  async (event, responseStream, context) => {
    context.callbackWaitsForEmptyEventLoop = false;
    const model = event.model || defaultModel;
    for await (const token of invokeStream(
      model,
      event.messages
    )) {
      responseStream.write(token);
    }
    responseStream.end();
  }
);

フロントエンド

今回は、Lambda SDKを利用して、フロントエンドから直接Lambdaを実行する構成となっています。直接実行するためには、以下のライブラリが必要です。

フロントエンドからLambdaを実行

バックエンドの章で解説した通り、Lambdaに「認証ユーザしか利用できない」という権限設定を行なっているため、認証ユーザであることを証明するための認証トークンを取得する必要があります。

以下が実際にLambdaを実行しているコードです(こちらから抜粋)。
まず、fromCognitoIdentityPoolを利用して、Lambdaの実行に必要なCredentialを取得します。(await Auth.currentSession()).getIdToken().getJwtToken()を設定して、fromCognitoIdentityPoolを実行すると、認証済みユーザの検証とLambda実行の認可処理が自動で行われます。
バックエンドと同様に、ジェネレーター関数を使った実装となっています。

なお、ここで利用しているAuthはAmplify UIの機能となります。Amplify UIのAuthenticatorを利用すると、非常に簡単にログイン画面を実装することができるので、こちらも利用してみてください!

async function* predictStream(req: PredictRequest) {
  const region = import.meta.env.VITE_APP_REGION;
  const userPoolId = import.meta.env.VITE_APP_USER_POOL_ID;
  const idPoolId = import.meta.env.VITE_APP_IDENTITY_POOL_ID;
  const cognito = new CognitoIdentityClient({ region });
  const providerName = `cognito-idp.${region}.amazonaws.com/${userPoolId}`;
  const lambda = new LambdaClient({
    region,
    credentials: fromCognitoIdentityPool({
      client: cognito,
      identityPoolId: idPoolId,
      logins: {
        [providerName]: (await Auth.currentSession())
          .getIdToken()
          .getJwtToken(),
      },
    }),
  });

  const res = await lambda.send(
    new InvokeWithResponseStreamCommand({
      FunctionName: import.meta.env.VITE_APP_PREDICT_STREAM_FUNCTION_ARN,
      Payload: JSON.stringify(req),
    })
  );
  const events = res.EventStream!;

  for await (const event of events) {
    if (event.PayloadChunk) {
      yield new TextDecoder('utf-8').decode(event.PayloadChunk.Payload);
    }

    if (event.InvokeComplete) {
      break;
    }
  }
}

以下が上記の関数を実行する部分になります(こちらの抜粋)。
上記がジェネレーター関数で実装されているので、for...ofを利用して出力された結果をループで処理するようにしています。
出力される文字の数だけループが回るため、ループの中に状態更新の処理を入れると、処理キューがたまりすぎてReactのパフォーマンスが低下するという問題があります。そのため、バッファリングを行いながら、状態更新の処理を行うようにしています。
ちなみに、このアセットでは状態管理にzusutandを利用しています(標準のuseStateで状態管理を行うと、短い時間で何度も状態を更新すると順序性が担保されず、実行結果に一貫性を持たせられないため)。

const stream = predictStream({
  model: model,
  messages: formattedMessages,
});
// Assistant の発言を更新
let tmpChunk = '';
for await (const chunk of stream) {
  tmpChunk += chunk;
  // chunk は 10 文字以上でまとめて処理する
  // バッファリングしないと以下のエラーが出る
  // Maximum update depth exceeded
  if (tmpChunk.length >= 10) {
    addChunkToAssistantMessage(id, tmpChunk, model);
    tmpChunk = '';
  }
}
// tmpChunk に文字列が残っている場合は処理する
if (tmpChunk.length > 0) {
  addChunkToAssistantMessage(id, tmpChunk, model);
}
// メッセージの後処理(例:footnote の付与)
if (postProcessOutput) {
  set((state) => {
    const newChats = produce(state.chats, (draft) => {
      const oldAssistantMessage = draft[id].messages.pop()!;
      const newAssistantMessage: UnrecordedMessage = {
        role: 'assistant',
        content: postProcessOutput(oldAssistantMessage.content),
        llmType: model?.modelId,
      };
      draft[id].messages.push(newAssistantMessage);
    });
    return {
      chats: newChats,
    };
  });
}

Lambdaを直接実行する際の注意点

今回は、Lambdaを直接実行する際にCognitoを使って安全性を担保しています。しかしこれは、Cognitoの制約を受けてしまうというデメリットもあります。
特にCognitoの処理の呼び出し回数の上限が結構厳しいので、大規模利用の際はご注意ください。
注意すべきクォータは以下になります(参考)。

  • GetId: 25 RPS(ブラウザにキャッシュがない状態の場合に実行される)
  • GetCredentialsForIdentity: 200 RPS (毎回実行される)

さいごに

解説は以上になります。
本日紹介したコード(generative-ai-use-cases-jpbedrock-claude-chat)はGitHub上にOSSとして公開していますので、ご自由にご利用いただけます。
デプロイも非常に簡単にでき、最近話題のClaude3も使えるので、ぜひデプロイをしてみてください!
皆さんもより良い生成AIライフをお過ごしください!

5
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?