6ddaedca96e314bcd1607e69bcfa8fb228d22a3c
hiram
  Thu May 21 21:53:56 2026 -0700
common functions moved to ottoLib.py and shared with ottoBuildGenArkHub.py and fixup the legacy clade situation refs #31811

diff --git src/hg/utils/otto/userRequests/ottoRequestPush.py src/hg/utils/otto/userRequests/ottoRequestPush.py
index ecc009991be..a5ee51f633e 100755
--- src/hg/utils/otto/userRequests/ottoRequestPush.py
+++ src/hg/utils/otto/userRequests/ottoRequestPush.py
@@ -1,228 +1,77 @@
 #!/usr/bin/env python3
 """
-ottoRequestPush.py - group fromDb/toDb identifiers from pending push
-requests (status=5) by clade.
+ottoRequestPush2.py - lib-using rewrite of ottoRequestPush.py.
 
-Output: dict[clade] -> sorted list of assembly identifiers, where each
-identifier is "<gcAccession>_<asmName>" for GenArk accessions, or the
-plain UCSC db name for native dbs.
+Same behavior as ottoRequestPush.py: groups fromDb/toDb identifiers
+from pending push requests (status=5) by clade, drives the per-clade
+genArkMakeCommands sequence, rsyncs the UCSC-native .over.chain.gz to
+both hgdownload hosts, and advances each request's status (6 on full
+success, 7 on rsync failure, stays at 5 on clade-side failure).
 
-cron tab entry in hiram crontab:
-4,26,46 * * * * ~/kent/src/hg/utils/otto/userRequests/ottoRequestPush.py
+Differences from ottoRequestPush.py: all shared helpers live in
+ottoLib.py.  Push-specific helpers (pendingRequests, mark*,
+pushUcscChain) and main() stay here.
+
+This is a parallel script for review/cutover - the live cron continues
+to invoke ottoRequestPush.py.
 """
 
-import fcntl
 import os
-import re
 import subprocess
 import sys
-from collections import defaultdict
 
-# Set umask 002 (group-writable) for this process and all subprocesses
-os.umask(0o002)
+import ottoLib
 
 scriptDir = os.path.dirname(os.path.abspath(__file__))
-cladeTsv = os.path.join(scriptDir, "dbDb.name.clade.tsv")
+# share the live cron's lock so the two scripts cannot run concurrently
+# in the same cladeAsmHub directories
 lockPath = os.path.join(scriptDir, "ottoRequestPush.lock")
-gcPattern = re.compile(r"^GC[AF]_")
 
 # UCSC native .over.chain.gz files get rsync'd to both hgdownload hosts.
 pushUser = "qateam"
 pushHosts = ["hgdownload1.soe.ucsc.edu", "hgdownload3.gi.ucsc.edu"]
 
-############################################################################
-def acquireSingletonLock():
-    """Ensure only one instance of this script runs at a time.  Holds an
-    exclusive flock on lockPath for the lifetime of the process; the
-    kernel releases it on exit (including crash / kill -9), so no stale
-    lock cleanup is needed.  Returns the open file handle, which the
-    caller must keep alive."""
-    # "a+" opens read+write without truncating (and creates if missing),
-    # so a second instance that fails to lock doesn't wipe the running
-    # instance's PID from the file before exiting.
-    fh = open(lockPath, "a+")
-    try:
-        fcntl.flock(fh, fcntl.LOCK_EX | fcntl.LOCK_NB)
-    except BlockingIOError:
-        sys.exit(0)
-    # we own the lock truncate and write our PID for information
-    fh.seek(0)
-    fh.truncate()
-    fh.write("%d\n" % os.getpid())
-    fh.flush()
-    return fh
-    ### FYI: can also see the locking process via: lsof ottoRequestPush.lock
-
-############################################################################
-def hgsql(query, db="hgcentraltest"):
-    """Run hgsql -N -B and return rows as list of tuples (tab-split)."""
-    out = subprocess.run(
-        ["/cluster/bin/x86_64/hgsql", "-N", "-B", "-e", query, db],
-        check=True, capture_output=True, text=True,
-    ).stdout
-    return [tuple(line.split("\t")) for line in out.splitlines() if line]
-
-
-############################################################################
-def loadDbDbClades():
-    """Read dbDb.name.clade.tsv -> {dbName: clade}."""
-    result = {}
-    with open(cladeTsv) as fh:
-        for line in fh:
-            if line.startswith("#") or not line.strip():
-                continue
-            name, clade = line.rstrip("\n").split("\t")[:2]
-            result[name] = clade
-    return result
-
 
