6
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Entrez efetchで配列レコードを大量取得する

Posted at

ある仕事で5万レコード近い核酸配列を取得して解析する必要が生じた。手元にあるのはaccessionが一行にひとつづつ書かれたファイルのみ。どうやるか?ということになった。解析にはアノテーション情報も使うかもしれないということで、genbankファイルを取得することにした。

環境はpython=3.6.8、Biopython=1.72を用いた。

Entrez efetchについて

efetchは、NCBIが提供しているE-utilitiesのひとつで、配列レコードの取得のみならず文献検索やらなんやらいろいろとできて面白そうですが、寄り道せずに進みます。

詳しい使い方の例や注意事項については以下のページを見てください。

今のところ必須ではないようですが、下のページを読むとNCBIの利用者アカウントを作ってapi keyを取得したほうがいい気がしてきます。

基本的にはhttps://eutils.ncbi.nlm.nih.gov/entrez/eutils/efetch.fcgiというbase URLに必要なパラメターをつないでリクエスト送信すると、NCBIから返事が返ってくるというものです。自分でパラメターを含むURL構築を行ってrequestsモジュールで送信してもいいのですが、idが200以上の場合にはpostを使ってね、とか、本家のURL(https://www.ncbi.nlm.nih.gov/)は使わないでね、とか、そもそもURL構築が面倒だなとか、emailやapi keyはどうやって組み込むんだとか、余計に頭を使うことになってしまいますので、そのあたりうまく丸めてくれるBiopythonのEntrezモジュールのお世話になることにします。(responseはどんなものが返ってきているのかとかHTTPのやり取り自体をもっと知りたい場合にはrequestsで頑張ると勉強にはなります)

あと、Biopythonの世界に持ち込むと、例えばgenbankファイルを取りつつdefinition lineに連番を入れたfastaも同時に作ってしまうとか(genbankとfasta両方リクエストする必要はないですし)、配列長で振り分けするとか、そういうこともできるので便利です。取得が終わってからやってもいいんですけど、もちろん。

必要な情報とパラメターについて

レコード取得にあたり、以下ものものを準備します。

  • 有効なemailアドレス:お願いする以上、正直に名乗ることになります。
  • api_key:必須ではありませんが、名乗りの延長上にあります。

用意する(考えておく)パラメターは以下の通り。

  • db:データベース名。今回は核酸配列を取得しますので、'nuccore'を使います。'nucleotide'でもよいようです。
  • id:accession id(たち)。手動でURLを作る場合には','で区切ります。その場合、スペースを入れないように。Biopythonに任せる場合にはリストを渡します。
  • rettype:今回の場合、'gb'あるいは'fasta'で間に合うのですが、genbankファイルで配列がcontigの場合にはうまくとれてこなくてハマりました。そういう時には'gbwithparts'とするとうまく取れてきます。(webページからファイルに保存する場合には、'GenBank (full)'がある、絶対にできるはずだ、と思ったのですが、すぐにわからなくて苦労しました。ヒントを得るために、webページからのファイル取得で'GenBank'を選んだ時と、'GenBank (full)'を選んだ時に送信URLがどう違うのかを、ChromeブラウザのDeveloper toolsから見てみると、'GenBank (full)'を選んだ時には'&report=gbwithpars'、'&withparts=on'が見えるようになるではないですか。これだ、これでいけると思いましたが、webでの'report'は'rettype'だと予想できるのですが、'withparts'はefetchのargにはない感じ。結局このページに助けられました)あと、自分で指定値を創作してしまうと警告も何もなしに、'xml'で返信されてきてなんじゃこりゃ?になりますので気をつけましょう。('gb'と書くところを'genbank'でもいけるやろ、と思って試してそうなりました。'fasta'じゃなくて'fa'でもそうなります。やってみてください)
  • retmode:今回は'text'でいきます。

データベースごとの可能なrettype、retmodeの組み合わせはこのページにテーブルとしてまとめられています。

方針

