はじめに
マイクロサービスアーキテクチャを使った動画ストリーミングサイトを作りました。
システム構成は以下の通りです。
フロントエンド | Next.js, TypeScript |
---|---|
バックエンド | Go |
通信規格 | gRPC |
データベース | MySQL |
インフラ | EKS |
IaC | Terraform, kubernetes, eksctl |
Next.jsは13系を使っています。
レポジトリーは以下の通りです。
クリーンアーキテクチャー
apserver側では、依存関係を一方向にして、変更や置き換えを容易にする設計パターンであるクリーンアーキテクチャーを採用しています。これによりパッケージ等に不具合が生じた場合すぐに取り替え可能です。
そこで、クリーンアーキテクチャーに従いディレクトリを設計しました。
ディレクトリ構成は以下の通りです。
.
├── apserver
│ ├── Dockerfile
│ ├── app
│ │ ├── cmd
│ │ │ ├── domain
│ │ │ │ ├── form
│ │ │ │ ├── model
│ │ │ │ ├── pb
│ │ │ │ └── proto
│ │ │ ├── driver
│ │ │ ├── infrastructure
│ │ │ │ └── dao
│ │ │ ├── interface
│ │ │ │ ├── presentation
│ │ │ │ └── repository
│ │ │ ├── main.go
│ │ │ └── usecase
│ │ ├── go.mod
│ │ └── go.sum
eksctl
eksctlは、Go言語で書かれたEKS上でKubernetesクラスターを作成および管理するためのCLIツールです。
今回は、eksctlを使います。
apiVersion: eksctl.io/v1alpha5
kind: ClusterConfig
metadata:
name: eks-study-cluster
region: ap-northeast-1
version: "1.23"
vpc:
id: "vpc-0c6e20eeb53226421"
subnets:
private:
ap-northeast-1a:
id: "subnet-059deda9d581a9f59"
ap-northeast-1c:
id: "subnet-01184558a4b431099"
public:
ap-northeast-1a:
id: "subnet-0f10dc9f9eba19745"
ap-northeast-1c:
id: "subnet-0077cde123263eb88"
nodeGroups:
- name: eks-study-ng
instanceType: t3.small
desiredCapacity: 1
iam:
withAddonPolicies:
imageBuilder: true
awsLoadBalancerController: true
cloudWatch: true
ebs: true
attachPolicy:
Version: "2012-10-17"
Statement:
- Effect: Allow
Action:
- "s3:GetObject"
Resource: "arn:aws:s3:::example-bucket/*"
kubernetes
Kubernetesはコンテナの分散システムを弾力的に実行するフレームワークです。以下の利点があります。
- サービスディスカバリーと負荷分散
- ストレージ オーケストレーション
- 自動化されたロールアウトとロールバック
- 自動ビンパッキング
- 自己修復
- 機密情報と構成管理
gRPC
動画のストリーミング再生する上で、gRPCを採用した理由は以下の通りです。
- バイナリ形式にシリアライズで高速。
- 型安全である。
- http/2を使い、高速。
などです。
apserverのヘルスチェック
gRPCに対するkubernetes独自のヘルスチェックは存在しないしないため、ヘルスチェックを作る必要があります。
以下を参考にしました。
Dockerfileに以下の行を追加します。
RUN GRPC_HEALTH_PROBE_VERSION=v0.3.2 && \
wget -qO/bin/grpc_health_probe https://github.com/grpc-ecosystem/grpc-health-probe/releases/download/${GRPC_HEALTH_PROBE_VERSION}/grpc_health_probe-linux-amd64 && \
chmod +x /bin/grpc_health_probe
syntax = "proto3";
package healthcheck;
option go_package = "./pb/healthcheck";
message HealthCheckRequest {
string service = 1;
}
message HealthCheckResponse {
enum ServingStatus {
UNKNOWN = 0;
SERVING = 1;
NOT_SERVING = 2;
SERVICE_UNKNOWN = 3; // Used only by the Watch method.
}
ServingStatus status = 1;
}
service Health {
rpc Check(HealthCheckRequest) returns (HealthCheckResponse);
rpc Watch(HealthCheckRequest) returns (stream HealthCheckResponse);
}
// The greeting service definition.
service Greeter {
rpc SayHello (HelloRequest) returns (HelloReply) {}
}
message HelloRequest {
string name = 1;
}
message HelloReply {
string message = 1;
}
package presentation
import (
pb "app/cmd/domain/pb/healthcheck"
"context"
"google.golang.org/grpc/codes"
health "google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/status"
)
type HelloHandler struct {
pb.UnimplementedGreeterServer
}
func (h *HelloHandler) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
return &pb.HelloReply{Message: "Hello " + in.Name}, nil
}
type HealthHandler struct {
pb.UnimplementedHealthServer
}
func (h *HealthHandler) Check(context.Context, *health.HealthCheckRequest) (*health.HealthCheckResponse, error) {
return &health.HealthCheckResponse{
Status: health.HealthCheckResponse_SERVING,
}, nil
}
func (h *HealthHandler) Watch(*health.HealthCheckRequest, health.Health_WatchServer) error {
return status.Error(codes.Unimplemented, "watch is not implemented.")
}
package driver
import (
pb_healthcheck "app/cmd/domain/pb/healthcheck"
"app/cmd/interface/presentation"
"log"
"net"
health "google.golang.org/grpc/health/grpc_health_v1"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/jinzhu/gorm"
"google.golang.org/grpc"
)
func Init(lis net.Listener, conn *gorm.DB, s3Client *s3.S3) {
s := grpc.NewServer()
pb_healthcheck.RegisterGreeterServer(s, &presentation.HelloHandler{})
health.RegisterHealthServer(s, &presentation.HealthHandler{})
log.Printf("server listening at %v", lis.Addr())
if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}
gRPCを使った動画のストリーミング再生
メインロジック
Next.jsとgRPCを使った動画のストリーミング再生のロジックです。基本的に、Next.jsのGETから
渡されたheader情報から欲しいデータをGo側に送ります。
syntax = "proto3";
package s3video;
option go_package = "./pb/s3video";
service Videotransporter {
rpc VideoUpload (stream VideoUpoadRequest) returns (VideoUploadReplay) {}
}
message VideoUpoadRequest {
bytes data = 1;
int64 size = 2;
int64 id = 3;
}
message VideoUploadReplay {
string newname = 1;
}
func (s *S3VideoServer) VideoUpload(stream pb.Videotransporter_VideoUploadServer) error {
fmt.Println("VideoUpload")
userid := 0
filesize := 0
var imagedata []byte
for {
req, err := stream.Recv()
if req.GetId() != 0 {
userid = int(req.GetId())
}
if req.GetSize() != 0 {
filesize = int(req.GetSize())
}
if req.GetData() != nil {
imagedata = append(imagedata, req.GetData()...)
}
if err == io.EOF {
break
}
if err != nil {
fmt.Println(err)
return err
}
}
video, err := s.Interactor.CreateByIDAndSize(userid, filesize)
if err != nil {
fmt.Println(err)
return err
}
uuid_name := video.UUID.String()
filename := uuid_name + ".mp4"
fmt.Println(filename)
f, err := os.Create("/tmp/" + filename)
if err != nil {
fmt.Println(err)
return err
}
defer f.Close()
err = os.WriteFile("/tmp/"+filename, imagedata, 0644)
if err != nil {
fmt.Println(err)
return err
}
err = s.S3Interactor.VideoUpload(filename, f)
if err != nil {
fmt.Println(err)
return err
}
os.Remove("/tmp/" + filename)
return stream.SendAndClose(&pb.VideoUploadReplay{Newname: uuid_name})
}
//@ts-ignore
import * as grpc from '@grpc/grpc-js';
import { VideotransporterClient } from '@/types/pb/s3video/s3video_grpc_pb';
import { VideoDownloadRequest } from '@/types/pb/s3video/s3video_pb';
//@ts-ignore
const target: string = process.env.APSERVER_ADDRESS;
const CHUNK_SIZE = 10 ** 6;
async function getVideoStream(filename: string, startbytes: number, endbytes: number) {
const client = new VideotransporterClient(target, grpc.credentials.createInsecure());
const req = new VideoDownloadRequest();
if (!!filename) req.setName(filename);
if (!!startbytes) req.setStartbytes(startbytes);
if (!!endbytes) req.setEndbytes(endbytes);
const res = await new Promise((resolve) => {
client.videoDownload(req, function (err: unknown, response: any) {
if (err != null) {
console.log(err);
}
const movie_data = response.array[0];
resolve(movie_data);
});
});
console.log("received data");
const data = await res;
return data;
}
function getVideoRequest(req: Request, uuid: string, size: number) {
const range = req.headers.get("Range");
console.log(range);
if (!range) {
return null;
};
const filename: string = uuid;
const videoSize: number = size;
const start = Number(range.replace(/\D/g, "")); // 32324
const end = Math.min(start + CHUNK_SIZE, videoSize - 1);
// Create headers
const contentLength = end - start + 1;
const headers = {
"Content-Range": `bytes ${start}-${end}/${videoSize}`,
"Accept-Ranges": "bytes",
"Content-Length": contentLength.toString(),
"Content-Type": "video/mp4",
} as { [key: string]: string };
const videoStream = new ReadableStream({
type: "bytes",
async start(controller) {
const value = await getVideoStream(filename, start, end) as Uint8Array;
console.log("videoStream")
controller.enqueue(value);
},
})
return new Response(videoStream as any, {
status: 206,
headers,
});
}
export async function GET(req: Request, { params }: { params: { uuid: string, size: string } }) {
const uuid = params.uuid;
const size = Number(params.size);
return getVideoRequest(req, uuid, size);
}