1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209
|
""" BAM 文件批量统计报告生成器 📊 (并行修复版 + Excel输出) """ import argparse import subprocess import re import sys import pandas as pd from pathlib import Path from tqdm import tqdm from typing import Dict, List from multiprocessing import Pool, cpu_count
def check_dependencies(): """检查 samtools 和必要的Python库是否已安装""" missing_libs = [] try: import pandas except ImportError: missing_libs.append('pandas') try: import openpyxl except ImportError: missing_libs.append('openpyxl') try: import tqdm except ImportError: missing_libs.append('tqdm') if missing_libs: print(f"❌ 错误: 缺少必要的Python库。请先安装: {' '.join(missing_libs)}", file=sys.stderr) print(f" 运行: pip install {' '.join(missing_libs)}", file=sys.stderr) return False
try: subprocess.run(['samtools', '--version'], check=True, capture_output=True) print("✅ samtools 已找到,准备开始分析...") return True except (subprocess.CalledProcessError, FileNotFoundError): print("❌ 错误: 未找到 'samtools' 命令。", file=sys.stderr) print(" 请确保 samtools 已安装并且在您的系统 PATH 环境变量中。", file=sys.stderr) print(" 您可以通过 'conda install -c bioconda samtools' 进行安装。", file=sys.stderr) return False
def parse_flagstat(flagstat_output: str) -> Dict: """从 samtools flagstat 的输出中解析关键指标""" stats = {} total_reads_match = re.search(r'(\d+) \+ \d+ in total', flagstat_output) mapped_reads_match = re.search(r'(\d+) \+ \d+ mapped', flagstat_output) properly_paired_match = re.search(r'(\d+) \+ \d+ properly paired', flagstat_output) total_reads = int(total_reads_match.group(1)) if total_reads_match else 0 mapped_reads = int(mapped_reads_match.group(1)) if mapped_reads_match else 0 properly_paired = int(properly_paired_match.group(1)) if properly_paired_match else 0 stats['Total_Reads'] = total_reads stats['Mapped_Reads'] = mapped_reads stats['Properly_Paired_Reads'] = properly_paired if total_reads > 0: stats['Overall_Mapping_Rate(%)'] = round((mapped_reads / total_reads) * 100, 2) stats['Properly_Paired_Rate(%)'] = round((properly_paired / total_reads) * 100, 2) else: stats['Overall_Mapping_Rate(%)'] = 0.0 stats['Properly_Paired_Rate(%)'] = 0.0 return stats
def analyze_bam(bam_file: Path) -> Dict: """对单个 BAM 文件执行所有分析 (这是被并行调用的函数)""" try: sample_name = bam_file.name.replace('.sorted.bam', '').replace('.bam', '') results = {'Sample': sample_name}
bai_file = bam_file.with_suffix('.bam.bai') if not bai_file.exists(): subprocess.run(['samtools', 'index', str(bam_file)], check=True, capture_output=True)
flagstat_proc = subprocess.run(['samtools', 'flagstat', str(bam_file)], capture_output=True, text=True, check=True) results.update(parse_flagstat(flagstat_proc.stdout))
coverage_proc = subprocess.run(['samtools', 'coverage', str(bam_file)], capture_output=True, text=True, check=True)
idxstats_proc = subprocess.run(['samtools', 'idxstats', str(bam_file)], capture_output=True, text=True, check=True)
coverage_lines = coverage_proc.stdout.strip().splitlines() summary_line = None for line in reversed(coverage_lines): if not line.startswith('#'): summary_line = line break if summary_line: overall_cov_parts = summary_line.strip().split('\t') if len(overall_cov_parts) > 6: results['Overall_Covered_Bases_Rate(%)'] = float(overall_cov_parts[5]) results['Overall_Coverage_Depth'] = float(overall_cov_parts[6])
total_mapped = results.get('Mapped_Reads', 1) for line in idxstats_proc.stdout.strip().splitlines(): parts = line.split('\t') if len(parts) < 3: continue chrom, _, mapped = parts[0], parts[1], parts[2] if chrom == '*': continue results[f'{chrom}_Mapped_Reads'] = int(mapped) if total_mapped > 0: results[f'{chrom}_Mapped_Reads_Percentage(%)'] = round((int(mapped) / total_mapped) * 100, 2) for line in coverage_lines: if line.startswith('#'): continue parts = line.strip().split('\t') chrom = parts[0] if len(parts) > 6 and chrom != "genome" and chrom != summary_line.split('\t')[0]: results[f'{chrom}_Coverage_Depth'] = float(parts[6]) results[f'{chrom}_Covered_Bases_Rate(%)'] = float(parts[5]) return results
except subprocess.CalledProcessError as e: error_msg = e.stderr.strip() if e.stderr else str(e) return {'Sample': bam_file.name, 'Error': error_msg} except Exception as e: return {'Sample': bam_file.name, 'Error': str(e)}
def main(): parser = argparse.ArgumentParser( description="BAM 文件批量统计报告生成器 📊 (并行修复版 + Excel输出)", formatter_class=argparse.RawTextHelpFormatter ) parser.add_argument('-i', '--input_dir', type=str, required=True, help="包含 BAM 文件的输入文件夹路径。") parser.add_argument('-o', '--output_file', type=str, required=True, help="输出的报告文件名 (推荐使用 .xlsx 后缀)。") parser.add_argument('-p', '--processes', type=int, default=cpu_count(), help=f"使用的并行进程数 (默认为全部可用核心: {cpu_count()})。") args = parser.parse_args() if not check_dependencies(): sys.exit(1) input_path = Path(args.input_dir) if not input_path.is_dir(): print(f"❌ 错误: 输入路径 '{input_path}' 不是一个有效的文件夹。", file=sys.stderr) sys.exit(1) bam_files = sorted(list(input_path.glob('*.bam'))) if not bam_files: print(f"⚠️ 警告: 在文件夹 '{input_path}' 中没有找到任何 .bam 文件。", file=sys.stderr) sys.exit(0)
print(f"📁 在 '{input_path}' 中找到 {len(bam_files)} 个 BAM 文件。") print(f"🚀 使用 {args.processes} 个进程并行处理...")
with Pool(processes=args.processes) as pool: results_iterator = pool.imap_unordered(analyze_bam, bam_files) all_results = list(tqdm(results_iterator, total=len(bam_files), desc="⚙️ 处理BAM文件"))
successful_results = [res for res in all_results if 'Error' not in res] failed_samples = [res for res in all_results if 'Error' in res]
if failed_samples: print("\n❌ 以下样本处理失败:") for res in failed_samples: print(f" - {res['Sample']}: {res['Error']}") if not successful_results: print("\n❌ 未能成功处理任何文件,程序退出。", file=sys.stderr) sys.exit(1) print("\n🔄 正在汇总所有成功的结果并生成报告...") df = pd.DataFrame(successful_results) df = df.set_index('Sample').T core_indices = [ 'Total_Reads', 'Mapped_Reads', 'Overall_Mapping_Rate(%)', 'Properly_Paired_Reads', 'Properly_Paired_Rate(%)', 'Overall_Coverage_Depth', 'Overall_Covered_Bases_Rate(%)' ] chrom_indices = sorted([idx for idx in df.index if idx not in core_indices]) final_indices = core_indices + chrom_indices df = df.reindex(index=final_indices)
if Path(args.output_file).suffix.lower() == '.xlsx': df.to_excel(args.output_file, index=True) else: df.to_csv(args.output_file, index=True)
print(f"\n🎉 报告生成完毕!结果已保存至: {args.output_file}")
if __name__ == "__main__": main()
|