f5cece60b38bd8c2cfd57acf2abb972d66edb6d9 hiram Fri Apr 24 11:43:36 2026 -0700 rename doneStatus to just status and correctly set status codes refs #31811 diff --git src/hg/utils/otto/userRequests/workflowMonitor.sh src/hg/utils/otto/userRequests/workflowMonitor.sh index dd53ed9338e..3d51b27cf55 100755 --- src/hg/utils/otto/userRequests/workflowMonitor.sh +++ src/hg/utils/otto/userRequests/workflowMonitor.sh @@ -1,292 +1,312 @@ #!/bin/bash # workflowMonitor.sh - check a pending Galaxy kegAlign workflow invocation # and install results when complete # -# usage: workflowMonitor.sh +# usage: workflowMonitor.sh # # Called periodically (e.g. from cron) to poll the Galaxy API for # invocation status. Exits 0 silently when still running so cron # can simply re-invoke on schedule. When complete, downloads results # via planemo and installs chain/liftOver bigBed files into the build # and swap directories. # # Expects in : # pendingInvocationId.txt - written by kegAlign.sh # (3 tab-separated fields: DS invocationId logJsonPath) # kegAlign.sh - generated by kegAlignLastz.sh, # variable definitions sourced for buildDir, # swapDir, PM, targetDb, queryDb, etc. +# status indicators: +# 0 pending, 1 notified, 2 in progress, 3 galaxy done, 4 tracks complete, +# 5 finish notification, 6 complete, 7 problems */ set -beEu -o pipefail -if [ $# != 1 ]; then - printf "usage: workflowMonitor.sh \n" 1>&2 +if [ $# != 2 ]; then + printf "usage: workflowMonitor.sh \n" 1>&2 exit 255 fi -export buildDir="$1" +export reqId="$1" +export buildDir="$2" + +############################################################################## +### errors - set error status in the table +function setErrorStatus() { + id="${1}" + hgsql -N -e \ + "UPDATE ottoRequest SET status=7 WHERE id=${id};" hgcentraltest +} +############################################################################## if [ ! -d "${buildDir}" ]; then printf "ERROR: buildDir not found: %s\n" "${buildDir}" 1>&2 exit 255 fi cd "${buildDir}" # already finished? if [ -s successInvocationId.txt ]; then printf "# already completed: %s\n" "${buildDir}" 1>&2 exit 0 fi # no pending invocation? if [ ! -s pendingInvocationId.txt ]; then printf "ERROR: no pendingInvocationId.txt in %s\n" "${buildDir}" 1>&2 exit 255 fi # read pending invocation fields export DS=$(cut -f1 pendingInvocationId.txt) export invocationId=$(cut -f2 pendingInvocationId.txt) export logJson=$(cut -f3 pendingInvocationId.txt) # source variable definitions from the generated kegAlign.sh if [ ! -s kegAlign.sh ]; then printf "ERROR: no kegAlign.sh in %s\n" "${buildDir}" 1>&2 exit 255 fi source <(grep -E '^export (swapDir|PM|targetDb|queryDb|QueryDb|Target|tSizes|qSizes)=' kegAlign.sh) printf "# monitoring invocation: %s\n" "${invocationId}" 1>&2 printf "# buildDir: %s\n" "${buildDir}" 1>&2 # read Galaxy credentials from planemo profile export profileJson="${HOME}/.planemo/profiles/vgp/planemo_profile_options.json" if [ ! -s "${profileJson}" ]; then printf "ERROR: planemo profile not found: %s\n" "${profileJson}" 1>&2 exit 255 fi export galaxyUrl=$(jq -r '.galaxy_url' "${profileJson}") export galaxyApiKey=$(jq -r '.galaxy_user_key' "${profileJson}") if [ -z "${galaxyUrl}" -o -z "${galaxyApiKey}" ]; then printf "ERROR: could not read galaxy_url or galaxy_user_key from %s\n" \ "${profileJson}" 1>&2 exit 255 fi # strip trailing slash galaxyUrl="${galaxyUrl%/}" ############################################################################ # query invocation state from the Galaxy API ############################################################################ stateJson=$(curl -s -H "x-api-key: ${galaxyApiKey}" \ "${galaxyUrl}/api/invocations/${invocationId}") state=$(printf "%s" "${stateJson}" | jq -r '.state // "unknown"') printf "# invocation state: %s\n" "${state}" 1>&2 case "${state}" in "cancelled"|"failed") printf "ERROR: workflow %s -- invocation %s\n" "${state}" "${invocationId}" 1>&2 + setErrorStatus ${reqId} exit 1 ;; "new"|"ready") printf "# workflow still starting up, will check again later\n" 1>&2 exit 0 ;; "scheduled") # all steps dispatched -- fall through to check individual jobs ;; "completed") # all steps dispatched -- fall through to check individual jobs ;; *) printf "# unexpected state '%s', will check again later\n" "${state}" 1>&2 exit 0 ;; esac # "scheduled" means all steps dispatched, but jobs may still be running. # use the jobs_summary endpoint to get aggregate job state counts. summaryJson=$(curl -s -H "x-api-key: ${galaxyApiKey}" \ "${galaxyUrl}/api/invocations/${invocationId}/jobs_summary") # count jobs in non-terminal states (new, queued, running, waiting, upload, # setting_metadata, resubmitted). Terminal states: ok, error, deleted, # skipped, paused. nonTerminalCount=$(printf "%s" "${summaryJson}" | jq \ '[.states | to_entries[] | select(.key != "ok" and .key != "error" and .key != "deleted" and .key != "skipped" and .key != "paused") | .value] | add // 0') if [ "${nonTerminalCount}" -gt 0 ]; then activeStates=$(printf "%s" "${summaryJson}" | jq -r \ '[.states | to_entries[] | select(.key != "ok" and .key != "error" and .key != "deleted" and .key != "skipped" and .key != "paused") | "\(.value) \(.key)"] | join(", ")') printf "# %d jobs still active (%s), will check again later\n" \ "${nonTerminalCount}" "${activeStates}" 1>&2 exit 0 fi # check for errored jobs -- a "completed" invocation can still contain # individual jobs that failed errorCount=$(printf "%s" "${summaryJson}" | jq '.states.error // 0') if [ "${errorCount}" -gt 0 ]; then printf "ERROR: %d job(s) had errors in invocation %s\n" \ "${errorCount}" "${invocationId}" 1>&2 + setErrorStatus ${reqId} # the invocation detail (stateJson) embeds steps with job_ids but not # job states; query each job individually to find which step(s) failed printf "%s" "${stateJson}" | jq -r ' .steps[] | select(.job_id != null) | "\(.order_index)\t\(.workflow_step_label // "unlabeled")\t\(.job_id)" ' | while IFS=$'\t' read -r stepIdx stepLabel jobId; do jobState=$(curl -s -H "x-api-key: ${galaxyApiKey}" \ "${galaxyUrl}/api/jobs/${jobId}" | jq -r '.state // "unknown"') if [ "${jobState}" = "error" ]; then printf " FAILED step %s: %s (job %s)\n" \ "${stepIdx}" "${stepLabel}" "${jobId}" 1>&2 fi done # check sub-workflow invocations for errors printf "%s" "${stateJson}" | jq -r ' .steps[] | select(.subworkflow_invocation_id != null) | "\(.order_index)\t\(.workflow_step_label // "unlabeled")\t\(.subworkflow_invocation_id)" ' | while IFS=$'\t' read -r stepIdx stepLabel subInvId; do subErrors=$(curl -s -H "x-api-key: ${galaxyApiKey}" \ "${galaxyUrl}/api/invocations/${subInvId}/jobs_summary" \ | jq '.states.error // 0') if [ "${subErrors}" -gt 0 ]; then printf " FAILED step %s: %s (sub-workflow %s, %d error(s))\n" \ "${stepIdx}" "${stepLabel}" "${subInvId}" "${subErrors}" 1>&2 fi done exit 1 fi printf "# all jobs complete, downloading results\n" 1>&2 +hgsql -N -e \ + "UPDATE ottoRequest SET status = 3 WHERE id = ${reqId};" hgcentraltest ############################################################################ # download results via planemo ############################################################################ mkdir -p "result/${DS}" ${PM} invocation_download "${invocationId}" --profile vgp \ --output_directory "result/${DS}" ############################################################################ # chainBigBedFb - convert chain to bigBed and compute featureBits # args: db chainName chainGz sizesFile fbFile ############################################################################ function chainBigBedFb() { local db=$1 local chainName=$2 local chainGz=$3 local sizesFile=$4 local fbFile=$5 chainToBigChain "${chainGz}" ${chainName}.tab ${chainName}Link.tab bedToBigBed -type=bed6+6 -as=$HOME/kent/src/hg/lib/bigChain.as -tab \ ${chainName}.tab ${sizesFile} ${chainName}.bb bedToBigBed -type=bed4+1 -as=$HOME/kent/src/hg/lib/bigLink.as -tab \ ${chainName}Link.tab ${sizesFile} ${chainName}Link.bb rm -f ${chainName}.tab ${chainName}Link.tab chain.tab link.tab local totalBases=$(ave -col=2 ${sizesFile} | grep "^total" | awk '{printf "%d", $2}') local basesCovered=$(bigBedInfo ${chainName}Link.bb | grep "basesCovered" | cut -d' ' -f2 | tr -d ',') local percentCovered=$(echo ${basesCovered} ${totalBases} | awk '{printf "%.3f", 100.0*$1/$2}') printf "%d bases of %d (%s%%) in intersection\n" \ "${basesCovered}" "${totalBases}" "${percentCovered}" > ${fbFile} } ############################################################################ # install target-side chains ############################################################################ ### allChain mkdir -p "${buildDir}/axtChain" if [ ! -s "${buildDir}/axtChain/${targetDb}.${queryDb}.all.chain.gz" ]; then allChainFile=$(ls result/${DS}/allChain__.*.chain) gzip -c "${allChainFile}" \ > "${buildDir}/axtChain/${targetDb}.${queryDb}.all.chain.gz" fi ### target allChain -> bigBed + featureBits cd "${buildDir}/axtChain" if [ ! -s "${buildDir}/fb.${targetDb}.chain${QueryDb}Link.txt" ]; then chainBigBedFb ${targetDb} chain${QueryDb} \ "${targetDb}.${queryDb}.all.chain.gz" ${tSizes} \ "${buildDir}/fb.${targetDb}.chain${QueryDb}Link.txt" fi cat "${buildDir}/fb.${targetDb}.chain${QueryDb}Link.txt" 1>&2 ### target liftOver chain (over.chain) cd "${buildDir}" if [ ! -s "${buildDir}/axtChain/${targetDb}.${queryDb}.over.chain.gz" ]; then overChainFile=$(ls result/${DS}/liftOverChain_.*.chain) gzip -c "${overChainFile}" \ > "${buildDir}/axtChain/${targetDb}.${queryDb}.over.chain.gz" fi ### target over.chain -> bigBed + featureBits cd "${buildDir}/axtChain" if [ ! -s "${buildDir}/fb.${targetDb}.chainLiftOver${QueryDb}Link.txt" ]; then chainBigBedFb ${targetDb} chainLiftOver${QueryDb} \ "${targetDb}.${queryDb}.over.chain.gz" ${tSizes} \ "${buildDir}/fb.${targetDb}.chainLiftOver${QueryDb}Link.txt" fi cat "${buildDir}/fb.${targetDb}.chainLiftOver${QueryDb}Link.txt" 1>&2 ############################################################################ # install swap-side chains ############################################################################ ### swap allChain mkdir -p "${swapDir}/axtChain" cd "${buildDir}" if [ ! -s "${swapDir}/axtChain/${queryDb}.${targetDb}.all.chain.gz" ]; then allChainSwapFile=$(ls result/${DS}/allChainSwap_.*.chain) gzip -c "${allChainSwapFile}" \ > "${swapDir}/axtChain/${queryDb}.${targetDb}.all.chain.gz" fi ### swap allChain -> bigBed + featureBits cd "${swapDir}/axtChain" if [ ! -s "${swapDir}/fb.${queryDb}.chain${Target}Link.txt" ]; then chainBigBedFb ${queryDb} chain${Target} \ "${queryDb}.${targetDb}.all.chain.gz" ${qSizes} \ "${swapDir}/fb.${queryDb}.chain${Target}Link.txt" fi cat "${swapDir}/fb.${queryDb}.chain${Target}Link.txt" 1>&2 ### swap liftOver chain (over.chain) cd "${buildDir}" if [ ! -s "${swapDir}/axtChain/${queryDb}.${targetDb}.over.chain.gz" ]; then overChainSwapFile=$(ls result/${DS}/swapLiftOverChain_.*.chain) gzip -c "${overChainSwapFile}" \ > "${swapDir}/axtChain/${queryDb}.${targetDb}.over.chain.gz" fi ### swap over.chain -> bigBed + featureBits cd "${swapDir}/axtChain" if [ ! -s "${swapDir}/fb.${queryDb}.chainLiftOver${Target}Link.txt" ]; then chainBigBedFb ${queryDb} chainLiftOver${Target} \ "${queryDb}.${targetDb}.over.chain.gz" ${qSizes} \ "${swapDir}/fb.${queryDb}.chainLiftOver${Target}Link.txt" fi cat "${swapDir}/fb.${queryDb}.chainLiftOver${Target}Link.txt" 1>&2 ############################################################################ # mark complete ############################################################################ printf "%s\tinvocation ID: %s\t%s\n" "${DS}" "${invocationId}" "${logJson}" \ > successInvocationId.txt rm -f pendingInvocationId.txt printf "### workflow monitor complete: %s %s -> %s\n" \ "${buildDir}" "${targetDb}" "${queryDb}" 1>&2 + +hgsql -N -e \ + "UPDATE ottoRequest SET status = 4 WHERE id = ${reqId};" hgcentraltest