Custom RuntimeはAWS公式ランタイムとどう違うか
共通点
- コンテナの中で動く
- /var/runtimeと/var/lang以外のディレクトリは同じらしい
違う点
-
AWS公式ランタイム(python)
-
コンテナ起動時に生成されるプロセス(PID:1)が
/var/lang/bin/python3.6 /var/runtime/awslambda/bootstrap.py
-
PID:1がイベントループを回してLambdaハンドラを呼ぶ。
-
Custom Runtime
-
コンテナ起動時に生成されるプロセス(PID:1)が
/var/runtime/init
。これをユーザが別のものに差し替えることはできない。つまりPID:1は、ユーザが作成するbootstrap
ではない! これに気づかず激ハマりした。 -
/var/runtime/init
(PID:1)がユーザ作成のbootstrap
(PIDは7とか)を呼び、さらにbootstrap
がイベントループを回してLambdaハンドラを呼ぶ。
この違いによりCustom Runtimeは、公式ランタイムの/var/runtime/awslambda/bootstrap.py
に使われているruntime
モジュールを使うことができない。
できるだけ公式ランタイムに近いCustom Runtimeを作りたい
ログ
公式ランタイムの/var/runtime/awslambda/bootstrap.py
を見ると、loggingモジュールの出力を設定している。書式を設定している部分はそのまま持ってくる。
出力先はruntime
モジュールのsend_console_message()
。Custom Runtimeには同等のものが提供されていないらしい。かわりにprint(msg, flush=True)
。これでCloudWatch Logsにログが行く。おそらくオーバーヘッドは大きいのだろうが、仕方ない。
この設定をすると、どういうわけか、リクエスト終了後にCloudWatch Logsのログに空行が入る。原因は不明。
プロセスの再利用
もしチュートリアルにあるように、Custom Runtimeのbootstrap
をシェルスクリプトで書くと、リクエストのたびにプロセスが生成されるCGI状態になる。
bootstrap
をPythonで書き、単なる関数の呼び出しとしてLambdaハンドラを呼べば、リクエスト終了後しばらくはプロセスが残りつづけ、その間に新たなリクエストが来た場合はプロセスが再利用される。
リクエスト・レスポンスのインターフェイス
公式ランタイムはruntime
モジュールのreceive_invoke()
を使っている。Custom Runtimeでは、requestでも使って同等のものを書くしかない。幸いPyPy用Custom Runtimeのものがあったので使った。iopipe/lambda-runtime-pypy3.5
結論
# !/opt/bin/python3.6
import decimal
import json
import os
import site
import sys
import urllib.request as request
import time
import logging
HANDLER = os.getenv("_HANDLER")
RUNTIME_API = os.getenv("AWS_LAMBDA_RUNTIME_API")
_GLOBAL_AWS_REQUEST_ID = None
def get_opt_site_packages_directory():
return '/opt/python/lib/python{}.{}/site-packages'.format(sys.version_info.major, sys.version_info.minor)
for path in ["/opt/runtime", '/opt/python', get_opt_site_packages_directory(), os.environ["LAMBDA_TASK_ROOT"]]:
sys.path.insert(0, path)
for path in [os.environ["LAMBDA_TASK_ROOT"], get_opt_site_packages_directory(), '/opt/python']:
site.addsitedir(path)
class LambdaContext(object):
def __init__(self, request_id, invoked_function_arn, deadline_ms, trace_id):
self.aws_request_id = request_id
self.deadline_ms = deadline_ms
self.function_name = os.getenv("AWS_LAMBDA_FUNCTION_NAME")
self.function_version = os.getenv("AWS_LAMBDA_FUNCTION_VERSION")
self.invoked_function_arn = invoked_function_arn
self.log_group_name = os.getenv("AWS_LAMBDA_LOG_GROUP_NAME")
self.log_stream_name = os.getenv("AWS_LAMBDA_LOG_STREAM_NAME")
self.memory_limit_in_mb = os.getenv("AWS_LAMBDA_FUNCTION_MEMORY_SIZE")
self.trace_id = trace_id
if self.trace_id is not None:
os.environ["_X_AMZN_TRACE_ID"] = self.trace_id
def get_remaining_time_in_millis(self):
if self.deadline_ms is not None:
return time.time() * 1000 - int(self.deadline_ms)
class LambdaLoggerHandler(logging.Handler):
def __init__(self):
logging.Handler.__init__(self)
def emit(self, record):
msg = self.format(record)
print(msg, flush=True)
class LambdaLoggerFilter(logging.Filter):
def filter(self, record):
record.aws_request_id = _GLOBAL_AWS_REQUEST_ID or ""
return True
def decimal_serializer(obj):
if isinstance(obj, decimal.Decimal):
return float(obj)
raise TypeError(repr(obj) + " is not JSON serializable")
def init_error(message, type):
details = {"errorMessage": message, "errorType": type}
details = json.dumps(details).encode("utf-8")
req = request.Request(
"http://%s/2018-06-01/runtime/init/error" % RUNTIME_API,
details,
{"Content-Type": "application/json"},
)
with request.urlopen(req) as res:
res.read()
def next_invocation():
with request.urlopen(
"http://%s/2018-06-01/runtime/invocation/next" % RUNTIME_API
) as res:
request_id = res.getheader("lambda-runtime-aws-request-id")
invoked_function_arn = res.getheader("lambda-runtime-invoked-function-arn")
deadline_ms = res.getheader("lambda-runtime-deadline-ms")
trace_id = res.getheader("lambda-runtime-trace-id")
event_payload = res.read()
event = json.loads(event_payload.decode("utf-8"))
context = LambdaContext(request_id, invoked_function_arn, deadline_ms, trace_id)
return request_id, event, context
def invocation_response(request_id, handler_response):
if not isinstance(handler_response, (bytes, str)):
handler_response = json.dumps(handler_response, default=decimal_serializer)
if not isinstance(handler_response, bytes):
handler_response = handler_response.encode("utf-8")
req = request.Request(
"http://%s/2018-06-01/runtime/invocation/%s/response"
% (RUNTIME_API, request_id),
handler_response,
{"Content-Type": "application/json"},
)
with request.urlopen(req) as res:
res.read()
def invocation_error(request_id, error):
details = {"errorMessage": str(error), "errorType": type(error).__name__}
details = json.dumps(details).encode("utf-8")
req = request.Request(
"http://%s/2018-06-01/runtime/invocation/%s/error" % (RUNTIME_API, request_id),
details,
{"Content-Type": "application/json"},
)
with request.urlopen(req) as res:
res.read()
def main():
global _GLOBAL_AWS_REQUEST_ID
for runtime_var in ["AWS_LAMBDA_RUNTIME_API", "_HANDLER"]:
if runtime_var not in os.environ:
init_error("%s environment variable not set" % runtime_var, "RuntimeError")
sys.exit(1)
try:
module_path, handler_name = HANDLER.rsplit(".", 1)
except ValueError:
init_error("Improperly formated handler value: %s" % HANDLER, "ValueError")
sys.exit(1)
module_path = module_path.replace("/", ".")
try:
module = __import__(module_path)
except ImportError:
init_error("Failed to import module: %s" % module_path, "ImportError")
sys.exit(1)
try:
handler = getattr(module, handler_name)
except AttributeError:
init_error(
"No handler %s in module %s" % (handler_name, module_path), "AttributeError"
)
sys.exit(1)
logging.Formatter.converter = time.gmtime
logger = logging.getLogger()
logger_handler = LambdaLoggerHandler()
logger_handler.setFormatter(logging.Formatter(
'[%(levelname)s]\t%(asctime)s.%(msecs)dZ\t%(aws_request_id)s\t%(message)s\n',
'%Y-%m-%dT%H:%M:%S'
))
logger_handler.addFilter(LambdaLoggerFilter())
logger.addHandler(logger_handler)
while True:
request_id, event, context = next_invocation()
_GLOBAL_AWS_REQUEST_ID = context.aws_request_id
try:
handler_response = handler(event, context)
except Exception as e:
invocation_error(request_id, e)
else:
invocation_response(request_id, handler_response)
if __name__ == '__main__':
main()
FROM lambci/lambda:build-python3.6
ENV AWS_DEFAULT_REGION ap-northeast-1
ENV HOME /home/hoge
RUN mkdir $HOME
WORKDIR $HOME
# https://github.com/lambci/docker-lambda/blob/master/python3.6/run/Dockerfile
RUN curl -O https://lambci.s3.amazonaws.com/fs/python3.6.tgz
# COPY python3.6.tgz .
RUN tar zxf python3.6.tgz
COPY bootstrap.py .
RUN mkdir -p ~/fuga
RUN cp -rd ~/var/lang/* ~/fuga/
RUN rm -rf ~/fuga/lib/python3.6/site-packages
RUN cp -rd ~/var/runtime ~/fuga/
COPY bootstrap.py $HOME/fuga
RUN mv ~/fuga/bootstrap.py ~/fuga/bootstrap
RUN chmod 755 ~/fuga/bootstrap
WORKDIR $HOME/fuga
CMD zip -ry9 ~/python36_layer.zip * && cp ~/python36_layer.zip /share
version: '2'
services:
app:
build:
context: .
volumes:
- .:/share
3つのファイルを同じディレクトリに入れてdocker-compose build; docker-compose up
すると、python36_layer.zipができる。
参考
- iopipe/lambda-runtime-pypy3.5 PyPy用Custom Runtime。しかしログが欠けている。
- Reverse engineering AWS Lambda 公式ランタイム(Python)についての一番詳しい記事。
追記
Stackless Python 3.6を展開後47MBに押し込んだ。共有ライブラリとテスト系パッケージをごっそり削っているので、動かないものに出くわしたら付け加えるべし。デバッグシンボルも削っている。
FROM lambci/lambda:build-python3.6
ENV AWS_DEFAULT_REGION ap-northeast-1
ENV HOME /home/hoge
RUN mkdir $HOME
WORKDIR $HOME
RUN curl -O https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh
RUN bash Miniconda3-latest-Linux-x86_64.sh -b -p ~/miniconda
ENV PATH $HOME/miniconda/bin:$PATH
RUN source ~/miniconda/bin/activate
RUN conda config --add channels stackless
RUN conda install -y stackless
RUN conda create -n stackless36 python=3.6 stackless
RUN mkdir -p ~/fuga/bin
RUN mkdir -p ~/fuga/lib
COPY bootstrap.py $HOME/fuga
RUN mv ~/fuga/bootstrap.py ~/fuga/bootstrap
RUN chmod 755 ~/fuga/bootstrap
ENV MC $HOME/miniconda/envs/stackless36
RUN cp $MC/bin/python3.6 ~/fuga/bin/
RUN cp -d $MC/lib/libpython* $MC/lib/libssl* $MC/lib/libcrypto* ~/fuga/lib
RUN rm ~/fuga/lib/libpython3.6m.a
RUN rm -rf ~/fuga/lib/python3.6
RUN cp -rd $MC/lib/python3.6 ~/fuga/lib
RUN rm -rf ~/fuga/lib/python3.6/site-packages
WORKDIR $HOME/fuga
RUN rm -rf lib/python3.6/test lib/python3.6/distutils/tests lib/python3.6/ctypes/test \
lib/python3.6/idlelib/idle_test lib/python3.6/unittest/test
RUN find . -name *.so|xargs -n 1 strip -s; exit 0
RUN strip -s bin/python3.6
CMD zip -ry9 ~/stackless36_layer.zip * && cp ~/stackless36_layer.zip /share