BSA测序的原理和分析

BSA(Bulk Segregant Analysis,混合分离分析)是一种分子标记辅助的基因定位技术。我将从原理、方法、分析和代码实现等方面详细阐述。

1 BSA原理

BSA基于孟德尔遗传定律,通过构建具有极端表型的DNA混合池来快速定位控制目标性状的基因或QTL。核心原理是:

  1. 分离群体构建:杂交后代中,控制目标性状的基因区域在极端表型个体中会表现出不同的等位基因频率
  2. DNA混合:将具有相同极端表型的个体DNA等量混合,形成两个对比鲜明的基因池
  3. 标记检测:与目标基因紧密连锁的分子标记在两个基因池间会显示多态性差异
  4. 区间定位:通过全基因组扫描找到与目标性状连锁的染色体区间

2 BSA方法流程

2.1 实验设计

  • 选择表型差异明显的亲本进行杂交
  • 构建F2或回交群体
  • 从分离后代中选择极端表型个体(通常各20-50个)

2.2 DNA制备与混合

  • 提取单个个体基因组DNA
  • 等量混合同一表型组的DNA样品
  • 形成两个对比基因池(bulk)

2.3 分子标记检测

传统方法:

  • RAPD(随机扩增多态DNA)
  • AFLP(扩增片段长度多态性)
  • SSR(简单序列重复)

现代方法:

  • BSA-seq(结合高通量测序)
  • SNP芯片检测

2.4 数据分析

  • 计算各标记位点的等位基因频率
  • 统计显著性检验
  • 构建连锁图谱
  • 基因定位

3 BSA-seq分析流程

4 BSA分析的关键指标

4.1 统计参数

  • 等位基因频率差异:两个基因池间的频率差异,理论值0.5表示完全连锁
  • 卡方检验:检验两个基因池间等位基因频率是否存在显著差异
  • LOD值:对数优势比,评估连锁程度
  • 置信区间:基因定位的可信区间

4.2 质量控制指标

  • 测序深度:每个位点的读数深度,建议≥20×
  • SNP密度:单位长度内的SNP数量
  • 基因池大小:每个pool的个体数量,建议≥20个
  • 表型分离比:确保符合预期的遗传模式

5 BSA的优势与局限

5.1 优势

  1. 高效快速:相比传统QTL定位,大大减少了基因型检测工作量
  2. 成本效益:减少了标记开发和检测成本
  3. 全基因组扫描:可以同时检测多个基因位点
  4. 分辨率高:结合高通量测序,可达到基因水平的精度

5.2 局限性

  1. 只适用于主效基因:对微效基因检测能力有限
  2. 需要极端表型:要求目标性状表型分布明确
  3. 群体结构影响:遗传背景复杂时影响结果准确性
  4. 基因互作:多基因控制性状时分析复杂

6 应用领域

BSA技术广泛应用于:

  1. 作物育种:抗病基因、品质基因、产量基因定位
  2. 医学遗传学:疾病易感基因识别
  3. 进化生物学:适应性基因发现
  4. 功能基因组学:基因功能验证

BSA技术作为一种强大的基因定位工具,在现代分子育种和基因组学研究中发挥着重要作用。随着测序技术的发展和成本降低,BSA-seq已成为基因定位的首选方法之一。

7 Claude给的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
#!/usr/bin/env python3
"""
BSA-seq (Bulk Segregant Analysis with sequencing) 数据分析流程
包含数据预处理、SNP检测、统计分析和可视化
"""

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from scipy import stats
from scipy.signal import savgol_filter
import warnings
warnings.filterwarnings('ignore')

class BSAAnalysis:
def __init__(self, vcf_file=None, window_size=1000000, step_size=100000):
"""
初始化BSA分析对象

Parameters:
vcf_file: VCF格式的变异文件路径
window_size: 滑动窗口大小(bp)
step_size: 滑动窗口步长(bp)
"""
self.vcf_file = vcf_file
self.window_size = window_size
self.step_size = step_size
self.snp_data = None
self.bsa_results = None

def load_vcf_data(self, vcf_file=None):
"""
加载VCF格式的SNP数据
"""
if vcf_file:
self.vcf_file = vcf_file

