ryuki999
@ryuki999

Are you sure you want to delete the question?

Leaving a resolved question undeleted may help others!

pythonで並列処理する方法

解決したいこと

以下のプログラムを並列処理して高速化したいです。何か方法はあるでしょうか?
プログラムは、cluster_idで指定された文字列を含むseqsの要素をcluster_id別のファイルに書き出す処理をしたいです。とにかく現状よりも高速に動かしたいと言うのが目的です。

# cluster_id=[["AAA","BBB", "CCC"], ["DDD", "EEE", "FFF"], ...]()
# seqs=["123AAA323...","AAB3was23...","123aCCC23...",...](200万)

for i, ids in enumerate(cluster_id):
    with open(f"CLUSTER_{i}_ID.fas", "w+") as w:
        for seq in tqdm(seqs):
            for id in ids:
                if id in seq[:100]:
                    w.write(seq)

自分で試したこと

例えば以下のことを試しましたが、普通にシングルコアで動いているっぽく、出来ませんでした。

from concurrent.futures import ProcessPoolExecutor

def write_seq(seq, ids, w):
    for id in ids:
        if id in seq[:100]:
            w.write(seq)
    return

with ProcessPoolExecutor(max_workers=6) as executor:
    for i, ids in enumerate(cluster_id):
        with open(f"{outputdir}/CLUSTER{i}_ID.fas", "w+") as w:
            for seq in tqdm(seqs):
                executor.submit(write_seq, seq, ids, w)
0

2Answer

試したことを参考に少し直してみました
動かしていないので動作しないかもしれません

from concurrent.futures import ProcessPoolExecutor

def write_seq(i, ids, tqdm_seqs):
    with open(f"CLUSTER_{i}_ID.fas", "w+") as w:
        for seq in tqdm_seqs:
            for id in ids:
                if id in seq[:100]:
                    w.write(seq)

with ProcessPoolExecutor(max_workers=6) as executor:
    tqdm_seqs = tqdm(seqs)
    for i, ids in enumerate(cluster_id):
        executor.submit(write_seq, i, ids, tqdm_seqs)

できるだけwrite_seqに処理を移動しました

200万のtqdm(seqs)を処理するところが遅いのではないかと思います(レコード数が多いので)
tqdmの中身を分解してwrite_seqの方に移したほうがいいかもしれません


上のコードはtqdm_seqsをサブプロセス側へデータ転送するのに時間がかかりそうな気がします
これを避けるためにtqdm_seqをグローバル化し、スレッドを使うThreadPoolExecutorを使用するほうが早いかもしれません

from concurrent.futures import ThreadPoolExecutor

def write_seq(i, ids):
    global tqdm_seqs
    with open(f"CLUSTER_{i}_ID.fas", "w+") as w:
        for seq in tqdm_seqs:
            for id in ids:
                if id in seq[:100]:
                    w.write(seq)

tqdm_seqs = tqdm(seqs)
with ThreadPoolExecutor(max_workers=6) as executor:
    for i, ids in enumerate(cluster_id):
        executor.submit(write_seq, i, ids)
1Like

Comments

  1. @ryuki999

    Questioner

    ご回答ありがとうございます!
    実はこの後試行錯誤してたら、なんとかなって、実行時間が1/max_workersにさせることができました!
    今回はPoolを使ったのですが、回答いただいたglobal変数化とThreadPoolも別解として利用できそうなので参考にさせて頂こうと思います。非常に助かりました。

    一応自作したコードを別で貼っておきます。

最終的に少しコードも改変されていますが、並列処理自体は以下のように記載することで実現できました!
ご思案くださった皆様ありがとうございました:sunny:

def create_file(params):
    i, ids = params
    header_pat = re.compile('^>(.*)')
    is_target_section = False
    with open(f"{outputdir}/CLUSTER{i}_ID.fas", "w+") as w:
        with open(FASTA_FILE, "r") as r:
            for line in tqdm(r):
                if m := header_pat.match(line):
                    is_target_section = m[1] in ids
                if is_target_section:
                    w.write(line)
    return

# 並列処理
with ProcessPoolExecutor(max_workers=len(cluster_id)) as executor:
    params = enumerate(cluster_id)
    results = executor.map(create_file, params)
1Like

Your answer might help someone💌