さて、今回は手元にaccession id(数万)の情報がファイルにあり、該当するレコードをすべて取得したいという状況です。どうやるか?以下のように考えます。

  • 一度のリクエスト(クエリ)に含まれるaccession idの数。NCBIのページで検索する場合には、デフォルトでは1ページに20の結果が表示されます。表示は最大で200までにすることができ、ページの右上にある'Send to:'のところからファイルに保存できるようになっている。このへんをヒントとすると、ひとまず20ぐらいを一つのリクエストとすれば、そんなに不自然ではないはずだ。
  • ファイル出力。これはこの後の解析とファイル管理の都合で変えればいいと思いますが、今回の例では1ファイル1レコードとしておきます。
  • エラー処理。小手調べコードを走らせてみてわかったことですが、私の環境では二種類のよろしくない状況が発生しました。一つは通信自体に関連するエラー。もう一つは取れてきたレコードの一部が壊れているという問題です。今回のように大量のものを扱う場合にはSTDOUTに流れるメッセージを画面に張り付いて終始見ているわけにもいかないので(そもそもそれをしたくないからこういうコードを書いている)、最低限、問題の起こったaccession idについてはログファイルに書き出しておいてあとでやり直しやすいようにしておきたい。通信エラーが起こった場合には、そのクエリに含まれていたaccession idをすべてやり直すことになりますので、それらを書きだす。ちょっと悩ましいのが取ったレコードの一部が壊れている場合です。BiopythonのSeqIOでレコードを読み込ませるとそれが壊れている場合にはエラーが出るのですが、たとえば20づつ取っている場合に5番目が壊れていたとしますと、SeqIO.parse()は5番目に来たところでエラーを出して止まります。6番以降については問題があるのかないのかわからない。。。これをクリアしようとすると、手が込んでしまいますので(いつまでもこの記事を投稿できなくなってしまうかもしれない)、どれは大丈夫でどれはだめ、とは判定せずに、クエリとして取ってきたものの中に壊れているレコードがある場合には通信エラーのときのようにクエリに含まれるaccession id全てを報告することにします。

エラーが起こったaccession idたちについての取り直しを考えると、クエリに含まれるaccession idの数はあまり大きくしないほうが得策と思われますので、コード中では20としました(count_in_query = 20)。オレ様の環境ではそんなエラーは起きん、という方はそれなりに設定してくださいまし。(うらやましい)

大量取得版コード

Argumentとして渡すのはaccession idたちが一行に一つづつ書かれたファイルへのpathのみ。このファイルの名前(extensionを除いたbasename)をログファイルの名前の一部に使いますので、スペースやら全角やら含まないほうが無難です。このファイルにはコメント行や、空行も見やすさの都合で含みたくなるかもしれません。'#'で始まる行と空行はスキップするようにしておきました。'#'はaccessionの除外にも使えますし。ただし、accession idの後ろには(同じ行には)コメントを入れないでください。(必要ならば改造してください)
データベース(db=)は'nuccore'、ファイル形式(retrype=, retmode=)は'genbank'、'text'、1クエリのid数(count_in_query=)は20で固定、hard codingしてあります。(もっと柔軟に選択したい場合にはこれらもargumentとして取るように改造してください)出力ファイルはログも含めてすべてカレントディレクトリにできてきます。

import sys
import re
import os
from Bio import Entrez
from Bio import SeqIO
import datetime
import time


def timestamp():
    # returns current time stamp
    dt = datetime.datetime.now()
    ts = dt.strftime('%d%b%Y-%H:%M:%S')
    return ts