# 模拟VCF数据加载(实际应使用pysam或其他VCF解析工具)
print("加载VCF数据...")
# 这里应该实现真实的VCF文件解析
# 示例数据结构
np.random.seed(42)
n_snps = 50000

self.snp_data = pd.DataFrame({
'CHROM': np.random.choice(['chr1', 'chr2', 'chr3', 'chr4', 'chr5'], n_snps),
'POS': np.random.randint(1, 100000000, n_snps),
'REF': np.random.choice(['A', 'T', 'C', 'G'], n_snps),
'ALT': np.random.choice(['A', 'T', 'C', 'G'], n_snps),
'BULK1_REF': np.random.randint(5, 50, n_snps),
'BULK1_ALT': np.random.randint(5, 50, n_snps),
'BULK2_REF': np.random.randint(5, 50, n_snps),
'BULK2_ALT': np.random.randint(5, 50, n_snps)
})

# 添加一些目标区域的信号
target_region = (self.snp_data['CHROM'] == 'chr2') & \
(self.snp_data['POS'] >= 20000000) & \
(self.snp_data['POS'] <= 30000000)

# 在目标区域增强信号
self.snp_data.loc[target_region, 'BULK1_ALT'] *= 2
self.snp_data.loc[target_region, 'BULK2_REF'] *= 2

print(f"加载了 {len(self.snp_data)} 个SNP位点")
return self.snp_data

def calculate_allele_frequency(self):
"""
计算等位基因频率
"""
print("计算等位基因频率...")

# 计算每个bulk的总读数和ALT等位基因频率
self.snp_data['BULK1_TOTAL'] = self.snp_data['BULK1_REF'] + self.snp_data['BULK1_ALT']
self.snp_data['BULK2_TOTAL'] = self.snp_data['BULK2_REF'] + self.snp_data['BULK2_ALT']

self.snp_data['BULK1_FREQ'] = self.snp_data['BULK1_ALT'] / self.snp_data['BULK1_TOTAL']
self.snp_data['BULK2_FREQ'] = self.snp_data['BULK2_ALT'] / self.snp_data['BULK2_TOTAL']

# 计算频率差异
self.snp_data['FREQ_DIFF'] = self.snp_data['BULK1_FREQ'] - self.snp_data['BULK2_FREQ']

# 过滤低质量SNP
min_depth = 10
self.snp_data = self.snp_data[
(self.snp_data['BULK1_TOTAL'] >= min_depth) &
(self.snp_data['BULK2_TOTAL'] >= min_depth)
].copy()

print(f"过滤后保留 {len(self.snp_data)} 个高质量SNP")

def statistical_test(self):
"""
进行统计检验
"""
print("进行统计检验...")

p_values = []
for idx, row in self.snp_data.iterrows():
# 使用卡方检验
obs = np.array([[row['BULK1_REF'], row['BULK1_ALT']],
[row['BULK2_REF'], row['BULK2_ALT']]])

chi2, p_val = stats.chi2_contingency(obs)[:2]
p_values.append(p_val)

self.snp_data['P_VALUE'] = p_values

# 计算-log10(p)值用于可视化
self.snp_data['NEG_LOG10_P'] = -np.log10(self.snp_data['P_VALUE'] + 1e-300)

def sliding_window_analysis(self):
"""
滑动窗口分析
"""
print("进行滑动窗口分析...")

results = []

for chrom in self.snp_data['CHROM'].unique():
chrom_data = self.snp_data[self.snp_data['CHROM'] == chrom].copy()
chrom_data = chrom_data.sort_values('POS')

max_pos = chrom_data['POS'].max()

for start in range(0, int(max_pos), self.step_size):
end = start + self.window_size

window_snps = chrom_data[
(chrom_data['POS'] >= start) &
(chrom_data['POS'] < end)
]

if len(window_snps) < 5: # 窗口内SNP数量太少
continue

# 计算窗口统计量
mean_freq_diff = window_snps['FREQ_DIFF'].mean()
abs_freq_diff = np.abs(window_snps['FREQ_DIFF']).mean()
mean_neg_log_p = window_snps['NEG_LOG10_P'].mean()
max_neg_log_p = window_snps['NEG_LOG10_P'].max()
snp_count = len(window_snps)

