23790efe33021ccd1691bf5f4262c1396a1f6e9d
braney
  Fri May 1 12:49:13 2026 -0700
quickLiftBench: phase_asserts mechanism + parallel-fetch regression case

phase_asserts is a per-case schema in cases.yaml that declares regex
matches against the per-iteration timing spans, with optional
required/min_median_ms/max_median_ms bounds. When a case declares
phase_asserts, the runner captures phase data automatically and runs
the asserts after iterations complete; any failure prints to stderr
and exits non-zero.

The new regress_quickLift_parallel case uses this to assert that the
"Waiting for parallel..." span fires for the lifted multi-track session
Brianraney/benchQuickPara with a median between 500 and 15000 ms --
discriminating the working sandbox build from a pre-fix hgwdev where
only non-quickLift tracks parallelize at ~50 ms.

refs #37488, #37470

diff --git src/utils/qa/quickLiftBench/quickLiftBench.py src/utils/qa/quickLiftBench/quickLiftBench.py
index 24542039b3e..1001d39bdfc 100755
--- src/utils/qa/quickLiftBench/quickLiftBench.py
+++ src/utils/qa/quickLiftBench/quickLiftBench.py
@@ -107,33 +107,61 @@
              "trackDbLoad, parallel data fetch, etc.)",
     )
     return p.parse_args()
 
 
 def load_config(path):
     with open(path) as f:
         cfg = yaml.safe_load(f)
     if "cases" not in cfg or not cfg["cases"]:
         sys.exit(f"{path}: no `cases` defined")
     cfg.setdefault("defaults", {})
     cfg["defaults"].setdefault("iterations", 5)
     cfg["defaults"].setdefault("warmup", 1)
     cfg["defaults"].setdefault("timeout", 60)
     cfg["defaults"].setdefault("servers", {})
+    for case in cfg["cases"]:
+        validate_phase_asserts(case, path)
     return cfg
 
 
+def validate_phase_asserts(case, path):
+    asserts = case.get("phase_asserts")
+    if asserts is None:
+        return
+    if not isinstance(asserts, list):
+        sys.exit(f"{path}: case {case.get('id')!r}: phase_asserts must be a list")
+    variant_names = set((case.get("variants") or {}).keys())
+    for i, a in enumerate(asserts):
+        ctx = f"{path}: case {case.get('id')!r}: phase_asserts[{i}]"
+        if not isinstance(a, dict):
+            sys.exit(f"{ctx} must be a mapping")
+        if "variant" not in a or "phase" not in a:
+            sys.exit(f"{ctx} missing required key (variant, phase)")
+        if a["variant"] not in variant_names:
+            sys.exit(f"{ctx}: variant {a['variant']!r} not declared in variants")
+        try:
+            re.compile(a["phase"])
+        except re.error as e:
+            sys.exit(f"{ctx}: phase regex invalid: {e}")
+        for k in ("max_median_ms", "min_median_ms"):
+            if k in a and not isinstance(a[k], int):
+                sys.exit(f"{ctx}: {k} must be an integer")
+        if "required" in a and not isinstance(a["required"], bool):
+            sys.exit(f"{ctx}: required must be true/false")
+
+
 def parse_session(s):
     """Split 'user/sessionName' (or '/s/user/name'). Returns (user, name)."""
     if not isinstance(s, str):
         raise ValueError(f"variant must be a 'user/sessionName' string, got: {s!r}")
     s = s.strip()
     if s.startswith("/s/"):
         s = s[3:]
     if "/" not in s:
         raise ValueError(f"variant must contain '/': {s!r}")
     user, name = s.split("/", 1)
     if not user or not name:
         raise ValueError(f"empty user or session name in: {s!r}")
     return user, name
 
 
@@ -360,60 +388,64 @@
                         "user": user,
                         "session": session_name,
                         "iteration": it,
                         "http_ms": http_ms,
                         "load_ms_sum": load_sum if load_sum is not None else "",
                         "draw_ms_sum": draw_sum if draw_sum is not None else "",
                         "n_tracks": n_tracks if n_tracks is not None else "",
                         "total_ms": total_ms if total_ms is not None else "",
                         "status_code": code if code is not None else "",
                         "error": err or "",
                     }
                     writer.writerow(row)
                     rf.flush()
                     all_rows.append(row)
 
-                    if args.phases:
+                    if args.phases or case.get("phase_asserts"):
                         for label, ms in phases.items():
                             all_phase_rows.append({
                                 "case": cid,
                                 "variant": vname,
                                 "server": server_key,
                                 "user": user,
                                 "session": session_name,
                                 "iteration": it,
                                 "phase": label,
                                 "ms": ms,
                             })
                     if args.verbose:
                         print(
                             f"    {vname:8s} it={it} "
                             f"http={fmt_ms(http_ms)} "
                             f"load_sum={fmt_ms(load_sum)} draw_sum={fmt_ms(draw_sum)} "
                             f"tracks={fmt_ms(n_tracks)} "
                             f"total={fmt_ms(total_ms)} {err or ''}",
                             file=sys.stderr,
                         )
 
     write_summary(cases, all_rows, summary_path)
     print(f"\nresults: {results_path}", file=sys.stderr)
     print(f"summary: {summary_path}", file=sys.stderr)
 
     if phases_path:
         write_phases(all_phase_rows, phases_fields, phases_path)
         print(f"phases:  {phases_path}", file=sys.stderr)
 
