LoginSignup
3
4

動画ストリーミングサイトを作ってみた。[Go/Nextjs13/gRPC/kubernetes/EKS]

Posted at

はじめに

マイクロサービスアーキテクチャを使った動画ストリーミングサイトを作りました。

システム構成は以下の通りです。

フロントエンド Next.js, TypeScript
バックエンド Go
通信規格 gRPC
データベース MySQL
インフラ EKS
IaC Terraform, kubernetes, eksctl

Next.jsは13系を使っています。

レポジトリーは以下の通りです。

クリーンアーキテクチャー

apserver側では、依存関係を一方向にして、変更や置き換えを容易にする設計パターンであるクリーンアーキテクチャーを採用しています。これによりパッケージ等に不具合が生じた場合すぐに取り替え可能です。

Screenshot 2023-09-25 at 14.33.52.png

そこで、クリーンアーキテクチャーに従いディレクトリを設計しました。
ディレクトリ構成は以下の通りです。

.
├── apserver
│   ├── Dockerfile
│   ├── app
│   │   ├── cmd
│   │   │   ├── domain
│   │   │   │   ├── form
│   │   │   │   ├── model
│   │   │   │   ├── pb
│   │   │   │   └── proto
│   │   │   ├── driver
│   │   │   ├── infrastructure
│   │   │   │   └── dao
│   │   │   ├── interface
│   │   │   │   ├── presentation
│   │   │   │   └── repository
│   │   │   ├── main.go
│   │   │   └── usecase
│   │   ├── go.mod
│   │   └── go.sum

eksctl

eksctlは、Go言語で書かれたEKS上でKubernetesクラスターを作成および管理するためのCLIツールです。
今回は、eksctlを使います。

infra/eksctl/cluster.yaml
apiVersion: eksctl.io/v1alpha5
kind: ClusterConfig

metadata:
  name: eks-study-cluster
  region: ap-northeast-1
  version: "1.23"
vpc:
  id: "vpc-0c6e20eeb53226421"
  subnets:
    private:
      ap-northeast-1a:
        id: "subnet-059deda9d581a9f59"
      ap-northeast-1c:
        id: "subnet-01184558a4b431099"
    public:
      ap-northeast-1a:
        id: "subnet-0f10dc9f9eba19745"
      ap-northeast-1c:
        id: "subnet-0077cde123263eb88"
nodeGroups:
  - name: eks-study-ng
    instanceType: t3.small
    desiredCapacity: 1
    iam:
      withAddonPolicies:
        imageBuilder: true
        awsLoadBalancerController: true
        cloudWatch: true
        ebs: true
      attachPolicy:
        Version: "2012-10-17"
        Statement:
          - Effect: Allow
            Action:
              - "s3:GetObject"
            Resource: "arn:aws:s3:::example-bucket/*"

kubernetes

Kubernetesはコンテナの分散システムを弾力的に実行するフレームワークです。以下の利点があります。

  • サービスディスカバリーと負荷分散
  • ストレージ オーケストレーション
  • 自動化されたロールアウトとロールバック
  • 自動ビンパッキング
  • 自己修復
  • 機密情報と構成管理

gRPC

動画のストリーミング再生する上で、gRPCを採用した理由は以下の通りです。

  • バイナリ形式にシリアライズで高速。
  • 型安全である。
  • http/2を使い、高速。

などです。

apserverのヘルスチェック

gRPCに対するkubernetes独自のヘルスチェックは存在しないしないため、ヘルスチェックを作る必要があります。

以下を参考にしました。

Dockerfileに以下の行を追加します。

apserver/Dockerfile
RUN GRPC_HEALTH_PROBE_VERSION=v0.3.2 && \
    wget -qO/bin/grpc_health_probe https://github.com/grpc-ecosystem/grpc-health-probe/releases/download/${GRPC_HEALTH_PROBE_VERSION}/grpc_health_probe-linux-amd64 && \
    chmod +x /bin/grpc_health_probe
apserver/app/cmd/domain/proto/healthcheck.proto
syntax = "proto3";

package healthcheck;

option go_package = "./pb/healthcheck";

message HealthCheckRequest {
  string service = 1;
}

message HealthCheckResponse {
  enum ServingStatus {
    UNKNOWN = 0;
    SERVING = 1;
    NOT_SERVING = 2;
    SERVICE_UNKNOWN = 3;  // Used only by the Watch method.
  }
  ServingStatus status = 1;
}

service Health {
  rpc Check(HealthCheckRequest) returns (HealthCheckResponse);
  rpc Watch(HealthCheckRequest) returns (stream HealthCheckResponse);
}

// The greeting service definition.
service Greeter {
    rpc SayHello (HelloRequest) returns (HelloReply) {}
}

message HelloRequest {
    string name = 1;
}

message HelloReply {
    string message = 1;
}
apserver/app/cmd/interface/presentation/HealthCheckController.go
package presentation

import (
	pb "app/cmd/domain/pb/healthcheck"
	"context"

	"google.golang.org/grpc/codes"

	health "google.golang.org/grpc/health/grpc_health_v1"
	"google.golang.org/grpc/status"
)

type HelloHandler struct {
	pb.UnimplementedGreeterServer
}

func (h *HelloHandler) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
	return &pb.HelloReply{Message: "Hello " + in.Name}, nil
}

type HealthHandler struct {
	pb.UnimplementedHealthServer
}

func (h *HealthHandler) Check(context.Context, *health.HealthCheckRequest) (*health.HealthCheckResponse, error) {
	return &health.HealthCheckResponse{
		Status: health.HealthCheckResponse_SERVING,
	}, nil
}

