超算上使用Parabricks

功能概述

自动识别样品信息,批量运行Parabricks.

Python脚本

可以命名为05.run_parabricks.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""
Parabricks GPU加速全基因组重测序批量分析脚本 (Python版本) - 正确GVCF输出版
作者: Xiang LI
日期: 2024-05-21
描述: 使用NVIDIA Parabricks进行GPU加速的全基因组重测序变异检测流程
说明: 根据HPC文档,不手动指定GPU参数,让pbrun从Slurm环境变量自动继承。
"""

import os
import sys
import glob
import argparse
import logging
import subprocess
import json
import concurrent.futures
from pathlib import Path
from typing import List, Dict, Tuple, Optional, Set
from dataclasses import dataclass, field
from datetime import datetime


@dataclass
class Sample:
"""样本信息数据类"""
name: str
fastq1: Path
fastq2: Path
output_dir: Path = field(default_factory=Path)

def __post_init__(self):
self.fastq1 = Path(self.fastq1)
self.fastq2 = Path(self.fastq2)
if not self.output_dir:
self.output_dir = Path(self.name)


@dataclass
class Config:
"""配置信息数据类,所有参数都有默认值"""
container: Path = Path("/share/apps/containers/parabricks.sif")
input_dir: Path = Path("./input")
output_dir: Path = Path("./output")
ref_genome: Path = Path("/path/to/your/reference/genome.fa")
known_sites: Optional[Path] = None
workflow: str = "fq2bam_and_call"
num_threads: int = 32
max_parallel: int = 1
fastq_pattern: str = ""
sample_list: Optional[Path] = None
gvcf: bool = True # 新增:控制是否生成GVCF的开关,默认为是
verbose: bool = False
dry_run: bool = False
overwrite: bool = False


class ParabricksAnalyzer:
"""Parabricks分析器主类"""

def __init__(self, config: Config):
self.config = config
self.setup_logging()

def setup_logging(self):
level = logging.DEBUG if self.config.verbose else logging.INFO
logging.basicConfig(level=level, format='[%(asctime)s] %(levelname)s: %(message)s', datefmt='%Y-%m-%d %H:%M:%S')
self.logger = logging.getLogger(__name__)

def check_requirements(self):
self.logger.info("检查运行环境...")
try:
result = subprocess.run(['nvidia-smi', '-L'], capture_output=True, text=True, check=True)
self.logger.info(f"信息:在当前作业环境中检测到以下GPU:\n{result.stdout.strip()}")
except Exception as e:
self.logger.warning(f"无法运行 nvidia-smi 进行信息检查: {e}")

required_files = [self.config.container, self.config.ref_genome]
for file_path in required_files:
if not file_path.is_file():
if file_path == self.config.ref_genome and str(file_path) == "/path/to/your/reference/genome.fa":
raise FileNotFoundError(f"请通过 --ref-genome 参数或配置文件指定一个有效的参考基因组路径。")
raise FileNotFoundError(f"必要文件不存在: {file_path}")

self.container_cmd = ""
for cmd in ['apptainer', 'singularity']:
if subprocess.run(['which', cmd], capture_output=True).returncode == 0:
self.container_cmd = cmd
break
if not self.container_cmd:
raise RuntimeError("未找到Apptainer或Singularity")
self.logger.info(f"使用容器引擎: {self.container_cmd}")

self.config.output_dir.mkdir(parents=True, exist_ok=True)

# ... find_samples_* 和 load_samples_from_file 方法保持不变 ...
def find_samples_with_pattern(self, pattern: str) -> List[Sample]:
if "*" not in pattern:
raise ValueError("文件模式必须包含 * 作为样本名占位符")
prefix, suffix = pattern.split("*", 1)
read_indicators = [("R1", "R2"), ("_1", "_2"), (".1", ".2")]
r1_id, r2_id = None, None
for r1, r2 in read_indicators:
if r1 in suffix:
r1_id, r2_id = r1, r2
break
if not r1_id:
raise ValueError(f"无法从模式 '{pattern}' 的后缀 '{suffix}' 中识别read标识符 (R1/R2, _1/_2, .1/.2)")

samples = []
for fq1_path in glob.glob(str(self.config.input_dir / f"{prefix}*{suffix}")):
fq1_path = Path(fq1_path)
fq2_path = Path(str(fq1_path).replace(r1_id, r2_id))
if fq2_path.exists():
basename = fq1_path.name
sample_name = basename[len(prefix) : -len(suffix)]
if not sample_name:
sample_name = fq1_path.stem.replace(r1_id, "")
samples.append(Sample(name=sample_name, fastq1=fq1_path, fastq2=fq2_path))
else:
self.logger.warning(f"找不到对应的R2文件: {fq2_path}")
return samples

def find_samples_auto(self) -> List[Sample]:
patterns = [("_R1.fq.gz", "_R2.fq.gz"), ("_R1.fastq.gz", "_R2.fastq.gz"), ("_1.fq.gz", "_2.fq.gz"), ("_1.fastq.gz", "_2.fastq.gz")]
for r1_suffix, r2_suffix in patterns:
samples = []
for fq1_path in glob.glob(str(self.config.input_dir / f"*{r1_suffix}")):
fq1_path = Path(fq1_path)
fq2_path = fq1_path.with_name(fq1_path.name[:-len(r1_suffix)] + r2_suffix)
if fq2_path.exists():
sample_name = fq1_path.name[:-len(r1_suffix)]
samples.append(Sample(name=sample_name, fastq1=fq1_path, fastq2=fq2_path))
if samples:
self.logger.info(f"自动检测到模式: *{r1_suffix}")
return samples
return []

def load_samples_from_file(self, sample_file: Path) -> List[Sample]:
samples = []
with sample_file.open('r') as f:
for line_num, line in enumerate(f, 1):
line = line.strip()
if not line or line.startswith('#'): continue
parts = line.split('\t')
if len(parts) >= 3:
samples.append(Sample(name=parts[0], fastq1=Path(parts[1]), fastq2=Path(parts[2])))
else:
self.logger.warning(f"第{line_num}行格式错误,跳过: {line}")
return samples

def discover_samples(self) -> List[Sample]:
self.logger.info("开始样本发现...")
samples = []
if self.config.sample_list and self.config.sample_list.is_file():
self.logger.info(f"从样本列表文件加载: {self.config.sample_list}")
samples = self.load_samples_from_file(self.config.sample_list)
elif self.config.fastq_pattern:
self.logger.info(f"使用指定的文件模式: {self.config.fastq_pattern}")
samples = self.find_samples_with_pattern(self.config.fastq_pattern)
else:
self.logger.info("自动检测FASTQ文件...")
samples = self.find_samples_auto()

self.logger.info(f"发现 {len(samples)} 个样本")
for sample in samples:
self.logger.info(f"样本: {sample.name}")
self.logger.debug(f" R1: {sample.fastq1}\n R2: {sample.fastq2}")
return samples

def run_apptainer(self, cmd: List[str]) -> subprocess.CompletedProcess:
paths_to_bind: Set[Path] = {self.config.input_dir.resolve(), self.config.output_dir.resolve(), self.config.ref_genome.parent.resolve()}
if self.config.known_sites and self.config.known_sites.exists():
paths_to_bind.add(self.config.known_sites.parent.resolve())
bind_args = [f"--bind={path}:{path}" for path in sorted(list(paths_to_bind))]
apptainer_cmd = [self.container_cmd, "exec", "--nv", *bind_args, str(self.config.container)] + [str(c) for c in cmd]

self.logger.debug(f"执行命令: {' '.join(apptainer_cmd)}")
if self.config.dry_run:
self.logger.info(f"DRY RUN: {' '.join(apptainer_cmd)}")
return subprocess.CompletedProcess(apptainer_cmd, 0, "", "")

try:
return subprocess.run(apptainer_cmd, capture_output=True, text=True, check=True)
except subprocess.CalledProcessError as e:
self.logger.error(f"命令执行失败: {' '.join(e.cmd)}")
self.logger.error(f"返回码: {e.returncode}")
self.logger.error(f"标准输出:\n{e.stdout}")
self.logger.error(f"标准错误:\n{e.stderr}")
raise

def workflow_fq2bam_only(self, sample: Sample, sample_output_dir: Path):
self.logger.info(f"[{sample.name}] 执行工作流程: fq2bam (FASTQ to deduped BAM)")
output_bam = sample_output_dir / f"{sample.name}.bam"

cmd = [
"pbrun", "fq2bam",
"--ref", self.config.ref_genome,
"--in-fq", sample.fastq1, sample.fastq2,
"--out-bam", output_bam,
]
self.run_apptainer(cmd)

def workflow_call_only(self, sample: Sample, sample_output_dir: Path):
input_bam = sample_output_dir / f"{sample.name}.bam"
if not input_bam.exists() and not self.config.dry_run:
raise FileNotFoundError(f"输入BAM文件不存在: {input_bam}")

# 根据 self.config.gvcf 决定输出模式
if self.config.gvcf:
self.logger.info(f"[{sample.name}] 执行工作流程: haplotypecaller (BAM to GVCF)")
output_file = sample_output_dir / f"{sample.name}.g.vcf.gz"
cmd = [
"pbrun", "haplotypecaller",
"--ref", self.config.ref_genome,
"--in-bam", input_bam,
"--out-variants", output_file,
"--gvcf"
]
else:
self.logger.info(f"[{sample.name}] 执行工作流程: haplotypecaller (BAM to VCF)")
output_file = sample_output_dir / f"{sample.name}.vcf.gz"
cmd = [
"pbrun", "haplotypecaller",
"--ref", self.config.ref_genome,
"--in-bam", input_bam,
"--out-variants", output_file
]

self.run_apptainer(cmd)

def workflow_fq2bam_and_call(self, sample: Sample, sample_output_dir: Path):
self.logger.info(f"[{sample.name}] 执行联合工作流程: fq2bam + haplotypecaller")

self.logger.info(f"[{sample.name}] 步骤 1: 运行 fq2bam")
output_bam = sample_output_dir / f"{sample.name}.bam"

if not self.config.overwrite and output_bam.exists() and output_bam.stat().st_size > 0:
self.logger.info(f"[{sample.name}] 步骤 1 的产物 {output_bam.name} 已存在,跳过 fq2bam。")
else:
self.workflow_fq2bam_only(sample, sample_output_dir)

self.logger.info(f"[{sample.name}] 步骤 2: 运行 haplotypecaller")
self.workflow_call_only(sample, sample_output_dir)

def process_single_sample(self, sample: Sample) -> bool:
sample_output_dir = self.config.output_dir / sample.name

final_output = None
if self.config.workflow == "fq2bam_only":
final_output = sample_output_dir / f"{sample.name}.bam"
elif self.config.workflow in ["fq2bam_and_call", "call_only"]:
# 根据 gvcf 开关决定检查哪个文件
if self.config.gvcf:
final_output = sample_output_dir / f"{sample.name}.g.vcf.gz"
else:
final_output = sample_output_dir / f"{sample.name}.vcf.gz"

if not self.config.overwrite and final_output and final_output.exists() and final_output.stat().st_size > 0:
self.logger.info(f"样本 {sample.name} 的最终产物 {final_output.name} 已存在,跳过。")
return True
try:
self.logger.info(f"开始处理样本: {sample.name}")
if not sample.fastq1.exists() or not sample.fastq2.exists():
raise FileNotFoundError(f"FASTQ文件不存在: {sample.fastq1}{sample.fastq2}")
sample_output_dir.mkdir(parents=True, exist_ok=True)
workflow_map = {"fq2bam_only": self.workflow_fq2bam_only, "call_only": self.workflow_call_only, "fq2bam_and_call": self.workflow_fq2bam_and_call}
if self.config.workflow not in workflow_map:
raise ValueError(f"未知或未实现的工作流程: {self.config.workflow}")
workflow_func = workflow_map[self.config.workflow]
workflow_func(sample, sample_output_dir)
self.logger.info(f"样本 {sample.name} 处理完成")
return True
except Exception as e:
self.logger.error(f"样本 {sample.name} 处理失败: {e}", exc_info=self.config.verbose)
return False

def run(self):
try:
self.check_requirements()
self.logger.info("检查Parabricks版本...")
self.run_apptainer(["pbrun", "version"])
samples = self.discover_samples()
if not samples:
self.logger.error("未发现任何样本,请检查输入目录或文件模式。")
return False
self.logger.info(f"开始批量处理 {len(samples)} 个样本")
self.logger.info(f"工作流程: {self.config.workflow}, 最大并行数: {self.config.max_parallel}, GVCF模式: {'开启' if self.config.gvcf else '关闭'}")

if self.config.max_parallel > 1:
self.logger.warning(f"最大并行数 > 1 ({self.config.max_parallel})。请确保您的作业调度系统为每个并行任务正确分配了独立的GPU资源,否则可能导致冲突。")
if self.config.max_parallel > 1:
with concurrent.futures.ThreadPoolExecutor(max_workers=self.config.max_parallel) as executor:
futures = {executor.submit(self.process_single_sample, sample): sample for sample in samples}
for future in concurrent.futures.as_completed(futures):
sample = futures[future]
try:
if not future.result():
self.logger.error(f"样本 {sample.name} 处理线程报告失败。")
except Exception as exc:
self.logger.error(f'样本 {sample.name} 在执行中产生异常: {exc}')
else:
for sample in samples:
self.process_single_sample(sample)
self.logger.info("所有样本分析完成!")
return True
except Exception as e:
self.logger.error(f"分析流程遇到致命错误: {e}", exc_info=self.config.verbose)
return False

def main():
parser = argparse.ArgumentParser(description="Parabricks GPU加速全基因组重测序批量分析脚本", formatter_class=argparse.RawTextHelpFormatter)
defaults = Config()

path_args = parser.add_argument_group('路径参数')
path_args.add_argument("--container", type=Path, default=None, help=f"Parabricks容器路径 (默认: {defaults.container})")
path_args.add_argument("--input-dir", type=Path, default=None, help=f"输入FASTQ文件目录 (默认: {defaults.input_dir})")
path_args.add_argument("--output-dir", type=Path, default=None, help=f"输出结果目录 (默认: {defaults.output_dir})")
path_args.add_argument("--ref-genome", type=Path, default=None, help=f"参考基因组路径 (必需, 默认: {defaults.ref_genome})")
path_args.add_argument("--known-sites", type=Path, default=None, help=f"已知变异位点文件路径 (默认: 无)")

proc_args = parser.add_argument_group('处理参数')
proc_args.add_argument("--workflow", choices=["fq2bam_only", "call_only", "fq2bam_and_call"], default=None, help=f"工作流程类型 (默认: {defaults.workflow})")
proc_args.add_argument("--num-threads", type=int, default=None, help=f"每个任务使用的CPU线程数量 (默认: {defaults.num_threads})")
proc_args.add_argument("--max-parallel", type=int, default=None, help=f"最大并行处理样本数 (默认: {defaults.max_parallel})")
# 添加 --no-gvcf 开关,逻辑反转
proc_args.add_argument("--no-gvcf", action="store_false", dest="gvcf", default=None, help="仅输出VCF,不输出GVCF (默认生成GVCF)")

sample_args = parser.add_argument_group('样本发现参数')
sample_args.add_argument("--fastq-pattern", type=str, default=None, help="FASTQ文件模式,例: '*_R1.fq.gz' (默认: 自动检测)")
sample_args.add_argument("--sample-list", type=Path, default=None, help="样本信息文件路径 (TSV格式, 默认: 无)")

control_args = parser.add_argument_group('控制参数')
control_args.add_argument("--config", type=Path, help="配置文件路径 (JSON格式)")
control_args.add_argument("--verbose", "-v", action="store_true", default=None, help=f"启用详细日志输出 (默认: {defaults.verbose})")
control_args.add_argument("--dry-run", action="store_true", default=None, help=f"仅显示命令,不运行 (默认: {defaults.dry_run})")
control_args.add_argument("--overwrite", action="store_true", default=None, help=f"强制重新运行已完成的样本 (默认: {defaults.overwrite})")

args = parser.parse_args()
config = Config()

if args.config and args.config.is_file():
with args.config.open('r') as f:
config_data = json.load(f)
for key, value in config_data.items():
if hasattr(config, key):
if isinstance(getattr(config, key), (Path, Optional[Path])):
setattr(config, key, Path(value) if value is not None else None)
else:
setattr(config, key, value)

# 移除已废弃的 num_gpus
if 'num_gpus' in args:
delattr(args, 'num_gpus')

for key, value in vars(args).items():
if value is not None and hasattr(config, key):
setattr(config, key, value)

try:
analyzer = ParabricksAnalyzer(config)
success = analyzer.run()
sys.exit(0 if success else 1)
except Exception as e:
logging.error(f"脚本启动失败: {e}", exc_info=True)
sys.exit(1)

if __name__ == "__main__":
main()

执行脚本

可以命名为06.run_parabricks.sh

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
#!/bin/bash

echo "========================================================"
echo "诊断开始:检查主机环境"
echo "主机上的 CUDA_VISIBLE_DEVICES: $CUDA_VISIBLE_DEVICES"
echo "主机上的 nvidia-smi 输出:"
nvidia-smi
echo "========================================================"

echo ""

echo "========================================================"
echo "诊断核心:在 Apptainer 容器内检查环境"
APPTAINER_IMAGE="/share/apps/containers/parabricks.sif" # 替换成您的容器路径

apptainer exec --nv "$APPTAINER_IMAGE" bash -c "
echo '--- 进入容器内部 ---';
echo '容器内的 CUDA_VISIBLE_DEVICES: \$CUDA_VISIBLE_DEVICES';
echo '容器内的 nvidia-smi 输出:';
nvidia-smi;
echo '--- 退出容器内部 ---';
"
echo "========================================================"

echo ""

echo "准备运行 Python 主脚本..."

# # 主脚本
python3 \
/share/org/xxx/xxx/project/xxx/00.code/05.run_parabricks.py \
--container /share/apps/containers/parabricks.sif \
--input-dir /share/org/xxx/xxx/project/xxx/01.data/clean \
--output-dir /share/org/xxx/xxx/project/xxx/04.parabricks \
--ref-genome /share/org/xxx/xxx/project/xxx/01.data/genome/genome.fa \
--workflow fq2bam_and_call \
--num-threads 64 \
--max-parallel 1 \
--fastq-pattern "*_1.fq.gz"

提交作业

1
csub -n 32 -m 200 -J "parabricks" -q xxx 00.code/06.run_parabricks.sh

会自动根据CPU数量选择GPU数量,-n设置为16就使用一个GPU,设置为32就使用两个CPU. 因为这个主机的配置是64核+4个GPU.


超算上使用Parabricks
https://lixiang117423.github.io/article/parabricks/
作者
李详【Xiang LI】
发布于
2025年7月7日
许可协议