Read/write SAM/BAM/CRAM alignments, VCF/BCF variants, FASTA/FASTQ sequences. Region queries, coverage/pileup analysis, variant filtering, read group extraction. Python wrapper for htslib with samtools/bcftools CLI access. For alignment pipelines use STAR/BWA; for variant calling use GATK/DeepVariant.
Pysam provides a Pythonic interface to htslib for reading, manipulating, and writing genomic data files. It handles SAM/BAM/CRAM alignments, VCF/BCF variants, and FASTA/FASTQ sequences with efficient region-based random access. Also exposes samtools and bcftools as callable Python functions.
pip install pysam
Note: Requires htslib C library (bundled with pip install on most platforms). On some Linux systems, may need libhts-dev or equivalent. Index files (.bai, .tbi, .fai) required for random access — create with pysam.index(), pysam.tabix_index(), or pysam.faidx().
import pysam
# Read BAM file, fetch reads in a region
with pysam.AlignmentFile("sample.bam", "rb") as bam:
for read in bam.fetch("chr1", 1000, 2000):
print(f"{read.query_name}: pos={read.reference_start}, mapq={read.mapping_quality}")
print(f"Total reads in region: {bam.count('chr1', 1000, 2000)}")
Read, query, and write aligned sequencing reads.
import pysam
# Open BAM (binary) or SAM (text) file
bam = pysam.AlignmentFile("sample.bam", "rb") # rb=read BAM, r=read SAM, rc=read CRAM
# Fetch reads overlapping a region (requires .bai index)
for read in bam.fetch("chr1", 10000, 20000):
print(f"Name: {read.query_name}")
print(f" Position: {read.reference_start}-{read.reference_end}")
print(f" MAPQ: {read.mapping_quality}")
print(f" CIGAR: {read.cigarstring}")
print(f" Sequence: {read.query_sequence[:30]}...")
break
# Count reads in region (fast, no iteration needed)
n_reads = bam.count("chr1", 10000, 20000)
print(f"Reads in region: {n_reads}")
# Filter reads by quality and flags
for read in bam.fetch("chr1", 10000, 20000):
if read.mapping_quality >= 30 and not read.is_unmapped and not read.is_duplicate:
pass # Process high-quality, mapped, non-duplicate reads
bam.close()
# Write filtered reads to a new BAM file
with pysam.AlignmentFile("input.bam", "rb") as inbam:
with pysam.AlignmentFile("filtered.bam", "wb", header=inbam.header) as outbam:
for read in inbam.fetch("chr1", 10000, 20000):
if read.mapping_quality >= 30:
outbam.write(read)
# Index the output
pysam.index("filtered.bam")
print("Created filtered.bam + filtered.bam.bai")
Calculate per-base coverage statistics.
import pysam
import numpy as np
bam = pysam.AlignmentFile("sample.bam", "rb")
# Pileup: per-base coverage with read-level detail
for pileup_col in bam.pileup("chr1", 10000, 10100, min_mapping_quality=30):
bases = [p.alignment.query_sequence[p.query_position]
for p in pileup_col.pileups if not p.is_del and p.query_position is not None]
print(f"Pos {pileup_col.reference_pos}: depth={pileup_col.nsegments}, bases={''.join(bases[:5])}")
# Quick coverage count per region (faster than pileup)
coverage = bam.count_coverage("chr1", 10000, 10100, quality_threshold=20)
# Returns tuple of 4 arrays (A, C, G, T counts per position)
total_cov = np.array(coverage).sum(axis=0)
print(f"Mean coverage: {total_cov.mean():.1f}x")
bam.close()
Read, query, and filter genetic variants.
import pysam
# Open VCF/BCF file
vcf = pysam.VariantFile("variants.vcf.gz")
# Iterate all variants
for record in vcf.fetch("chr1", 10000, 50000):
print(f"{record.chrom}:{record.pos} {record.ref}>{','.join(record.alts or [])}")
print(f" QUAL={record.qual}, FILTER={list(record.filter)}")
print(f" INFO: {dict(record.info)}")
# Access genotypes per sample
for sample in record.samples:
gt = record.samples[sample]["GT"]
print(f" {sample}: GT={gt}")
break
vcf.close()
# Filter variants and write to new VCF
with pysam.VariantFile("variants.vcf.gz") as vcf_in:
with pysam.VariantFile("filtered.vcf.gz", "wz", header=vcf_in.header) as vcf_out:
for record in vcf_in:
if record.qual and record.qual >= 30 and "PASS" in record.filter:
vcf_out.write(record)
pysam.tabix_index("filtered.vcf.gz", preset="vcf")
print("Created filtered.vcf.gz + filtered.vcf.gz.tbi")
Random access to reference sequences and sequential reading of raw reads.
import pysam
# FASTA: random access (requires .fai index)
fasta = pysam.FastaFile("reference.fasta")
seq = fasta.fetch("chr1", 10000, 10050)
print(f"Sequence ({len(seq)} bp): {seq}")
print(f"Available contigs: {fasta.references[:5]}")
print(f"Contig lengths: {dict(zip(fasta.references[:3], fasta.lengths[:3]))}")
fasta.close()
# Create FASTA index if needed
# pysam.faidx("reference.fasta")
# FASTQ: sequential reading
with pysam.FastxFile("reads.fastq.gz") as fq:
for i, entry in enumerate(fq):
print(f"Read {entry.name}: {len(entry.sequence)} bp, mean_qual={sum(entry.get_quality_array())/len(entry.sequence):.1f}")
if i >= 2:
break
Extract and filter reads by read group (essential for multi-sample BAM files).
import pysam
bam = pysam.AlignmentFile("multisample.bam", "rb")
# Access read group information from BAM header
print("Read groups in file:")
for rg_dict in bam.header.get("RG", []):
print(f" ID: {rg_dict['ID']}, Sample: {rg_dict.get('SM', 'N/A')}, Library: {rg_dict.get('LB', 'N/A')}, Platform: {rg_dict.get('PL', 'N/A')}")
# Get all samples in the BAM (from RG headers)
samples = set()
for rg_dict in bam.header.get("RG", []):
if "SM" in rg_dict:
samples.add(rg_dict["SM"])
print(f"Samples in BAM: {sorted(samples)}")
bam.close()
# Filter reads by read group ID
def extract_reads_by_rg(bam_path, rg_id, output_path):
"""Extract all reads from a specific read group.
WARNING: Uses fetch(until_eof=True), which scans the entire BAM sequentially.
Multi-sample BAMs can be tens to hundreds of GB — this may be slow.
For large files, prefer region-based filtering:
for read in bam.fetch("chr1", start, end): ...
Or use the samtools CLI equivalent (faster for one-off extractions):
samtools view -b -r <rg_id> input.bam -o output.bam
"""
with pysam.AlignmentFile(bam_path, "rb") as bam_in:
with pysam.AlignmentFile(output_path, "wb", header=bam_in.header) as bam_out:
for read in bam_in.fetch(until_eof=True):
if read.has_tag("RG") and read.get_tag("RG") == rg_id:
bam_out.write(read)
pysam.index(output_path)
print(f"Extracted reads from RG:{rg_id} → {output_path}")
extract_reads_by_rg("multisample.bam", "SAMPLE_001_LaneA", "sample001_laneA.bam")
from collections import defaultdict
import pysam
# Count reads per sample
def reads_per_sample(bam_path):
"""Count reads per sample from read group information.
Two distinct "unknown" cases are tracked separately:
- "no_sm_field": RG header entry exists but is missing the SM (sample name) field.
- "undefined_rg": A read carries an RG tag not declared in the BAM header.
"""
counts = defaultdict(int)
rg_to_sample = {}
with pysam.AlignmentFile(bam_path, "rb") as bam:
# Build RG → sample mapping from header
for rg_dict in bam.header.get("RG", []):
rg_id = rg_dict["ID"]
# (a) RG header entry lacks SM field
rg_to_sample[rg_id] = rg_dict.get("SM", "no_sm_field")
# Count reads per resolved sample name
for read in bam.fetch(until_eof=True):
if read.has_tag("RG"):
rg_id = read.get_tag("RG")
# (b) Read's RG tag is not declared in the header
sample = rg_to_sample.get(rg_id, "undefined_rg")
counts[sample] += 1
return dict(counts)
sample_counts = reads_per_sample("multisample.bam")
for sample, count in sorted(sample_counts.items()):
print(f" {sample}: {count:,} reads")
Call samtools and bcftools commands from Python.
import pysam
# Sort BAM file
pysam.sort("-o", "sorted.bam", "input.bam")
# Index BAM
pysam.index("sorted.bam")
# View region as BAM
pysam.view("-b", "-o", "region.bam", "sorted.bam", "chr1:1000-2000")
# BCFtools: compress and index VCF
pysam.bcftools.view("-O", "z", "-o", "output.vcf.gz", "input.vcf")
pysam.tabix_index("output.vcf.gz", preset="vcf")
# Error handling