64a3f9e7813e823cf724ea188c3928a911578286 max Thu Jun 4 00:32:22 2026 -0700 varFreqs: replace All Databases Combined with two phenotype-split tracks Replace the single varFreqsAll combined track (and drop the varFreqsDisease track) with two matched tracks for visual case-vs-background comparison: varFreqsAffected - variants seen in the affected/case arms of disease cohorts (SFARI SPARK WES/WGS ASD probands, SCHEMA cases, GREGoR affected, GA4K); ~130,000 individuals varFreqsBackground - population reference cohorts + the unaffected/control arms of disease cohorts ("all other variants"); ~1.5 million individuals A variant seen in both groups appears in both tracks. Genotyping-array cohorts stay out of both (varFreqsArray unchanged). vcfToBigBed.py gains --split-affected to emit both tracks in one pass; it reads phenotype tags (affected/unaffected/unknown) from populations.tsv and is_disease/disease_role from databases.tsv, and derives the length-filter ranges from the observed data. TOPMed reclassified as a population cohort. SPARK WGS display name changed to SFARI SPARK WGS for consistency with the standalone subtracks. Fixed the trackDb mouseOver $-substitution prefix collision by wrapping fields in ${}. New description pages for both tracks. refs #36642 diff --git src/hg/makeDb/scripts/varFreqs/vcfToBigBed.py src/hg/makeDb/scripts/varFreqs/vcfToBigBed.py index cdab8f57360..1cf9dbe9c73 100755 --- src/hg/makeDb/scripts/varFreqs/vcfToBigBed.py +++ src/hg/makeDb/scripts/varFreqs/vcfToBigBed.py @@ -137,51 +137,57 @@ db_path = db_file if os.path.isabs(db_file) \ else os.path.join(scripts_dir, db_file) with open(db_path) as f: for line in f: line = line.strip() if not line or line.startswith('#'): continue parts = line.split('\t') if len(parts) < 5: print(f"WARNING: skipping malformed line: {line}", file=sys.stderr) continue key, name, vcf, ac_field, af_field = ( parts[0], parts[1], parts[2], parts[3], parts[4]) + is_disease = int(parts[5]) if len(parts) > 5 else 0 + disease_role = parts[6].strip() if len(parts) > 6 else "" databases[key] = { "name": name, "vcf": vcf, "ac_field": ac_field, "af_field": af_field, + "is_disease": is_disease, + "disease_role": disease_role, "pops": [], } pop_path = pop_file if os.path.isabs(pop_file) \ else os.path.join(scripts_dir, pop_file) with open(pop_path) as f: for line in f: line = line.strip() if not line or line.startswith('#'): continue parts = line.split('\t') if len(parts) < 5: continue db_key = parts[0] + phenotype = parts[5].strip() if len(parts) > 5 else "" if db_key in databases: databases[db_key]["pops"].append({ "key": parts[1], "name": parts[2], "ac_field": parts[3], "af_field": parts[4], + "phenotype": phenotype, }) else: print(f"WARNING: population references unknown db '{db_key}'", file=sys.stderr) return databases # ============================================================================ # Phase 1: Pre-extract frequency data (parallel by database) # ============================================================================ def pre_extract_one(args): """Extract frequency data from one VCF, split output by chromosome.""" db_key, db_info, region, extract_dir = args @@ -271,268 +277,437 @@ path = os.path.join(extract_dir, db_key, f"{chrom}.tsv") data = {} if not os.path.exists(path): return data with open(path) as f: for line in f: parts = line.rstrip('\n').split('\t') if len(parts) < 3: continue key = f"{parts[0]}:{parts[1]}:{parts[2]}" data[key] = parts[3:] return data def process_chromosome(args): - """Phase 2: Build BED for one chromosome from annotated VCF + extracted data.""" - chrom, annotated_vcf, databases, extract_dir, output_dir = args + """Phase 2: Build the affected and background BEDs for one chromosome from + the annotated VCF + pre-extracted per-cohort data. + + Two output rows are possible per variant, sharing one schema: + - affected BED: variant seen in any affected/case arm of a disease cohort + - background BED: variant seen in a population cohort or an unaffected/ + control/unknown arm ("all other variants") + A variant present in both groups is written to both (overlap is intended, so + the case-vs-background comparison works). + + With split=False a single BED is written instead (one row per variant, + score = max of the two summaries); used for tracks with no disease cohorts + such as the genotyping-array combined track.""" + chrom, annotated_vcf, databases, extract_dir, output_dir, split = args # Read annotated variants (try BCSQ first, fall back to plain variants) cmd = ["bcftools", "query", "-r", chrom, "-f", "%CHROM\t%POS\t%REF\t%ALT\t%INFO/BCSQ\n", annotated_vcf] result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode != 0 or not result.stdout.strip(): # Fall back: read without BCSQ (use "." as consequence) cmd = ["bcftools", "query", "-r", chrom, "-f", "%CHROM\t%POS\t%REF\t%ALT\t.\n", annotated_vcf] result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode != 0 or not result.stdout.strip(): print(f" {chrom}: no variants", file=sys.stderr) return None ann_lines = result.stdout.strip().split("\n") print(f" {chrom}: {len(ann_lines):,} variants, " f"loading frequencies...", file=sys.stderr) # Load pre-extracted frequency data freq_data = {} for db_key in databases: freq_data[db_key] = load_extracted(db_key, chrom, extract_dir) - output_path = os.path.join(output_dir, f"{chrom}.bed") - count = 0 - - with open(output_path, "w") as out: + affected_path = os.path.join(output_dir, f"{chrom}.affected.bed") + background_path = os.path.join(output_dir, f"{chrom}.background.bed") + single_path = os.path.join(output_dir, f"{chrom}.bed") + n_aff = 0 + n_bg = 0 + n_single = 0 + # Length extremes, used to emit data-driven length filter ranges. + stats = {"max_ref_len": 1, "max_alt_len": 1, + "min_var_len": 0, "max_var_len": 0} + + out_aff = open(affected_path, "w") if split else None + out_bg = open(background_path, "w") if split else None + out_single = None if split else open(single_path, "w") + try: for line in ann_lines: parts = line.split("\t") if len(parts) < 5: continue chrom_name, pos, ref, alt, bcsq = parts[0:5] key = f"{pos}:{ref}:{alt}" consequence, gene, transcript, aa_change, dna_change = \ parse_bcsq(bcsq) r, g, b = get_color(bcsq) pos_int = int(pos) start = pos_int - 1 end = start + len(ref) ref_d = ref[:17] + "..." if len(ref) > 20 else ref alt_d = alt[:17] + "..." if len(alt) > 20 else alt name = f"{ref_d}>{alt_d}" var_type = get_vartype(ref, alt) - max_af = 0.0 - sources = [] + # Affected/case summary (affected arms + role=affected whole cohorts) + affected_af = 0.0 + affected_ac = 0 + affected_cohorts = [] + # Background summary = population cohorts + unaffected/control/unknown + # arms of disease cohorts ("all other variants"). + background_af = 0.0 + background_ac = 0 + background_sources = [] db_ac_af = [] # per-database AC, AF pop_ac_af = [] # per-population AC, AF (written AFTER all db fields) - total_ac = 0 for db_key, db_info in databases.items(): values = freq_data.get(db_key, {}).get(key, []) ac = values[0] if len(values) > 0 and values[0] not in \ (".", "") else "" af = values[1] if len(values) > 1 and values[1] not in \ (".", "") else "" - # A source contributes if the unified AC/AF slot has data OR - # any per-population slot has data. AllOfUs ships only - # per-population fields ("." in the unified slot) so without - # this fallback its 67M+ variants get no source attribution. - has_data = bool(ac) or bool(af) - if not has_data: - for i in range(len(db_info["pops"])): - idx = 2 + i * 2 - pop_ac = values[idx] if len(values) > idx and \ - values[idx] not in (".", "") else "" - pop_af = values[idx + 1] if len(values) > idx + 1 \ - and values[idx + 1] not in (".", "") else "" - if pop_ac or pop_af: - has_data = True - break - - if has_data: - sources.append(db_key) + is_disease_db = db_info.get("is_disease", 0) + disease_role = db_info.get("disease_role", "") + af_val = None + if af: + try: + af_val = float(af) + except ValueError: + af_val = None + ac_val = None if ac: try: - total_ac += int(ac) + ac_val = int(ac) except ValueError: - pass + ac_val = None + + # Track this cohort's contribution to each group so the + # affectedCohorts / backgroundSources lists are accurate. + hits_affected = False + hits_background = False + + if is_disease_db: + # Unified AC/AF slot: only meaningful when the whole cohort + # has a known role (e.g. GA4K = affected). Otherwise the + # per-arm populations below carry the phenotype signal. + if disease_role == "affected": + if af_val is not None: + affected_af = max(affected_af, af_val) + hits_affected = True + if ac_val is not None: + affected_ac += ac_val + hits_affected = True + elif disease_role == "unaffected": + if af_val is not None: + background_af = max(background_af, af_val) + hits_background = True + if ac_val is not None: + background_ac += ac_val + hits_background = True + else: + # Population cohort feeds the background summary. + if af_val is not None: + background_af = max(background_af, af_val) + hits_background = True + if ac_val is not None: + background_ac += ac_val + hits_background = True db_ac_af.extend([ac, af]) - if af: - try: - max_af = max(max_af, float(af)) - except ValueError: - pass - for i, pop in enumerate(db_info["pops"]): idx = 2 + i * 2 pop_ac = values[idx] if len(values) > idx and \ values[idx] not in (".", "") else "" pop_af = values[idx + 1] if len(values) > idx + 1 and \ values[idx + 1] not in (".", "") else "" pop_ac_af.extend([pop_ac, pop_af]) + pop_af_val = None if pop_af: try: - max_af = max(max_af, float(pop_af)) + pop_af_val = float(pop_af) except ValueError: - pass - - score = min(1000, int(max_af * 1000)) - - fields = [ - chrom_name, str(start), str(end), name, str(score), "+", + pop_af_val = None + pop_ac_val = None + if pop_ac: + try: + pop_ac_val = int(pop_ac) + except ValueError: + pop_ac_val = None + + pheno = pop.get("phenotype", "") + if is_disease_db and pheno == "affected": + if pop_af_val is not None: + affected_af = max(affected_af, pop_af_val) + hits_affected = True + if pop_ac_val is not None: + affected_ac += pop_ac_val + hits_affected = True + elif is_disease_db and pheno in ("unaffected", "unknown"): + # Unaffected relatives, controls, and unknown-phenotype + # individuals are all "not clearly affected". + if pop_af_val is not None: + background_af = max(background_af, pop_af_val) + hits_background = True + if pop_ac_val is not None: + background_ac += pop_ac_val + hits_background = True + elif not is_disease_db: + # Ancestry population of a population cohort. + if pop_af_val is not None: + background_af = max(background_af, pop_af_val) + hits_background = True + if pop_ac_val is not None: + hits_background = True + + if hits_affected: + affected_cohorts.append(db_key) + if hits_background: + background_sources.append(db_key) + + in_affected = 1 if (affected_ac > 0 or affected_af > 0) else 0 + + # Track length extremes for data-driven length filter ranges. + ref_len = len(ref) + alt_len = len(alt) + var_len = alt_len - ref_len + if ref_len > stats["max_ref_len"]: + stats["max_ref_len"] = ref_len + if alt_len > stats["max_alt_len"]: + stats["max_alt_len"] = alt_len + if var_len < stats["min_var_len"]: + stats["min_var_len"] = var_len + if var_len > stats["max_var_len"]: + stats["max_var_len"] = var_len + + # Shared columns (score at index 4 is filled in per output below). + base = [ + chrom_name, str(start), str(end), name, "0", "+", str(start), str(end), f"{r},{g},{b}", - ref, alt, str(len(ref)), str(len(alt)), - str(len(alt) - len(ref)), var_type, + ref, alt, str(ref_len), str(alt_len), + str(var_len), var_type, normalize_consequence(consequence), gene, transcript, aa_change, dna_change, - f"{max_af:.6f}" if max_af > 0 else "0", - str(total_ac) if total_ac > 0 else "0", - ",".join(sources), + f"{affected_af:.6f}" if affected_af > 0 else "", + str(affected_ac) if affected_ac > 0 else "", + ",".join(affected_cohorts), + f"{background_af:.6f}" if background_af > 0 else "", + str(background_ac) if background_ac > 0 else "", + ",".join(background_sources), + str(in_affected), ] # Database AC/AF first, then population AC/AF — must match autoSql order - fields.extend(db_ac_af) - fields.extend(pop_ac_af) - - out.write("\t".join(fields) + "\n") - count += 1 + base.extend(db_ac_af) + base.extend(pop_ac_af) + + has_affected = affected_af > 0 or affected_ac > 0 + has_background = background_af > 0 or background_ac > 0 + + if split: + if has_affected: + row = list(base) + row[4] = str(min(1000, int(affected_af * 1000))) + out_aff.write("\t".join(row) + "\n") + n_aff += 1 + if has_background: + row = list(base) + row[4] = str(min(1000, int(background_af * 1000))) + out_bg.write("\t".join(row) + "\n") + n_bg += 1 + else: + if has_affected or has_background: + row = list(base) + row[4] = str(min(1000, int( + max(affected_af, background_af) * 1000))) + out_single.write("\t".join(row) + "\n") + n_single += 1 + finally: + for fh in (out_aff, out_bg, out_single): + if fh is not None: + fh.close() - print(f" {chrom}: wrote {count:,} BED lines", file=sys.stderr) - return output_path + if split: + print(f" {chrom}: wrote {n_aff:,} affected + {n_bg:,} background " + f"BED lines", file=sys.stderr) + return (affected_path, background_path, stats) + print(f" {chrom}: wrote {n_single:,} BED lines", file=sys.stderr) + return (single_path, None, stats) # ============================================================================ # AutoSql Generation # ============================================================================ def generate_autosql(output_path, databases, table_name="varFreqsAll"): """Generate autoSql file from database config.""" with open(output_path, "w") as f: f.write(f'table {table_name}\n') f.write('"Variant frequencies across population databases"\n') f.write('(\n') # BED9 f.write(' string chrom; "Chromosome"\n') f.write(' uint chromStart; "Start position (0-based)"\n') f.write(' uint chromEnd; "End position"\n') f.write(' string name; "Variant (REF>ALT)"\n') - f.write(' uint score; "Score (maxAF * 1000)"\n') + f.write(' uint score; "Score (allele frequency * 1000)"\n') f.write(' char[1] strand; "Strand"\n') f.write(' uint thickStart; "Thick start"\n') f.write(' uint thickEnd; "Thick end"\n') f.write(' uint reserved; "Color by consequence"\n') # Variant info f.write(' lstring ref; "Reference allele"\n') f.write(' lstring alt; "Alternate allele"\n') f.write(' int refLen; "Reference length"\n') f.write(' int altLen; "Alternate length"\n') f.write(' int varLen; "Length change (alt-ref)"\n') f.write(' string varType; "Type (SNV/INS/DEL/MNV)"\n') # Consequence f.write(' string consequence; "Consequence"\n') f.write(' string gene; "Gene"\n') f.write(' string transcript; "Transcript"\n') f.write(' lstring aaChange; "AA change"\n') f.write(' lstring dnaChange; "DNA change"\n') - # Frequencies - f.write(' float maxAF; "Max allele frequency"\n') - f.write(' uint totalAC; "Total allele count across all databases"\n') - f.write(' string sources; "Source databases"\n') + # Frequency summaries (shared by the affected and background tracks) + f.write(' string affectedAF; "Max allele frequency in affected/case individuals"\n') + f.write(' string affectedAC; "Summed allele count in affected/case individuals"\n') + f.write(' string affectedCohorts; "Disease cohorts contributing affected/case carriers"\n') + f.write(' string backgroundAF; "Max allele frequency in population cohorts + unaffected/control individuals"\n') + f.write(' string backgroundAC; "Summed allele count in population cohorts + unaffected/control individuals"\n') + f.write(' string backgroundSources; "Cohorts contributing to the background (population + unaffected)"\n') + f.write(' uint inAffected; "1 if seen in an affected/case arm, else 0"\n') # Per-database AC/AF for db_key, db_info in databases.items(): f.write(f' string {db_key}AC;' f' "{db_info["name"]} AC"\n') f.write(f' string {db_key}AF;' f' "{db_info["name"]} AF"\n') # Per-population AC/AF for db_key, db_info in databases.items(): for pop in db_info["pops"]: f.write(f' string {db_key}AC_{pop["key"]};' f' "{db_info["name"]} {pop["name"]} AC"\n') f.write(f' string {db_key}AF_{pop["key"]};' f' "{db_info["name"]} {pop["name"]} AF"\n') f.write(')\n') print(f"Wrote autoSql: {output_path}", file=sys.stderr) -def generate_trackdb_fragment(output_path, databases, track_name="varFreqsAll"): +def generate_trackdb_fragment(output_path, databases, track_name="varFreqsAll", + len_stats=None): """Generate trackDb .ra fragment for the combined track filters.""" + if len_stats is None: + len_stats = {"max_ref_len": 1000, "max_alt_len": 1000, + "min_var_len": -1000, "max_var_len": 1000} with open(output_path, "w") as f: f.write(f"# Auto-generated trackDb fragment for {track_name} filters\n") f.write(f"# Paste this into the {track_name} track stanza in varFreqs.ra\n\n") - # Source database filter - src_parts = [] + # Cohort filters. affectedCohorts = disease cohorts that can contribute + # an affected/case carrier; backgroundSources = population cohorts plus + # disease cohorts with an unaffected/control/unknown arm. + affected_parts = [] + background_parts = [] for db_key, db_info in databases.items(): - src_parts.append(f"{db_key}|{db_info['name']}") - f.write(" filterValues.sources " - + ",".join(src_parts) + "\n") - f.write(" filterType.sources multipleListOr\n") - f.write(" filterLabel.sources Source Database\n") + entry = f"{db_key}|{db_info['name']}" + is_dis = db_info.get("is_disease", 0) + phenos = {p.get("phenotype", "") for p in db_info["pops"]} + if is_dis and (db_info.get("disease_role", "") == "affected" + or "affected" in phenos): + affected_parts.append(entry) + if (not is_dis) or (phenos & {"unaffected", "unknown"}): + background_parts.append(entry) + if affected_parts: + f.write(" filterValues.affectedCohorts " + + ",".join(affected_parts) + "\n") + f.write(" filterType.affectedCohorts multipleListOr\n") + f.write(" filterLabel.affectedCohorts Affected/case cohort\n") + if background_parts: + f.write(" filterValues.backgroundSources " + + ",".join(background_parts) + "\n") + f.write(" filterType.backgroundSources multipleListOr\n") + f.write(" filterLabel.backgroundSources Background source (population or unaffected)\n") # Variant type and consequence (static) f.write(" # Variant type and consequence filters\n") f.write(" filterValues.varType " "SNV|SNV,INS|Insertion,DEL|Deletion,MNV|MNV\n") f.write(" filterLabel.varType Variant Type\n") f.write(" filterValues.consequence " "missense|Missense,synonymous|Synonymous," "stop_gained|Stop Gained,frameshift|Frameshift," "splice_donor|Splice Donor,splice_acceptor|Splice Acceptor," "intron|Intron,3_prime_utr|3' UTR,5_prime_utr|5' UTR," "non_coding|Non-coding,.|Intergenic,others|Other\n") f.write(" filterType.consequence multipleListOr\n") f.write(" filterLabel.consequence Consequence\n") - # Length filters + # Length filters. A numeric range filter needs filter. min:max (the + # default/limits range) or the slider does not render; ranges are derived + # from the observed data so nothing is hidden by default. f.write(" # Length filters\n") - for fld in ["refLen", "altLen", "varLen"]: - label = {"refLen": "Reference Length", "altLen": "Alternate Length", - "varLen": "Length Change"}[fld] + len_ranges = { + "refLen": ("Reference Length", 1, len_stats["max_ref_len"]), + "altLen": ("Alternate Length", 1, len_stats["max_alt_len"]), + "varLen": ("Length Change", len_stats["min_var_len"], + len_stats["max_var_len"]), + } + for fld, (label, lo, hi) in len_ranges.items(): f.write(f" filterByRange.{fld} on\n") f.write(f" filterLabel.{fld} {label}\n") - - # Max AF filter - f.write(" # Max AF filter\n") - f.write(" filterByRange.maxAF on\n") - f.write(" filterLabel.maxAF Max Allele Frequency\n") - f.write(" filterLimits.maxAF 0:1\n") - - # Total AC filter - f.write(" # Total AC filter\n") - f.write(" filterByRange.totalAC on\n") - f.write(" filterLabel.totalAC Total Allele Count (all databases)\n") + f.write(f" filter.{fld} {lo}:{hi}\n") + f.write(f" filterLimits.{fld} {lo}:{hi}\n") + + # Affected and background frequency summaries (both tracks carry both, + # so e.g. the Affected track can be filtered to variants rare in the + # background). String range filters mirror the per-database AF/AC fields. + f.write(" # Affected/case frequency summary\n") + f.write(" filterByRange.affectedAF on\n") + f.write(" filterLabel.affectedAF Affected/case AF\n") + f.write(" filterLimits.affectedAF 0:1\n") + f.write(" filterByRange.affectedAC on\n") + f.write(" filterLabel.affectedAC Affected/case AC\n") + f.write(" # Background (population + unaffected) frequency summary\n") + f.write(" filterByRange.backgroundAF on\n") + f.write(" filterLabel.backgroundAF Background AF (population + unaffected)\n") + f.write(" filterLimits.backgroundAF 0:1\n") + f.write(" filterByRange.backgroundAC on\n") + f.write(" filterLabel.backgroundAC Background AC (population + unaffected)\n") + f.write(" # Affected/case membership flag\n") + f.write(" filterByRange.inAffected on\n") + f.write(" filterLabel.inAffected Seen in an affected/case arm (1=yes, 0=no)\n") + f.write(" filter.inAffected 0:1\n") + f.write(" filterLimits.inAffected 0:1\n") # Per-database AF and AC filters f.write(" # Per-database AF filters\n") for db_key, db_info in databases.items(): f.write(f" filterByRange.{db_key}AF on\n") f.write(f" filterLabel.{db_key}AF " f"{db_info['name']} AF\n") f.write(" # Per-database AC filters\n") for db_key, db_info in databases.items(): f.write(f" filterByRange.{db_key}AC on\n") f.write(f" filterLabel.{db_key}AC " f"{db_info['name']} AC\n") # Population-specific filters @@ -564,106 +739,121 @@ description="Convert annotated VCF to bigBed (two-phase parallel)") parser.add_argument("--annotated-vcf", default="merged.annotated.vcf.gz") parser.add_argument("--output-prefix", default="varFreqs") parser.add_argument("--chrom-sizes", default="/hive/data/genomes/hg38/chrom.sizes") parser.add_argument("--threads", type=int, default=8) parser.add_argument("--work-dir", default=".") parser.add_argument("--region", default=None, help="Region to process, e.g. chrM or chr22") parser.add_argument("--scripts-dir", default=SCRIPTS_DIR) parser.add_argument("--databases-file", default="databases.tsv", help="DB config TSV (name under scripts-dir or abs path)") parser.add_argument("--populations-file", default="populations.tsv", help="Population config TSV (name under scripts-dir or abs path)") parser.add_argument("--keep-temp", action="store_true") + parser.add_argument("--split-affected", action="store_true", + help="Emit two tracks: Affected and " + "Background (affected/case vs population+" + "unaffected). Without it, a single track.") args = parser.parse_args() databases = load_config(args.scripts_dir, args.databases_file, args.populations_file) print(f"Loaded {len(databases)} databases:", file=sys.stderr) for key, info in databases.items(): pops = ", ".join(p["key"] for p in info["pops"]) extra = f" [{pops}]" if pops else "" exists = "OK" if os.path.exists(info["vcf"]) else "MISSING" print(f" {key}: {info['name']}{extra} ({exists})", file=sys.stderr) if args.region: chromosomes = [args.region.split(":")[0]] else: chromosomes = ALL_CHROMOSOMES - as_path = os.path.join(args.work_dir, f"{args.output_prefix}.as") - bb_path = os.path.join(args.work_dir, f"{args.output_prefix}.bb") - key_path = os.path.join(args.work_dir, f"{args.output_prefix}.keys.txt") extract_dir = os.path.join(args.work_dir, "extracted") bed_dir = os.path.join(args.work_dir, "beds") os.makedirs(extract_dir, exist_ok=True) os.makedirs(bed_dir, exist_ok=True) - # Step 1: AutoSql and trackDb fragment. The autoSql table name matches the - # output prefix so each combined track (varFreqsAll/Disease/Array) gets a - # distinct table definition. - generate_autosql(as_path, databases, table_name=args.output_prefix) - ra_path = os.path.join(args.work_dir, f"{args.output_prefix}.trackDb.ra") - generate_trackdb_fragment(ra_path, databases, track_name=args.output_prefix) + # Output prefixes: one per track. Split mode yields an Affected and a + # Background track that share a single autoSql schema. + if args.split_affected: + prefixes = [f"{args.output_prefix}Affected", + f"{args.output_prefix}Background"] + else: + prefixes = [args.output_prefix] + + # Step 1: AutoSql per track (same columns; table name = output prefix). + for prefix in prefixes: + generate_autosql(os.path.join(args.work_dir, f"{prefix}.as"), + databases, table_name=prefix) # Step 2: Phase 1 — pre-extract pre_extract_all(databases, args.region, extract_dir, args.threads) # Step 3: Phase 2 — process chromosomes print(f"\n=== Phase 2: Processing {len(chromosomes)} chromosome(s) ===", file=sys.stderr) - task_args = [(chrom, args.annotated_vcf, databases, extract_dir, bed_dir) - for chrom in chromosomes] + task_args = [(chrom, args.annotated_vcf, databases, extract_dir, bed_dir, + args.split_affected) for chrom in chromosomes] with Pool(min(args.threads, len(chromosomes))) as pool: - chrom_beds = pool.map(process_chromosome, task_args) - chrom_beds = [b for b in chrom_beds if b is not None] + chrom_results = pool.map(process_chromosome, task_args) + chrom_results = [r for r in chrom_results if r is not None] - if not chrom_beds: + if not chrom_results: print("ERROR: No BED output produced", file=sys.stderr) sys.exit(1) - # Step 4: Concatenate and sort - # Stream sort output straight to disk instead of capture_output=True, which - # would buffer the full sorted chrom (~24 GB for chr1) in Python's RAM. - print(f"\n=== Concatenating {len(chrom_beds)} chromosome files ===", + # Aggregate length extremes across chromosomes for the length filter ranges, + # then write the trackDb fragment (after processing, so ranges are real). + len_stats = {"max_ref_len": 1, "max_alt_len": 1, + "min_var_len": 0, "max_var_len": 0} + for r in chrom_results: + st = r[2] + len_stats["max_ref_len"] = max(len_stats["max_ref_len"], st["max_ref_len"]) + len_stats["max_alt_len"] = max(len_stats["max_alt_len"], st["max_alt_len"]) + len_stats["min_var_len"] = min(len_stats["min_var_len"], st["min_var_len"]) + len_stats["max_var_len"] = max(len_stats["max_var_len"], st["max_var_len"]) + ra_path = os.path.join(args.work_dir, f"{args.output_prefix}.trackDb.ra") + generate_trackdb_fragment(ra_path, databases, track_name=args.output_prefix, + len_stats=len_stats) + + # Steps 4-5: build one bigBed per track from its per-chromosome BEDs. + # results column 0 = affected/single BED, column 1 = background BED (or None) + bed_sets = [[r[0] for r in chrom_results]] + if args.split_affected: + bed_sets.append([r[1] for r in chrom_results]) + + for prefix, chrom_beds in zip(prefixes, bed_sets): + as_path = os.path.join(args.work_dir, f"{prefix}.as") + bb_path = os.path.join(args.work_dir, f"{prefix}.bb") + bed_path = os.path.join(args.work_dir, f"{prefix}.bed") + print(f"\n=== {prefix}: concatenating {len(chrom_beds)} chrom files ===", file=sys.stderr) - bed_path = os.path.join(args.work_dir, f"{args.output_prefix}.bed") with open(bed_path, "wb") as out: for chrom_bed in chrom_beds: subprocess.run(["sort", "-k2,2n", chrom_bed], stdout=out, check=True) - - # Step 5: bedToBigBed - print(f"\n=== Running bedToBigBed ===", file=sys.stderr) + print(f"=== {prefix}: running bedToBigBed ===", file=sys.stderr) cmd = ["bedToBigBed", "-type=bed9+", f"-as={as_path}", "-tab", bed_path, args.chrom_sizes, bb_path] - print(f" {' '.join(cmd)}", file=sys.stderr) result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode != 0: print(f" Error: {result.stderr}", file=sys.stderr) sys.exit(1) - - # Step 6: Key mapping - with open(key_path, "w") as f: - for key, info in databases.items(): - f.write(f"{key}|{info['name']}\n") + if not args.keep_temp and os.path.exists(bed_path): + os.remove(bed_path) + bb_size = os.path.getsize(bb_path) / (1024**3) + print(f"Done! {bb_path} ({bb_size:.1f} GB)", file=sys.stderr) # Cleanup if not args.keep_temp: - for chrom_bed in chrom_beds: - if os.path.exists(chrom_bed): - os.remove(chrom_bed) - if os.path.exists(bed_path): - os.remove(bed_path) shutil.rmtree(extract_dir, ignore_errors=True) shutil.rmtree(bed_dir, ignore_errors=True) - bb_size = os.path.getsize(bb_path) / (1024**3) - print(f"\nDone! {bb_path} ({bb_size:.1f} GB)", file=sys.stderr) - if __name__ == "__main__": main()