1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371
|
""" Parabricks GPU加速全基因组重测序批量分析脚本 (Python版本) - 正确GVCF输出版 作者: Xiang LI 日期: 2024-05-21 描述: 使用NVIDIA Parabricks进行GPU加速的全基因组重测序变异检测流程 说明: 根据HPC文档,不手动指定GPU参数,让pbrun从Slurm环境变量自动继承。 """
import os import sys import glob import argparse import logging import subprocess import json import concurrent.futures from pathlib import Path from typing import List, Dict, Tuple, Optional, Set from dataclasses import dataclass, field from datetime import datetime
@dataclass class Sample: """样本信息数据类""" name: str fastq1: Path fastq2: Path output_dir: Path = field(default_factory=Path) def __post_init__(self): self.fastq1 = Path(self.fastq1) self.fastq2 = Path(self.fastq2) if not self.output_dir: self.output_dir = Path(self.name)
@dataclass class Config: """配置信息数据类,所有参数都有默认值""" container: Path = Path("/share/apps/containers/parabricks.sif") input_dir: Path = Path("./input") output_dir: Path = Path("./output") ref_genome: Path = Path("/path/to/your/reference/genome.fa") known_sites: Optional[Path] = None workflow: str = "fq2bam_and_call" num_threads: int = 32 max_parallel: int = 1 fastq_pattern: str = "" sample_list: Optional[Path] = None gvcf: bool = True verbose: bool = False dry_run: bool = False overwrite: bool = False
class ParabricksAnalyzer: """Parabricks分析器主类""" def __init__(self, config: Config): self.config = config self.setup_logging() def setup_logging(self): level = logging.DEBUG if self.config.verbose else logging.INFO logging.basicConfig(level=level, format='[%(asctime)s] %(levelname)s: %(message)s', datefmt='%Y-%m-%d %H:%M:%S') self.logger = logging.getLogger(__name__) def check_requirements(self): self.logger.info("检查运行环境...") try: result = subprocess.run(['nvidia-smi', '-L'], capture_output=True, text=True, check=True) self.logger.info(f"信息:在当前作业环境中检测到以下GPU:\n{result.stdout.strip()}") except Exception as e: self.logger.warning(f"无法运行 nvidia-smi 进行信息检查: {e}") required_files = [self.config.container, self.config.ref_genome] for file_path in required_files: if not file_path.is_file(): if file_path == self.config.ref_genome and str(file_path) == "/path/to/your/reference/genome.fa": raise FileNotFoundError(f"请通过 --ref-genome 参数或配置文件指定一个有效的参考基因组路径。") raise FileNotFoundError(f"必要文件不存在: {file_path}") self.container_cmd = "" for cmd in ['apptainer', 'singularity']: if subprocess.run(['which', cmd], capture_output=True).returncode == 0: self.container_cmd = cmd break if not self.container_cmd: raise RuntimeError("未找到Apptainer或Singularity") self.logger.info(f"使用容器引擎: {self.container_cmd}") self.config.output_dir.mkdir(parents=True, exist_ok=True) def find_samples_with_pattern(self, pattern: str) -> List[Sample]: if "*" not in pattern: raise ValueError("文件模式必须包含 * 作为样本名占位符") prefix, suffix = pattern.split("*", 1) read_indicators = [("R1", "R2"), ("_1", "_2"), (".1", ".2")] r1_id, r2_id = None, None for r1, r2 in read_indicators: if r1 in suffix: r1_id, r2_id = r1, r2 break if not r1_id: raise ValueError(f"无法从模式 '{pattern}' 的后缀 '{suffix}' 中识别read标识符 (R1/R2, _1/_2, .1/.2)") samples = [] for fq1_path in glob.glob(str(self.config.input_dir / f"{prefix}*{suffix}")): fq1_path = Path(fq1_path) fq2_path = Path(str(fq1_path).replace(r1_id, r2_id)) if fq2_path.exists(): basename = fq1_path.name sample_name = basename[len(prefix) : -len(suffix)] if not sample_name: sample_name = fq1_path.stem.replace(r1_id, "") samples.append(Sample(name=sample_name, fastq1=fq1_path, fastq2=fq2_path)) else: self.logger.warning(f"找不到对应的R2文件: {fq2_path}") return samples
def find_samples_auto(self) -> List[Sample]: patterns = [("_R1.fq.gz", "_R2.fq.gz"), ("_R1.fastq.gz", "_R2.fastq.gz"), ("_1.fq.gz", "_2.fq.gz"), ("_1.fastq.gz", "_2.fastq.gz")] for r1_suffix, r2_suffix in patterns: samples = [] for fq1_path in glob.glob(str(self.config.input_dir / f"*{r1_suffix}")): fq1_path = Path(fq1_path) fq2_path = fq1_path.with_name(fq1_path.name[:-len(r1_suffix)] + r2_suffix) if fq2_path.exists(): sample_name = fq1_path.name[:-len(r1_suffix)] samples.append(Sample(name=sample_name, fastq1=fq1_path, fastq2=fq2_path)) if samples: self.logger.info(f"自动检测到模式: *{r1_suffix}") return samples return []
def load_samples_from_file(self, sample_file: Path) -> List[Sample]: samples = [] with sample_file.open('r') as f: for line_num, line in enumerate(f, 1): line = line.strip() if not line or line.startswith('#'): continue parts = line.split('\t') if len(parts) >= 3: samples.append(Sample(name=parts[0], fastq1=Path(parts[1]), fastq2=Path(parts[2]))) else: self.logger.warning(f"第{line_num}行格式错误,跳过: {line}") return samples
def discover_samples(self) -> List[Sample]: self.logger.info("开始样本发现...") samples = [] if self.config.sample_list and self.config.sample_list.is_file(): self.logger.info(f"从样本列表文件加载: {self.config.sample_list}") samples = self.load_samples_from_file(self.config.sample_list) elif self.config.fastq_pattern: self.logger.info(f"使用指定的文件模式: {self.config.fastq_pattern}") samples = self.find_samples_with_pattern(self.config.fastq_pattern) else: self.logger.info("自动检测FASTQ文件...") samples = self.find_samples_auto() self.logger.info(f"发现 {len(samples)} 个样本") for sample in samples: self.logger.info(f"样本: {sample.name}") self.logger.debug(f" R1: {sample.fastq1}\n R2: {sample.fastq2}") return samples def run_apptainer(self, cmd: List[str]) -> subprocess.CompletedProcess: paths_to_bind: Set[Path] = {self.config.input_dir.resolve(), self.config.output_dir.resolve(), self.config.ref_genome.parent.resolve()} if self.config.known_sites and self.config.known_sites.exists(): paths_to_bind.add(self.config.known_sites.parent.resolve()) bind_args = [f"--bind={path}:{path}" for path in sorted(list(paths_to_bind))] apptainer_cmd = [self.container_cmd, "exec", "--nv", *bind_args, str(self.config.container)] + [str(c) for c in cmd] self.logger.debug(f"执行命令: {' '.join(apptainer_cmd)}") if self.config.dry_run: self.logger.info(f"DRY RUN: {' '.join(apptainer_cmd)}") return subprocess.CompletedProcess(apptainer_cmd, 0, "", "") try: return subprocess.run(apptainer_cmd, capture_output=True, text=True, check=True) except subprocess.CalledProcessError as e: self.logger.error(f"命令执行失败: {' '.join(e.cmd)}") self.logger.error(f"返回码: {e.returncode}") self.logger.error(f"标准输出:\n{e.stdout}") self.logger.error(f"标准错误:\n{e.stderr}") raise
def workflow_fq2bam_only(self, sample: Sample, sample_output_dir: Path): self.logger.info(f"[{sample.name}] 执行工作流程: fq2bam (FASTQ to deduped BAM)") output_bam = sample_output_dir / f"{sample.name}.bam" cmd = [ "pbrun", "fq2bam", "--ref", self.config.ref_genome, "--in-fq", sample.fastq1, sample.fastq2, "--out-bam", output_bam, ] self.run_apptainer(cmd)
def workflow_call_only(self, sample: Sample, sample_output_dir: Path): input_bam = sample_output_dir / f"{sample.name}.bam" if not input_bam.exists() and not self.config.dry_run: raise FileNotFoundError(f"输入BAM文件不存在: {input_bam}")
if self.config.gvcf: self.logger.info(f"[{sample.name}] 执行工作流程: haplotypecaller (BAM to GVCF)") output_file = sample_output_dir / f"{sample.name}.g.vcf.gz" cmd = [ "pbrun", "haplotypecaller", "--ref", self.config.ref_genome, "--in-bam", input_bam, "--out-variants", output_file, "--gvcf" ] else: self.logger.info(f"[{sample.name}] 执行工作流程: haplotypecaller (BAM to VCF)") output_file = sample_output_dir / f"{sample.name}.vcf.gz" cmd = [ "pbrun", "haplotypecaller", "--ref", self.config.ref_genome, "--in-bam", input_bam, "--out-variants", output_file ] self.run_apptainer(cmd)
def workflow_fq2bam_and_call(self, sample: Sample, sample_output_dir: Path): self.logger.info(f"[{sample.name}] 执行联合工作流程: fq2bam + haplotypecaller") self.logger.info(f"[{sample.name}] 步骤 1: 运行 fq2bam") output_bam = sample_output_dir / f"{sample.name}.bam" if not self.config.overwrite and output_bam.exists() and output_bam.stat().st_size > 0: self.logger.info(f"[{sample.name}] 步骤 1 的产物 {output_bam.name} 已存在,跳过 fq2bam。") else: self.workflow_fq2bam_only(sample, sample_output_dir) self.logger.info(f"[{sample.name}] 步骤 2: 运行 haplotypecaller") self.workflow_call_only(sample, sample_output_dir)
def process_single_sample(self, sample: Sample) -> bool: sample_output_dir = self.config.output_dir / sample.name final_output = None if self.config.workflow == "fq2bam_only": final_output = sample_output_dir / f"{sample.name}.bam" elif self.config.workflow in ["fq2bam_and_call", "call_only"]: if self.config.gvcf: final_output = sample_output_dir / f"{sample.name}.g.vcf.gz" else: final_output = sample_output_dir / f"{sample.name}.vcf.gz"
if not self.config.overwrite and final_output and final_output.exists() and final_output.stat().st_size > 0: self.logger.info(f"样本 {sample.name} 的最终产物 {final_output.name} 已存在,跳过。") return True try: self.logger.info(f"开始处理样本: {sample.name}") if not sample.fastq1.exists() or not sample.fastq2.exists(): raise FileNotFoundError(f"FASTQ文件不存在: {sample.fastq1} 或 {sample.fastq2}") sample_output_dir.mkdir(parents=True, exist_ok=True) workflow_map = {"fq2bam_only": self.workflow_fq2bam_only, "call_only": self.workflow_call_only, "fq2bam_and_call": self.workflow_fq2bam_and_call} if self.config.workflow not in workflow_map: raise ValueError(f"未知或未实现的工作流程: {self.config.workflow}") workflow_func = workflow_map[self.config.workflow] workflow_func(sample, sample_output_dir) self.logger.info(f"样本 {sample.name} 处理完成") return True except Exception as e: self.logger.error(f"样本 {sample.name} 处理失败: {e}", exc_info=self.config.verbose) return False
def run(self): try: self.check_requirements() self.logger.info("检查Parabricks版本...") self.run_apptainer(["pbrun", "version"]) samples = self.discover_samples() if not samples: self.logger.error("未发现任何样本,请检查输入目录或文件模式。") return False self.logger.info(f"开始批量处理 {len(samples)} 个样本") self.logger.info(f"工作流程: {self.config.workflow}, 最大并行数: {self.config.max_parallel}, GVCF模式: {'开启' if self.config.gvcf else '关闭'}") if self.config.max_parallel > 1: self.logger.warning(f"最大并行数 > 1 ({self.config.max_parallel})。请确保您的作业调度系统为每个并行任务正确分配了独立的GPU资源,否则可能导致冲突。") if self.config.max_parallel > 1: with concurrent.futures.ThreadPoolExecutor(max_workers=self.config.max_parallel) as executor: futures = {executor.submit(self.process_single_sample, sample): sample for sample in samples} for future in concurrent.futures.as_completed(futures): sample = futures[future] try: if not future.result(): self.logger.error(f"样本 {sample.name} 处理线程报告失败。") except Exception as exc: self.logger.error(f'样本 {sample.name} 在执行中产生异常: {exc}') else: for sample in samples: self.process_single_sample(sample) self.logger.info("所有样本分析完成!") return True except Exception as e: self.logger.error(f"分析流程遇到致命错误: {e}", exc_info=self.config.verbose) return False
def main(): parser = argparse.ArgumentParser(description="Parabricks GPU加速全基因组重测序批量分析脚本", formatter_class=argparse.RawTextHelpFormatter) defaults = Config() path_args = parser.add_argument_group('路径参数') path_args.add_argument("--container", type=Path, default=None, help=f"Parabricks容器路径 (默认: {defaults.container})") path_args.add_argument("--input-dir", type=Path, default=None, help=f"输入FASTQ文件目录 (默认: {defaults.input_dir})") path_args.add_argument("--output-dir", type=Path, default=None, help=f"输出结果目录 (默认: {defaults.output_dir})") path_args.add_argument("--ref-genome", type=Path, default=None, help=f"参考基因组路径 (必需, 默认: {defaults.ref_genome})") path_args.add_argument("--known-sites", type=Path, default=None, help=f"已知变异位点文件路径 (默认: 无)") proc_args = parser.add_argument_group('处理参数') proc_args.add_argument("--workflow", choices=["fq2bam_only", "call_only", "fq2bam_and_call"], default=None, help=f"工作流程类型 (默认: {defaults.workflow})") proc_args.add_argument("--num-threads", type=int, default=None, help=f"每个任务使用的CPU线程数量 (默认: {defaults.num_threads})") proc_args.add_argument("--max-parallel", type=int, default=None, help=f"最大并行处理样本数 (默认: {defaults.max_parallel})") proc_args.add_argument("--no-gvcf", action="store_false", dest="gvcf", default=None, help="仅输出VCF,不输出GVCF (默认生成GVCF)")
sample_args = parser.add_argument_group('样本发现参数') sample_args.add_argument("--fastq-pattern", type=str, default=None, help="FASTQ文件模式,例: '*_R1.fq.gz' (默认: 自动检测)") sample_args.add_argument("--sample-list", type=Path, default=None, help="样本信息文件路径 (TSV格式, 默认: 无)") control_args = parser.add_argument_group('控制参数') control_args.add_argument("--config", type=Path, help="配置文件路径 (JSON格式)") control_args.add_argument("--verbose", "-v", action="store_true", default=None, help=f"启用详细日志输出 (默认: {defaults.verbose})") control_args.add_argument("--dry-run", action="store_true", default=None, help=f"仅显示命令,不运行 (默认: {defaults.dry_run})") control_args.add_argument("--overwrite", action="store_true", default=None, help=f"强制重新运行已完成的样本 (默认: {defaults.overwrite})") args = parser.parse_args() config = Config() if args.config and args.config.is_file(): with args.config.open('r') as f: config_data = json.load(f) for key, value in config_data.items(): if hasattr(config, key): if isinstance(getattr(config, key), (Path, Optional[Path])): setattr(config, key, Path(value) if value is not None else None) else: setattr(config, key, value) if 'num_gpus' in args: delattr(args, 'num_gpus')
for key, value in vars(args).items(): if value is not None and hasattr(config, key): setattr(config, key, value)
try: analyzer = ParabricksAnalyzer(config) success = analyzer.run() sys.exit(0 if success else 1) except Exception as e: logging.error(f"脚本启动失败: {e}", exc_info=True) sys.exit(1)
if __name__ == "__main__": main()
|