はじめに
pythonでAWS Lambda を業務利用で量産するにあたって、自分があると便利だと思うクラス構成の雛形を作成してみました。
どんなプロジェクトに入っても、これに近いクラス階層や構成を用意したり、なければ自作することになるであろう階層たちなので、雛形として作成&公開してしまおう。という考えです。
機能群
- 環境変数でのログレベル指定
- オブジェクト指向の多重継承クラス
- 非同期処理の直列待ち合わせ
- 入出力JSONのログ出力(Lambdaで便利)
ここら辺が量産用の雛形として、プロジェクトに用意してあると嬉しいですね。
クラス設計
AWS Lambda基盤から直接実行される、lambda_function.pyと、業務処理クラスは切り離したいのと、各クラス階層ごとに、どのレベルの共通処理をするか定めておくと似たような別処理を量産するときに楽に量産できるようになります。
lambda_function.py (サンプル)
import logging
import os
import sys
import json
import asyncio
import nest_asyncio
nest_asyncio.apply()
import main_process_common
logger = logging.getLogger(__name__)
handler = logging.StreamHandler()
logLevelEnv = os.getenv('LogLevel', 'INFO')
if logLevelEnv == 'DEBUG':
handler.setLevel(logging.DEBUG)
logger.setLevel(logging.DEBUG)
if logLevelEnv == 'INFO':
handler.setLevel(logging.INFO)
logger.setLevel(logging.INFO)
if logLevelEnv == 'WARNING':
handler.setLevel(logging.WARNING)
logger.setLevel(logging.WARNING)
if logLevelEnv == 'ERROR':
handler.setLevel(logging.ERROR)
logger.setLevel(logging.ERROR)
if logLevelEnv == 'CRITICAL':
handler.setLevel(logging.CRITICAL)
logger.setLevel(logging.CRITICAL)
for tmp_handler in logger.handlers:
logger.removeHandler(tmp_handler)
logger.addHandler(handler)
logger.propagate = False
def lambda_handler(event, context):
logger.info(json.dumps(event))
logger.debug('this is debug')
logger.info('this is info')
logger.warning('this is warning')
logger.error('this is error')
logger.critical('this is critical')
mainObj = main_process_common.MainProcessCommon("lambda_handler", logger, asyncio)
loop = asyncio.new_event_loop()
loop.run_until_complete(mainObj.execute_process(event, context))
loop.close()
results = mainObj.get_results()
logger.info(json.dumps(results))
response = {'result': results}
return response
AWS SDKで各種AWSサービスにアクセスする業務処理を書くことを見越して、非同期処理として業務処理を実行するのと待ち合わせの例です。
main_process_common.py (サンプル)
import abstract_process_common
class MainProcessCommon(abstract_process_common.AbstractProcessCommon):
def __init__(self, name, logger, asyncio):
super(MainProcessCommon, self).__init__(name, logger, asyncio)
self.name = "MainProcessCommon " + name
self.logger = logger
self.asyncio = asyncio
def display_name(self):
print (self.name)
継承の書き方(import と Class構文)
コンストラクタ
のサンプルにもなりますね。
abstract_process_common.py (サンプル)
import abstract_business_common
class AbstractProcessCommon(abstract_business_common.AbstractBusinessCommon):
def __init__(self, name, logger, asyncio):
super(AbstractProcessCommon, self).__init__(name, logger, asyncio)
self.name = "AbstractProcessCommon " + name
def display_name(self):
print (self.name)
async def main_biz_process(self, event, context):
try:
self.get_logger().debug('AbstractProcessCommon#main_biz_process start#');
return "main_biz_process Finish";
except Exception as e:
self.get_logger().error('AbstractProcessCommon#main_biz_process error')
self.get_logger().error(e)
raise e
finally:
self.get_logger().debug('AbstractProcessCommon#main_biz_process end#');
継承の書き方(import と Class構文)
コンストラクタ
は、ほとんど一緒です。
main_biz_processがオーバーライドメソッドのサンプルです。
abstract_business_common.py (サンプル)
import abstract_base_common
class AbstractBusinessCommon(abstract_base_common.AbstractBaseCommon):
def __init__(self, name, logger, asyncio):
super(AbstractBusinessCommon, self).__init__(name, logger, asyncio)
self.name = "AbstractBusinessCommon " + name
def display_name(self):
print (self.name)
async def before_biz_process(self, event, context):
try:
self.get_logger().debug('AbstractBusinessCommon#before_biz_process start#');
return "before_biz_process Finish";
except Exception as e:
self.get_logger().error('AbstractBusinessCommon#before_biz_process error')
self.get_logger().error(e)
raise e
finally:
self.get_logger().debug('AbstractBusinessCommon#before_biz_process end#');
async def main_biz_process(self, event, context):
try:
self.get_logger().debug('AbstractBusinessCommon#main_biz_process start#');
return "main_biz_process Finish";
except Exception as e:
self.get_logger().error('AbstractBusinessCommon#main_biz_process error')
self.get_logger().error(e)
raise e
finally:
self.get_logger().debug('AbstractBusinessCommon#main_biz_process end#');
async def after_biz_process(self, event, context):
try:
self.get_logger().debug('AbstractBusinessCommon#after_biz_process start#');
return "after_biz_process Finish";
except Exception as e:
self.get_logger().error('AbstractBusinessCommon#after_biz_process error')
self.get_logger().error(e)
raise e
finally:
self.get_logger().debug('AbstractBusinessCommon#after_biz_process end#');
def get_tasks(self, event, context):
try:
self.get_logger().debug('AbstractBusinessCommon#get_tasks start#');
tasks = [
self.before_biz_process,
self.main_biz_process,
self.after_biz_process
]
return tasks
except Exception as e:
self.get_logger().error('AbstractBusinessCommon#get_tasks error')
self.get_logger().error(e)
raise e
finally:
self.get_logger().debug('AbstractBusinessCommon#get_tasks end#');
非同期関数を、「前処理」「主処理」「後処理」と個別に作成するサンプルと、pythonは関数型を扱えるので、関数の配列を返却するgetterのサンプルです。
abstract_base_common.py (サンプル)
class AbstractBaseCommon():
def __init__(self, name, logger, asyncio):
self.name = "AbstractBaseCommon " + name
self.logger = logger
self.asyncio = asyncio
self.results = []
def display_name(self):
print (self.name)
def get_logger(self):
return self.logger
def get_asyncio(self):
return self.asyncio
def get_results(self):
return self.results
def get_first_index_object(self, args):
rtn_object = null
if (args is not None) and (len(args) > 0):
rtn_object = args[0]
return rtn_object
def get_last_index_object(self, args):
rtn_object = null
if (args is not None) and (len(args) > 0):
rtn_object = args[-1]
return rtn_object
def get_tasks(self, event, context):
try:
self.get_logger().debug('AbstractBaseCommon#get_tasks start#');
return []
except Exception as e:
self.get_logger().error('AbstractBaseCommon#get_tasks error')
self.get_logger().error(e)
raise e
finally:
self.get_logger().debug('AbstractBaseCommon#get_tasks end#');
async def execute_process(self, event, context):
try:
self.get_logger().debug('AbstractBaseCommon#execute_process start#');
tasks = self.get_tasks(event, context)
results = self.get_results()
for task in tasks:
loop = self.get_asyncio().new_event_loop()
rtn_value = loop.run_until_complete(task(event, context))
loop.close()
results.append(rtn_value)
except Exception as e:
self.get_logger().error('AbstractBaseCommon#execute_process error')
self.get_logger().error(e)
raise e
finally:
self.get_logger().debug('AbstractBaseCommon#execute_process end#');
共通基底処理として、非同期関数を直列に待ち合わせしながら処理するサンプルです。
実行結果
このサンプルを実行すると、上記のようなログが出力され、オーバーライドされたメソッド含め、どのクラスのメソッドが動いたかログで確認できます。(デバックレベル出力なので本運用時はInfoレベルに変更したら少なくすることも可能)
まとめ
本職は、java屋で、あまりクラス階層や継承で役割分担したサンプルを見かけないので、Lambdaで共通基底クラスを作るとしたら、こんな階層にするのが良いのでは?という階層をサンプル化しました。
AWSの相談・お困りごとありましたら、、、
AWSの活用方法や、お困りごとの相談、随時、お仕事の受付しております。