LoginSignup
3

More than 5 years have passed since last update.

cromwell でGATKのワークフローgatk4-data-processing-1.0.1/processing-for-variant-discovery-gatk4.wdlを実行する

Last updated at Posted at 2019-01-31

GATKのワークフローgatk4-data-processing-1.0.1/processing-for-variant-discovery-gatk4.wdlをcromwellを利用して実行する

cromwell を手元のOS XにインストールするでWDLの実行環境が構築できたのでこれを利用してGATKのワークフローを実行する

GATK4のワークフローgatk4-data-processingを取得

hg37の設定とhg38の設定があるが、ここではhg38を利用することにして、下記のファイルを用いる
gatk4-data-processing-1.0.1/processing-for-variant-discovery-gatk4.hg38.wgs.inputs.json

ポイント

  • gs:// から始まるファイルを全てgoogle storageから取得
  • gatk-test-data/wgs_ubam/NA12878_24RG/NA12878_24RG_small.txt ファイルを取得するとさらにgs://から始まるファイルの一覧が得られるので、そのファイルも取得しておく
  • gatk4-data-processing-1.0.1/processing-for-variant-discovery-gatk4.hg38.wgs.inputs.json ファイルの内容を実行環境にあわせて変更する
    • gs://部分を削除
      • sed や適切なエディターで文字列を置き換え
    • マルチスレッド部分のCPUに関する設定をシステムにあわせて変更
      • "PreProcessingForVariantDiscovery_GATK4.bwa_commandline": "bwa mem -K 100000000 -p -v 3 -t 16 -Y $bash_ref_fasta", の -t 16 を -t 2
      • "PreProcessingForVariantDiscovery_GATK4.SamToFastqAndBwaMem.num_cpu": "16", の "16" を "2"

gs:// から始まるファイルを全てgoogle storageから取得

必要なファイルを取得するためのユーティリティー

Google Storageからのファイルを大量に取得する作業が煩雑なので、そのための支援ツールを提供する

download_gs.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import argparse
import subprocess
import shlex
from collections import OrderedDict
import json

__author__ = 'makoto'
__date__ = '2019-01-23'
__date_of_last_modification__ = '2019-02-05'
__version__ = '1.1.3'


def submit_command(command):
    popen = subprocess.Popen(
        shlex.split(command),
        stdout=subprocess.PIPE,
        env={'PATH': os.getenv('PATH')})
    output = popen.communicate()[0]
    if output:
        print(output)
    return output.decode('utf-8')


def copy_gsfile(gs_location, local_location=None):
    """
    Copy gs file to local location.
    :param gs_location: The source location of the target file.
    :param local_location: Where should we put the downloaded file.
    :return:
    """
    if not gs_location.startswith('gs:'):
        raise SyntaxError
    if not local_location:
        local_location = gs_location[5:]  # remove "gs://"

    # Check if the local file already exists.
    if os.path.exists(local_location):
        cmd_ls = f'gsutil ls -al {gs_location}'
        # Parse file size
        output = submit_command(cmd_ls)
        file_size = output.split()[0]
        if os.path.getsize(local_location) == int(file_size):
            return
    cmd = f'gsutil cp {gs_location} {local_location}'
    print(cmd)
    output = submit_command(cmd)
    return output.decode('utf-8')


def check_gs(item):
    if isinstance(item, list):
        for item2 in item:
            check_gs(item2)
        return
    if isinstance(item, str) and item.startswith('gs:'):
        gs_location = item
        local_location = copy_gsfile(gs_location)
        return local_location


def parse_json(json_file_name):
    with open(json_file_name) as file_handle:
        data = json.load(file_handle, object_pairs_hook=OrderedDict)

    for item in data:
        check_gs(data[item])


def parse_file(file_name):
    with open(file_name) as file_handle:
        for item in file_handle:
            for item2 in item.split():
                if item2.startswith('gs:'):
                    copy_gsfile(item2)


def set_arguments():
    parser = argparse.ArgumentParser(description="Parse WDL's input file.")
    parser.add_argument('file_name', help='file name for json')
    parser.add_argument('--non_json', action='store_true', help='file is not a json')
    return parser.parse_args()


if __name__ == '__main__':
    args = set_arguments()
    file_name = args.file_name

    if args.non_json:
        parse_file(file_name)
    else:
        parse_json(file_name)

使い方
gatk4-data-processing-1.0.1/processing-for-variant-discovery-gatk4.hg38.wgs.inputs.json のgs://で定義されるファイルを取得

カレントディレクトリ以下にディレクトリ構造そのままで出力する

$ python3 download_gs.py gatk4-data-processing-1.0.1/processing-for-variant-discovery-gatk4.hg38.wgs.inputs.json

上記で取得したgatk-test-data/wgs_ubam/NA12878_24RG/NA12878_24RG_small.txtに含まれるファイルを全て取得する

$ python3 download_gs.py gatk-test-data/wgs_ubam/NA12878_24RG/NA12878_24RG_small.txt --non_json

gatk4-data-processing-1.0.1/processing-for-variant-discovery-gatk4.hg38.wgs.inputs.json ファイルの内容を実行環境にあわせて変更する

  • gs://部分を削除
    • sed や適切なエディターで文字列を置き換え
  • マルチスレッド部分のCPUに関する設定をシステムにあわせて変更
    • "PreProcessingForVariantDiscovery_GATK4.bwa_commandline": "bwa mem -K 100000000 -p -v 3 -t 16 -Y $bash_ref_fasta", の -t 16 を -t 2
    • "PreProcessingForVariantDiscovery_GATK4.SamToFastqAndBwaMem.num_cpu": "16", の "16" を "2"

上記の変更を加えたファイルを gatk4-data-processing-1.0.1/processing-for-variant-discovery-gatk4.hg38.wgs.inputs_local.json として保存

Jobの実行

Jobが分割実行される場合に、同時に実行可能なJobの上限を設定してcromwellを実行する
-Dbackend.providers.Local.config.concurrent-job-limit=2

コマンドラインは
java -Dbackend.providers.Local.config.concurrent-job-limit=2 -jar cromwell.jar WDLファイル -i jobの指定を行う入力ファイルのJSONとなる

今回の例では下記のような設定

$ java -Dbackend.providers.Local.config.concurrent-job-limit=2 -jar cromwell.jar run gatk4-data-processing-1.0.1/processing-for-variant-discovery-gatk4.wdl -i gatk4-data-processing-1.0.1/processing-for-variant-discovery-gatk4.hg38.wgs.inputs_local.json

今回はここまで:smile:

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3