23790efe33021ccd1691bf5f4262c1396a1f6e9d braney Fri May 1 12:49:13 2026 -0700 quickLiftBench: phase_asserts mechanism + parallel-fetch regression case phase_asserts is a per-case schema in cases.yaml that declares regex matches against the per-iteration timing spans, with optional required/min_median_ms/max_median_ms bounds. When a case declares phase_asserts, the runner captures phase data automatically and runs the asserts after iterations complete; any failure prints to stderr and exits non-zero. The new regress_quickLift_parallel case uses this to assert that the "Waiting for parallel..." span fires for the lifted multi-track session Brianraney/benchQuickPara with a median between 500 and 15000 ms -- discriminating the working sandbox build from a pre-fix hgwdev where only non-quickLift tracks parallelize at ~50 ms. refs #37488, #37470 diff --git src/utils/qa/quickLiftBench/quickLiftBench.py src/utils/qa/quickLiftBench/quickLiftBench.py index 24542039b3e..1001d39bdfc 100755 --- src/utils/qa/quickLiftBench/quickLiftBench.py +++ src/utils/qa/quickLiftBench/quickLiftBench.py @@ -107,33 +107,61 @@ "trackDbLoad, parallel data fetch, etc.)", ) return p.parse_args() def load_config(path): with open(path) as f: cfg = yaml.safe_load(f) if "cases" not in cfg or not cfg["cases"]: sys.exit(f"{path}: no `cases` defined") cfg.setdefault("defaults", {}) cfg["defaults"].setdefault("iterations", 5) cfg["defaults"].setdefault("warmup", 1) cfg["defaults"].setdefault("timeout", 60) cfg["defaults"].setdefault("servers", {}) + for case in cfg["cases"]: + validate_phase_asserts(case, path) return cfg +def validate_phase_asserts(case, path): + asserts = case.get("phase_asserts") + if asserts is None: + return + if not isinstance(asserts, list): + sys.exit(f"{path}: case {case.get('id')!r}: phase_asserts must be a list") + variant_names = set((case.get("variants") or {}).keys()) + for i, a in enumerate(asserts): + ctx = f"{path}: case {case.get('id')!r}: phase_asserts[{i}]" + if not isinstance(a, dict): + sys.exit(f"{ctx} must be a mapping") + if "variant" not in a or "phase" not in a: + sys.exit(f"{ctx} missing required key (variant, phase)") + if a["variant"] not in variant_names: + sys.exit(f"{ctx}: variant {a['variant']!r} not declared in variants") + try: + re.compile(a["phase"]) + except re.error as e: + sys.exit(f"{ctx}: phase regex invalid: {e}") + for k in ("max_median_ms", "min_median_ms"): + if k in a and not isinstance(a[k], int): + sys.exit(f"{ctx}: {k} must be an integer") + if "required" in a and not isinstance(a["required"], bool): + sys.exit(f"{ctx}: required must be true/false") + + def parse_session(s): """Split 'user/sessionName' (or '/s/user/name'). Returns (user, name).""" if not isinstance(s, str): raise ValueError(f"variant must be a 'user/sessionName' string, got: {s!r}") s = s.strip() if s.startswith("/s/"): s = s[3:] if "/" not in s: raise ValueError(f"variant must contain '/': {s!r}") user, name = s.split("/", 1) if not user or not name: raise ValueError(f"empty user or session name in: {s!r}") return user, name @@ -360,60 +388,64 @@ "user": user, "session": session_name, "iteration": it, "http_ms": http_ms, "load_ms_sum": load_sum if load_sum is not None else "", "draw_ms_sum": draw_sum if draw_sum is not None else "", "n_tracks": n_tracks if n_tracks is not None else "", "total_ms": total_ms if total_ms is not None else "", "status_code": code if code is not None else "", "error": err or "", } writer.writerow(row) rf.flush() all_rows.append(row) - if args.phases: + if args.phases or case.get("phase_asserts"): for label, ms in phases.items(): all_phase_rows.append({ "case": cid, "variant": vname, "server": server_key, "user": user, "session": session_name, "iteration": it, "phase": label, "ms": ms, }) if args.verbose: print( f" {vname:8s} it={it} " f"http={fmt_ms(http_ms)} " f"load_sum={fmt_ms(load_sum)} draw_sum={fmt_ms(draw_sum)} " f"tracks={fmt_ms(n_tracks)} " f"total={fmt_ms(total_ms)} {err or ''}", file=sys.stderr, ) write_summary(cases, all_rows, summary_path) print(f"\nresults: {results_path}", file=sys.stderr) print(f"summary: {summary_path}", file=sys.stderr) if phases_path: write_phases(all_phase_rows, phases_fields, phases_path) print(f"phases: {phases_path}", file=sys.stderr) + failed = evaluate_phase_asserts(cases, all_rows, all_phase_rows) + if failed: + sys.exit(1) + def _to_int(v): if v == "" or v is None: return None return int(v) def write_summary(cases, rows, path): """ Per (case, variant): compute median + p90 of (http_ms, load_ms_sum, draw_ms_sum, total_ms). Then for each compare-pair, emit median ratios. """ by_case_var = {} for r in rows: by_case_var.setdefault(r["case"], {}).setdefault(r["variant"], []).append(r) @@ -533,17 +565,105 @@ f.write("\n# Per-(case, variant, phase) median ms across iterations\n") summary_fields = ["case", "variant", "phase", "n", "median_ms", "p90_ms"] sw = csv.DictWriter(f, fieldnames=summary_fields, delimiter="\t") sw.writeheader() for (cid, vname, phase), values in sorted(groups.items()): sw.writerow({ "case": cid, "variant": vname, "phase": phase, "n": len(values), "median_ms": median_or_none(values), "p90_ms": p90(values), }) +def evaluate_phase_asserts(cases, all_rows, all_phase_rows): + """Walk each case's phase_asserts (if any), report PASS/FAIL per assert. + Returns True iff at least one assert failed. Prints a summary table to + stderr.""" + asserts_present = any(case.get("phase_asserts") for case in cases) + if not asserts_present: + return False + + # iterations attempted per (case, variant) regardless of error + iter_counts = {} + for r in all_rows: + key = (r["case"], r["variant"]) + iter_counts[key] = iter_counts.get(key, 0) + 1 + + # phase rows grouped by (case, variant, iteration) for matching + by_iter = {} + for r in all_phase_rows: + key = (r["case"], r["variant"], r["iteration"]) + by_iter.setdefault(key, []).append((r["phase"], r["ms"])) + + print("\nphase_asserts:", file=sys.stderr) + any_failed = False + for case in cases: + cid = case["id"] + for a in (case.get("phase_asserts") or []): + vname = a["variant"] + phase_re = re.compile(a["phase"]) + required = a.get("required", True) + max_med = a.get("max_median_ms") + min_med = a.get("min_median_ms") + n_iters = iter_counts.get((cid, vname), 0) + + iters_with_match = 0 + per_iter_sums = [] + for it in range(1, n_iters + 1): + matches = [ + ms for label, ms in by_iter.get((cid, vname, it), []) + if phase_re.search(label) + ] + if matches: + iters_with_match += 1 + per_iter_sums.append(sum(matches)) + + status = "PASS" + detail = "" + + if n_iters == 0: + status = "FAIL" + detail = "no iterations ran" + elif required and iters_with_match < n_iters: + status = "FAIL" + detail = ( + f"phase missing in {n_iters - iters_with_match}/" + f"{n_iters} iterations" + ) + elif not per_iter_sums: + status = "PASS" + detail = "no matches (not required)" + else: + med = statistics.median(per_iter_sums) + if max_med is not None and med > max_med: + status = "FAIL" + detail = f"median {med}ms > max {max_med}ms" + elif min_med is not None and med < min_med: + status = "FAIL" + detail = f"median {med}ms < min {min_med}ms" + else: + bounds = [] + if max_med is not None: + bounds.append(f"max={max_med}") + if min_med is not None: + bounds.append(f"min={min_med}") + bounds_s = (" " + ",".join(bounds)) if bounds else "" + detail = ( + f"median {med}ms across {len(per_iter_sums)}/" + f"{n_iters}{bounds_s}" + ) + + if status == "FAIL": + any_failed = True + print( + f" [{status}] {cid}/{vname} /{a['phase']}/ {detail}", + file=sys.stderr, + ) + + return any_failed + + if __name__ == "__main__": main()