results.append({
'CHROM': chrom,
'START': start,
'END': end,
'CENTER': start + self.window_size // 2,
'SNP_COUNT': snp_count,
'MEAN_FREQ_DIFF': mean_freq_diff,
'ABS_FREQ_DIFF': abs_freq_diff,
'MEAN_NEG_LOG10_P': mean_neg_log_p,
'MAX_NEG_LOG10_P': max_neg_log_p
})

self.bsa_results = pd.DataFrame(results)
print(f"生成了 {len(self.bsa_results)} 个滑动窗口")

def smooth_signals(self, column='ABS_FREQ_DIFF', window_length=21, polyorder=3):
"""
对信号进行平滑处理
"""
for chrom in self.bsa_results['CHROM'].unique():
mask = self.bsa_results['CHROM'] == chrom
values = self.bsa_results.loc[mask, column].values

if len(values) > window_length:
smoothed = savgol_filter(values, window_length, polyorder)
self.bsa_results.loc[mask, f'{column}_SMOOTH'] = smoothed
else:
self.bsa_results.loc[mask, f'{column}_SMOOTH'] = values

def identify_peaks(self, threshold_percentile=95):
"""
识别显著峰值区域
"""
print("识别候选区域...")

threshold = np.percentile(self.bsa_results['ABS_FREQ_DIFF_SMOOTH'], threshold_percentile)

candidate_regions = self.bsa_results[
self.bsa_results['ABS_FREQ_DIFF_SMOOTH'] > threshold
].copy()

print(f"识别到 {len(candidate_regions)} 个候选窗口,阈值: {threshold:.3f}")

return candidate_regions

def plot_manhattan(self, figsize=(15, 8)):
"""
绘制曼哈顿图
"""
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=figsize, sharex=True)

# 为每个染色体分配颜色
colors = plt.cm.Set3(np.linspace(0, 1, len(self.bsa_results['CHROM'].unique())))
chrom_colors = dict(zip(self.bsa_results['CHROM'].unique(), colors))

# 计算染色体在x轴上的位置
chrom_positions = {}
x_pos = 0
x_ticks = []
x_labels = []

for chrom in sorted(self.bsa_results['CHROM'].unique()):
chrom_data = self.bsa_results[self.bsa_results['CHROM'] == chrom]
positions = chrom_data['CENTER'] / 1e6 + x_pos # 转换为Mb

chrom_positions[chrom] = positions
x_ticks.append(positions.mean())
x_labels.append(chrom.replace('chr', ''))

# 绘制频率差异
ax1.scatter(positions, chrom_data['ABS_FREQ_DIFF'],
c=[chrom_colors[chrom]], alpha=0.6, s=20)
ax1.plot(positions, chrom_data['ABS_FREQ_DIFF_SMOOTH'],
color='red', linewidth=2, alpha=0.8)

# 绘制-log10(p)值
ax2.scatter(positions, chrom_data['MEAN_NEG_LOG10_P'],
c=[chrom_colors[chrom]], alpha=0.6, s=20)

x_pos = positions.max() + 10 # 染色体间隔

# 设置图形属性
ax1.set_ylabel('绝对频率差异', fontsize=12)
ax1.set_title('BSA-seq 分析结果 - 频率差异', fontsize=14, fontweight='bold')
ax1.grid(True, alpha=0.3)

ax2.set_ylabel('-log₁₀(P值)', fontsize=12)
ax2.set_xlabel('染色体位置 (Mb)', fontsize=12)
ax2.set_title('统计显著性', fontsize=14, fontweight='bold')
ax2.grid(True, alpha=0.3)

# 设置x轴标签
ax2.set_xticks(x_ticks)
ax2.set_xticklabels(x_labels)

# 添加显著性阈值线
threshold = np.percentile(self.bsa_results['ABS_FREQ_DIFF_SMOOTH'], 95)
ax1.axhline(y=threshold, color='red', linestyle='--', alpha=0.7,
label=f'95%阈值 ({threshold:.3f})')
ax1.legend()

plt.tight_layout()
return fig

def plot_chromosome_detail(self, chrom, figsize=(12, 8)):
"""
绘制单个染色体的详细图
"""
chrom_data = self.bsa_results[self.bsa_results['CHROM'] == chrom].copy()
chrom_snps = self.snp_data[self.snp_data['CHROM'] == chrom].copy()

