Pythonの便利なマルチプロセスライブラリであるmultiprocessingとSQLAlchemyを組み合わせて使用するサンプル
プロセス間のデータのやり取りはQueueで行い、DBにはSQLALchemyのsessionを使ってアクセスする
from multiprocessing import (Process, Queue)
from sqlalchemy import (create_engine, MetaData)
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import (sessionmaker, scoped_session)
engine = create_engine("mysql://{user}:{passwd}@{host}/{db}")
# _Session = scoped_session(sessionmaker())
Session = sessionmaker()
metadata = MetaData(engine)
Base = declarative_base()
# Base.query = _Session.query_property()
####################
# テーブル定義は省略(Itemテーブル)
####################
class Hoge(object):
def __init__(self):
'''マルチプロセスに食わせるキューと吐き出す先のキュー'''
self.in_queue = Queue()
self.out_queue = Queue()
self.session = Session()
def __del__(self):
self.session.commit()
self.session.close()
def get_items(self, worker_num=4):
'''マルチプロセスでDBからデータを取得する
キューに取得したいidを格納してマルチプロセスでDBにアクセスし、取り出す
'''
item_ids = self.session.query(Item.id).all()
for item_id in item_ids:
self.in_queue.put(item_id[0])
jobs = []
for i in xrange(worker_num):
p = Process(target=self.worker, args=[])
p.daemon = True
jobs.append(p)
p.start()
for job in jobs:
job.join()
# プロセス終了メッセージ
print '{name}.exitcode = {code}'.format(name=job.name, code=job.exitcode)
return True
def worker(self):
'''マルチプロセス用ワーカー
self.in_queueからidを取り出し、DBからそのデータを取得してself.out_queueに格納する
'''
while not self.in_queue.empty():
id = self.in_queue.get()
print id,
try:
item = self.session.query(Item).filter(Item.id == id).first()
except Exception as e:
print 'error =>', e
continue
self.out_queue.put(item.name)
print 'in_queue: {0} (out_queue: {1})'\
.format(self.in_queue.qsize(), self.out_queue.qsize())
return True # これがないと終わらない
注意点
ここでscoped_sessionでSessionを作成すると
Could not locate column in row for column …
といったエラーが吐かれてしまうことがあるので注意
また、
Traceback (most recent call last):
File "/usr/lib/python2.7/multiprocessing/queues.py", line 266, in _feed
send(obj)
IOError: [Errno 32] Broken pipe
というエラーも出たりしましたが、再現されなかったのでよく分かりませんでした…。