パーソンリンクアドベントカレンダー18日目の投稿です。
こんにちは桑原です。
一週間ぶりの投稿です。
前回のkinesisのアンチパターンの後続処理でGoとDynamoDBを使ってみようと思いサンプルコードを書いてみました。
両方とも触り始めて数時間の状態なので自分への備忘録です。
#やりたいこと
kinesisからのデータが1時間で最大数憶レコードに上るシステムなため、分析用に中間データを作成しようとしています。
中間データフォーマットとしては
1時間ごとにuser_idとsegment_id単位でsession_time(滞在時間)、**count(接触回数)**を集計する
になります。
#DynamoDB
##テーブル作成
AWSコンソールからテーブル作成します。
##テーブル設計
下記で実装していきます。
primary_key : user_id_segment_id_yyyy-MM-dd HHss
user_id : ユーザーID
segment_id : セグメントID
start_date : 初回アクセス日時
last_date : 最終アクセス日時
session_time : 滞在時間
count : 接触回数
#ソースコード
指定したuser_id、segmet_idに対してループ毎にlast_date(最終アクセス日時)に+1秒したデータで集計を行っています。
package main
import (
"fmt"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/guregu/dynamo"
)
type User struct {
WAW string `dynamo:"w_a_w"`
UserId string `dynamo:"user_id"`
StartDate string `dynamo:"start_date"`
LastDate string `dynamo:"last_date"`
SegmentID string `dynamo:"segment_id"`
SessionTime int `dynamo:"session_time"`
Count int `dynamo:"count"`
}
/**
* dbへselectし最新データの最終アクセス日時を比較、
* 滞在時間と接触回数を加算
*/
func main(){
start := time.Now();
loop := 1000 //1000回処理する
cred := credentials.NewStaticCredentials("*******", "*******", "")
db := dynamo.New(session.New(), &aws.Config{
Credentials: cred,
Region: aws.String("ap-northeast-1"),
})
table := db.Table("test")
user_id := "73nSpnPJ"
segment_id := "11111111111111"
s_date := time.Date(2019, time.December, 18, 12, 00, 00, 00, time.UTC)
start_date := s_date.Format("2006-01-02 15:04:05")
primary_key := user_id+"_"+segment_id+"_"+start_date
for i := 0; i < loop; i++ {
var users []User
err := table.Get("primary_key" ,primary_key).All(&users)
l_date := s_date.Add(time.Duration(i) * time.Second)
last_date := l_date.Format("2006-01-02 15:04:05")
if err != nil {
fmt.Println("err")
panic(err.Error())
}
//tableにデータがなければinsertする
if len(users) == 0 {
u := User{WAW: primary_key, UserId: user_id, StartDate: start_date, LastDate: last_date, SegmentID:segment_id, SessionTime:0, Count:0}
//fmt.Println(u)
if err := table.Put(u).Run(); err != nil {
fmt.Println("err")
panic(err.Error())
}
} else {
layout := "2006-01-02 15:04:05"
t, _ := time.Parse(layout, users[0].LastDate)
time_diff_second := l_date.Sub(t) //前回のログと比較し差分を接触時間とする
diff_second := int(time_diff_second/time.Second)
session_time := users[0].SessionTime + diff_second
count := users[0].Count + 1 //前回の接触回数+1
u := User{WAW: primary_key, UserId: user_id, StartDate: start_date, LastDate: last_date, SegmentID:segment_id, SessionTime:int(session_time), Count:count}
if err := table.Put(u).Run(); err != nil {
fmt.Println("err")
panic(err.Error())
}
}
}
end := time.Now();
fmt.Printf("%f秒\n",(end.Sub(start)).Seconds())
}
##実行環境
実行環境はAWS workspecesのAmazon Linux2で
物理 CPU の数
grep physical.id /proc/cpuinfo | sort -u | wc -l
1
CPU ごとのコアの数
grep cpu.cores /proc/cpuinfo | sort -u
cpu cores : 4
論理プロセッサーの数
grep processor /proc/cpuinfo | wc -l
4
となっています。
##コードの実行
go run sample.go とターミナルで入力します。
1000回DBへの書き込みと値のチェックを行うだけですので爆速を期待していました。
##結果
47.547127秒
遅い。。。
ターミナルに出力したログも明らかに途中で停止しています。
調べてみるとコンソールにアラートが。
消費された書き込みキャパシティー 5 分間 >= 4
デフォルトキャパシティの5だったためDynamoDBへの書き込みがsample.goの処理速度に追いついていないようです。
キャパシティは秒間の処理数とざっくり理解したのでいくつか設定し処理時間のログを比較してみました。
キャパシティ 10 10 | キャパシティ 30 30 | キャパシティ 50 30 | キャパシティ 100 100 | キャパシティ 150 200 |
---|---|---|---|---|
43.037734秒 | 22.560107秒50 | 22.201688秒 | 14.260555秒 12.466228秒 19.781125秒 |
14.321412秒 |
オンデマンド |
---|
14.515797秒 14.591887秒 |
という結果でした同じキャパシティでも誤差がかなりあることが気になりますが、おおよそ14秒くらいが現在の実行環境の限界かなというところです。
EC2の高スペックなサーバやLambdaでどれくらい速度が向上するのか、ほかの言語ではどうなかを引き続き調査を行っていきます。