fig, axes = plt.subplots(3, 1, figsize=figsize, sharex=True)

positions_mb = chrom_data['CENTER'] / 1e6
snp_pos_mb = chrom_snps['POS'] / 1e6

# 频率差异
axes[0].scatter(snp_pos_mb, chrom_snps['FREQ_DIFF'], alpha=0.5, s=10, color='gray')
axes[0].plot(positions_mb, chrom_data['MEAN_FREQ_DIFF'], 'b-', linewidth=2, label='窗口均值')
axes[0].set_ylabel('频率差异')
axes[0].set_title(f'{chrom} 详细分析')
axes[0].grid(True, alpha=0.3)
axes[0].legend()

# 绝对频率差异
axes[1].plot(positions_mb, chrom_data['ABS_FREQ_DIFF'], 'o-', color='green',
linewidth=2, markersize=4, label='原始信号')
axes[1].plot(positions_mb, chrom_data['ABS_FREQ_DIFF_SMOOTH'], 'r-',
linewidth=3, label='平滑信号')
axes[1].set_ylabel('绝对频率差异')
axes[1].grid(True, alpha=0.3)
axes[1].legend()

# 统计显著性
axes[2].scatter(snp_pos_mb, chrom_snps['NEG_LOG10_P'], alpha=0.5, s=10, color='purple')
axes[2].plot(positions_mb, chrom_data['MEAN_NEG_LOG10_P'], 'orange', linewidth=2)
axes[2].set_ylabel('-log₁₀(P值)')
axes[2].set_xlabel('位置 (Mb)')
axes[2].grid(True, alpha=0.3)

plt.tight_layout()
return fig

def generate_report(self, output_file='bsa_report.txt'):
"""
生成分析报告
"""
candidates = self.identify_peaks()

with open(output_file, 'w', encoding='utf-8') as f:
f.write("BSA-seq 分析报告\n")
f.write("=" * 50 + "\n\n")

f.write(f"总SNP数量: {len(self.snp_data)}\n")
f.write(f"滑动窗口数量: {len(self.bsa_results)}\n")
f.write(f"窗口大小: {self.window_size/1e6:.1f} Mb\n")
f.write(f"步长: {self.step_size/1e3:.0f} kb\n\n")

f.write("候选区域:\n")
f.write("-" * 30 + "\n")

for idx, row in candidates.iterrows():
f.write(f"染色体: {row['CHROM']}\n")
f.write(f"位置: {row['START']/1e6:.2f}-{row['END']/1e6:.2f} Mb\n")
f.write(f"频率差异: {row['ABS_FREQ_DIFF']:.3f}\n")
f.write(f"显著性: {row['MEAN_NEG_LOG10_P']:.2f}\n")
f.write(f"SNP数量: {row['SNP_COUNT']}\n")
f.write("-" * 30 + "\n")

print(f"报告已保存至: {output_file}")

def run_complete_analysis(self):
"""
运行完整的BSA分析流程
"""
print("开始BSA-seq完整分析流程...")

# 1. 加载数据
if self.snp_data is None:
self.load_vcf_data()

# 2. 计算等位基因频率
self.calculate_allele_frequency()

# 3. 统计检验
self.statistical_test()

# 4. 滑动窗口分析
self.sliding_window_analysis()

# 5. 信号平滑
self.smooth_signals()

# 6. 生成图表
manhattan_fig = self.plot_manhattan()
plt.show()

# 7. 生成报告
self.generate_report()

print("BSA分析完成!")

return self.bsa_results

# 使用示例
if __name__ == "__main__":
# 创建BSA分析对象
bsa = BSAAnalysis(window_size=1000000, step_size=100000)

# 运行完整分析
results = bsa.run_complete_analysis()

# 查看特定染色体的详细分析
detail_fig = bsa.plot_chromosome_detail('chr2')
plt.show()

# 打印候选区域
candidates = bsa.identify_peaks()
print("\n候选区域:")
print(candidates[['CHROM', 'START', 'END', 'ABS_FREQ_DIFF', 'MEAN_NEG_LOG10_P']].head())

BSA测序的原理和分析
https://lixiang117423.github.io/article/bsa/
作者
李详【Xiang LI】
发布于
2025年5月27日
许可协议