0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Stream入門

Last updated at Posted at 2023-04-28

Lambda関数がストリームレスポンスに対応したので、今更ながらStreamに入門した。

Lambda関数側の実装サンプル

上記の公式ブログからリンクがあるが、Lambda関数としては以下の二通りの書き方がある。

writeで書き込む方法

主要部分だけ書くとこんな感じ。ChatGPTによると、writeで書いていく方法はあまりストリームの良さが享受できないとかなんとか。しかし後述するがシンプルにストリームレスポンスがどんなものか体感するのにちょうどよかった。

exports.handler = awslambda.streamifyResponse(
    async (event, responseStream, context) => {
        const httpResponseMetadata = {
            statusCode: 200,
            headers: {
                "Content-Type": "text/html",
                "X-Custom-Header": "Example-Custom-Header"
            }
        };

        responseStream = awslambda.HttpResponseStream.from(responseStream, httpResponseMetadata);
...
        responseStream.write("<h1>Streaming h1</h1>");
        await new Promise(r => setTimeout(r, 1000));
        responseStream.write("<h2>Streaming h2</h2>");
        await new Promise(r => setTimeout(r, 1000));
        responseStream.write("<h3>Streaming h3</h3>");

... 
        responseStream.write("<p>DONE!</p>");
        responseStream.end();
    }
);

pipe (pipeline)を使う方法

シンプル。これで全部。requestStream -> responseStreamというふうに繋げる意味合いがあるので、requestStreamに書き込むとレスポンスに流れる。この後writeをして追加データを送りたいと考えたがストリームはすでに終わっているとのことでそれはできなかった。しかし実用を考えるとそうしたいという場面もあまりなさそうである。

import util from 'util'; 
import stream from 'stream';
const { Readable } = stream;
const pipeline = util.promisify(stream.pipeline);

export const handler = awslambda.streamifyResponse(async (event, responseStream, _context) => {
  const requestStream = Readable.from(Buffer.from(new Array(1024*1024).join( '🚣' )));
  await pipeline(requestStream, responseStream);
});

Lambda側のCORS設定

CORSの設定もできる。とりあえず許可オリジン、許可ヘッダ、許可メソッド全部*にしてから必要な部分に絞っていくといいと思う。
上記のwriteでストリームに書き込むサンプルでもやっているが、コード上でヘッダを返すことも可能なのでそちらでも設定できる。

クライアント側のコード

curl

一応。IAM認証も対応。

curl -X GET 関数URL --aws-sigv4 "aws:amz:ap-northeast-1:lambda" --user "アクセスキー:シークレットキー" -N

重要なのは-Nオプションで、これがないとレスポンスをまとめて返された。しかしこれさえつければ順次レスポンスが返却されるストリームを簡単に体験できてよかった。

man curlの説明もメモ

-N, --no-buffer
Disables the buffering of the output stream. In normal work situations,
curl will use a standard buffered output stream that will have the effect
that it will output the data in chunks, not necessarily exactly when the
data arrives. Using this option will disable that buffering.

fetch

React18.
aws-sdkをインストールするとIAM認証用の署名も用意できる。そうはいってもメインはfetchの部分。最初の方はほとんど署名作るためのおまじない。

App.js
import crypto from '@aws-crypto/sha256-js';
import { SignatureV4 } from '@aws-sdk/signature-v4';
import { HttpRequest } from '@aws-sdk/protocol-http';

import './App.css';
import React, { useState } from 'react';

const { Sha256 } = crypto;
const AWS_REGION = 'ap-northeast-1'
const ENDPOINT = "関数URL"

function App() {
  const [resMessage,setResMessage] = useState(null)

  const fetchUrl = async() =>{
let body = ''

    const signer = new SignatureV4({
      credentials: {
        accessKeyId:"アクセスきー",
        secretAccessKey:"シークレットキー"
      },
      region: AWS_REGION,
      service: 'lambda',
      sha256: Sha256
    });
    const endpoint = new URL(ENDPOINT);

    const requestToBeSigned = new HttpRequest({
      method: 'GET',
      headers: {
        host: endpoint.host
      },
      hostname: endpoint.host,
      path: endpoint.pathname
    });

    const signed = await signer.sign(requestToBeSigned);
    const request = new Request(endpoint, signed);

    try {
      const response = await fetch(request,{mode:"cors"})
      const reader = response.body.getReader();
      while (true) {
        const { done, value } = await reader.read();
        if (done) {
          console.log('Stream finished');
          break;
        }
        const strValue = new TextDecoder('utf-8').decode(value);
        console.log('Received chunk:', strValue);
      }
    } catch (error) {
      body = error.message
      console.log(body)

    }

  }

  return (
    <>
    <button onClick={fetchUrl}>button</button>
    <p>{resMessage}</p>
    </>
  );
}

export default App;

おまけ〜ストリームレスポンスを返すサーバーを自分で作る

これはChatGPTに頼んだら2,3回目で出てきたコードがそのまま使えた。すごい。

const http = require('http');

http.createServer(async (req, res) => {
  res.writeHead(200, { 'Content-Type': 'text/plain',
   'Access-Control-Allow-Origin': "*",
   "Access-Control-Allow-Headers": "*"});

  for (let i = 0; i < 10; i++) {
    res.write(`hello ${i}\n`);
    await sleep(1000);
  }

  res.end('world\n');
}).listen(8443);

async function sleep(ms) {
  return new Promise((resolve) => setTimeout(resolve, ms));
}

まとめ

ストリームがどんなものかを体感するなら上記の「ストリームレスポンスを返すサーバーを自分で作る」のサーバーに対してcurl -Nでリクエストするのが一番簡単だと思う。事前準備がほぼいらない。

参考

ほとんど読んでいないけどdocument. https://nodejs.org/api/stream.html
たくさんあってまだ違いが全然わからない。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?