6ddaedca96e314bcd1607e69bcfa8fb228d22a3c hiram Thu May 21 21:53:56 2026 -0700 common functions moved to ottoLib.py and shared with ottoBuildGenArkHub.py and fixup the legacy clade situation refs #31811 diff --git src/hg/utils/otto/userRequests/ottoRequestPush.py src/hg/utils/otto/userRequests/ottoRequestPush.py index ecc009991be..a5ee51f633e 100755 --- src/hg/utils/otto/userRequests/ottoRequestPush.py +++ src/hg/utils/otto/userRequests/ottoRequestPush.py @@ -1,355 +1,204 @@ #!/usr/bin/env python3 """ -ottoRequestPush.py - group fromDb/toDb identifiers from pending push -requests (status=5) by clade. +ottoRequestPush2.py - lib-using rewrite of ottoRequestPush.py. -Output: dict[clade] -> sorted list of assembly identifiers, where each -identifier is "<gcAccession>_<asmName>" for GenArk accessions, or the -plain UCSC db name for native dbs. +Same behavior as ottoRequestPush.py: groups fromDb/toDb identifiers +from pending push requests (status=5) by clade, drives the per-clade +genArkMakeCommands sequence, rsyncs the UCSC-native .over.chain.gz to +both hgdownload hosts, and advances each request's status (6 on full +success, 7 on rsync failure, stays at 5 on clade-side failure). -cron tab entry in hiram crontab: -4,26,46 * * * * ~/kent/src/hg/utils/otto/userRequests/ottoRequestPush.py +Differences from ottoRequestPush.py: all shared helpers live in +ottoLib.py. Push-specific helpers (pendingRequests, mark*, +pushUcscChain) and main() stay here. + +This is a parallel script for review/cutover - the live cron continues +to invoke ottoRequestPush.py. """ -import fcntl import os -import re import subprocess import sys -from collections import defaultdict -# Set umask 002 (group-writable) for this process and all subprocesses -os.umask(0o002) +import ottoLib scriptDir = os.path.dirname(os.path.abspath(__file__)) -cladeTsv = os.path.join(scriptDir, "dbDb.name.clade.tsv") +# share the live cron's lock so the two scripts cannot run concurrently +# in the same cladeAsmHub directories lockPath = os.path.join(scriptDir, "ottoRequestPush.lock") -gcPattern = re.compile(r"^GC[AF]_") # UCSC native .over.chain.gz files get rsync'd to both hgdownload hosts. pushUser = "qateam" pushHosts = ["hgdownload1.soe.ucsc.edu", "hgdownload3.gi.ucsc.edu"] -############################################################################ -def acquireSingletonLock(): - """Ensure only one instance of this script runs at a time. Holds an - exclusive flock on lockPath for the lifetime of the process; the - kernel releases it on exit (including crash / kill -9), so no stale - lock cleanup is needed. Returns the open file handle, which the - caller must keep alive.""" - # "a+" opens read+write without truncating (and creates if missing), - # so a second instance that fails to lock doesn't wipe the running - # instance's PID from the file before exiting. - fh = open(lockPath, "a+") - try: - fcntl.flock(fh, fcntl.LOCK_EX | fcntl.LOCK_NB) - except BlockingIOError: - sys.exit(0) - # we own the lock truncate and write our PID for information - fh.seek(0) - fh.truncate() - fh.write("%d\n" % os.getpid()) - fh.flush() - return fh - ### FYI: can also see the locking process via: lsof ottoRequestPush.lock - -############################################################################ -def hgsql(query, db="hgcentraltest"): - """Run hgsql -N -B and return rows as list of tuples (tab-split).""" - out = subprocess.run( - ["/cluster/bin/x86_64/hgsql", "-N", "-B", "-e", query, db], - check=True, capture_output=True, text=True, - ).stdout - return [tuple(line.split("\t")) for line in out.splitlines() if line] - - -############################################################################ -def loadDbDbClades(): - """Read dbDb.name.clade.tsv -> {dbName: clade}.""" - result = {} - with open(cladeTsv) as fh: - for line in fh: - if line.startswith("#") or not line.strip(): - continue - name, clade = line.rstrip("\n").split("\t")[:2] - result[name] = clade - return result - -############################################################################ def pendingRequests(): """Status=5 liftOver requests as [(id, fromDb, toDb), ...].""" - rows = hgsql( + rows = ottoLib.hgsql( "SELECT id, fromDb, toDb FROM ottoRequest " "WHERE status = 5 AND requestType = 'liftOver';" ) return [(int(r[0]), r[1], r[2]) for r in rows] -############################################################################ def markComplete(reqIds): """Set status=6 on the given ottoRequest ids.""" if not reqIds: return idList = ",".join(str(i) for i in sorted(reqIds)) - hgsql("UPDATE ottoRequest SET status = 6 WHERE id IN (%s);" % idList) -# print("# marked status=6: %s" % idList, file=sys.stderr) + ottoLib.hgsql( + "UPDATE ottoRequest SET status = 6 WHERE id IN (%s);" % idList) -############################################################################ def markFailed(reqIds): """Set status=7 (problems) on the given ottoRequest ids.""" if not reqIds: return idList = ",".join(str(i) for i in sorted(reqIds)) - hgsql("UPDATE ottoRequest SET status = 7 WHERE id IN (%s);" % idList) + ottoLib.hgsql( + "UPDATE ottoRequest SET status = 7 WHERE id IN (%s);" % idList) print("# ottoRequestPush.py marked status=7 for failing list: %s" % idList, file=sys.stderr) -############################################################################ -def lookupGenark(accessions): - """Bulk-lookup GenArk accessions -> {acc: (asmName, clade)}.""" - if not accessions: - return {} - quoted = ",".join("'%s'" % a for a in sorted(accessions)) - rows = hgsql( - "SELECT gcAccession, asmName, clade FROM genark " - "WHERE gcAccession IN (%s);" % quoted - ) - return {acc: (asmName, clade) for acc, asmName, clade in rows} - - -############################################################################ -def groupByClade(dbs, dbDbClades, genarkInfo): - """Build {clade: [assemblyId, ...]}.""" - grouped = defaultdict(set) - for db in dbs: - if gcPattern.match(db): - info = genarkInfo.get(db) - if info is None: - print("WARNING: %s not in genark table" % db, file=sys.stderr) - continue - asmName, clade = info - grouped[clade].add("%s_%s" % (db, asmName)) - else: - clade = dbDbClades.get(db) - if clade is None: - print("WARNING: %s not in %s" % (db, cladeTsv), file=sys.stderr) - continue - grouped[clade].add(db) - return {clade: sorted(ids) for clade, ids in grouped.items()} - -############################################################################ -def writeCladeTsv(clade, asmIds): - """Filter <clade>.orderList.tsv down to lines matching any asmId and - write the result to tsv.otto in the same directory. Mirrors: - cd ~/kent/src/hg/makeDb/doc/<clade>AsmHub - egrep '<id1>|<id2>|...' <clade>.orderList.tsv > tsv.otto - Only GenArk identifiers are used UCSC native dbs are not in the - AsmHub orderList files. - - Returns cladeDir on success (so the caller can chain the make - sequence), or None if there is nothing to do for this clade. - """ - genarkIds = [a for a in asmIds if gcPattern.match(a)] - if not genarkIds: - return None - cladeDir = os.path.expanduser( - "~/kent/src/hg/makeDb/doc/%sAsmHub" % clade) - orderList = os.path.join(cladeDir, "%s.orderList.tsv" % clade) - outPath = os.path.join(cladeDir, "tsv.otto") - if not os.path.isfile(orderList): - print("WARNING: missing %s" % orderList, file=sys.stderr) - return None - # orderList.tsv files occasionally contain Latin-1 bytes (e.g. xxx in - # Scandinavian fish names) that aren't valid UTF-8. surrogateescape - # round-trips those bytes through read+write byte-for-byte instead of - # raising UnicodeDecodeError. - matched = [] - with open(orderList, encoding="utf-8", errors="surrogateescape") as fh: - for line in fh: - if any(asmId in line for asmId in genarkIds): - matched.append(line) - if not matched: - print("WARNING: no matches in %s" % orderList, file=sys.stderr) - return None - with open(outPath, "w", encoding="utf-8", errors="surrogateescape") as fh: - fh.writelines(matched) -# print("# wrote %d line(s) to %s" % (len(matched), outPath), -# file=sys.stderr) - return cladeDir - - -# Sequence of make commands run in the clade AsmHub directory after -# tsv.otto is written. Stops on the first failure. -makeChainCommands = [ - "time (make symLinks orderList=tsv.otto) >> dbg 2>&1", - "time (make mkGenomes orderList=tsv.otto) >> dbg 2>&1", - "time (make symLinks orderList=tsv.otto) >> dbg 2>&1", - "time (make verifyTestDownload orderList=tsv.otto) >> test.down.log 2>&1", - "time (make sendDownload orderList=tsv.otto) >> send.down.log 2>&1", - "time (make verifyDownload orderList=tsv.otto) >> verify.down.log 2>&1", -] - - -############################################################################ -def runMakeChain(cladeDir): - """Run the post-tsv.otto make sequence in cladeDir. Uses bash so - 'time (...)' (a builtin on a subshell) and '>>' / '2>&1' work as - written. Returns True on success, False if any step fails (the - chain stops at the first failure).""" - for cmd in makeChainCommands: -# print("# [%s] %s" % (cladeDir, cmd), file=sys.stderr) - result = subprocess.run( - cmd, shell=True, executable="/bin/bash", cwd=cladeDir, - ) - if result.returncode != 0: - print("# ERROR: exit %d from: %s -- stopping chain" - % (result.returncode, cmd), file=sys.stderr) - return False - return True - - -############################################################################ def pushUcscChain(targetDb, queryDb): """rsync the .over.chain.gz to both hgdownload hosts under /goldenPath/<targetDb>/liftOver/. targetDb must be a UCSC native db (the goldenPath tree only exists for native dbs); queryDb may be GenArk or native. Source filename uses dot separators (e.g. ce11.GCA_000180635.4.over.chain.gz), destination filename uses CamelCase (ce11ToGCA_000180635.4.over.chain.gz); rsync renames in flight by giving the destination as a file path, not a directory. -L dereferences the lastz.<queryDb> symlink to the dated build dir. Pre-creates the destination directory with a separate ssh + mkdir -p. Returns True on success, False on any failure.""" QueryDb = queryDb[:1].upper() + queryDb[1:] src = ("/hive/data/genomes/%s/bed/lastz.%s/axtChain/%s.%s.over.chain.gz" % (targetDb, queryDb, targetDb, queryDb)) if not os.path.isfile(src): print("# ERROR: pushUcscChain: missing source %s" % src, file=sys.stderr) return False dstDir = "/data/apache/htdocs/goldenPath/%s/liftOver" % targetDb dstFile = "%s/%sTo%s.over.chain.gz" % (dstDir, targetDb, QueryDb) for host in pushHosts: target = "%s@%s" % (pushUser, host) result = subprocess.run( ["ssh", target, "mkdir", "-p", dstDir], capture_output=True, text=True, ) if result.returncode != 0: print("# ERROR: pushUcscChain: mkdir failed on %s: %s" % (host, result.stderr.strip()), file=sys.stderr) return False result = subprocess.run( ["rsync", "-avL", src, "%s:%s" % (target, dstFile)], capture_output=True, text=True, ) if result.returncode != 0: print("# ERROR: pushUcscChain: rsync to %s failed: %s" % (host, result.stderr.strip()), file=sys.stderr) return False return True -############################################################################ def main(): - lockFh = acquireSingletonLock() # noqa: F841 -- keep ref alive + lockFh = ottoLib.acquireSingletonLock(lockPath) # noqa: F841 requests = pendingRequests() if not requests: return dbs = set() for _, fromDb, toDb in requests: dbs.update((fromDb, toDb)) - accessions = {db for db in dbs if gcPattern.match(db)} - dbDbClades = loadDbDbClades() - genarkInfo = lookupGenark(accessions) - grouped = groupByClade(dbs, dbDbClades, genarkInfo) + accessions = {db for db in dbs if ottoLib.gcPattern.match(db)} + dbDbClades = ottoLib.loadDbDbClades() + genarkInfo = ottoLib.lookupGenark(accessions) + grouped = ottoLib.groupByClade(dbs, dbDbClades, genarkInfo) + + # bring the otto kent tree up to date before any cladeAsmHub make + if not ottoLib.gitPullKentTree(): + sys.exit(1) def cladeOf(db): - if gcPattern.match(db): + if ottoLib.gcPattern.match(db): info = genarkInfo.get(db) return info[1] if info else None return dbDbClades.get(db) succeededClades = set() failedClades = set() for clade in sorted(grouped): -# print("%s:" % clade) -# for asmId in grouped[clade]: -# print(" %s" % asmId) - cladeDir = writeCladeTsv(clade, grouped[clade]) + cladeDir = ottoLib.writeCladeTsv(clade, grouped[clade]) if cladeDir is None: continue - if runMakeChain(cladeDir): + if ottoLib.runGenArkMake(cladeDir): succeededClades.add(clade) else: failedClades.add(clade) # GenArk-target directions are pushed by the per-clade make chain # above; UCSC-native-target directions are pushed by rsync to the # hgdownload hosts. A request advances to status=6 only when ALL # of its directions succeed. Clade-side failures leave the request # at status=5 (existing behavior). rsync failures move it to # status=7 to alert examination. completedIds = [] failedIds = [] cladeFailures = [] pushFailures = [] for reqId, fromDb, toDb in requests: genarkFailedClades = set() genarkIncomplete = False for db in (fromDb, toDb): - if not gcPattern.match(db): + if not ottoLib.gcPattern.match(db): continue clade = cladeOf(db) if clade is None: genarkIncomplete = True # missing clade info; retry elif clade in failedClades: genarkFailedClades.add(clade) elif clade not in succeededClades: genarkIncomplete = True # not yet pushed; retry if genarkFailedClades: cladeFailures.append( (reqId, fromDb, toDb, sorted(genarkFailedClades))) continue if genarkIncomplete: continue pushOk = True pushFailedDirs = [] for target, query in [(fromDb, toDb), (toDb, fromDb)]: - if gcPattern.match(target): + if ottoLib.gcPattern.match(target): continue # GenArk side already handled if not pushUcscChain(target, query): pushFailedDirs.append("%s -> %s" % (target, query)) pushOk = False break if pushOk: completedIds.append(reqId) else: failedIds.append(reqId) pushFailures.append((reqId, fromDb, toDb, pushFailedDirs)) markComplete(completedIds) markFailed(failedIds) if cladeFailures: print("# the following request(s) stay at status=5 due to failed " "clade pushes:", file=sys.stderr) for reqId, fromDb, toDb, badClades in cladeFailures: print("# id=%d %s -> %s (failed clade(s): %s)" % (reqId, fromDb, toDb, ", ".join(badClades)), file=sys.stderr) if pushFailures: print("# the following request(s) set to status=7 due to rsync " "failures:", file=sys.stderr) for reqId, fromDb, toDb, dirs in pushFailures: print("# id=%d %s -> %s (failed: %s)" % (reqId, fromDb, toDb, "; ".join(dirs)), file=sys.stderr) if __name__ == "__main__": main()