func (h *HealthHandler) Watch(*health.HealthCheckRequest, health.Health_WatchServer) error {
	return status.Error(codes.Unimplemented, "watch is not implemented.")
}
apserver/app/cmd/driver/router.go
package driver

import (
	pb_healthcheck "app/cmd/domain/pb/healthcheck"
	"app/cmd/interface/presentation"
	"log"
	"net"
	health "google.golang.org/grpc/health/grpc_health_v1"

	"github.com/aws/aws-sdk-go/service/s3"
	"github.com/jinzhu/gorm"
	"google.golang.org/grpc"
)

func Init(lis net.Listener, conn *gorm.DB, s3Client *s3.S3) {
	s := grpc.NewServer()
	pb_healthcheck.RegisterGreeterServer(s, &presentation.HelloHandler{})
	health.RegisterHealthServer(s, &presentation.HealthHandler{})

	log.Printf("server listening at %v", lis.Addr())
	if err := s.Serve(lis); err != nil {
		log.Fatalf("failed to serve: %v", err)
	}
}

gRPCを使った動画のストリーミング再生

メインロジック

Next.jsとgRPCを使った動画のストリーミング再生のロジックです。基本的に、Next.jsのGETから
渡されたheader情報から欲しいデータをGo側に送ります。

apserver/app/cmd/domain/proto/s3video.proto
syntax = "proto3";

package s3video;

option go_package = "./pb/s3video";

service Videotransporter {
    rpc VideoUpload (stream VideoUpoadRequest) returns (VideoUploadReplay) {}
}

message VideoUpoadRequest {
    bytes data = 1;
    int64 size = 2;
    int64 id = 3;
}

message VideoUploadReplay {
    string newname = 1;
}
apserver/app/cmd/interface/presentation/S3VideoController.go
func (s *S3VideoServer) VideoUpload(stream pb.Videotransporter_VideoUploadServer) error {
	fmt.Println("VideoUpload")
	userid := 0
	filesize := 0
	var imagedata []byte

	for {
		req, err := stream.Recv()
		if req.GetId() != 0 {
			userid = int(req.GetId())
		}
		if req.GetSize() != 0 {
			filesize = int(req.GetSize())
		}
		if req.GetData() != nil {
			imagedata = append(imagedata, req.GetData()...)
		}
		if err == io.EOF {
			break
		}
		if err != nil {
			fmt.Println(err)
			return err
		}
	}

	video, err := s.Interactor.CreateByIDAndSize(userid, filesize)
	if err != nil {
		fmt.Println(err)
		return err
	}

	uuid_name := video.UUID.String()
	filename := uuid_name + ".mp4"
	fmt.Println(filename)

	f, err := os.Create("/tmp/" + filename)
	if err != nil {
		fmt.Println(err)
		return err
	}
	defer f.Close()

	err = os.WriteFile("/tmp/"+filename, imagedata, 0644)
	if err != nil {
		fmt.Println(err)
		return err
	}

	err = s.S3Interactor.VideoUpload(filename, f)
	if err != nil {
		fmt.Println(err)
		return err
	}
	os.Remove("/tmp/" + filename)
	return stream.SendAndClose(&pb.VideoUploadReplay{Newname: uuid_name})
}
webserver/myapp/app/api/video/play/[uuid]/[size]/route.ts
//@ts-ignore
import * as grpc from '@grpc/grpc-js';
import { VideotransporterClient } from '@/types/pb/s3video/s3video_grpc_pb';
import { VideoDownloadRequest } from '@/types/pb/s3video/s3video_pb';
//@ts-ignore
const target: string = process.env.APSERVER_ADDRESS;
const CHUNK_SIZE = 10 ** 6;

async function getVideoStream(filename: string, startbytes: number, endbytes: number) {
    const client = new VideotransporterClient(target, grpc.credentials.createInsecure());
    const req = new VideoDownloadRequest();
    if (!!filename) req.setName(filename);
    if (!!startbytes) req.setStartbytes(startbytes);
    if (!!endbytes) req.setEndbytes(endbytes);
    const res = await new Promise((resolve) => {
        client.videoDownload(req, function (err: unknown, response: any) {
            if (err != null) {
                console.log(err);
            }
            const movie_data = response.array[0];
            resolve(movie_data);
        });
    });
    console.log("received data");
    const data = await res;
    return data;
}

function getVideoRequest(req: Request, uuid: string, size: number) {
    const range = req.headers.get("Range");
    console.log(range);
    if (!range) {
        return null;
    };
    const filename: string = uuid;
    const videoSize: number = size;
    const start = Number(range.replace(/\D/g, "")); // 32324
    const end = Math.min(start + CHUNK_SIZE, videoSize - 1);

    // Create headers
    const contentLength = end - start + 1;
    const headers = {
        "Content-Range": `bytes ${start}-${end}/${videoSize}`,
        "Accept-Ranges": "bytes",
        "Content-Length": contentLength.toString(),
        "Content-Type": "video/mp4",
    } as { [key: string]: string };

    const videoStream = new ReadableStream({
        type: "bytes",
        async start(controller) {
            const value = await getVideoStream(filename, start, end) as Uint8Array;
            console.log("videoStream")
            controller.enqueue(value);
        },
    })

    return new Response(videoStream as any, {
        status: 206,
        headers,
    });
}

export async function GET(req: Request, { params }: { params: { uuid: string, size: string } }) {
    const uuid = params.uuid;
    const size = Number(params.size);
    return getVideoRequest(req, uuid, size);
}
3
4
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
4