def elapsed_time(dt_start, dt_end):
    # calculate delta between start and end
    ds = (dt_end - dt_start).total_seconds()
    h = int(ds // 3600)
    m = int((ds % 3600) // 60)
    s = int(ds % 60)
    # construct str line
    str_line = str(h).zfill(2) + 'h ' + str(m).zfill(2) + 'm ' + str(s).zfill(2) + 's'
    return str_line


def write_file(out_file, mode, info_str):
    # write file
    with open(out_file, mode=mode) as OUT:
        OUT.write(info_str + '\n')


def get_acc_chunks(id_list_file, max_in_chunk):
    # group specified number of accession ids in a chunk
    # return a list of chunks and total id count
    # regex to skip some lines
    comment_ptn = re.compile('^#')
    blank_ptn = re.compile(r'^\s*$')

    chunks = []
    chunk = []
    total_id_count = 0
    with open(id_list_file, mode='r') as ACC:
        id_count_in_a_chunk = 0
        for l in ACC:
            l = l.rstrip() # remove annoying EOL
            if (comment_ptn.match(l)) or (blank_ptn.match(l)):
                continue # skip those lines
            else:
                id_count_in_a_chunk += 1
                if id_count_in_a_chunk <= max_in_chunk:
                    chunk.append(l)
                else:
                    # dump a chunk
                    chunks.append(chunk)
                    total_id_count += len(chunk)
                    # start a new chunk
                    chunk = [l]
                    id_count_in_a_chunk = 1
        # append the last chunk
        chunks.append(chunk)
        total_id_count += len(chunk)

    return chunks, total_id_count


def when_exeption_happened(flag, e, acc_chunk, display_chunk_num):
    ts = timestamp()
    if flag == 1: # error in efetch
        where = 'efetch'
    elif flag == 2: # error in SeqIO
        where = 'SeqIO'
    # report on STDOUT
    print(" --- Error in " + where + ". See log file. @" + ts)
    print(str(type(e)))
    print(e.args)
    if flag == 1:
        print("Will be continued after 1 min.")
        print()
    # report to log
    log_lines = [
        '[Error in ' + where + '] chunk:' + display_chunk_num + '@' + ts,
        str(type(e)),
        str(e.args), # if any
        '# accession ids in the chunk'
    ]

    # append accession ids in the chunk
    for acc in acc_chunk:
        log_lines.append(acc)
    log_lines.append('\n')
    write_file(log_file, 'a', '\n'.join(log_lines))


def main():
    print('program started: ', timestamp())
    # mark start time
    dt_start = datetime.datetime.now()

    # write log header
    log_lines = [
        '# processed by: ' + sys.argv[0],
        '# process started: ' + timestamp(),
        '# accession id file: ' + acc_id_file,
        '# db=' + db,
        '# retrype=' + rettype,
        '# retmode=' + retmode,
        '# id count per query: ' + str(count_in_query) + '\n'
    ]

    write_file(log_file, 'w', '\n'.join(log_lines)) # create a new file

    # get accession id chunks from accession id file
    acc_chunks, total_id_count = get_acc_chunks(acc_id_file, count_in_query)

    # declare who you are
    Entrez.email = email
    Entrez.api_key = api_key
    #Entrez.tool is 'biopython' by default

    # some buckets to monitor the processes
    chunk_num = 0
    done_id_count = 0
    error_id_count = 0
    error_chunk_count = 0
    # loop
    for acc_chunk in acc_chunks: # acc_chunk is a list of ids
        chunk_num += 1

        display_chunk_num = str(chunk_num).zfill(len(str(len(acc_chunks))))
        print(acc_id_file_stem + ': chunk:' + display_chunk_num)

        # error capture 1: efetch
        try:
            net_handle = Entrez.efetch(db=db, id=acc_chunk, rettype=rettype, retmode=retmode)
        except Exception as e:
            when_exeption_happened(1, e, acc_chunk, display_chunk_num)
            # counts
            error_chunk_count += 1
            error_id_count += len(acc_chunk)
            # wait 1 min
            time.sleep(60)
            continue
        else:
            pass # I do not want nested try-except. Pass the case to the next try-exept.

        # error capture 2: broken records?
        # Note: net_handle is still open because I need it here.
        try:
            # if record is broken, it will be trapped in the next line.
            for record in SeqIO.parse(net_handle, fmt):
                # write records into a file
                out_file = './' + record.id + ext
                SeqIO.write(record, out_file, fmt)
        except Exception as e:
            when_exeption_happened(2, e, acc_chunk, display_chunk_num)
            # counts
            error_chunk_count += 1
            error_id_count += len(acc_chunk)
        else:
            # report on STDOUT
            dt_now = datetime.datetime.now()
            et = elapsed_time(dt_start, dt_now)
            print(' --- Done. ET:' + et)
            print()
            # counts
            done_id_count += len(acc_chunk)
        finally:
            net_handle.close()
            time.sleep(1) # try longer sleep if you suspect you are kicked out by the server.


    # post loop works
    ts = timestamp()
    dt_now = datetime.datetime.now()
    et = elapsed_time(dt_start, dt_now)
    # log process summary
    log_lines = [
        '# === SUMMARY ===',
        '# total accession id count: ' + str(total_id_count),
        '# chunk created: ' + str(len(acc_chunks)),
        '# error chunks: ' + str(error_chunk_count),
        '# done id count: ' + str(done_id_count),
        '# error id count: ' + str(error_id_count),
        '# process finished: ' + ts,
        '# total elapsed time: ' + et
    ]
    write_file(log_file, 'a', '\n'.join(log_lines))

    # STDOUT
    print('Process finished.')
    print('\n'.join(log_lines))


if __name__ == '__main__':
    # get accession id file path
    acc_id_file = sys.argv[1]

    # vars for efetch
    db = 'nuccore' # 'nucleotide' will work as well
    rettype = 'gb' # use 'fasta', 'gbwithparts' for fasta, GanBank(full)
    retmode = 'text' # fixed in this code
    count_in_query = 20 # count in one query
    email = 'your@email.com' # use yours
    api_key = 'your_api_key_string' # use yours

    # vars for SeqIO
    # format name adjustment and file extension
    if rettype in ['gb', 'gbwithparts']:
        fmt = 'gb'
        ext = '.gb'
    elif rettype == 'fasta':
        fmt = 'fasta'
        ext = '.fa'
    else:
        print("rettype should be 'gb', 'gbwithparts', or 'fasta'")
        sys.exit()

    # log file path construction
    acc_id_file_stem = os.path.splitext(os.path.basename(acc_id_file))[0]
    log_file = './' + acc_id_file_stem + '_download_log.txt'

    # run!
    main()

ログを出したりerror captureしたりしているせいで、長ったらしいコードになってしまいました。

count_in_query = で指定した数のaccession idが一回のクエリに入ります。あとは、acc_id_fileの中にいくつidがあろうともその数ずつコツコツとクエリを送ってはレコードをファイルに書き出してくれます(1ファイル1レコード)。通信でエラー、あるいはレコードが壊れている時にはエラー情報とともに該当するaccession idがログに書き込まれます。やり直しの際に使えます。

実際数万のレコードを取るときには、accession idファイルをsplitコマンドで1000行ずつに分割(acc_00、acc_01、、、のようなファイルに小分け)して、bash scriptで順に走らせました。こんな感じです。

for acc_file in 'path_to/acc_*
do
    python <code> ${acc_file}
done

これでacc_*の部分をacc_0*などとすれば`00から09までだけを走らせたりできるわけです。USのbusy timeを避けてほしい、というようなことがどこかにかかれていました。私は(GMT+08)のところに住んでいますので、自分の時間帯の朝(向こうが仕事から帰る)から夕方(向こうが起き始める)までに様子をみながら走らせるようにしました。週末にも走らせましたが、この目安を守っている人が多いからなのか、通信に時間がかかりました。たまたまかもしれませんが。

普段使いちょい取り版

配列レコードを大量に取得する機会はそんなにないかもしれません。数個を取ってくるならログなんていらないでしょうし、エラーがあるならターミナル上へ流れればそれでいい。確かにそうです。ということで、普段使い簡単版を下にのせます。経過時間なんていらへんし、という場合にはさらにすっきりさせられます。(が、どれだけかかったのかな、ってのは案外知りたくなるもんです)

ひとつめのargumentにrettypeを与えてください。'gb'、'gbwithparts'、'fasta'のいづれかが使えます。そのあと、accession idをスペースで区切りながら書いてください(カンマを勢いで入れないでください。単純にsys.argv[2:]で拾っているだけなので)。gbとfasta両方ほしい、という方は、ひとまずgbをとってあとでSeqIOでfasta版を書きだしましょう。もう一度取り直すよりよっぽど早くできます。

import sys
from Bio import Entrez
from Bio import SeqIO
import datetime


def timestamp():
    # returns current time stamp
    dt = datetime.datetime.now()
    ts = dt.strftime('%d%b%Y-%H:%M:%S')
    return ts


def elapsed_time(dt_start, dt_end):
    # calculate delta between start and end
    ds = (dt_end - dt_start).total_seconds()
    h = int(ds // 3600)
    m = int((ds % 3600) // 60)
    s = int(ds % 60)
    # construct str line
    str_line = str(h).zfill(2) + 'h ' + str(m).zfill(2) + 'm ' + str(s).zfill(2) + 's'
    return str_line


def main():
    print('process started: ', timestamp())
    # mark start time
    dt_start = datetime.datetime.now()

    # declare who you are
    Entrez.email = email
    Entrez.api_key = api_key

    # efetch!
    net_handle = Entrez.efetch(db=db, id=acc_ids, rettype=rettype, retmode=retmode)

    # write out files
    for record in SeqIO.parse(net_handle, fmt):
        out_file = record.id + ext
        SeqIO.write(record, out_file, fmt)
        print(out_file, 'created.')

    # do not forget to close the handle
    net_handle.close()

    # STDOUT
    print()
    print('process finished: ', timestamp())
    et = elapsed_time(dt_start, datetime.datetime.now())
    print('elapsed time: ', et)


if __name__ == '__main__':

    # get rettype
    rettype = sys.argv[1] # choices: 'gb', 'gbwithparts', 'fasta'
    # get acc ids
    acc_ids = sys.argv[2:]

    # vars for efetch
    db = 'nuccore' # 'nucleotide' will work as well
    retmode = 'text' # fixed in this code
    email = 'your@email.com' # use yours
    api_key = 'your_api_key_string' # use yours

    # vars for SeqIO
    # format name adjustment and file extension
    if rettype in ['gb', 'gbwithparts']:
        fmt = 'gb'
        ext = '.gb'
    elif rettype == 'fasta':
        fmt = 'fasta'
        ext = '.fa'
    else:
        print("rettype should be 'gb', 'gbwithparts', or 'fasta'")
        sys.exit()

    # run!
    main()

最後に

今回は核酸配列しか扱いませんでしたが、E-utilityには文献検索はじめ、たくさんの便利が詰まっています。たいていのものはwebから利用したほうがわかりやすいので、わざわざプログラムからアクセスする必要はないかもしれませんが、少しだけ違ってほぼ同じことの繰り返し、が作業に入り始めたら考える時到来です。

6
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
6
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?