ある仕事で5万レコード近い核酸配列を取得して解析する必要が生じた。手元にあるのはaccessionが一行にひとつづつ書かれたファイルのみ。どうやるか?ということになった。解析にはアノテーション情報も使うかもしれないということで、genbankファイルを取得することにした。
環境はpython=3.6.8、Biopython=1.72を用いた。
Entrez efetchについて
efetchは、NCBIが提供しているE-utilitiesのひとつで、配列レコードの取得のみならず文献検索やらなんやらいろいろとできて面白そうですが、寄り道せずに進みます。
詳しい使い方の例や注意事項については以下のページを見てください。
- A General Introduction to the E-utilities
- E-utilities Quick Start
- The E-utilities In-Depth: Parameters, Syntax and More
今のところ必須ではないようですが、下のページを読むとNCBIの利用者アカウントを作ってapi keyを取得したほうがいい気がしてきます。
基本的にはhttps://eutils.ncbi.nlm.nih.gov/entrez/eutils/efetch.fcgi
というbase URLに必要なパラメターをつないでリクエスト送信すると、NCBIから返事が返ってくるというものです。自分でパラメターを含むURL構築を行ってrequests
モジュールで送信してもいいのですが、idが200以上の場合にはpostを使ってね、とか、本家のURL(https://www.ncbi.nlm.nih.gov/
)は使わないでね、とか、そもそもURL構築が面倒だなとか、emailやapi keyはどうやって組み込むんだとか、余計に頭を使うことになってしまいますので、そのあたりうまく丸めてくれるBiopythonのEntrez
モジュールのお世話になることにします。(responseはどんなものが返ってきているのかとかHTTPのやり取り自体をもっと知りたい場合にはrequests
で頑張ると勉強にはなります)
あと、Biopythonの世界に持ち込むと、例えばgenbankファイルを取りつつdefinition lineに連番を入れたfastaも同時に作ってしまうとか(genbankとfasta両方リクエストする必要はないですし)、配列長で振り分けするとか、そういうこともできるので便利です。取得が終わってからやってもいいんですけど、もちろん。
必要な情報とパラメターについて
レコード取得にあたり、以下ものものを準備します。
- 有効なemailアドレス:お願いする以上、正直に名乗ることになります。
-
api_key
:必須ではありませんが、名乗りの延長上にあります。
用意する(考えておく)パラメターは以下の通り。
-
db
:データベース名。今回は核酸配列を取得しますので、'nuccore'を使います。'nucleotide'でもよいようです。 -
id
:accession id(たち)。手動でURLを作る場合には','
で区切ります。その場合、スペースを入れないように。Biopythonに任せる場合にはリストを渡します。 -
rettype
:今回の場合、'gb'あるいは'fasta'で間に合うのですが、genbankファイルで配列がcontigの場合にはうまくとれてこなくてハマりました。そういう時には'gbwithparts'とするとうまく取れてきます。(webページからファイルに保存する場合には、'GenBank (full)'がある、絶対にできるはずだ、と思ったのですが、すぐにわからなくて苦労しました。ヒントを得るために、webページからのファイル取得で'GenBank'を選んだ時と、'GenBank (full)'を選んだ時に送信URLがどう違うのかを、ChromeブラウザのDeveloper toolsから見てみると、'GenBank (full)'を選んだ時には'&report=gbwithpars'、'&withparts=on'が見えるようになるではないですか。これだ、これでいけると思いましたが、webでの'report'は'rettype'だと予想できるのですが、'withparts'はefetchのargにはない感じ。結局このページに助けられました)あと、自分で指定値を創作してしまうと警告も何もなしに、'xml'で返信されてきてなんじゃこりゃ?になりますので気をつけましょう。('gb'と書くところを'genbank'でもいけるやろ、と思って試してそうなりました。'fasta'じゃなくて'fa'でもそうなります。やってみてください) -
retmode
:今回は'text'でいきます。
データベースごとの可能なrettype、retmodeの組み合わせはこのページにテーブルとしてまとめられています。
方針
さて、今回は手元にaccession id(数万)の情報がファイルにあり、該当するレコードをすべて取得したいという状況です。どうやるか?以下のように考えます。
- 一度のリクエスト(クエリ)に含まれるaccession idの数。NCBIのページで検索する場合には、デフォルトでは1ページに20の結果が表示されます。表示は最大で200までにすることができ、ページの右上にある'Send to:'のところからファイルに保存できるようになっている。このへんをヒントとすると、ひとまず20ぐらいを一つのリクエストとすれば、そんなに不自然ではないはずだ。
- ファイル出力。これはこの後の解析とファイル管理の都合で変えればいいと思いますが、今回の例では1ファイル1レコードとしておきます。
- エラー処理。小手調べコードを走らせてみてわかったことですが、私の環境では二種類のよろしくない状況が発生しました。一つは通信自体に関連するエラー。もう一つは取れてきたレコードの一部が壊れているという問題です。今回のように大量のものを扱う場合にはSTDOUTに流れるメッセージを画面に張り付いて終始見ているわけにもいかないので(そもそもそれをしたくないからこういうコードを書いている)、最低限、問題の起こったaccession idについてはログファイルに書き出しておいてあとでやり直しやすいようにしておきたい。通信エラーが起こった場合には、そのクエリに含まれていたaccession idをすべてやり直すことになりますので、それらを書きだす。ちょっと悩ましいのが取ったレコードの一部が壊れている場合です。BiopythonのSeqIOでレコードを読み込ませるとそれが壊れている場合にはエラーが出るのですが、たとえば20づつ取っている場合に5番目が壊れていたとしますと、SeqIO.parse()は5番目に来たところでエラーを出して止まります。6番以降については問題があるのかないのかわからない。。。これをクリアしようとすると、手が込んでしまいますので(いつまでもこの記事を投稿できなくなってしまうかもしれない)、どれは大丈夫でどれはだめ、とは判定せずに、クエリとして取ってきたものの中に壊れているレコードがある場合には通信エラーのときのようにクエリに含まれるaccession id全てを報告することにします。
エラーが起こったaccession idたちについての取り直しを考えると、クエリに含まれるaccession idの数はあまり大きくしないほうが得策と思われますので、コード中では20としました(count_in_query = 20
)。オレ様の環境ではそんなエラーは起きん、という方はそれなりに設定してくださいまし。(うらやましい)
大量取得版コード
Argumentとして渡すのはaccession idたちが一行に一つづつ書かれたファイルへのpathのみ。このファイルの名前(extensionを除いたbasename)をログファイルの名前の一部に使いますので、スペースやら全角やら含まないほうが無難です。このファイルにはコメント行や、空行も見やすさの都合で含みたくなるかもしれません。'#'で始まる行と空行はスキップするようにしておきました。'#'はaccessionの除外にも使えますし。ただし、accession idの後ろには(同じ行には)コメントを入れないでください。(必要ならば改造してください)
データベース(db=)は'nuccore'、ファイル形式(retrype=, retmode=)は'genbank'、'text'、1クエリのid数(count_in_query=)は20で固定、hard codingしてあります。(もっと柔軟に選択したい場合にはこれらもargumentとして取るように改造してください)出力ファイルはログも含めてすべてカレントディレクトリにできてきます。
import sys
import re
import os
from Bio import Entrez
from Bio import SeqIO
import datetime
import time
def timestamp():
# returns current time stamp
dt = datetime.datetime.now()
ts = dt.strftime('%d%b%Y-%H:%M:%S')
return ts
def elapsed_time(dt_start, dt_end):
# calculate delta between start and end
ds = (dt_end - dt_start).total_seconds()
h = int(ds // 3600)
m = int((ds % 3600) // 60)
s = int(ds % 60)
# construct str line
str_line = str(h).zfill(2) + 'h ' + str(m).zfill(2) + 'm ' + str(s).zfill(2) + 's'
return str_line
def write_file(out_file, mode, info_str):
# write file
with open(out_file, mode=mode) as OUT:
OUT.write(info_str + '\n')
def get_acc_chunks(id_list_file, max_in_chunk):
# group specified number of accession ids in a chunk
# return a list of chunks and total id count
# regex to skip some lines
comment_ptn = re.compile('^#')
blank_ptn = re.compile(r'^\s*$')
chunks = []
chunk = []
total_id_count = 0
with open(id_list_file, mode='r') as ACC:
id_count_in_a_chunk = 0
for l in ACC:
l = l.rstrip() # remove annoying EOL
if (comment_ptn.match(l)) or (blank_ptn.match(l)):
continue # skip those lines
else:
id_count_in_a_chunk += 1
if id_count_in_a_chunk <= max_in_chunk:
chunk.append(l)
else:
# dump a chunk
chunks.append(chunk)
total_id_count += len(chunk)
# start a new chunk
chunk = [l]
id_count_in_a_chunk = 1
# append the last chunk
chunks.append(chunk)
total_id_count += len(chunk)
return chunks, total_id_count
def when_exeption_happened(flag, e, acc_chunk, display_chunk_num):
ts = timestamp()
if flag == 1: # error in efetch
where = 'efetch'
elif flag == 2: # error in SeqIO
where = 'SeqIO'
# report on STDOUT
print(" --- Error in " + where + ". See log file. @" + ts)
print(str(type(e)))
print(e.args)
if flag == 1:
print("Will be continued after 1 min.")
print()
# report to log
log_lines = [
'[Error in ' + where + '] chunk:' + display_chunk_num + '@' + ts,
str(type(e)),
str(e.args), # if any
'# accession ids in the chunk'
]
# append accession ids in the chunk
for acc in acc_chunk:
log_lines.append(acc)
log_lines.append('\n')
write_file(log_file, 'a', '\n'.join(log_lines))
def main():
print('program started: ', timestamp())
# mark start time
dt_start = datetime.datetime.now()
# write log header
log_lines = [
'# processed by: ' + sys.argv[0],
'# process started: ' + timestamp(),
'# accession id file: ' + acc_id_file,
'# db=' + db,
'# retrype=' + rettype,
'# retmode=' + retmode,
'# id count per query: ' + str(count_in_query) + '\n'
]
write_file(log_file, 'w', '\n'.join(log_lines)) # create a new file
# get accession id chunks from accession id file
acc_chunks, total_id_count = get_acc_chunks(acc_id_file, count_in_query)
# declare who you are
Entrez.email = email
Entrez.api_key = api_key
#Entrez.tool is 'biopython' by default
# some buckets to monitor the processes
chunk_num = 0
done_id_count = 0
error_id_count = 0
error_chunk_count = 0
# loop
for acc_chunk in acc_chunks: # acc_chunk is a list of ids
chunk_num += 1
display_chunk_num = str(chunk_num).zfill(len(str(len(acc_chunks))))
print(acc_id_file_stem + ': chunk:' + display_chunk_num)
# error capture 1: efetch
try:
net_handle = Entrez.efetch(db=db, id=acc_chunk, rettype=rettype, retmode=retmode)
except Exception as e:
when_exeption_happened(1, e, acc_chunk, display_chunk_num)
# counts
error_chunk_count += 1
error_id_count += len(acc_chunk)
# wait 1 min
time.sleep(60)
continue
else:
pass # I do not want nested try-except. Pass the case to the next try-exept.
# error capture 2: broken records?
# Note: net_handle is still open because I need it here.
try:
# if record is broken, it will be trapped in the next line.
for record in SeqIO.parse(net_handle, fmt):
# write records into a file
out_file = './' + record.id + ext
SeqIO.write(record, out_file, fmt)
except Exception as e:
when_exeption_happened(2, e, acc_chunk, display_chunk_num)
# counts
error_chunk_count += 1
error_id_count += len(acc_chunk)
else:
# report on STDOUT
dt_now = datetime.datetime.now()
et = elapsed_time(dt_start, dt_now)
print(' --- Done. ET:' + et)
print()
# counts
done_id_count += len(acc_chunk)
finally:
net_handle.close()
time.sleep(1) # try longer sleep if you suspect you are kicked out by the server.
# post loop works
ts = timestamp()
dt_now = datetime.datetime.now()
et = elapsed_time(dt_start, dt_now)
# log process summary
log_lines = [
'# === SUMMARY ===',
'# total accession id count: ' + str(total_id_count),
'# chunk created: ' + str(len(acc_chunks)),
'# error chunks: ' + str(error_chunk_count),
'# done id count: ' + str(done_id_count),
'# error id count: ' + str(error_id_count),
'# process finished: ' + ts,
'# total elapsed time: ' + et
]
write_file(log_file, 'a', '\n'.join(log_lines))
# STDOUT
print('Process finished.')
print('\n'.join(log_lines))
if __name__ == '__main__':
# get accession id file path
acc_id_file = sys.argv[1]
# vars for efetch
db = 'nuccore' # 'nucleotide' will work as well
rettype = 'gb' # use 'fasta', 'gbwithparts' for fasta, GanBank(full)
retmode = 'text' # fixed in this code
count_in_query = 20 # count in one query
email = 'your@email.com' # use yours
api_key = 'your_api_key_string' # use yours
# vars for SeqIO
# format name adjustment and file extension
if rettype in ['gb', 'gbwithparts']:
fmt = 'gb'
ext = '.gb'
elif rettype == 'fasta':
fmt = 'fasta'
ext = '.fa'
else:
print("rettype should be 'gb', 'gbwithparts', or 'fasta'")
sys.exit()
# log file path construction
acc_id_file_stem = os.path.splitext(os.path.basename(acc_id_file))[0]
log_file = './' + acc_id_file_stem + '_download_log.txt'
# run!
main()
ログを出したりerror captureしたりしているせいで、長ったらしいコードになってしまいました。
count_in_query =
で指定した数のaccession idが一回のクエリに入ります。あとは、acc_id_file
の中にいくつidがあろうともその数ずつコツコツとクエリを送ってはレコードをファイルに書き出してくれます(1ファイル1レコード)。通信でエラー、あるいはレコードが壊れている時にはエラー情報とともに該当するaccession idがログに書き込まれます。やり直しの際に使えます。
実際数万のレコードを取るときには、accession idファイルをsplit
コマンドで1000行ずつに分割(acc_00、acc_01、、、のようなファイルに小分け)して、bash scriptで順に走らせました。こんな感じです。
for acc_file in 'path_to/acc_*
do
python <code> ${acc_file}
done
これでacc_*
の部分をacc_0*
などとすれば`00から09までだけを走らせたりできるわけです。USのbusy timeを避けてほしい、というようなことがどこかにかかれていました。私は(GMT+08)のところに住んでいますので、自分の時間帯の朝(向こうが仕事から帰る)から夕方(向こうが起き始める)までに様子をみながら走らせるようにしました。週末にも走らせましたが、この目安を守っている人が多いからなのか、通信に時間がかかりました。たまたまかもしれませんが。
普段使いちょい取り版
配列レコードを大量に取得する機会はそんなにないかもしれません。数個を取ってくるならログなんていらないでしょうし、エラーがあるならターミナル上へ流れればそれでいい。確かにそうです。ということで、普段使い簡単版を下にのせます。経過時間なんていらへんし、という場合にはさらにすっきりさせられます。(が、どれだけかかったのかな、ってのは案外知りたくなるもんです)
ひとつめのargumentにrettypeを与えてください。'gb'、'gbwithparts'、'fasta'のいづれかが使えます。そのあと、accession idをスペースで区切りながら書いてください(カンマを勢いで入れないでください。単純にsys.argv[2:]
で拾っているだけなので)。gbとfasta両方ほしい、という方は、ひとまずgbをとってあとでSeqIOでfasta版を書きだしましょう。もう一度取り直すよりよっぽど早くできます。
import sys
from Bio import Entrez
from Bio import SeqIO
import datetime
def timestamp():
# returns current time stamp
dt = datetime.datetime.now()
ts = dt.strftime('%d%b%Y-%H:%M:%S')
return ts
def elapsed_time(dt_start, dt_end):
# calculate delta between start and end
ds = (dt_end - dt_start).total_seconds()
h = int(ds // 3600)
m = int((ds % 3600) // 60)
s = int(ds % 60)
# construct str line
str_line = str(h).zfill(2) + 'h ' + str(m).zfill(2) + 'm ' + str(s).zfill(2) + 's'
return str_line
def main():
print('process started: ', timestamp())
# mark start time
dt_start = datetime.datetime.now()
# declare who you are
Entrez.email = email
Entrez.api_key = api_key
# efetch!
net_handle = Entrez.efetch(db=db, id=acc_ids, rettype=rettype, retmode=retmode)
# write out files
for record in SeqIO.parse(net_handle, fmt):
out_file = record.id + ext
SeqIO.write(record, out_file, fmt)
print(out_file, 'created.')
# do not forget to close the handle
net_handle.close()
# STDOUT
print()
print('process finished: ', timestamp())
et = elapsed_time(dt_start, datetime.datetime.now())
print('elapsed time: ', et)
if __name__ == '__main__':
# get rettype
rettype = sys.argv[1] # choices: 'gb', 'gbwithparts', 'fasta'
# get acc ids
acc_ids = sys.argv[2:]
# vars for efetch
db = 'nuccore' # 'nucleotide' will work as well
retmode = 'text' # fixed in this code
email = 'your@email.com' # use yours
api_key = 'your_api_key_string' # use yours
# vars for SeqIO
# format name adjustment and file extension
if rettype in ['gb', 'gbwithparts']:
fmt = 'gb'
ext = '.gb'
elif rettype == 'fasta':
fmt = 'fasta'
ext = '.fa'
else:
print("rettype should be 'gb', 'gbwithparts', or 'fasta'")
sys.exit()
# run!
main()
最後に
今回は核酸配列しか扱いませんでしたが、E-utilityには文献検索はじめ、たくさんの便利が詰まっています。たいていのものはwebから利用したほうがわかりやすいので、わざわざプログラムからアクセスする必要はないかもしれませんが、少しだけ違ってほぼ同じことの繰り返し、が作業に入り始めたら考える時到来です。