GATKのワークフローgatk4-data-processing-1.0.1/processing-for-variant-discovery-gatk4.wdlをcromwellを利用して実行する
cromwell を手元のOS XにインストールするでWDLの実行環境が構築できたのでこれを利用してGATKのワークフローを実行する
GATK4のワークフローgatk4-data-processingを取得
hg37の設定とhg38の設定があるが、ここではhg38を利用することにして、下記のファイルを用いる
gatk4-data-processing-1.0.1/processing-for-variant-discovery-gatk4.hg38.wgs.inputs.json
ポイント
- gs:// から始まるファイルを全てgoogle storageから取得
-
gatk-test-data/wgs_ubam/NA12878_24RG/NA12878_24RG_small.txt
ファイルを取得するとさらにgs://から始まるファイルの一覧が得られるので、そのファイルも取得しておく -
gatk4-data-processing-1.0.1/processing-for-variant-discovery-gatk4.hg38.wgs.inputs.json
ファイルの内容を実行環境にあわせて変更する- gs://部分を削除
- sed や適切なエディターで文字列を置き換え
- マルチスレッド部分のCPUに関する設定をシステムにあわせて変更
- "PreProcessingForVariantDiscovery_GATK4.bwa_commandline": "bwa mem -K 100000000 -p -v 3 -t 16 -Y $bash_ref_fasta", の -t 16 を -t 2
- "PreProcessingForVariantDiscovery_GATK4.SamToFastqAndBwaMem.num_cpu": "16", の "16" を "2"
- gs://部分を削除
gs:// から始まるファイルを全てgoogle storageから取得
必要なファイルを取得するためのユーティリティー
Google Storageからのファイルを大量に取得する作業が煩雑なので、そのための支援ツールを提供する
# !/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import argparse
import subprocess
import shlex
from collections import OrderedDict
import json
__author__ = 'makoto'
__date__ = '2019-01-23'
__date_of_last_modification__ = '2019-02-05'
__version__ = '1.1.3'
def submit_command(command):
popen = subprocess.Popen(
shlex.split(command),
stdout=subprocess.PIPE,
env={'PATH': os.getenv('PATH')})
output = popen.communicate()[0]
if output:
print(output)
return output.decode('utf-8')
def copy_gsfile(gs_location, local_location=None):
"""
Copy gs file to local location.
:param gs_location: The source location of the target file.
:param local_location: Where should we put the downloaded file.
:return:
"""
if not gs_location.startswith('gs:'):
raise SyntaxError
if not local_location:
local_location = gs_location[5:] # remove "gs://"
# Check if the local file already exists.
if os.path.exists(local_location):
cmd_ls = f'gsutil ls -al {gs_location}'
# Parse file size
output = submit_command(cmd_ls)
file_size = output.split()[0]
if os.path.getsize(local_location) == int(file_size):
return
cmd = f'gsutil cp {gs_location} {local_location}'
print(cmd)
output = submit_command(cmd)
return output.decode('utf-8')
def check_gs(item):
if isinstance(item, list):
for item2 in item:
check_gs(item2)
return
if isinstance(item, str) and item.startswith('gs:'):
gs_location = item
local_location = copy_gsfile(gs_location)
return local_location
def parse_json(json_file_name):
with open(json_file_name) as file_handle:
data = json.load(file_handle, object_pairs_hook=OrderedDict)
for item in data:
check_gs(data[item])
def parse_file(file_name):
with open(file_name) as file_handle:
for item in file_handle:
for item2 in item.split():
if item2.startswith('gs:'):
copy_gsfile(item2)
def set_arguments():
parser = argparse.ArgumentParser(description="Parse WDL's input file.")
parser.add_argument('file_name', help='file name for json')
parser.add_argument('--non_json', action='store_true', help='file is not a json')
return parser.parse_args()
if __name__ == '__main__':
args = set_arguments()
file_name = args.file_name
if args.non_json:
parse_file(file_name)
else:
parse_json(file_name)
使い方
gatk4-data-processing-1.0.1/processing-for-variant-discovery-gatk4.hg38.wgs.inputs.json
のgs://で定義されるファイルを取得
カレントディレクトリ以下にディレクトリ構造そのままで出力する
$ python3 download_gs.py gatk4-data-processing-1.0.1/processing-for-variant-discovery-gatk4.hg38.wgs.inputs.json
上記で取得したgatk-test-data/wgs_ubam/NA12878_24RG/NA12878_24RG_small.txt
に含まれるファイルを全て取得する
$ python3 download_gs.py gatk-test-data/wgs_ubam/NA12878_24RG/NA12878_24RG_small.txt --non_json
gatk4-data-processing-1.0.1/processing-for-variant-discovery-gatk4.hg38.wgs.inputs.json ファイルの内容を実行環境にあわせて変更する
- gs://部分を削除
- sed や適切なエディターで文字列を置き換え
- マルチスレッド部分のCPUに関する設定をシステムにあわせて変更
- "PreProcessingForVariantDiscovery_GATK4.bwa_commandline": "bwa mem -K 100000000 -p -v 3 -t 16 -Y $bash_ref_fasta", の -t 16 を -t 2
- "PreProcessingForVariantDiscovery_GATK4.SamToFastqAndBwaMem.num_cpu": "16", の "16" を "2"
上記の変更を加えたファイルを gatk4-data-processing-1.0.1/processing-for-variant-discovery-gatk4.hg38.wgs.inputs_local.json
として保存
Jobの実行
Jobが分割実行される場合に、同時に実行可能なJobの上限を設定してcromwellを実行する
-Dbackend.providers.Local.config.concurrent-job-limit=2
コマンドラインは
java -Dbackend.providers.Local.config.concurrent-job-limit=2 -jar cromwell.jar WDLファイル -i jobの指定を行う入力ファイルのJSON
となる
今回の例では下記のような設定
$ java -Dbackend.providers.Local.config.concurrent-job-limit=2 -jar cromwell.jar run gatk4-data-processing-1.0.1/processing-for-variant-discovery-gatk4.wdl -i gatk4-data-processing-1.0.1/processing-for-variant-discovery-gatk4.hg38.wgs.inputs_local.json
今回はここまで