6b285a53b036b309e3c7a9b61d3741731088a172 lrnassar Fri Jun 12 02:35:01 2026 -0700 varFreqs: switch affectedAF/backgroundAF from max-across-cohorts to pooled sum(AC)/sum(AN) so the rate matches the carrier count scale. Per-arm AN is derived as round(AC/AF) when both are reported. An optional "default_an" column was added to databases.tsv so AF-only cohorts (ABraOM, ALFA) can synthesize a denominator from their cohort size; without it those cohorts had been silently dropped from the pooled rate. New affectedAN and backgroundAN columns expose the pool denominator. The mouseOver now reads "Affected AC/AN: 33238 / 213153" so the ratio is visible. Per-arm cohorts that ship only AC and no default_an (MGRB, GREGoR AC_AFFECTED/UNAFFECTED/UNKNOWN, AllOfUs per-population) are still listed in affectedCohorts/backgroundSources but contribute 0 to the pool, preserving the invariant pool_AF <= 1. The build pipeline is unchanged: re-run vcfToBigBed.py --split-affected against the existing merged.annotated.vcf.gz. refs #36642 diff --git src/hg/makeDb/scripts/varFreqs/vcfToBigBed.py src/hg/makeDb/scripts/varFreqs/vcfToBigBed.py index 1cf9dbe9c73..35c96b421bb 100755 --- src/hg/makeDb/scripts/varFreqs/vcfToBigBed.py +++ src/hg/makeDb/scripts/varFreqs/vcfToBigBed.py @@ -1,859 +1,911 @@ #!/usr/bin/env python3 """ Convert merged/annotated VCF to bigBed with variant frequencies. Reads database config from TSV files in the kent source tree. Two-phase parallel processing: Phase 1: Pre-extract frequency data from each VCF (parallel by database) Phase 2: Build BED from annotated VCF + pre-extracted data (parallel by chromosome) Usage: python3 vcfToBigBed.py # full genome python3 vcfToBigBed.py --region chrM # quick test on chrM python3 vcfToBigBed.py --region chr22 --threads 4 """ import sys import os import subprocess import argparse import shutil from collections import OrderedDict from typing import List, Tuple, Dict, Optional from multiprocessing import Pool # ============================================================================ # Constants # ============================================================================ SCRIPTS_DIR = os.path.join(os.path.expanduser("~"), "kent", "src", "hg", "makeDb", "scripts", "varFreqs") ALL_CHROMOSOMES = [f"chr{i}" for i in range(1, 23)] + ["chrX", "chrY", "chrM"] CHR_RENAME = {str(i): f"chr{i}" for i in range(1, 23)} CHR_RENAME.update({"X": "chrX", "Y": "chrY", "M": "chrM", "MT": "chrM", "23": "chrX", "24": "chrY"}) # ============================================================================ # Consequence Handling # ============================================================================ LOF_CSQ = {"stop_gained", "stop_lost", "frameshift", "splice_donor", "splice_acceptor", "start_lost", "transcript_ablation", "feature_truncation"} MISSENSE_CSQ = {"missense", "inframe_insertion", "inframe_deletion", "protein_altering", "coding_sequence"} SYN_CSQ = {"synonymous", "stop_retained", "start_retained"} def get_color(bcsq): """Map BCSQ to RGB color based on most severe consequence.""" if not bcsq or bcsq == ".": return (128, 128, 128) best_priority = -1 best_color = (128, 128, 128) for entry in bcsq.split(","): csq = entry.split("|")[0].lower() for part in csq.split("&"): if part in LOF_CSQ and best_priority < 3: best_priority, best_color = 3, (255, 0, 0) elif part in MISSENSE_CSQ and best_priority < 2: best_priority, best_color = 2, (31, 119, 180) elif part in SYN_CSQ and best_priority < 1: best_priority, best_color = 1, (0, 128, 0) return best_color def parse_bcsq(bcsq): """Parse BCSQ → (consequence, gene, transcript, aa_change, dna_change). aa_change/dna_change return "" rather than "." for non-coding so the mouseOver and detail page render a clean blank line.""" if not bcsq or bcsq == ".": return (".", ".", ".", "", "") best, best_pri = None, -1 for entry in bcsq.split(","): parts = entry.split("|") if len(parts) < 4: continue csq = parts[0].lower().split("&")[0] pri = 3 if csq in LOF_CSQ else 2 if csq in MISSENSE_CSQ \ else 1 if csq in SYN_CSQ else 0 if pri > best_pri: best_pri, best = pri, parts if best is None: best = bcsq.split(",")[0].split("|") return ( best[0] if len(best) > 0 else ".", best[1] if len(best) > 1 else ".", best[2] if len(best) > 2 else ".", best[5] if len(best) > 5 else "", best[6] if len(best) > 6 else "", ) # Filter buckets exposed in the trackdb consequence multipleListOr filter. # Anything that produces no token in this set gets ",others" appended so it # is still selectable via the "Other" bucket. NAMED_CONSEQUENCES = { "missense", "synonymous", "stop_gained", "frameshift", "splice_donor", "splice_acceptor", "intron", ".", "3_prime_utr", "5_prime_utr", "non_coding", } def normalize_consequence(raw): """Convert bcftools csq consequence (& or , joined) to a comma-joined token list. Append "others" if no token matches a named filter bucket.""" if not raw or raw == ".": return "." tokens = raw.replace("&", ",").split(",") if not any(tok in NAMED_CONSEQUENCES for tok in tokens): tokens.append("others") return ",".join(tokens) def get_vartype(ref, alt): if len(ref) == 1 and len(alt) == 1: return "SNV" elif len(ref) == 1: return "INS" elif len(alt) == 1: return "DEL" return "MNV" # ============================================================================ # Configuration Loading # ============================================================================ def load_config(scripts_dir, db_file="databases.tsv", pop_file="populations.tsv"): """Load the database and population config TSVs. db_file/pop_file may be bare names (resolved against scripts_dir) or absolute paths, so disease/array variants can use their own configs.""" databases = OrderedDict() db_path = db_file if os.path.isabs(db_file) \ else os.path.join(scripts_dir, db_file) with open(db_path) as f: for line in f: line = line.strip() if not line or line.startswith('#'): continue parts = line.split('\t') if len(parts) < 5: print(f"WARNING: skipping malformed line: {line}", file=sys.stderr) continue key, name, vcf, ac_field, af_field = ( parts[0], parts[1], parts[2], parts[3], parts[4]) is_disease = int(parts[5]) if len(parts) > 5 else 0 disease_role = parts[6].strip() if len(parts) > 6 else "" + default_an = 0 + if len(parts) > 7 and parts[7].strip(): + try: + default_an = int(parts[7].strip()) + except ValueError: + print(f"WARNING: bad default_an for {key}: {parts[7]}", + file=sys.stderr) databases[key] = { "name": name, "vcf": vcf, "ac_field": ac_field, "af_field": af_field, "is_disease": is_disease, "disease_role": disease_role, + "default_an": default_an, "pops": [], } pop_path = pop_file if os.path.isabs(pop_file) \ else os.path.join(scripts_dir, pop_file) with open(pop_path) as f: for line in f: line = line.strip() if not line or line.startswith('#'): continue parts = line.split('\t') if len(parts) < 5: continue db_key = parts[0] phenotype = parts[5].strip() if len(parts) > 5 else "" if db_key in databases: databases[db_key]["pops"].append({ "key": parts[1], "name": parts[2], "ac_field": parts[3], "af_field": parts[4], "phenotype": phenotype, }) else: print(f"WARNING: population references unknown db '{db_key}'", file=sys.stderr) return databases # ============================================================================ # Phase 1: Pre-extract frequency data (parallel by database) # ============================================================================ def pre_extract_one(args): """Extract frequency data from one VCF, split output by chromosome.""" db_key, db_info, region, extract_dir = args vcf = db_info["vcf"] if not os.path.exists(vcf): print(f" {db_key}: VCF not found ({vcf}), skipping", file=sys.stderr) return db_key, 0 out_dir = os.path.join(extract_dir, db_key) os.makedirs(out_dir, exist_ok=True) # Build bcftools query format: CHROM POS REF ALT [AC AF] [pop_ac pop_af ...] fmt_parts = ["%CHROM", "%POS", "%REF", "%ALT"] if db_info["ac_field"] != ".": fmt_parts.append(f"%INFO/{db_info['ac_field']}") else: fmt_parts.append(".") if db_info["af_field"] != ".": fmt_parts.append(f"%INFO/{db_info['af_field']}") else: fmt_parts.append(".") for pop in db_info["pops"]: if pop["ac_field"] != ".": fmt_parts.append(f"%INFO/{pop['ac_field']}") else: fmt_parts.append(".") if pop["af_field"] != ".": fmt_parts.append(f"%INFO/{pop['af_field']}") else: fmt_parts.append(".") fmt = "\t".join(fmt_parts) + "\n" cmd = ["bcftools", "query", "-f", fmt] if region: cmd.extend(["-r", region]) cmd.append(vcf) chrom_files = {} total = 0 try: proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) for line in proc.stdout: parts = line.rstrip('\n').split('\t') if len(parts) < 4: continue chrom = parts[0] if chrom in CHR_RENAME: chrom = CHR_RENAME[chrom] if chrom not in chrom_files: chrom_files[chrom] = open( os.path.join(out_dir, f"{chrom}.tsv"), "w") # Write: POS\tREF\tALT\tAC\tAF\t[pop fields...] chrom_files[chrom].write("\t".join(parts[1:]) + "\n") total += 1 proc.wait() except Exception as e: print(f" {db_key}: ERROR: {e}", file=sys.stderr) finally: for fh in chrom_files.values(): fh.close() print(f" {db_key}: {total:,} variants extracted", file=sys.stderr) return db_key, total def pre_extract_all(databases, region, extract_dir, threads): """Phase 1: Pre-extract all VCFs in parallel.""" print(f"\n=== Phase 1: Pre-extracting frequency data " f"({threads} parallel) ===", file=sys.stderr) tasks = [(db_key, db_info, region, extract_dir) for db_key, db_info in databases.items()] with Pool(min(threads, len(tasks))) as pool: pool.map(pre_extract_one, tasks) # ============================================================================ # Phase 2: Process chromosomes (parallel by chromosome) # ============================================================================ def load_extracted(db_key, chrom, extract_dir): """Load pre-extracted TSV into dict keyed by pos:ref:alt.""" path = os.path.join(extract_dir, db_key, f"{chrom}.tsv") data = {} if not os.path.exists(path): return data with open(path) as f: for line in f: parts = line.rstrip('\n').split('\t') if len(parts) < 3: continue key = f"{parts[0]}:{parts[1]}:{parts[2]}" data[key] = parts[3:] return data +def _pool_arm(ac_val, af_val, default_an): + """Compute pooled (AC, AN) contribution for one cohort arm. + + Used by the affected and background pooled-AF calculations. Returns + (0, 0) when we can't determine AN, so the pool denominator never + includes a cohort's carriers without also including its allele number + -- the resulting pooled AF stays well-defined and bounded. + + Strategies, in order: + 1. Both AC and AF present with AF > 0: AN = round(AC / AF) (typical case). + 2. AF present but AC empty: synthesize AC = round(AF * default_an) + and use default_an as AN (e.g. ALFA, ABraOM, which ship only AF). + 3. AC present but AF empty/0: use default_an as AN (e.g. MGRB if it + had a configured default_an). + 4. None of the above: return (0, 0), arm does not contribute. + """ + if ac_val is not None and af_val is not None and af_val > 0: + return ac_val, max(1, round(ac_val / af_val)) + if af_val is not None and default_an > 0: + return max(0, round(af_val * default_an)), default_an + if ac_val is not None and default_an > 0: + return ac_val, default_an + return 0, 0 + + def process_chromosome(args): """Phase 2: Build the affected and background BEDs for one chromosome from the annotated VCF + pre-extracted per-cohort data. Two output rows are possible per variant, sharing one schema: - affected BED: variant seen in any affected/case arm of a disease cohort - background BED: variant seen in a population cohort or an unaffected/ control/unknown arm ("all other variants") A variant present in both groups is written to both (overlap is intended, so the case-vs-background comparison works). With split=False a single BED is written instead (one row per variant, score = max of the two summaries); used for tracks with no disease cohorts such as the genotyping-array combined track.""" chrom, annotated_vcf, databases, extract_dir, output_dir, split = args # Read annotated variants (try BCSQ first, fall back to plain variants) cmd = ["bcftools", "query", "-r", chrom, "-f", "%CHROM\t%POS\t%REF\t%ALT\t%INFO/BCSQ\n", annotated_vcf] result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode != 0 or not result.stdout.strip(): # Fall back: read without BCSQ (use "." as consequence) cmd = ["bcftools", "query", "-r", chrom, "-f", "%CHROM\t%POS\t%REF\t%ALT\t.\n", annotated_vcf] result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode != 0 or not result.stdout.strip(): print(f" {chrom}: no variants", file=sys.stderr) return None ann_lines = result.stdout.strip().split("\n") print(f" {chrom}: {len(ann_lines):,} variants, " f"loading frequencies...", file=sys.stderr) # Load pre-extracted frequency data freq_data = {} for db_key in databases: freq_data[db_key] = load_extracted(db_key, chrom, extract_dir) affected_path = os.path.join(output_dir, f"{chrom}.affected.bed") background_path = os.path.join(output_dir, f"{chrom}.background.bed") single_path = os.path.join(output_dir, f"{chrom}.bed") n_aff = 0 n_bg = 0 n_single = 0 # Length extremes, used to emit data-driven length filter ranges. stats = {"max_ref_len": 1, "max_alt_len": 1, "min_var_len": 0, "max_var_len": 0} out_aff = open(affected_path, "w") if split else None out_bg = open(background_path, "w") if split else None out_single = None if split else open(single_path, "w") try: for line in ann_lines: parts = line.split("\t") if len(parts) < 5: continue chrom_name, pos, ref, alt, bcsq = parts[0:5] key = f"{pos}:{ref}:{alt}" consequence, gene, transcript, aa_change, dna_change = \ parse_bcsq(bcsq) r, g, b = get_color(bcsq) pos_int = int(pos) start = pos_int - 1 end = start + len(ref) ref_d = ref[:17] + "..." if len(ref) > 20 else ref alt_d = alt[:17] + "..." if len(alt) > 20 else alt name = f"{ref_d}>{alt_d}" var_type = get_vartype(ref, alt) - # Affected/case summary (affected arms + role=affected whole cohorts) - affected_af = 0.0 + # Pooled affected/case summary: sum AC, sum AN, AF = AC/AN. + # Switched from max-across-cohorts (which was dominated by tiny + # cohorts like GA4K when they reported high local AF) to a + # population-weighted ratio so the AF matches the AC scale. affected_ac = 0 + affected_an = 0 affected_cohorts = [] # Background summary = population cohorts + unaffected/control/unknown - # arms of disease cohorts ("all other variants"). - background_af = 0.0 + # arms of disease cohorts ("all other variants"), same pooling. background_ac = 0 + background_an = 0 background_sources = [] - db_ac_af = [] # per-database AC, AF - pop_ac_af = [] # per-population AC, AF (written AFTER all db fields) + db_ac_af = [] # per-database AC, AF (raw, for output columns) + pop_ac_af = [] # per-population AC, AF (raw, for output columns) for db_key, db_info in databases.items(): values = freq_data.get(db_key, {}).get(key, []) ac = values[0] if len(values) > 0 and values[0] not in \ (".", "") else "" af = values[1] if len(values) > 1 and values[1] not in \ (".", "") else "" is_disease_db = db_info.get("is_disease", 0) disease_role = db_info.get("disease_role", "") + default_an = db_info.get("default_an", 0) af_val = None if af: try: af_val = float(af) except ValueError: af_val = None ac_val = None if ac: try: ac_val = int(ac) except ValueError: ac_val = None - # Track this cohort's contribution to each group so the - # affectedCohorts / backgroundSources lists are accurate. + ac_add, an_add = _pool_arm(ac_val, af_val, default_an) + cohort_observes = (ac_val is not None) or (af_val is not None) + + # Track this cohort's appearance in each group's source list. + # A cohort that observes the variant but lacks a usable AN + # (e.g. MGRB ships AC only, GREGoR per-arm ships AC only) is + # still listed but contributes 0 to the pool. Future work: + # add default_an entries for these cohorts/arms. hits_affected = False hits_background = False if is_disease_db: - # Unified AC/AF slot: only meaningful when the whole cohort - # has a known role (e.g. GA4K = affected). Otherwise the - # per-arm populations below carry the phenotype signal. if disease_role == "affected": - if af_val is not None: - affected_af = max(affected_af, af_val) - hits_affected = True - if ac_val is not None: - affected_ac += ac_val + affected_ac += ac_add + affected_an += an_add + if cohort_observes: hits_affected = True elif disease_role == "unaffected": - if af_val is not None: - background_af = max(background_af, af_val) - hits_background = True - if ac_val is not None: - background_ac += ac_val + background_ac += ac_add + background_an += an_add + if cohort_observes: hits_background = True else: - # Population cohort feeds the background summary. - if af_val is not None: - background_af = max(background_af, af_val) - hits_background = True - if ac_val is not None: - background_ac += ac_val + background_ac += ac_add + background_an += an_add + if cohort_observes: hits_background = True db_ac_af.extend([ac, af]) for i, pop in enumerate(db_info["pops"]): idx = 2 + i * 2 pop_ac = values[idx] if len(values) > idx and \ values[idx] not in (".", "") else "" pop_af = values[idx + 1] if len(values) > idx + 1 and \ values[idx + 1] not in (".", "") else "" pop_ac_af.extend([pop_ac, pop_af]) pop_af_val = None if pop_af: try: pop_af_val = float(pop_af) except ValueError: pop_af_val = None pop_ac_val = None if pop_ac: try: pop_ac_val = int(pop_ac) except ValueError: pop_ac_val = None + # Per-arm default_an would let GREGoR per-arm rows pool + # cleanly; for now they fall through with default 0. + pop_default_an = pop.get("default_an", 0) + pop_ac_add, pop_an_add = _pool_arm( + pop_ac_val, pop_af_val, pop_default_an) + pop_observes = (pop_ac_val is not None) or \ + (pop_af_val is not None) pheno = pop.get("phenotype", "") if is_disease_db and pheno == "affected": - if pop_af_val is not None: - affected_af = max(affected_af, pop_af_val) - hits_affected = True - if pop_ac_val is not None: - affected_ac += pop_ac_val + affected_ac += pop_ac_add + affected_an += pop_an_add + if pop_observes: hits_affected = True elif is_disease_db and pheno in ("unaffected", "unknown"): # Unaffected relatives, controls, and unknown-phenotype - # individuals are all "not clearly affected". - if pop_af_val is not None: - background_af = max(background_af, pop_af_val) - hits_background = True - if pop_ac_val is not None: - background_ac += pop_ac_val + # individuals all feed the background. + background_ac += pop_ac_add + background_an += pop_an_add + if pop_observes: hits_background = True elif not is_disease_db: - # Ancestry population of a population cohort. - if pop_af_val is not None: - background_af = max(background_af, pop_af_val) - hits_background = True - if pop_ac_val is not None: + # Ancestry breakdown of a population cohort. The + # unified row above already pooled the cohort if it + # had AC+AF, so we deliberately don't double-count + # the per-pop AC/AN here. Per-pop AC and AF still + # write to their own bigBed columns above. + if pop_observes: hits_background = True if hits_affected: affected_cohorts.append(db_key) if hits_background: background_sources.append(db_key) + # Compute pooled allele frequencies. + affected_af = (affected_ac / affected_an) if affected_an > 0 else 0.0 + background_af = (background_ac / background_an) \ + if background_an > 0 else 0.0 + in_affected = 1 if (affected_ac > 0 or affected_af > 0) else 0 # Track length extremes for data-driven length filter ranges. ref_len = len(ref) alt_len = len(alt) var_len = alt_len - ref_len if ref_len > stats["max_ref_len"]: stats["max_ref_len"] = ref_len if alt_len > stats["max_alt_len"]: stats["max_alt_len"] = alt_len if var_len < stats["min_var_len"]: stats["min_var_len"] = var_len if var_len > stats["max_var_len"]: stats["max_var_len"] = var_len # Shared columns (score at index 4 is filled in per output below). base = [ chrom_name, str(start), str(end), name, "0", "+", str(start), str(end), f"{r},{g},{b}", ref, alt, str(ref_len), str(alt_len), str(var_len), var_type, normalize_consequence(consequence), gene, transcript, aa_change, dna_change, f"{affected_af:.6f}" if affected_af > 0 else "", str(affected_ac) if affected_ac > 0 else "", + str(affected_an) if affected_an > 0 else "", ",".join(affected_cohorts), f"{background_af:.6f}" if background_af > 0 else "", str(background_ac) if background_ac > 0 else "", + str(background_an) if background_an > 0 else "", ",".join(background_sources), str(in_affected), ] # Database AC/AF first, then population AC/AF — must match autoSql order base.extend(db_ac_af) base.extend(pop_ac_af) has_affected = affected_af > 0 or affected_ac > 0 has_background = background_af > 0 or background_ac > 0 if split: if has_affected: row = list(base) row[4] = str(min(1000, int(affected_af * 1000))) out_aff.write("\t".join(row) + "\n") n_aff += 1 if has_background: row = list(base) row[4] = str(min(1000, int(background_af * 1000))) out_bg.write("\t".join(row) + "\n") n_bg += 1 else: if has_affected or has_background: row = list(base) row[4] = str(min(1000, int( max(affected_af, background_af) * 1000))) out_single.write("\t".join(row) + "\n") n_single += 1 finally: for fh in (out_aff, out_bg, out_single): if fh is not None: fh.close() if split: print(f" {chrom}: wrote {n_aff:,} affected + {n_bg:,} background " f"BED lines", file=sys.stderr) return (affected_path, background_path, stats) print(f" {chrom}: wrote {n_single:,} BED lines", file=sys.stderr) return (single_path, None, stats) # ============================================================================ # AutoSql Generation # ============================================================================ def generate_autosql(output_path, databases, table_name="varFreqsAll"): """Generate autoSql file from database config.""" with open(output_path, "w") as f: f.write(f'table {table_name}\n') f.write('"Variant frequencies across population databases"\n') f.write('(\n') # BED9 f.write(' string chrom; "Chromosome"\n') f.write(' uint chromStart; "Start position (0-based)"\n') f.write(' uint chromEnd; "End position"\n') f.write(' string name; "Variant (REF>ALT)"\n') f.write(' uint score; "Score (allele frequency * 1000)"\n') f.write(' char[1] strand; "Strand"\n') f.write(' uint thickStart; "Thick start"\n') f.write(' uint thickEnd; "Thick end"\n') f.write(' uint reserved; "Color by consequence"\n') # Variant info f.write(' lstring ref; "Reference allele"\n') f.write(' lstring alt; "Alternate allele"\n') f.write(' int refLen; "Reference length"\n') f.write(' int altLen; "Alternate length"\n') f.write(' int varLen; "Length change (alt-ref)"\n') f.write(' string varType; "Type (SNV/INS/DEL/MNV)"\n') # Consequence f.write(' string consequence; "Consequence"\n') f.write(' string gene; "Gene"\n') f.write(' string transcript; "Transcript"\n') f.write(' lstring aaChange; "AA change"\n') f.write(' lstring dnaChange; "DNA change"\n') - # Frequency summaries (shared by the affected and background tracks) - f.write(' string affectedAF; "Max allele frequency in affected/case individuals"\n') + # Frequency summaries (shared by the affected and background tracks). + # AF is pooled across contributing arms (sum AC / sum AN), not the + # max across arms, so the AF matches the AC and AN scale. + f.write(' string affectedAF; "Pooled allele frequency in affected/case individuals (sum AC / sum AN)"\n') f.write(' string affectedAC; "Summed allele count in affected/case individuals"\n') + f.write(' string affectedAN; "Summed allele number in affected/case individuals (pool denominator)"\n') f.write(' string affectedCohorts; "Disease cohorts contributing affected/case carriers"\n') - f.write(' string backgroundAF; "Max allele frequency in population cohorts + unaffected/control individuals"\n') + f.write(' string backgroundAF; "Pooled allele frequency in population cohorts + unaffected/control individuals (sum AC / sum AN)"\n') f.write(' string backgroundAC; "Summed allele count in population cohorts + unaffected/control individuals"\n') + f.write(' string backgroundAN; "Summed allele number in population cohorts + unaffected/control individuals (pool denominator)"\n') f.write(' string backgroundSources; "Cohorts contributing to the background (population + unaffected)"\n') f.write(' uint inAffected; "1 if seen in an affected/case arm, else 0"\n') # Per-database AC/AF for db_key, db_info in databases.items(): f.write(f' string {db_key}AC;' f' "{db_info["name"]} AC"\n') f.write(f' string {db_key}AF;' f' "{db_info["name"]} AF"\n') # Per-population AC/AF for db_key, db_info in databases.items(): for pop in db_info["pops"]: f.write(f' string {db_key}AC_{pop["key"]};' f' "{db_info["name"]} {pop["name"]} AC"\n') f.write(f' string {db_key}AF_{pop["key"]};' f' "{db_info["name"]} {pop["name"]} AF"\n') f.write(')\n') print(f"Wrote autoSql: {output_path}", file=sys.stderr) def generate_trackdb_fragment(output_path, databases, track_name="varFreqsAll", len_stats=None): """Generate trackDb .ra fragment for the combined track filters.""" if len_stats is None: len_stats = {"max_ref_len": 1000, "max_alt_len": 1000, "min_var_len": -1000, "max_var_len": 1000} with open(output_path, "w") as f: f.write(f"# Auto-generated trackDb fragment for {track_name} filters\n") f.write(f"# Paste this into the {track_name} track stanza in varFreqs.ra\n\n") # Cohort filters. affectedCohorts = disease cohorts that can contribute # an affected/case carrier; backgroundSources = population cohorts plus # disease cohorts with an unaffected/control/unknown arm. affected_parts = [] background_parts = [] for db_key, db_info in databases.items(): entry = f"{db_key}|{db_info['name']}" is_dis = db_info.get("is_disease", 0) phenos = {p.get("phenotype", "") for p in db_info["pops"]} if is_dis and (db_info.get("disease_role", "") == "affected" or "affected" in phenos): affected_parts.append(entry) if (not is_dis) or (phenos & {"unaffected", "unknown"}): background_parts.append(entry) if affected_parts: f.write(" filterValues.affectedCohorts " + ",".join(affected_parts) + "\n") f.write(" filterType.affectedCohorts multipleListOr\n") f.write(" filterLabel.affectedCohorts Affected/case cohort\n") if background_parts: f.write(" filterValues.backgroundSources " + ",".join(background_parts) + "\n") f.write(" filterType.backgroundSources multipleListOr\n") f.write(" filterLabel.backgroundSources Background source (population or unaffected)\n") # Variant type and consequence (static) f.write(" # Variant type and consequence filters\n") f.write(" filterValues.varType " "SNV|SNV,INS|Insertion,DEL|Deletion,MNV|MNV\n") f.write(" filterLabel.varType Variant Type\n") f.write(" filterValues.consequence " "missense|Missense,synonymous|Synonymous," "stop_gained|Stop Gained,frameshift|Frameshift," "splice_donor|Splice Donor,splice_acceptor|Splice Acceptor," "intron|Intron,3_prime_utr|3' UTR,5_prime_utr|5' UTR," "non_coding|Non-coding,.|Intergenic,others|Other\n") f.write(" filterType.consequence multipleListOr\n") f.write(" filterLabel.consequence Consequence\n") # Length filters. A numeric range filter needs filter.<fld> min:max (the # default/limits range) or the slider does not render; ranges are derived # from the observed data so nothing is hidden by default. f.write(" # Length filters\n") len_ranges = { "refLen": ("Reference Length", 1, len_stats["max_ref_len"]), "altLen": ("Alternate Length", 1, len_stats["max_alt_len"]), "varLen": ("Length Change", len_stats["min_var_len"], len_stats["max_var_len"]), } for fld, (label, lo, hi) in len_ranges.items(): f.write(f" filterByRange.{fld} on\n") f.write(f" filterLabel.{fld} {label}\n") f.write(f" filter.{fld} {lo}:{hi}\n") f.write(f" filterLimits.{fld} {lo}:{hi}\n") # Affected and background frequency summaries (both tracks carry both, # so e.g. the Affected track can be filtered to variants rare in the # background). String range filters mirror the per-database AF/AC fields. f.write(" # Affected/case frequency summary\n") f.write(" filterByRange.affectedAF on\n") - f.write(" filterLabel.affectedAF Affected/case AF\n") + f.write(" filterLabel.affectedAF Affected/case AF (pooled)\n") f.write(" filterLimits.affectedAF 0:1\n") f.write(" filterByRange.affectedAC on\n") f.write(" filterLabel.affectedAC Affected/case AC\n") + f.write(" filterByRange.affectedAN on\n") + f.write(" filterLabel.affectedAN Affected/case AN (pool denominator)\n") f.write(" # Background (population + unaffected) frequency summary\n") f.write(" filterByRange.backgroundAF on\n") - f.write(" filterLabel.backgroundAF Background AF (population + unaffected)\n") + f.write(" filterLabel.backgroundAF Background AF (pooled)\n") f.write(" filterLimits.backgroundAF 0:1\n") f.write(" filterByRange.backgroundAC on\n") f.write(" filterLabel.backgroundAC Background AC (population + unaffected)\n") + f.write(" filterByRange.backgroundAN on\n") + f.write(" filterLabel.backgroundAN Background AN (pool denominator)\n") f.write(" # Affected/case membership flag\n") f.write(" filterByRange.inAffected on\n") f.write(" filterLabel.inAffected Seen in an affected/case arm (1=yes, 0=no)\n") f.write(" filter.inAffected 0:1\n") f.write(" filterLimits.inAffected 0:1\n") # Per-database AF and AC filters f.write(" # Per-database AF filters\n") for db_key, db_info in databases.items(): f.write(f" filterByRange.{db_key}AF on\n") f.write(f" filterLabel.{db_key}AF " f"{db_info['name']} AF\n") f.write(" # Per-database AC filters\n") for db_key, db_info in databases.items(): f.write(f" filterByRange.{db_key}AC on\n") f.write(f" filterLabel.{db_key}AC " f"{db_info['name']} AC\n") # Population-specific filters f.write(" # Population-specific AF filters\n") for db_key, db_info in databases.items(): if not db_info["pops"]: continue f.write(f" # {db_info['name']} populations\n") for pop in db_info["pops"]: f.write(f" filterByRange.{db_key}AF_{pop['key']} on\n") f.write(f" filterLabel.{db_key}AF_{pop['key']} " f"{db_info['name']} {pop['name']} AF\n") for pop in db_info["pops"]: f.write(f" filterByRange.{db_key}AC_{pop['key']} on\n") f.write(f" filterLabel.{db_key}AC_{pop['key']} " f"{db_info['name']} {pop['name']} AC\n") f.write(" skipEmptyFields on\n") print(f"Wrote trackDb fragment: {output_path}", file=sys.stderr) # ============================================================================ # Main # ============================================================================ def main(): parser = argparse.ArgumentParser( description="Convert annotated VCF to bigBed (two-phase parallel)") parser.add_argument("--annotated-vcf", default="merged.annotated.vcf.gz") parser.add_argument("--output-prefix", default="varFreqs") parser.add_argument("--chrom-sizes", default="/hive/data/genomes/hg38/chrom.sizes") parser.add_argument("--threads", type=int, default=8) parser.add_argument("--work-dir", default=".") parser.add_argument("--region", default=None, help="Region to process, e.g. chrM or chr22") parser.add_argument("--scripts-dir", default=SCRIPTS_DIR) parser.add_argument("--databases-file", default="databases.tsv", help="DB config TSV (name under scripts-dir or abs path)") parser.add_argument("--populations-file", default="populations.tsv", help="Population config TSV (name under scripts-dir or abs path)") parser.add_argument("--keep-temp", action="store_true") parser.add_argument("--split-affected", action="store_true", help="Emit two tracks: <prefix>Affected and " "<prefix>Background (affected/case vs population+" "unaffected). Without it, a single <prefix> track.") args = parser.parse_args() databases = load_config(args.scripts_dir, args.databases_file, args.populations_file) print(f"Loaded {len(databases)} databases:", file=sys.stderr) for key, info in databases.items(): pops = ", ".join(p["key"] for p in info["pops"]) extra = f" [{pops}]" if pops else "" exists = "OK" if os.path.exists(info["vcf"]) else "MISSING" print(f" {key}: {info['name']}{extra} ({exists})", file=sys.stderr) if args.region: chromosomes = [args.region.split(":")[0]] else: chromosomes = ALL_CHROMOSOMES extract_dir = os.path.join(args.work_dir, "extracted") bed_dir = os.path.join(args.work_dir, "beds") os.makedirs(extract_dir, exist_ok=True) os.makedirs(bed_dir, exist_ok=True) # Output prefixes: one per track. Split mode yields an Affected and a # Background track that share a single autoSql schema. if args.split_affected: prefixes = [f"{args.output_prefix}Affected", f"{args.output_prefix}Background"] else: prefixes = [args.output_prefix] # Step 1: AutoSql per track (same columns; table name = output prefix). for prefix in prefixes: generate_autosql(os.path.join(args.work_dir, f"{prefix}.as"), databases, table_name=prefix) # Step 2: Phase 1 — pre-extract pre_extract_all(databases, args.region, extract_dir, args.threads) # Step 3: Phase 2 — process chromosomes print(f"\n=== Phase 2: Processing {len(chromosomes)} chromosome(s) ===", file=sys.stderr) task_args = [(chrom, args.annotated_vcf, databases, extract_dir, bed_dir, args.split_affected) for chrom in chromosomes] with Pool(min(args.threads, len(chromosomes))) as pool: chrom_results = pool.map(process_chromosome, task_args) chrom_results = [r for r in chrom_results if r is not None] if not chrom_results: print("ERROR: No BED output produced", file=sys.stderr) sys.exit(1) # Aggregate length extremes across chromosomes for the length filter ranges, # then write the trackDb fragment (after processing, so ranges are real). len_stats = {"max_ref_len": 1, "max_alt_len": 1, "min_var_len": 0, "max_var_len": 0} for r in chrom_results: st = r[2] len_stats["max_ref_len"] = max(len_stats["max_ref_len"], st["max_ref_len"]) len_stats["max_alt_len"] = max(len_stats["max_alt_len"], st["max_alt_len"]) len_stats["min_var_len"] = min(len_stats["min_var_len"], st["min_var_len"]) len_stats["max_var_len"] = max(len_stats["max_var_len"], st["max_var_len"]) ra_path = os.path.join(args.work_dir, f"{args.output_prefix}.trackDb.ra") generate_trackdb_fragment(ra_path, databases, track_name=args.output_prefix, len_stats=len_stats) # Steps 4-5: build one bigBed per track from its per-chromosome BEDs. # results column 0 = affected/single BED, column 1 = background BED (or None) bed_sets = [[r[0] for r in chrom_results]] if args.split_affected: bed_sets.append([r[1] for r in chrom_results]) for prefix, chrom_beds in zip(prefixes, bed_sets): as_path = os.path.join(args.work_dir, f"{prefix}.as") bb_path = os.path.join(args.work_dir, f"{prefix}.bb") bed_path = os.path.join(args.work_dir, f"{prefix}.bed") print(f"\n=== {prefix}: concatenating {len(chrom_beds)} chrom files ===", file=sys.stderr) with open(bed_path, "wb") as out: for chrom_bed in chrom_beds: subprocess.run(["sort", "-k2,2n", chrom_bed], stdout=out, check=True) print(f"=== {prefix}: running bedToBigBed ===", file=sys.stderr) cmd = ["bedToBigBed", "-type=bed9+", f"-as={as_path}", "-tab", bed_path, args.chrom_sizes, bb_path] result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode != 0: print(f" Error: {result.stderr}", file=sys.stderr) sys.exit(1) if not args.keep_temp and os.path.exists(bed_path): os.remove(bed_path) bb_size = os.path.getsize(bb_path) / (1024**3) print(f"Done! {bb_path} ({bb_size:.1f} GB)", file=sys.stderr) # Cleanup if not args.keep_temp: shutil.rmtree(extract_dir, ignore_errors=True) shutil.rmtree(bed_dir, ignore_errors=True) if __name__ == "__main__": main()