1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

SQLAlchemyとmultiprocessingを使う

Last updated at Posted at 2014-01-17

Pythonの便利なマルチプロセスライブラリであるmultiprocessingとSQLAlchemyを組み合わせて使用するサンプル
プロセス間のデータのやり取りはQueueで行い、DBにはSQLALchemyのsessionを使ってアクセスする

from multiprocessing import (Process, Queue)
from sqlalchemy import (create_engine, MetaData)
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import (sessionmaker, scoped_session)


engine = create_engine("mysql://{user}:{passwd}@{host}/{db}")
# _Session = scoped_session(sessionmaker())
Session = sessionmaker()
metadata = MetaData(engine)
Base = declarative_base()
# Base.query = _Session.query_property()

####################
# テーブル定義は省略(Itemテーブル)
####################

class Hoge(object):
    def __init__(self):
        '''マルチプロセスに食わせるキューと吐き出す先のキュー'''
        self.in_queue = Queue()
        self.out_queue = Queue()
        self.session = Session()

    def __del__(self):
        self.session.commit()
        self.session.close()

    def get_items(self, worker_num=4):
        '''マルチプロセスでDBからデータを取得する
        キューに取得したいidを格納してマルチプロセスでDBにアクセスし、取り出す
        '''
        item_ids = self.session.query(Item.id).all()

        for item_id in item_ids:
            self.in_queue.put(item_id[0])

        jobs = []
        for i in xrange(worker_num):           
            p = Process(target=self.worker, args=[])
            p.daemon = True
            jobs.append(p)
            p.start()

        for job in jobs:
            job.join()
            # プロセス終了メッセージ
            print '{name}.exitcode = {code}'.format(name=job.name, code=job.exitcode)

        return True

    def worker(self):
        '''マルチプロセス用ワーカー
        self.in_queueからidを取り出し、DBからそのデータを取得してself.out_queueに格納する
        '''
        while not self.in_queue.empty():
            id = self.in_queue.get()
            print id,
            try:
                item = self.session.query(Item).filter(Item.id == id).first()
            except Exception as e:
                print 'error =>', e
                continue
            self.out_queue.put(item.name)
            print 'in_queue: {0} (out_queue: {1})'\
                    .format(self.in_queue.qsize(), self.out_queue.qsize())
        return True  # これがないと終わらない

注意点

ここでscoped_sessionでSessionを作成すると

Could not locate column in row for column …

といったエラーが吐かれてしまうことがあるので注意

また、

Traceback (most recent call last):
File "/usr/lib/python2.7/multiprocessing/queues.py", line 266, in _feed
send(obj)
IOError: [Errno 32] Broken pipe

というエラーも出たりしましたが、再現されなかったのでよく分かりませんでした…。

1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?