+    failed = evaluate_phase_asserts(cases, all_rows, all_phase_rows)
+    if failed:
+        sys.exit(1)
+
 
 def _to_int(v):
     if v == "" or v is None:
         return None
     return int(v)
 
 
 def write_summary(cases, rows, path):
     """
     Per (case, variant): compute median + p90 of (http_ms, load_ms_sum,
     draw_ms_sum, total_ms). Then for each compare-pair, emit median ratios.
     """
     by_case_var = {}
     for r in rows:
         by_case_var.setdefault(r["case"], {}).setdefault(r["variant"], []).append(r)
@@ -533,17 +565,105 @@
         f.write("\n# Per-(case, variant, phase) median ms across iterations\n")
         summary_fields = ["case", "variant", "phase", "n", "median_ms", "p90_ms"]
         sw = csv.DictWriter(f, fieldnames=summary_fields, delimiter="\t")
         sw.writeheader()
         for (cid, vname, phase), values in sorted(groups.items()):
             sw.writerow({
                 "case": cid,
                 "variant": vname,
                 "phase": phase,
                 "n": len(values),
                 "median_ms": median_or_none(values),
                 "p90_ms": p90(values),
             })
 
 
+def evaluate_phase_asserts(cases, all_rows, all_phase_rows):
+    """Walk each case's phase_asserts (if any), report PASS/FAIL per assert.
+    Returns True iff at least one assert failed. Prints a summary table to
+    stderr."""
+    asserts_present = any(case.get("phase_asserts") for case in cases)
+    if not asserts_present:
+        return False
+
+    # iterations attempted per (case, variant) regardless of error
+    iter_counts = {}
+    for r in all_rows:
+        key = (r["case"], r["variant"])
+        iter_counts[key] = iter_counts.get(key, 0) + 1
+
+    # phase rows grouped by (case, variant, iteration) for matching
+    by_iter = {}
+    for r in all_phase_rows:
+        key = (r["case"], r["variant"], r["iteration"])
+        by_iter.setdefault(key, []).append((r["phase"], r["ms"]))
+
+    print("\nphase_asserts:", file=sys.stderr)
+    any_failed = False
+    for case in cases:
+        cid = case["id"]
+        for a in (case.get("phase_asserts") or []):
+            vname = a["variant"]
+            phase_re = re.compile(a["phase"])
+            required = a.get("required", True)
+            max_med = a.get("max_median_ms")
+            min_med = a.get("min_median_ms")
+            n_iters = iter_counts.get((cid, vname), 0)
+
+            iters_with_match = 0
+            per_iter_sums = []
+            for it in range(1, n_iters + 1):
+                matches = [
+                    ms for label, ms in by_iter.get((cid, vname, it), [])
+                    if phase_re.search(label)
+                ]
+                if matches:
+                    iters_with_match += 1
+                    per_iter_sums.append(sum(matches))
+
+            status = "PASS"
+            detail = ""
+
+            if n_iters == 0:
+                status = "FAIL"
+                detail = "no iterations ran"
+            elif required and iters_with_match < n_iters:
+                status = "FAIL"
+                detail = (
+                    f"phase missing in {n_iters - iters_with_match}/"
+                    f"{n_iters} iterations"
+                )
+            elif not per_iter_sums:
+                status = "PASS"
+                detail = "no matches (not required)"
+            else:
+                med = statistics.median(per_iter_sums)
+                if max_med is not None and med > max_med:
+                    status = "FAIL"
+                    detail = f"median {med}ms > max {max_med}ms"
+                elif min_med is not None and med < min_med:
+                    status = "FAIL"
+                    detail = f"median {med}ms < min {min_med}ms"
+                else:
+                    bounds = []
+                    if max_med is not None:
+                        bounds.append(f"max={max_med}")
+                    if min_med is not None:
+                        bounds.append(f"min={min_med}")
+                    bounds_s = (" " + ",".join(bounds)) if bounds else ""
+                    detail = (
+                        f"median {med}ms across {len(per_iter_sums)}/"
+                        f"{n_iters}{bounds_s}"
+                    )
+
+            if status == "FAIL":
+                any_failed = True
+            print(
+                f"  [{status}] {cid}/{vname}  /{a['phase']}/  {detail}",
+                file=sys.stderr,
+            )
+
+    return any_failed
+
+
 if __name__ == "__main__":
     main()