9fbdfa3416ffde377072fafd2de44059155c3b44 max Thu Apr 30 06:57:35 2026 -0700 lrSv: add lrSvAll merged track combining all long-read SV subtracks Variants are merged on exact (chrom, start, end, svType, svLen, insLen). Per-database AC columns are stored as strings; "unknown" is used where the source dataset has only placeholder AC values (deCODE, SVatalog 101, 1KG ONT 100). Kim PD Brain is split into affected (PD+ILBD) and healthy (HC) AC columns. Gustafson contributes sampleCount instead of AC. Output: 2,694,871 unique SVs from 3,706,100 input rows across 15 subtracks (27% dedup). The merged track sits as the first subtrack of the lrSv supertrack with filters on sources, svType, svLen, insLen, maxAF/minAF, AC, and sourceCount. The trackDb stanza is generated by the build script directly into human/lrSvAll.ra and pulled in via 'include lrSvAll.ra' from lrSv.ra, so labels in databases.tsv stay the single source of truth. lrSv.html: add a "Disease cases" column to the dataset summary, strip parenthesized internal track names from the section headers, and shorten exact SV counts to ~Nk / ~N.NM in the prose. refs #36642 diff --git src/hg/makeDb/scripts/lrSv/lrSvMergeAll.py src/hg/makeDb/scripts/lrSv/lrSvMergeAll.py new file mode 100644 index 00000000000..491a29e8d5a --- /dev/null +++ src/hg/makeDb/scripts/lrSv/lrSvMergeAll.py @@ -0,0 +1,643 @@ +#!/usr/bin/env python3 +""" +Merge all lrSv subtrack bigBeds into a single combined bigBed (lrSvAll). + +Variants are merged on EXACT key (chrom, chromStart, chromEnd, svType, svLen, +insLen). For each merged variant we record which databases contained it and +the per-database AC (or "unknown" / sample count for placeholder datasets). +The Kim PD Brain dataset is split into affected (PD+ILBD) and healthy (HC) +allele counts. + +Usage: + python3 lrSvMergeAll.py + python3 lrSvMergeAll.py --region chr22 # quick test on one chromosome +""" + +import argparse +import os +import re +import subprocess +import sys +from collections import OrderedDict, defaultdict +from multiprocessing import Pool + +SCRIPTS_DIR = os.path.dirname(os.path.abspath(__file__)) +ALL_CHROMOSOMES = [f"chr{i}" for i in range(1, 23)] + ["chrX", "chrY", "chrM"] + +SVTYPE_COLOR = { + "DEL": "200,0,0", + "INS": "0,0,200", + "DUP": "0,150,0", + "INV": "200,100,0", + "CPX": "150,0,150", + "MIXED": "150,0,150", + "INSDEL": "100,100,150", + "BND": "100,100,100", + "TRA": "100,100,100", + "MEI": "0,100,200", + "CNV": "0,150,150", +} + +# --------------------------------------------------------------------------- +# Config + autoSql parsing +# --------------------------------------------------------------------------- + +def sanitize_id(s): + """Convert an arbitrary key to a valid autoSql / C identifier. + Non-alphanumeric chars become underscores; leading digit gets a 'd' prefix. + """ + out = re.sub(r'[^A-Za-z0-9_]', '_', s) + if out and out[0].isdigit(): + out = 'd' + out + return out + + +def load_config(path): + """Parse databases.tsv. Returns ordered list of dicts. + Each entry gets: + key - original key (used in `sources` field and as filter value) + idSafe - sanitized version usable as autoSql field-name prefix + """ + dbs = [] + with open(path) as f: + for line in f: + line = line.rstrip("\n") + if not line or line.startswith("#"): + continue + cols = line.split("\t") + [""] * 8 + key = cols[0] + dbs.append({ + "key": key, + "idSafe": sanitize_id(key), + "label": cols[1], + "bbPath": cols[2], + "valueField": cols[3], + "valueLabel": cols[4] or "AC", + "affField": cols[5], + "healField": cols[6], + "afField": cols[7], + }) + return dbs + + +def parse_autosql_fields(bb_path): + """Return ordered list of field names from a bigBed's embedded autoSql.""" + out = subprocess.run(["bigBedInfo", "-as", bb_path], + capture_output=True, text=True, check=True).stdout + fields = [] + in_block = False + for ln in out.splitlines(): + s = ln.strip() + if s == "(": + in_block = True + continue + if s == ")": + break + if not in_block or not s: + continue + # Lines look like: 'string chrom; "Chromosome"' + # Skip comments and empty. + if s.startswith("("): + continue + # Drop the comment after the semicolon + before_semi = s.split(";", 1)[0].strip() + # before_semi = 'string chrom' or 'char[1] strand' + toks = before_semi.split() + if len(toks) < 2: + continue + fname = toks[-1] + # strip array notation if any: e.g. "name[3]" -> "name" + if "[" in fname: + fname = fname.split("[", 1)[0] + fields.append(fname) + return fields + + +# --------------------------------------------------------------------------- +# Phase 1: extract per-db, per-chrom records +# --------------------------------------------------------------------------- + +def eval_expr(expr, row, names): + """Evaluate 'a' or 'a+b' (only +) over named row values; ints, "" if missing.""" + if not expr: + return "" + total = 0 + any_val = False + for term in expr.split("+"): + term = term.strip() + if term not in names: + return "" + v = row[names[term]] + if v in ("", ".", "-1"): + continue + try: + total += int(v) + any_val = True + except ValueError: + return "" + return str(total) if any_val else "" + + +def extract_one(args): + """Run bigBedToBed on one subtrack, write per-chrom TSVs. + Output TSV columns: start \t end \t svType \t svLen \t insLen \t value \t af + Where: + - value: for SPLIT dbs, "<aff>|<heal>"; otherwise the per-db value string + - af: max AF across configured AF fields, or "" if none + """ + db, region, extract_dir = args + bb = db["bbPath"] + if not os.path.exists(bb): + print(f" {db['key']}: missing {bb}", file=sys.stderr) + return db["key"], 0 + + fields = parse_autosql_fields(bb) + name_to_idx = {fn: i for i, fn in enumerate(fields)} + + required = ["chrom", "chromStart", "chromEnd", "svType", "svLen"] + for r in required: + if r not in name_to_idx: + print(f" {db['key']}: missing required field {r} in autoSql, " + f"skipping", file=sys.stderr) + return db["key"], 0 + has_inslen = "insLen" in name_to_idx + + af_fields = [f for f in db["afField"].split(",") if f] + + cmd = ["bigBedToBed"] + if region: + cmd.extend(["-chrom=" + region.split(":")[0]]) + cmd.extend([bb, "/dev/stdout"]) + + out_dir = os.path.join(extract_dir, db["key"]) + os.makedirs(out_dir, exist_ok=True) + chrom_files = {} + n = 0 + + proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, text=True) + try: + for line in proc.stdout: + row = line.rstrip("\n").split("\t") + if len(row) < len(fields): + # Some bigBed rows may have trailing empty fields stripped + row += [""] * (len(fields) - len(row)) + + chrom = row[name_to_idx["chrom"]] + start = row[name_to_idx["chromStart"]] + end = row[name_to_idx["chromEnd"]] + svType = row[name_to_idx["svType"]] + svLen = row[name_to_idx["svLen"]] + insLen = row[name_to_idx["insLen"]] if has_inslen else "0" + if not svLen or svLen in (".", ""): + svLen = "0" + if not insLen or insLen in (".", ""): + insLen = "0" + + # Per-db value + vf = db["valueField"] + if vf == "UNKNOWN": + value = "unknown" + elif vf == "SPLIT": + aff = eval_expr(db["affField"], row, name_to_idx) + heal = eval_expr(db["healField"], row, name_to_idx) + value = f"{aff}|{heal}" + else: + idx = name_to_idx.get(vf) + if idx is None: + value = "" + else: + v = row[idx] + value = "" if v in ("", ".", "-1") else v + + # Max AF across configured AF fields + max_af = "" + for fn in af_fields: + idx = name_to_idx.get(fn) + if idx is None: + continue + v = row[idx] + if v in ("", ".", "-1"): + continue + try: + f = float(v) + except ValueError: + continue + if f < 0: + continue + if max_af == "" or f > float(max_af): + max_af = f"{f:.6g}" + + if chrom not in chrom_files: + chrom_files[chrom] = open(os.path.join(out_dir, f"{chrom}.tsv"), "w") + chrom_files[chrom].write( + f"{start}\t{end}\t{svType}\t{svLen}\t{insLen}\t{value}\t{max_af}\n" + ) + n += 1 + finally: + for fh in chrom_files.values(): + fh.close() + proc.wait() + + print(f" {db['key']}: {n:,} variants extracted", file=sys.stderr) + return db["key"], n + + +# --------------------------------------------------------------------------- +# Phase 2: per-chromosome merge +# --------------------------------------------------------------------------- + +def _add_int(a, b): + """Add two AC values represented as strings; "" if neither numeric.""" + ai = int(a) if a and a.lstrip("-").isdigit() else None + bi = int(b) if b and b.lstrip("-").isdigit() else None + if ai is None and bi is None: + return a or b + if ai is None: + return str(bi) + if bi is None: + return str(ai) + return str(ai + bi) + + +def _max_str_float(a, b): + """Max of two AF values represented as strings; "" if neither numeric.""" + if not a: + return b + if not b: + return a + try: + m = max(float(a), float(b)) + return f"{m:.6g}" + except ValueError: + return a + + +def _combine(db, prev, new): + """Merge two within-db hits at the same variant key. + For numeric AC we sum; for SPLIT we sum each component; for UNKNOWN + or sampleCount we keep the existing value (sample count is per-site). + AF takes the max.""" + pval, paf, pins = prev + nval, naf, nins = new + vf = db["valueField"] + af = _max_str_float(paf, naf) + if vf == "SPLIT": + pa, ph = pval.split("|", 1) + na, nh = nval.split("|", 1) + return (f"{_add_int(pa, na)}|{_add_int(ph, nh)}", af, pins) + if vf == "UNKNOWN": + return ("unknown", af, pins) + if vf == "sampleCount": + # Take max - same site shouldn't have varying sample counts but be safe + try: + v = max(int(pval) if pval.isdigit() else 0, + int(nval) if nval.isdigit() else 0) + return (str(v), af, pins) + except ValueError: + return (pval or nval, af, pins) + # Numeric AC: sum + return (_add_int(pval, nval), af, pins) + + +def merge_chrom(args): + chrom, dbs, extract_dir, out_dir = args + # variant_key (start, end, svType, svLen, insLen) -> {db_key: (value, af, insLen)} + variants = defaultdict(dict) + n_in = 0 + + db_by_key = {db["key"]: db for db in dbs} + + for db in dbs: + path = os.path.join(extract_dir, db["key"], f"{chrom}.tsv") + if not os.path.exists(path): + continue + with open(path) as f: + for line in f: + parts = line.rstrip("\n").split("\t") + if len(parts) < 7: + continue + start, end, svType, svLen, insLen, value, af = parts + # Use insLen=0 for non-INS to avoid spurious key splits + if svType not in ("INS",): + insLen_key = "0" + else: + insLen_key = insLen + try: + key = (int(start), int(end), svType, + int(svLen), int(insLen_key)) + except ValueError: + continue + n_in += 1 + if db["key"] in variants[key]: + variants[key][db["key"]] = _combine( + db, variants[key][db["key"]], (value, af, insLen)) + else: + variants[key][db["key"]] = (value, af, insLen) + + out_path = os.path.join(out_dir, f"{chrom}.bed") + n_out = 0 + n_split_aff = 0 # variants with at least one Kim PD affected AC + + with open(out_path, "w") as out: + for key in sorted(variants.keys()): + start, end, svType, svLen, _insLen_key, = key + per_db = variants[key] + + sources = sorted(per_db.keys()) + source_count = len(sources) + + # Compute totalAC: sum over numeric values from regular dbs + total_ac = 0 + for db in dbs: + if db["key"] not in per_db: + continue + vf = db["valueField"] + value, af, _ins = per_db[db["key"]] + if vf == "SPLIT": + aff, heal = value.split("|", 1) + for v in (aff, heal): + if v.isdigit(): + total_ac += int(v) + elif vf in ("UNKNOWN", "sampleCount"): + # Don't sum sampleCount or "unknown" placeholders into AC + pass + else: + if value and value.lstrip("-").isdigit(): + try: + iv = int(value) + if iv > 0: + total_ac += iv + except ValueError: + pass + + # Compute min/max AF across DBs that contributed an AF value + max_af = 0.0 + min_af = None + for db_key, (_v, af, _i) in per_db.items(): + if not af: + continue + try: + f_af = float(af) + except ValueError: + continue + if f_af > max_af: + max_af = f_af + if f_af > 0 and (min_af is None or f_af < min_af): + min_af = f_af + + score = min(1000, int(max_af * 1000)) if max_af > 0 else 0 + + # Use the original insLen from any contributing INS record + ins_len_out = "0" + for db in dbs: + if db["key"] in per_db: + _v, _af, ins = per_db[db["key"]] + ins_len_out = ins + break + + short = svType[:6] + name = f"{short}-{svLen}:{source_count}" + color = SVTYPE_COLOR.get(svType, "128,128,128") + + fields = [ + chrom, str(start), str(end), name, str(score), ".", + str(start), str(end), color, + svType, str(svLen), ins_len_out, svType, + f"{max_af:.6g}" if max_af > 0 else "0", + f"{min_af:.6g}" if min_af is not None else "0", + str(total_ac), + str(source_count), + ",".join(sources), + ] + + # Per-database value columns (SPLIT contributes 2 columns) + for db in dbs: + if db["valueField"] == "SPLIT": + if db["key"] in per_db: + value, _af, _i = per_db[db["key"]] + aff, heal = value.split("|", 1) + if aff: + n_split_aff += 1 + else: + aff, heal = "", "" + fields.extend([aff, heal]) + else: + value = per_db.get(db["key"], ("", "", ""))[0] + fields.append(value) + + out.write("\t".join(fields) + "\n") + n_out += 1 + + print(f" {chrom}: {n_in:,} input rows -> {n_out:,} unique variants " + f"({n_in - n_out:,} merged)", file=sys.stderr) + return out_path, n_out + + +# --------------------------------------------------------------------------- +# AutoSql + trackDb fragment generation +# --------------------------------------------------------------------------- + +def write_autosql(out_path, dbs): + with open(out_path, "w") as f: + f.write('table lrSvAll\n') + f.write('"Combined long-read structural variants from all lrSv subtracks"\n') + f.write('(\n') + # BED9 + f.write(' string chrom; "Chromosome"\n') + f.write(' uint chromStart; "Start position (0-based)"\n') + f.write(' uint chromEnd; "End position"\n') + f.write(' string name; "Variant ID (TYPE-svLen:sourceCount)"\n') + f.write(' uint score; "Score (maxAF * 1000)"\n') + f.write(' char[1] strand; "Strand"\n') + f.write(' uint thickStart; "Thick start"\n') + f.write(' uint thickEnd; "Thick end"\n') + f.write(' uint itemRgb; "Color by SV type"\n') + # SV info + f.write(' string svType; "SV Type|DEL/INS/DUP/INV/CPX/etc."\n') + f.write(' int svLen; "SV Length|Length on the reference in bp"\n') + f.write(' int insLen; "Insertion Length|Length of inserted sequence (0 for DEL/INV/DUP)"\n') + f.write(' string varType; "Variant Type|Alias of svType"\n') + # Aggregate fields + f.write(' float maxAF; "Max Allele Frequency|Maximum alleleFreq across contributing databases"\n') + f.write(' float minAF; "Min Allele Frequency|Minimum non-zero alleleFreq across contributing databases"\n') + f.write(' int AC; "AC|Sum of allele counts across contributing databases"\n') + f.write(' int sourceCount; "Source Count|Number of databases reporting this variant"\n') + f.write(' string sources; "Source Databases|Comma-separated keys of databases reporting this variant"\n') + # Per-database value fields. Field names use the sanitized id; the + # description still shows the original label, including any commas. + for db in dbs: + sid = db["idSafe"] + if db["valueField"] == "SPLIT": + f.write(f' string {sid}AC_affected;' + f' "{db["label"]} AC affected|Allele count in affected samples"\n') + f.write(f' string {sid}AC_healthy;' + f' "{db["label"]} AC healthy|Allele count in healthy samples"\n') + elif db["valueField"] == "sampleCount": + f.write(f' string {sid}Samples;' + f' "{db["label"]} Samples|Number of samples carrying this variant"\n') + else: + # AC or UNKNOWN + f.write(f' string {sid}AC;' + f' "{db["label"]} AC|Allele count (or \'unknown\' for site-only datasets)"\n') + f.write(')\n') + + +def write_trackdb_stanza(out_path, dbs): + """Auto-generated full trackDb stanza for the lrSvAll track. + + Written directly into the trackDb dir and pulled in via + `include lrSvAll.ra` from lrSv.ra. + """ + # Commas are the separator in filterValues, so strip them from labels. + # The original label (with commas) still appears on the detail page via + # the autoSql field descriptions. + src_parts = [f"{db['key']}|{db['label'].replace(',', '')}" for db in dbs] + with open(out_path, "w") as f: + f.write("# AUTO-GENERATED by ~/kent/src/hg/makeDb/scripts/lrSv/lrSvMergeAll.py\n") + f.write("# Do not edit by hand - re-run the merge script and re-commit.\n\n") + f.write(" track lrSvAll\n") + f.write(" parent lrSv\n") + f.write(" bigDataUrl /gbdb/$D/lrSv/lrSvAll.bb\n") + f.write(" shortLabel All LR SVs merged\n") + f.write(" longLabel All long-read SVs merged across the lrSv subtracks " + "(exact-position match), with per-database AC\n") + f.write(" type bigBed 9 +\n") + f.write(" itemRgb on\n") + f.write(" visibility pack\n") + f.write(" mouseOver <b>$name</b> ($svType) svLen=$svLen insLen=$insLen " + "sources=$sources AF=$minAF-$maxAF AC=$AC\n") + f.write(" searchIndex name\n") + # Source filter + f.write(" filterValues.sources " + ",".join(src_parts) + "\n") + f.write(" filterType.sources multipleListOr\n") + f.write(" filterLabel.sources Source Database\n") + # SV type + f.write(" filterValues.svType DEL,INS,DUP,INV,CPX,MIXED,INSDEL," + "CNV,BND,TRA,MEI\n") + f.write(" filterType.svType multipleListOr\n") + f.write(" filterLabel.svType SV Type\n") + # Range filters + f.write(" filter.svLen 0:30000000\n") + f.write(" filterByRange.svLen on\n") + f.write(" filterLabel.svLen SV Length (bp)\n") + f.write(" filter.insLen 0:600000\n") + f.write(" filterByRange.insLen on\n") + f.write(" filterLabel.insLen Insertion Length (bp)\n") + f.write(" filter.maxAF 0:1\n") + f.write(" filterByRange.maxAF on\n") + f.write(" filterLimits.maxAF 0:1\n") + f.write(" filterLabel.maxAF Max Allele Frequency (across DBs)\n") + f.write(" filter.minAF 0:1\n") + f.write(" filterByRange.minAF on\n") + f.write(" filterLimits.minAF 0:1\n") + f.write(" filterLabel.minAF Min Allele Frequency (across DBs)\n") + f.write(" filter.AC 0:30000\n") + f.write(" filterByRange.AC on\n") + f.write(" filterLabel.AC Total AC (across DBs)\n") + f.write(f" filter.sourceCount 1:{len(dbs)}\n") + f.write(" filterByRange.sourceCount on\n") + f.write(" filterLabel.sourceCount Number of Source Databases\n") + f.write(" skipEmptyFields on\n") + f.write(" priority 0\n") + + +# --------------------------------------------------------------------------- +# Main +# --------------------------------------------------------------------------- + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument("--config", default=os.path.join(SCRIPTS_DIR, "databases.tsv")) + parser.add_argument("--work-dir", default="/hive/data/genomes/hg38/bed/lrSv/all") + parser.add_argument("--chrom-sizes", default="/hive/data/genomes/hg38/chrom.sizes") + parser.add_argument("--output-prefix", default="lrSvAll") + parser.add_argument("--region", default=None) + parser.add_argument("--threads", type=int, default=8) + parser.add_argument("--keep-temp", action="store_true") + parser.add_argument("--trackdb-ra", + default="~/kent/src/hg/makeDb/trackDb/human/lrSvAll.ra", + help="Path to write the trackDb stanza file. Default " + "writes directly into the kent source tree so it is " + "picked up by `include lrSvAll.ra`.") + args = parser.parse_args() + + os.makedirs(args.work_dir, exist_ok=True) + + dbs = load_config(args.config) + print(f"Loaded {len(dbs)} databases:", file=sys.stderr) + for db in dbs: + ok = "OK" if os.path.exists(db["bbPath"]) else "MISSING" + print(f" {db['key']}: {db['label']} [{ok}] {db['bbPath']}", + file=sys.stderr) + + # Output paths + as_path = os.path.join(args.work_dir, f"{args.output_prefix}.as") + bb_path = os.path.join(args.work_dir, f"{args.output_prefix}.bb") + bed_path = os.path.join(args.work_dir, f"{args.output_prefix}.bed") + extract_dir = os.path.join(args.work_dir, "extracted") + bed_dir = os.path.join(args.work_dir, "beds") + os.makedirs(extract_dir, exist_ok=True) + os.makedirs(bed_dir, exist_ok=True) + + # The trackDb stanza is written directly into the kent source tree so it + # can be `include`d from human/lrSv.ra without manual copy-paste. + ra_path = os.path.expanduser(args.trackdb_ra) if args.trackdb_ra else \ + os.path.join(args.work_dir, f"{args.output_prefix}.ra") + + write_autosql(as_path, dbs) + write_trackdb_stanza(ra_path, dbs) + print(f"Wrote {as_path}", file=sys.stderr) + print(f"Wrote {ra_path}", file=sys.stderr) + + # Phase 1: extract + print(f"\n=== Phase 1: Extracting ({args.threads} parallel) ===", + file=sys.stderr) + tasks = [(db, args.region, extract_dir) for db in dbs] + with Pool(min(args.threads, len(tasks))) as pool: + ext_results = pool.map(extract_one, tasks) + total_in = sum(n for _, n in ext_results) + print(f"Phase 1 done: {total_in:,} input rows", file=sys.stderr) + + # Phase 2: merge per chromosome + chroms = [args.region.split(":")[0]] if args.region else ALL_CHROMOSOMES + print(f"\n=== Phase 2: Merging {len(chroms)} chromosomes " + f"({args.threads} parallel) ===", file=sys.stderr) + tasks = [(c, dbs, extract_dir, bed_dir) for c in chroms] + with Pool(min(args.threads, len(chroms))) as pool: + results = pool.map(merge_chrom, tasks) + + total_out = sum(n for _, n in results) + print(f"Phase 2 done: {total_out:,} merged rows", file=sys.stderr) + + # Concatenate + sort + print(f"\n=== Concatenating + sorting ===", file=sys.stderr) + chrom_beds = [p for p, _ in results if os.path.exists(p)] + with open(bed_path, "w") as out: + for cb in chrom_beds: + with open(cb) as f: + # already sorted by start within chrom from sorted(variants.keys()) + out.write(f.read()) + + # bedToBigBed + print(f"\n=== Running bedToBigBed ===", file=sys.stderr) + cmd = ["bedToBigBed", "-type=bed9+", f"-as={as_path}", "-tab", + bed_path, args.chrom_sizes, bb_path] + print(" " + " ".join(cmd), file=sys.stderr) + subprocess.run(cmd, check=True) + + bb_size_mb = os.path.getsize(bb_path) / (1024 ** 2) + print(f"\nDone. {bb_path} ({bb_size_mb:.1f} MB)", file=sys.stderr) + print(f"Input variants: {total_in:,}", file=sys.stderr) + print(f"Output variants: {total_out:,}", file=sys.stderr) + if total_in > 0: + print(f"Dedup rate: {(1 - total_out/total_in)*100:.1f}%", + file=sys.stderr) + + if not args.keep_temp: + # Keep extract_dir + bed_dir for now in case we want to re-merge with + # different rules. Caller can rm -rf if needed. + pass + + +if __name__ == "__main__": + main()