-############################################################################
 def pendingRequests():
     """Status=5 liftOver requests as [(id, fromDb, toDb), ...]."""
-    rows = hgsql(
+    rows = ottoLib.hgsql(
         "SELECT id, fromDb, toDb FROM ottoRequest "
         "WHERE status = 5 AND requestType = 'liftOver';"
     )
     return [(int(r[0]), r[1], r[2]) for r in rows]
 
 
-############################################################################
 def markComplete(reqIds):
     """Set status=6 on the given ottoRequest ids."""
     if not reqIds:
         return
     idList = ",".join(str(i) for i in sorted(reqIds))
-    hgsql("UPDATE ottoRequest SET status = 6 WHERE id IN (%s);" % idList)
-#   print("# marked status=6: %s" % idList, file=sys.stderr)
+    ottoLib.hgsql(
+        "UPDATE ottoRequest SET status = 6 WHERE id IN (%s);" % idList)
 
 
-############################################################################
 def markFailed(reqIds):
     """Set status=7 (problems) on the given ottoRequest ids."""
     if not reqIds:
         return
     idList = ",".join(str(i) for i in sorted(reqIds))
-    hgsql("UPDATE ottoRequest SET status = 7 WHERE id IN (%s);" % idList)
+    ottoLib.hgsql(
+        "UPDATE ottoRequest SET status = 7 WHERE id IN (%s);" % idList)
     print("# ottoRequestPush.py marked status=7 for failing list: %s" % idList, file=sys.stderr)
 
-############################################################################
-def lookupGenark(accessions):
-    """Bulk-lookup GenArk accessions -> {acc: (asmName, clade)}."""
-    if not accessions:
-        return {}
-    quoted = ",".join("'%s'" % a for a in sorted(accessions))
-    rows = hgsql(
-        "SELECT gcAccession, asmName, clade FROM genark "
-        "WHERE gcAccession IN (%s);" % quoted
-    )
-    return {acc: (asmName, clade) for acc, asmName, clade in rows}
-
-
-############################################################################
-def groupByClade(dbs, dbDbClades, genarkInfo):
-    """Build {clade: [assemblyId, ...]}."""
-    grouped = defaultdict(set)
-    for db in dbs:
-        if gcPattern.match(db):
-            info = genarkInfo.get(db)
-            if info is None:
-                print("WARNING: %s not in genark table" % db, file=sys.stderr)
-                continue
-            asmName, clade = info
-            grouped[clade].add("%s_%s" % (db, asmName))
-        else:
-            clade = dbDbClades.get(db)
-            if clade is None:
-                print("WARNING: %s not in %s" % (db, cladeTsv), file=sys.stderr)
-                continue
-            grouped[clade].add(db)
-    return {clade: sorted(ids) for clade, ids in grouped.items()}
-
 
-############################################################################
-def writeCladeTsv(clade, asmIds):
-    """Filter <clade>.orderList.tsv down to lines matching any asmId and
-    write the result to tsv.otto in the same directory.  Mirrors:
-        cd ~/kent/src/hg/makeDb/doc/<clade>AsmHub
-        egrep '<id1>|<id2>|...' <clade>.orderList.tsv > tsv.otto
-    Only GenArk identifiers are used UCSC native dbs are not in the
-    AsmHub orderList files.
-
-    Returns cladeDir on success (so the caller can chain the make
-    sequence), or None if there is nothing to do for this clade.
-    """
-    genarkIds = [a for a in asmIds if gcPattern.match(a)]
-    if not genarkIds:
-        return None
-    cladeDir = os.path.expanduser(
-        "~/kent/src/hg/makeDb/doc/%sAsmHub" % clade)
-    orderList = os.path.join(cladeDir, "%s.orderList.tsv" % clade)
-    outPath = os.path.join(cladeDir, "tsv.otto")
-    if not os.path.isfile(orderList):
-        print("WARNING: missing %s" % orderList, file=sys.stderr)
-        return None
-    # orderList.tsv files occasionally contain Latin-1 bytes (e.g. xxx in
-    # Scandinavian fish names) that aren't valid UTF-8.  surrogateescape
-    # round-trips those bytes through read+write byte-for-byte instead of
-    # raising UnicodeDecodeError.
-    matched = []
-    with open(orderList, encoding="utf-8", errors="surrogateescape") as fh:
-        for line in fh:
-            if any(asmId in line for asmId in genarkIds):
-                matched.append(line)
-    if not matched:
-        print("WARNING: no matches in %s" % orderList, file=sys.stderr)
-        return None
-    with open(outPath, "w", encoding="utf-8", errors="surrogateescape") as fh:
-        fh.writelines(matched)
-#   print("# wrote %d line(s) to %s" % (len(matched), outPath),
-#         file=sys.stderr)
-    return cladeDir
-
-
-# Sequence of make commands run in the clade AsmHub directory after
-# tsv.otto is written.  Stops on the first failure.
-makeChainCommands = [
-    "time (make symLinks orderList=tsv.otto) >> dbg 2>&1",
-    "time (make mkGenomes orderList=tsv.otto) >> dbg 2>&1",
-    "time (make symLinks orderList=tsv.otto) >> dbg 2>&1",
-    "time (make verifyTestDownload orderList=tsv.otto) >> test.down.log 2>&1",
-    "time (make sendDownload orderList=tsv.otto) >> send.down.log 2>&1",
-    "time (make verifyDownload orderList=tsv.otto) >> verify.down.log 2>&1",
-]
-
-
-############################################################################
-def runMakeChain(cladeDir):
-    """Run the post-tsv.otto make sequence in cladeDir.  Uses bash so
-    'time (...)' (a builtin on a subshell) and '>>' / '2>&1' work as
-    written.  Returns True on success, False if any step fails (the
-    chain stops at the first failure)."""
-    for cmd in makeChainCommands:
-#       print("# [%s] %s" % (cladeDir, cmd), file=sys.stderr)
-        result = subprocess.run(
-            cmd, shell=True, executable="/bin/bash", cwd=cladeDir,
-        )
-        if result.returncode != 0:
-            print("# ERROR: exit %d from: %s -- stopping chain"
-                  % (result.returncode, cmd), file=sys.stderr)
-            return False
-    return True
-
-
-############################################################################
 def pushUcscChain(targetDb, queryDb):
     """rsync the .over.chain.gz to both hgdownload hosts under
     /goldenPath/<targetDb>/liftOver/.  targetDb must be a UCSC native db
     (the goldenPath tree only exists for native dbs); queryDb may be
     GenArk or native.  Source filename uses dot separators (e.g.
     ce11.GCA_000180635.4.over.chain.gz), destination filename uses
     CamelCase (ce11ToGCA_000180635.4.over.chain.gz); rsync renames in
     flight by giving the destination as a file path, not a directory.
     -L dereferences the lastz.<queryDb> symlink to the dated build dir.
     Pre-creates the destination directory with a separate ssh + mkdir
     -p.  Returns True on success, False on any failure."""
     QueryDb = queryDb[:1].upper() + queryDb[1:]
     src = ("/hive/data/genomes/%s/bed/lastz.%s/axtChain/%s.%s.over.chain.gz"
            % (targetDb, queryDb, targetDb, queryDb))
     if not os.path.isfile(src):
@@ -240,98 +89,98 @@
         if result.returncode != 0:
             print("# ERROR: pushUcscChain: mkdir failed on %s: %s"
                   % (host, result.stderr.strip()), file=sys.stderr)
             return False
         result = subprocess.run(
             ["rsync", "-avL", src, "%s:%s" % (target, dstFile)],
             capture_output=True, text=True,
         )
         if result.returncode != 0:
             print("# ERROR: pushUcscChain: rsync to %s failed: %s"
                   % (host, result.stderr.strip()), file=sys.stderr)
             return False
     return True
 
 
-############################################################################
 def main():
-    lockFh = acquireSingletonLock()  # noqa: F841 -- keep ref alive
+    lockFh = ottoLib.acquireSingletonLock(lockPath)  # noqa: F841
     requests = pendingRequests()
     if not requests:
         return
     dbs = set()
     for _, fromDb, toDb in requests:
         dbs.update((fromDb, toDb))
-    accessions = {db for db in dbs if gcPattern.match(db)}
-    dbDbClades = loadDbDbClades()
-    genarkInfo = lookupGenark(accessions)
-    grouped = groupByClade(dbs, dbDbClades, genarkInfo)
+    accessions = {db for db in dbs if ottoLib.gcPattern.match(db)}
+    dbDbClades = ottoLib.loadDbDbClades()
+    genarkInfo = ottoLib.lookupGenark(accessions)
+    grouped = ottoLib.groupByClade(dbs, dbDbClades, genarkInfo)
+
+    # bring the otto kent tree up to date before any cladeAsmHub make
+    if not ottoLib.gitPullKentTree():
+        sys.exit(1)
 
     def cladeOf(db):
-        if gcPattern.match(db):
+        if ottoLib.gcPattern.match(db):
             info = genarkInfo.get(db)
             return info[1] if info else None
         return dbDbClades.get(db)
 
     succeededClades = set()
     failedClades = set()
     for clade in sorted(grouped):
-#       print("%s:" % clade)
-#       for asmId in grouped[clade]:
-#           print("  %s" % asmId)
-        cladeDir = writeCladeTsv(clade, grouped[clade])
+        cladeDir = ottoLib.writeCladeTsv(clade, grouped[clade])
         if cladeDir is None:
             continue
-        if runMakeChain(cladeDir):
+        if ottoLib.runGenArkMake(cladeDir):
             succeededClades.add(clade)
         else:
             failedClades.add(clade)
 
     # GenArk-target directions are pushed by the per-clade make chain
     # above; UCSC-native-target directions are pushed by rsync to the
     # hgdownload hosts.  A request advances to status=6 only when ALL
     # of its directions succeed.  Clade-side failures leave the request
     # at status=5 (existing behavior).  rsync failures move it to
     # status=7 to alert examination.
     completedIds = []
     failedIds = []
     cladeFailures = []
     pushFailures = []
     for reqId, fromDb, toDb in requests:
         genarkFailedClades = set()
         genarkIncomplete = False
         for db in (fromDb, toDb):
-            if not gcPattern.match(db):
+            if not ottoLib.gcPattern.match(db):
                 continue
             clade = cladeOf(db)
             if clade is None:
                 genarkIncomplete = True       # missing clade info; retry
             elif clade in failedClades:
                 genarkFailedClades.add(clade)
             elif clade not in succeededClades:
                 genarkIncomplete = True       # not yet pushed; retry
         if genarkFailedClades:
             cladeFailures.append(
                 (reqId, fromDb, toDb, sorted(genarkFailedClades)))
             continue
         if genarkIncomplete:
             continue
 
         pushOk = True
         pushFailedDirs = []
         for target, query in [(fromDb, toDb), (toDb, fromDb)]:
-            if gcPattern.match(target):
+            if ottoLib.gcPattern.match(target):
                 continue                      # GenArk side already handled
             if not pushUcscChain(target, query):
                 pushFailedDirs.append("%s -> %s" % (target, query))
                 pushOk = False
                 break
         if pushOk:
             completedIds.append(reqId)
         else:
             failedIds.append(reqId)
             pushFailures.append((reqId, fromDb, toDb, pushFailedDirs))
 
     markComplete(completedIds)
     markFailed(failedIds)
 
     if cladeFailures: