cff0574d507c40d5bd01f091ccf8edf75ef5e892 hiram Thu May 21 19:33:26 2026 -0700 need to receive error message in cron stderr when something does not push correctly refs #31811 diff --git src/hg/utils/otto/userRequests/ottoRequestPush.py src/hg/utils/otto/userRequests/ottoRequestPush.py index 4c9f8c604c8..6c63641dd58 100755 --- src/hg/utils/otto/userRequests/ottoRequestPush.py +++ src/hg/utils/otto/userRequests/ottoRequestPush.py @@ -15,133 +15,140 @@ import os import re import subprocess import sys from collections import defaultdict scriptDir = os.path.dirname(os.path.abspath(__file__)) cladeTsv = os.path.join(scriptDir, "dbDb.name.clade.tsv") lockPath = os.path.join(scriptDir, "ottoRequestPush.lock") gcPattern = re.compile(r"^GC[AF]_") # UCSC native .over.chain.gz files get rsync'd to both hgdownload hosts. pushUser = "qateam" pushHosts = ["hgdownload1.soe.ucsc.edu", "hgdownload3.gi.ucsc.edu"] - +############################################################################ def acquireSingletonLock(): """Ensure only one instance of this script runs at a time. Holds an exclusive flock on lockPath for the lifetime of the process; the kernel releases it on exit (including crash / kill -9), so no stale lock cleanup is needed. Returns the open file handle, which the caller must keep alive.""" # "a+" opens read+write without truncating (and creates if missing), # so a second instance that fails to lock doesn't wipe the running # instance's PID from the file before exiting. fh = open(lockPath, "a+") try: fcntl.flock(fh, fcntl.LOCK_EX | fcntl.LOCK_NB) except BlockingIOError: sys.exit(0) # we own the lock truncate and write our PID for information fh.seek(0) fh.truncate() fh.write("%d\n" % os.getpid()) fh.flush() return fh ### FYI: can also see the locking process via: lsof ottoRequestPush.lock +############################################################################ def hgsql(query, db="hgcentraltest"): """Run hgsql -N -B and return rows as list of tuples (tab-split).""" out = subprocess.run( ["/cluster/bin/x86_64/hgsql", "-N", "-B", "-e", query, db], check=True, capture_output=True, text=True, ).stdout return [tuple(line.split("\t")) for line in out.splitlines() if line] +############################################################################ def loadDbDbClades(): """Read dbDb.name.clade.tsv -> {dbName: clade}.""" result = {} with open(cladeTsv) as fh: for line in fh: if line.startswith("#") or not line.strip(): continue name, clade = line.rstrip("\n").split("\t")[:2] result[name] = clade return result +############################################################################ def pendingRequests(): """Status=5 liftOver requests as [(id, fromDb, toDb), ...].""" rows = hgsql( "SELECT id, fromDb, toDb FROM ottoRequest " "WHERE status = 5 AND requestType = 'liftOver';" ) return [(int(r[0]), r[1], r[2]) for r in rows] +############################################################################ def markComplete(reqIds): """Set status=6 on the given ottoRequest ids.""" if not reqIds: return idList = ",".join(str(i) for i in sorted(reqIds)) hgsql("UPDATE ottoRequest SET status = 6 WHERE id IN (%s);" % idList) # print("# marked status=6: %s" % idList, file=sys.stderr) +############################################################################ def markFailed(reqIds): """Set status=7 (problems) on the given ottoRequest ids.""" if not reqIds: return idList = ",".join(str(i) for i in sorted(reqIds)) hgsql("UPDATE ottoRequest SET status = 7 WHERE id IN (%s);" % idList) -# print("# marked status=7: %s" % idList, file=sys.stderr) - + print("# ottoRequestPush.py marked status=7 for failing list: %s" % idList, file=sys.stderr) +############################################################################ def lookupGenark(accessions): """Bulk-lookup GenArk accessions -> {acc: (asmName, clade)}.""" if not accessions: return {} quoted = ",".join("'%s'" % a for a in sorted(accessions)) rows = hgsql( "SELECT gcAccession, asmName, clade FROM genark " "WHERE gcAccession IN (%s);" % quoted ) return {acc: (asmName, clade) for acc, asmName, clade in rows} +############################################################################ def groupByClade(dbs, dbDbClades, genarkInfo): """Build {clade: [assemblyId, ...]}.""" grouped = defaultdict(set) for db in dbs: if gcPattern.match(db): info = genarkInfo.get(db) if info is None: print("WARNING: %s not in genark table" % db, file=sys.stderr) continue asmName, clade = info grouped[clade].add("%s_%s" % (db, asmName)) else: clade = dbDbClades.get(db) if clade is None: print("WARNING: %s not in %s" % (db, cladeTsv), file=sys.stderr) continue grouped[clade].add(db) return {clade: sorted(ids) for clade, ids in grouped.items()} +############################################################################ def writeCladeTsv(clade, asmIds): """Filter <clade>.orderList.tsv down to lines matching any asmId and write the result to tsv.otto in the same directory. Mirrors: cd ~/kent/src/hg/makeDb/doc/<clade>AsmHub egrep '<id1>|<id2>|...' <clade>.orderList.tsv > tsv.otto Only GenArk identifiers are used UCSC native dbs are not in the AsmHub orderList files. Returns cladeDir on success (so the caller can chain the make sequence), or None if there is nothing to do for this clade. """ genarkIds = [a for a in asmIds if gcPattern.match(a)] if not genarkIds: return None cladeDir = os.path.expanduser( @@ -170,47 +177,49 @@ return cladeDir # Sequence of make commands run in the clade AsmHub directory after # tsv.otto is written. Stops on the first failure. makeChainCommands = [ "time (make symLinks orderList=tsv.otto) >> dbg 2>&1", "time (make mkGenomes orderList=tsv.otto) >> dbg 2>&1", "time (make symLinks orderList=tsv.otto) >> dbg 2>&1", "time (make verifyTestDownload orderList=tsv.otto) >> test.down.log 2>&1", "time (make sendDownload orderList=tsv.otto) >> send.down.log 2>&1", "time (make verifyDownload orderList=tsv.otto) >> verify.down.log 2>&1", ] +############################################################################ def runMakeChain(cladeDir): """Run the post-tsv.otto make sequence in cladeDir. Uses bash so 'time (...)' (a builtin on a subshell) and '>>' / '2>&1' work as written. Returns True on success, False if any step fails (the chain stops at the first failure).""" for cmd in makeChainCommands: # print("# [%s] %s" % (cladeDir, cmd), file=sys.stderr) result = subprocess.run( cmd, shell=True, executable="/bin/bash", cwd=cladeDir, ) if result.returncode != 0: print("# ERROR: exit %d from: %s -- stopping chain" % (result.returncode, cmd), file=sys.stderr) return False return True +############################################################################ def pushUcscChain(targetDb, queryDb): """rsync the .over.chain.gz to both hgdownload hosts under /goldenPath/<targetDb>/liftOver/. targetDb must be a UCSC native db (the goldenPath tree only exists for native dbs); queryDb may be GenArk or native. Source filename uses dot separators (e.g. ce11.GCA_000180635.4.over.chain.gz), destination filename uses CamelCase (ce11ToGCA_000180635.4.over.chain.gz); rsync renames in flight by giving the destination as a file path, not a directory. -L dereferences the lastz.<queryDb> symlink to the dated build dir. Pre-creates the destination directory with a separate ssh + mkdir -p. Returns True on success, False on any failure.""" QueryDb = queryDb[:1].upper() + queryDb[1:] src = ("/hive/data/genomes/%s/bed/lastz.%s/axtChain/%s.%s.over.chain.gz" % (targetDb, queryDb, targetDb, queryDb)) if not os.path.isfile(src): @@ -228,30 +237,31 @@ if result.returncode != 0: print("# ERROR: pushUcscChain: mkdir failed on %s: %s" % (host, result.stderr.strip()), file=sys.stderr) return False result = subprocess.run( ["rsync", "-avL", src, "%s:%s" % (target, dstFile)], capture_output=True, text=True, ) if result.returncode != 0: print("# ERROR: pushUcscChain: rsync to %s failed: %s" % (host, result.stderr.strip()), file=sys.stderr) return False return True +############################################################################ def main(): lockFh = acquireSingletonLock() # noqa: F841 -- keep ref alive requests = pendingRequests() if not requests: return dbs = set() for _, fromDb, toDb in requests: dbs.update((fromDb, toDb)) accessions = {db for db in dbs if gcPattern.match(db)} dbDbClades = loadDbDbClades() genarkInfo = lookupGenark(accessions) grouped = groupByClade(dbs, dbDbClades, genarkInfo) def cladeOf(db): if gcPattern.match(db):