Help us understand the problem. What is going on with this article?

SQLAlchemyとmultiprocessingを使う

More than 5 years have passed since last update.

Pythonの便利なマルチプロセスライブラリであるmultiprocessingとSQLAlchemyを組み合わせて使用するサンプル
プロセス間のデータのやり取りはQueueで行い、DBにはSQLALchemyのsessionを使ってアクセスする

from multiprocessing import (Process, Queue)
from sqlalchemy import (create_engine, MetaData)
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import (sessionmaker, scoped_session)


engine = create_engine("mysql://{user}:{passwd}@{host}/{db}")
# _Session = scoped_session(sessionmaker())
Session = sessionmaker()
metadata = MetaData(engine)
Base = declarative_base()
# Base.query = _Session.query_property()

####################
# テーブル定義は省略(Itemテーブル)
####################

class Hoge(object):
    def __init__(self):
        '''マルチプロセスに食わせるキューと吐き出す先のキュー'''
        self.in_queue = Queue()
        self.out_queue = Queue()
        self.session = Session()

    def __del__(self):
        self.session.commit()
        self.session.close()

    def get_items(self, worker_num=4):
        '''マルチプロセスでDBからデータを取得する
        キューに取得したいidを格納してマルチプロセスでDBにアクセスし、取り出す
        '''
        item_ids = self.session.query(Item.id).all()

        for item_id in item_ids:
            self.in_queue.put(item_id[0])

        jobs = []
        for i in xrange(worker_num):           
            p = Process(target=self.worker, args=[])
            p.daemon = True
            jobs.append(p)
            p.start()

        for job in jobs:
            job.join()
            # プロセス終了メッセージ
            print '{name}.exitcode = {code}'.format(name=job.name, code=job.exitcode)

        return True

    def worker(self):
        '''マルチプロセス用ワーカー
        self.in_queueからidを取り出し、DBからそのデータを取得してself.out_queueに格納する
        '''
        while not self.in_queue.empty():
            id = self.in_queue.get()
            print id,
            try:
                item = self.session.query(Item).filter(Item.id == id).first()
            except Exception as e:
                print 'error =>', e
                continue
            self.out_queue.put(item.name)
            print 'in_queue: {0} (out_queue: {1})'\
                    .format(self.in_queue.qsize(), self.out_queue.qsize())
        return True  # これがないと終わらない

注意点

ここでscoped_sessionでSessionを作成すると

Could not locate column in row for column …

といったエラーが吐かれてしまうことがあるので注意

また、

Traceback (most recent call last):
File "/usr/lib/python2.7/multiprocessing/queues.py", line 266, in _feed
send(obj)
IOError: [Errno 32] Broken pipe

というエラーも出たりしましたが、再現されなかったのでよく分かりませんでした…。

Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
Comments
No comments
Sign up for free and join this conversation.
If you already have a Qiita account
Why do not you register as a user and use Qiita more conveniently?
You need to log in to use this function. Qiita can be used more conveniently after logging in.
You seem to be reading articles frequently this month. Qiita can be used more conveniently after logging in.
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
ユーザーは見つかりませんでした