はじめに
私は今、いわゆるBIツールに前処理や機械学習の機能を足したようなプロダクト(いわゆるセルフサービスデータサイエンスプラットフォーム)を作っています。
UIはWebアプリケーションになりますが、バックエンドではRやPythonのプログラムを非同期に動かす必要があり、またユーザの負荷に応じて計算リソースを増減したくなります。そこで、RやPythonの処理をDocker上で行い、それらをWebサービス側から制御する機能を簡易的に実装しました。
構成図
- WebアプリケーションのサーバサイドはNodejs(Express)を使用
- WebサーバホストとワーカホストはVMを別にする
- データ処理を行う計算エンジン(Pythonで実装)はDockerコンテナ上で行う
- Webサーバと計算エンジンとのやり取りは、ZeroMQ上でJSONで行う
- データ本体の格納・アクセスにはサーバホストとワーカホスト両方から読み書きできる共有ファイルストレージ(NFS等)を利用
ワーカプロセス(Pythonでデータ処理を行う部分)
構成図中で「計算エンジン」と書いてある部分がデータ処理を行うワーカプロセスです。ここは実装の選択肢として
- データ処理のリクエストのたびにプロセスを生成・終了させる
- プロセスはデーモン的に稼働させたままリクエストのあるたびにデータ処理を行う
の2つがありますが、後者を選びました。
理由は、Dockerコンテナとはいえデータ処理のステップごとにコンテナ生成・消滅を行うのはDockerといえどもオーバーヘッドが大きそうですし、ZeroMQを使うことでプロセスを上げたまま非同期通信を行うことが容易にできるからです。
Webサーバ-計算エンジン間の非同期通信モデル
ZeroMQのソケットにはPUB/SUBやPUSH/PULL,REQ/REPなど、いくつか種類があります(それぞれの違いについてはQiita上に良い解説エントリがあったりするのでそれらをよく知らないという場合はQiitaのタグから辿ってみてください)。
要件
まず設計における要件として、以下があります。
- コンテナ数は負荷に応じて変動するためDockerネットワーク上のIPアドレスがコンテナごとに変わる(ポートは固定しておける)
- ZeroMQのソケットを実行中に生成・消滅させるような設計は複雑すぎるのでダメ(予め決められた数・種類のソケットをプロセスの最初に作ってそれを使うだけで十分であるべき)
- Pythonのプロセスがコケたり手動でDockerコンテナを起動・停止したりしてもWebサーバ側がそれなりに柔軟に対応できる
これらの要件を満たす通信モデルとして、以下のようにしました。
ソケット構成及びシーケンス
ポート番号については、PUB/SUBが5556, PUSH/PULLが5557と、固定で割り当てています。いずれもWebサーバ側がbind側で、コンテナ側がconnect側です。
シーケンス図は概ね自然に見えるかと思いますが、一点だけ処理のコマンドを送る部分が「PUB」ソケットを使って行われているところに違和感が感じられるかもしれませんが、ここがこの設計のミソです。
ZeroMQでは相手を指定してコネクションを張るためには当然ながら相手のIPアドレスとポートを指定する必要があります。しかし、上述の要件から、コンテナが自由に増減する環境のもとではIPアドレスも自由に増減するため、処理コマンドをユニキャストで行うためにはWebサーバプロセス側のZeroMQソケットをコンテナごとに持つ必要が出てきてしまいます。
そこで、まず空いてるプロセスを一定時間(現実装では500ミリ秒)の間募り(ping)、ワーカープロセス側は自分のID(=ホスト名 or IPアドレスです)を付けて応答します(pong)。応答のあったプロセスのうち一つを選択し、そのIDをデータ処理コマンドのJSONの中で指定してpingと同じくPUBソケットでブロードキャストしてしまいます。
各プロセスは(処理実行中のものも含めて)そのコマンドを受け取り、自分が割り当てられていた場合に処理を行います。ZeroMQのPUB/SUBでは、SUB側は購読したいメッセージをフィルタすることが簡単にできます。なのでメッセージの文字列をスペースで区切り、冒頭のトークン(トピック名)をワーカプロセスIDにしておき、「自分 または 全員」という2つのトピックを購読すればOKというわけです。また、メッセージ受信時に処理実行中だったノードは処理が終わった後に、そのメッセージからコマンドを読みだすことになりますがトピックが自分ではないので無事にそれをスルーします。
最初にpingで500ms待って募集することで、ワーカープロセス(コンテナ)がコケたり手動で再起動したりするたびにWebサーバプロセス側がいちいちそのことを把握するような処理が不要になるのもポイントです。この設計は古いEthernet LANのCSMA(Carrier Sense Multiple Access)の仕組みを参考にしました。
さて、では実際のコードの説明に移ります。
Dockerコンテナの作成
FROM python:2
WORKDIR /usr/src/app
RUN apt-get update \
&& apt-get install -y --no-install-recommends \
build-essential \
libzmq-dev
COPY requirements.txt ./
RUN pip install --no-cache-dir -r requirements.txt
RUN mkdir ./work
COPY . ./work
RUN useradd -u 1000 isobe
USER isobe
CMD [ "python", "./engine.py" ]
上のようなDockerfileを書き(requirements.txtにはscikit-learnとかpandasとか必要なライブラリを記述しておきます。)、dockerコマンドでイメージ作成します。
% docker run -d -v /var/data:/var/data my-engine
/var/dataが、(コンテナのホスト)VM上にマウントされた共有ストレージのパスになります。
ちなみに開発中のハマりポイントだったのは、ユーザ名が同じでもコンテナ上のUserIDと、開発マシンやVM側のUserIDが異なる場合があり、その場合はイメージを作るときにuseraddコマンドの-uオプションでコンテナ側のUserIDを揃えておかないとパーミッションが無くてマウントしたディレクトリを読み書きできなくなるという点でした。
ワーカープロセス(Python)
import sys
import os
import json
import zmq
import subprocess
def get_container_id():
return os.uname()[1].strip()
def get_webserver_ip():
cmd = "ip route | awk 'NR==1 {print $3}'"
return subprocess.check_output(cmd,shell=True).strip()
if __name__=='__main__':
cid = get_container_id()
ip = get_webserver_ip()
context = zmq.Context()
sub = context.socket(zmq.SUB)
sub.connect('tcp://'+ip+':5556')
sub.setsockopt(zmq.SUBSCRIBE, '*')
sub.setsockopt(zmq.SUBSCRIBE, cid)
push = context.socket(zmq.PUSH)
push.connect('tcp://'+ip+':5557')
pong = json.dumps({
'cid': cid,
'type': 'pong'
})
push.send(pong)
while True:
print "wait"
s = sub.recv()
index = s.find(' ')
topic = s[:index]
msg = s[index+1:]
if msg=='ping':
push.send(pong)
continue
command = json.loads(msg)
以降、処理本体
上のほうで図示したシーケンスをそのまま実装したものです。Webサーバ用のVMとワーカー用のVMの間でDockerネットワークを共有してあれば、WebサーバのIPは上記Pythonコードのget_webserver_ip()のようにip routeコマンドで簡単に取得できます。
Webサーバ側はソケットを作る際に*にbindしてしまってます。(クラウドのインフラ内部なのでセキュリティよりも実装の簡単さを重視)
Webサーバ側
少し長くなりますが、下の方で実装を掲載します。Dockerコンテナの制御にはdockerodeというライブラリがnodejsでは一番メジャーのようです。
工夫点としては、このJSモジュールの外部公開がただひとつのasyncメソッドだけになっていて、このモジュールを使うWebサービス側(Node-Express)から見るとあたかもワーカプロセスが同期式のシンプルなAPIであるかのように記述できるようにしたことです。(こうした込み入った非同期処理の隠蔽をしたいときに、PromiseとES2017のasync/awaitはとても便利です)
まずこのモジュールを使う(Node-Express)側のコードです。ワーカープロセス(計算エンジン)の呼び出しはengineモジュールのメソッドを呼び出してawaitするだけです。すごくシンプルで綺麗です。
var express = require('express')
const router = express.Router()
var engine = require('./engine')
(中略)
router.post('/some_rest_api', async (req,res) => {
// Do some web parameter process,
// to make JSON as commands for engines.
var cmd = ...
try {
var result = await engine(cmd) // awesome!! what a beautiful implementation!!
var api_response_data = ... // some post process
res.send(api_response_data)
} catch (e) {
res.status(500)
res.send(e)
}
})
以下はengineの制御モジュールです。
(とりあえずコードをコピペしただけになっていますが、後日、各メソッドの説明を書こうと思います。)
const zmq = require('zmq');
const pub = zmq.socket('pub');
const pull = zmq.socket('pull');
const moment = require('moment')
const Docker = require('dockerode')
const docker = new Docker({socketPath: '/var/run/docker.sock'});
const production = process.env.NODE_ENV=='production'
const config = {
INITIAL_WAIT: 1000,
MONITOR_INTERVAL: production ? 30000 : 10000, // milli seconds
PING_TIMEOUT: 500, // milli seconds
IDLE_LIMIT: 60, // seconds
MINIMUM_IDLE_WORKERS: production ? 3 : 1,
AUTO_SCALE: production
}
const workers = {}
const state_char = {
ready:'.',reserved:'o',running:'O',stopping:'x',
}
function show_workers_string() {
return Object.values(workers)
.sort((a,b)=>(a.discovered>b.discovered?+1:-1))
.reduce((ss,w)=>(ss+state_char[w.state]),'')
}
function show_workers() {
const ss = show_workers_string()
console.log(`workers: [${ss}]`)
}
pub.bind('tcp://*:5556',()=>{
console.log('engine(pub): bound')
setTimeout(init,config.INITIAL_WAIT)
monitor()
})
pull.bind('tcp://*:5557',()=>{
console.log('engine(pull): bound')
pull.on('message',handle_response)
})
async function start_worker() {
const container = await docker.createContainer({
Image: 'datarecipe-eng',
AttachStdin: false,
AttachStdout: true,
AttachStderr: true,
Tty: false,
OpenStdin: false,
StdinOnce: false,
HostConfig: {
Binds: ['/var/data:/var/data']
}
})
await container.start()
}
async function stop_worker(cid) {
workers[cid].state = 'stopping'
const container = docker.getContainer(cid)
if (container) {
await container.stop()
await container.remove()
console.log(`conatiner ${cid} stopped/removed.`)
}
delete workers[cid]
}
function init() {
console.log('ping')
pub.send(['* ping'],0,()=>{
setTimeout(()=>{
const ready = Object.keys(workers).length
const shortage = config.MINIMUM_IDLE_WORKERS-ready
if (shortage>0 && config.AUTO_SCALE) {
console.log(`shortage ${shortage} workers.`)
for (var i=0; i<shortage; i++) {
start_worker()
}
}
},config.PING_TIMEOUT)
})
}
function monitor() {
setInterval(()=>{
const limit = moment().subtract(config.IDLE_LIMIT,'s')
const idle_workers = Object.entries(workers).filter(([cid,worker])=>{
return worker.state=='ready' && worker.moment<=limit
})
//console.log(`${idle_workers.length} idle workers.`)
const excess = idle_workers.length - config.MINIMUM_IDLE_WORKERS
if (excess>0 && config.AUTO_SCALE) {
console.log(`extra ${excess} workers.`)
for (var i=0; i<excess; i++) {
const [cid,worker] = idle_workers[i]
stop_worker(cid)
}
}
show_workers()
},config.MONITOR_INTERVAL)
}
function handle_response(msg) {
const info = JSON.parse(msg);
console.log(`engine(${info.cid}): ${info.type}`);
if (!(info.cid in workers)) {
workers[info.cid] = {cid:info.cid,discovered:moment()}
}
worker = workers[info.cid]
if (info.type=='pong') {
worker.state = 'ready'
if (worker.cb) {
const err = 'worker restarted'
worker.cb(err,null)
}
} else if (info.type=='start') {
worker.state = 'running'
} else if (info.type=='finish') {
if ('cb' in worker) {
worker.cb(info.error,info.result)
delete worker.cb
}
worker.state = 'ready'
}
worker.moment = moment()
show_workers()
}
function select_worker() {
return new Promise(cb=>{
const now = moment()
console.log('ping')
pub.send(['* ping'],0,()=>{
setTimeout(()=>{
const found = Object.values(workers)
.find(w=>(w.state=='ready' && w.moment>=now))
if (found) found.state = 'reserved'
cb(found)
},config.PING_TIMEOUT)
})
})
}
async function request(cmd) {
var worker = await select_worker()
if (!worker) {
if (config.AUTO_SCALE) {
console.log('not found ready worker, so create one.')
await start_worker()
worker = await select_worker()
}
if (!worker) {
const err = 'cannot assign worker.'
console.log(err)
throw err
}
}
console.log(`worker ${worker.cid}: selected`)
return await new Promise((ok,ng) => {
const msg = JSON.stringify(cmd)
pub.send([worker.cid+' '+msg],0,() => {
console.log('engine: sent ');
worker.cb = (err,ret) => {
if (err) ng(err); else ok(ret)
}
show_workers()
})
})
}
module.exports = request