後々困らなくする程度の機能共通化
不定期に使うスクリプトなどは、手早く作るため一筆書で作成する事がよくあります
そのため、毎回同じよな処理や機能を書いたり、途中から良い方法をみつけた場合に昔書いたコードを修正するのが手間です。
また、スクリプトを複数にわけていて、久しぶりに実行しようと実行する手順を残していないため忘れたりする事もあります。
簡易的に作る場合でも、最低限の共通化と手順を残せるように、下記のようなジョブコマンドの簡易ルータを作ってみます
① 複数ジョブに共通な前処理、後処理の共通化する
② 実行手順や引数をJsonファイルに定義する
メイン処理の説明
メイン部分の処理は下記のようなコードになります
下記サンプルでは、個別処理は「@job.route」デコレータの関数です。データベースのテーブル作成や更新を行います。
実際には個別処理は別のファイルに書いて処理を記述すると思います。
共通の実行処理はmainにのみ実装されています。例では、データベースのコネクション作成とクローズ処理をしています。
import logging as log
from job_router.entity import BaseCtx
from job_router.entity import JobGroup
from job_router.job import Job
import sqlite3
log.basicConfig(level=log.DEBUG)
job = Job()
dbpath = 'sample.sqlite'
job_req = 'sample_job_req.json'
@job.route(group='group_a', job='job_1')
def sample_method_1(ctx):
cursor = ctx.conn.cursor()
cursor.execute("CREATE TABLE IF NOT EXISTS sample (id INTEGER PRIMARY KEY, name TEXT, job_id INTEGER)")
@job.route(group='group_a', job='job_2')
def sample_method_2(ctx):
job_id = ctx.args['job_id']
cursor = ctx.conn.cursor()
cursor.execute("INSERT INTO sample(name, job_id) VALUES (:name, :job_id)",
{'name': '田中', 'job_id': job_id})
cursor.execute('SELECT * FROM sample ORDER BY id')
print(cursor.fetchall())
def on_execute(job_name=None, group_name=None, ctx=None):
job({'JOB': job_name, 'GROUP': group_name, 'CTX': ctx})
if __name__ == '__main__':
class Ctx(BaseCtx):
def __init__(self, args, conn):
self.args = args
self.conn = conn
ctx = Ctx
# DB connection
ctx.conn = sqlite3.connect(dbpath)
try:
g = JobGroup.from_text(job_req)
for j in g.job_list:
args_dict = {}
for a in j.args:
args_dict[a.name] = a.value
ctx.args = args_dict
on_execute(job_name=j.name, group_name=g.name, ctx=ctx)
except Exception as e:
ctx.conn.rollback()
log.exception(e)
finally:
ctx.conn.commit()
ctx.conn.close()
ジョブ実行ルールを記載したJSONファイルを読み込むクラスを作成する
構造を定義して、間違った形式のJSONを読み込んだ場合にエラーにするようにします
jsonをクラスにするために、下記Mixinクラスを作成します
import json
class DictMixin(object):
@classmethod
def from_dict(cls, d):
return cls(**d)
@classmethod
def from_dicts(cls, ds):
return [cls(**d) for d in ds]
def to_dict(self):
return self._traverse_dict(self.__dict__)
def _traverse_dict(self, instance_dict):
output = {}
for key, value in instance_dict.items():
output[key] = self._traverse(key, value)
return output
def _traverse(self, key, value):
if isinstance(value, DictMixin):
return value.to_dict()
elif isinstance(value, dict):
return self._traverse_dict(value)
elif isinstance(value, list):
return [self._traverse(key, i) for i in value]
elif hasattr(value, '__dict__'):
return self._traverse_dict(value.__dict__)
else:
return value
DM = DictMixin
class JsonMixin(DictMixin):
@classmethod
def from_json(cls, data):
return cls.from_dict(json.loads(data))
@classmethod
def from_text(cls, file_path):
with open(file_path, 'r') as fh:
return cls.from_dict(json.load(fh))
def to_json(self, indent=0):
return json.dumps(self.to_dict(), indent=indent, ensure_ascii=False)
def to_pretty_json(self):
return self.to_json(4)
JM = JsonMixin
実行手順用のjsonファイルの構造を下記のようにします。
{
"group" : "group_a",
"jobs" : [
{
"name": "job_1",
"args": [
]
},
{
"name": "job_2",
"args": [
{
"name":"job_id",
"value":1
}
]
}
]
}
上記jsonに対応したクラスです
import json_conv as jc
class JobGroup(jc.JM):
def __init__(self, group, jobs):
self.name = group
self.job_list = list(map(lambda j: Job.from_dict(j), jobs))
class Job(jc.JM):
def __init__(self, name, args):
self.name = name
self.args = list(map(lambda a: Argument.from_dict(a), args))
class Argument(jc.JM):
def __init__(self, name, value):
self.name = name
self.value = value
class BaseCtx(object):
pass
ルールで指定した処理を呼び出すルータクラスを作成します
class JobRouterError(Exception):
pass
class Router:
def __init__(self):
self.routes = []
def add(self, job, group, callback):
self.routes.append({
'job': job,
'group': group,
'callback': callback
})
def match(self, job, group):
for r in filter(lambda x: x['job'] == job, self.routes):
if r['group'] == group:
return r['callback']
raise JobRouterError
class Job:
def __init__(self):
self.router = Router()
def route(self, group=None, job=None, callback=None):
def decorator(callback_func):
self.router.add(job, group, callback_func)
return callback_func
return decorator(callback) if callback else decorator
def __call__(self, env):
job = env['JOB']
group = env['GROUP']
ctx = env['CTX']
callback = self.router.match(job, group)
return callback(ctx)
以上です。