Posted at

RailsでS3上のログファイルをPostgreSQLにインポートする

More than 1 year has passed since last update.

S3上の任意のディレクトのファイルをダウンロードして処理する実装例が見当たらなかったので後世のために記しておきます。前提としているfluentdを利用する運用ならば多分もっとよい構成があると思いますが試験的に手慣れたRDBMSにログをインポートしてログの傾向を調べたいような場合で活用できるかと思います。


前提条件

fluentdが出力したログファイルを処理していますが実装的にはS3上の任意のgzipファイルのダウンロード処理に流用可能です。


  • Railsのアクセスログをfluentd経由でgzip圧縮してS3にアップロードしている

  • バケット直下の access ディレクトリにログの出力日に応じて /YYYY/MM/DD の階層構造をとる


  • DD ディレクトリ下に該当日のgzip圧縮されたログファイルが配置される

  • 毎日1時に前日分のログをPostgreSQLにインポートする


S3のディレクトリ構成

[bucket]

|
|-access
|
|-YYYY
|
|-MM
|
|-DD


実行環境


  • Ruby 2.5.1

  • Rails 5.2.0

  • PostgreSQL 9.6.9


環境変数

以下の環境変数を設定して実行してください。

キー
説明

AWS_ACCESS_KEY_ID
AWSの認証キー

AWS_SECRET_ACCESS_KEY
AWSの認証シークレットキー

AWS_REGION
ログの置かれているS3のリージョン

AWS_ACCESS_LOG_BUCKET
ログの置かれているS3のバケット名


実装解説

実装はGitHub上の s3-log-importer に公開しているので参考にしてください。


対象ディレクトリ下のオブジェクトキーを取得する

aws-sdk のv3にあたる aws-sdk-s3 を利用してS3上のログファイルにアクセスします。S3からダウンロードするにはファイル毎のオブジェクトキーを把握する必要があるので対象日のディレクトリ下のオブジェクトキーの一覧を取得します。

bucket = AWS::ACCESS_LOG_BUCKET

directory = "access/#{date.strftime('%Y/%m/%d')}"

client = s3_client(AWS::ACCESS_KEY_ID, AWS::SECRET_ACCESS_KEY, AWS::REGION)

objects = client.list_objects(bucket: bucket, prefix: directory).contents

def s3_client(aws_key, aws_secret, region)

Aws.config.update({
region: region,
credentials: Aws::Credentials.new(aws_key, aws_secret)
})
Aws::S3::Client.new
end


S3からのダウンロード

S3からのダウンロードはIOブロッキングが発生するので Parallel を利用したマルチスレッド処理が有効です。実行環境によりますがシングルスレッドの半分以下の時間で処理が完了します。ただ、Railsのオートロードと相性が悪く Circular dependency detected が発生しやすくなるので自前での require が必要になります。

Parallel.each(objects, in_threads: CONSTANTS::THREAD_COUNT) do |object|

contents = client.get_object(bucket: bucket, key: object.key).body.read
(中略)
end


gzip解凍とJSONオブジェクトへの変換

gzipファイルの解凍は以下のブログを参考にさせていただきました。感謝。

GZip形式のS3上のファイルをRubyのIO.pipeでストリーム読み出し

ログのレコード毎にJSONオブジェクトに変換します。

ログをメモリ上に一気に読み込んでいるのでログファイルのサイズ、実行環境によってはメモリ消費が厳しいかもしれません。メモリ消費が気になるようならば1行ずつ読み込むように変更してください。

records = read(contents)

def read(contents)

IO.pipe do |reader, writer|
writer.binmode
begin
writer.write(contents)
rescue
writer.close
end

log = Zlib::GzipReader.new(reader).read
lines = log.split("\n")
lines.map { |line| JSON.parse(line) }
end
end


PostgreSQLへのバルクINSERT

activerecord-import を利用して1000件ずつバルクINSERTを実行しています。

ログのアクセス時刻はUNIXTIMEなので Time オブジェクトに変換しています。

def insert(records)

access_logs = records.map do |r|
params = COLUMNS.map { |c| [c, r[c]] }.to_h
params[:requested_at] = Time.zone.at(r['time'])

AccessLog.new(params)
end

transaction = InsertTransaction.new(AccessLog)
access_logs.each { |access_log| transaction.push(access_log) }
transaction.flush
end

class InsertTransaction

def initialize(model)
@model = model
@insert_count = 0
@insert_buffer = []
end

def push(record)
@insert_buffer << record

return if @insert_buffer.size < 1000

@model.import(@insert_buffer, validate: false)
@insert_count += @insert_buffer.size
@insert_buffer = []
end
end


access_logsテーブル

アクセスログを解析する上で汎用的に使える値をピックアップしています。

summary_key/user/1 /user2 のようなURLを同一視して集計するために導入しています。

DAUMAU の他にユーザーの利用端末、利用ブラウザ情報が把握できるのでクロスプラットフォーム対応やクロスブラウザ対応の指針として活用できます。

カラム名

説明・値の例

id
bigint
自動採番キー

requested_at
timestamp with time zone
リクエストの受信時刻

uid
character varying(64)
ユーザーID

http_method
character varying(10)
GET、POST、DELETE、etc.

path
character varying(100)
URLからサーバードメインを除いたもの(/user/1)

summary_key
character varying(100)
Railsのcontroller+@+action(users@show)

device_type
character varying(20)
pc、smartphone、mobilephone、etc.

os
character varying(20)
Windows

os_version
character varying(20)
NT 10.0、etc.

browser
character varying(20)
Chrome、HTTP

browser_version
character varying(20)
67.0.3396.62、curl、etc.

host_name
character varying(250)
server01