584b69eff038b10c2176d2b9299c356af21288cc hiram Wed Apr 22 16:30:53 2026 -0700 initial scripts to run the galaxy workflow based off of the ottoRequest table entries refs #31811 diff --git src/hg/utils/otto/userRequests/workflowMonitor.sh src/hg/utils/otto/userRequests/workflowMonitor.sh new file mode 100755 index 00000000000..bb333d2f4aa --- /dev/null +++ src/hg/utils/otto/userRequests/workflowMonitor.sh @@ -0,0 +1,258 @@ +#!/bin/bash + +# workflowMonitor.sh - check a pending Galaxy kegAlign workflow invocation +# and install results when complete +# +# usage: workflowMonitor.sh +# +# Called periodically (e.g. from cron) to poll the Galaxy API for +# invocation status. Exits 0 silently when still running so cron +# can simply re-invoke on schedule. When complete, downloads results +# via planemo and installs chain/liftOver bigBed files into the build +# and swap directories. +# +# Expects in : +# pendingInvocationId.txt - written by kegAlign.sh +# (3 tab-separated fields: DS invocationId logJsonPath) +# kegAlign.sh - generated by kegAlignLastz.sh, +# variable definitions sourced for buildDir, +# swapDir, PM, targetDb, queryDb, etc. + +set -beEu -o pipefail + +if [ $# != 1 ]; then + printf "usage: workflowMonitor.sh \n" 1>&2 + exit 255 +fi + +export buildDir="$1" + +if [ ! -d "${buildDir}" ]; then + printf "ERROR: buildDir not found: %s\n" "${buildDir}" 1>&2 + exit 255 +fi + +cd "${buildDir}" + +# already finished? +if [ -s successInvocationId.txt ]; then + printf "# already completed: %s\n" "${buildDir}" 1>&2 + exit 0 +fi + +# no pending invocation? +if [ ! -s pendingInvocationId.txt ]; then + printf "ERROR: no pendingInvocationId.txt in %s\n" "${buildDir}" 1>&2 + exit 255 +fi + +# read pending invocation fields +export DS=$(cut -f1 pendingInvocationId.txt) +export invocationId=$(cut -f2 pendingInvocationId.txt) +export logJson=$(cut -f3 pendingInvocationId.txt) + +# source variable definitions from the generated kegAlign.sh +if [ ! -s kegAlign.sh ]; then + printf "ERROR: no kegAlign.sh in %s\n" "${buildDir}" 1>&2 + exit 255 +fi +source <(grep -E '^export (swapDir|PM|targetDb|queryDb|QueryDb|Target|tSizes|qSizes)=' kegAlign.sh) + +printf "# monitoring invocation: %s\n" "${invocationId}" 1>&2 +printf "# buildDir: %s\n" "${buildDir}" 1>&2 + +# read Galaxy credentials from planemo profile +export profileJson="${HOME}/.planemo/profiles/vgp/planemo_profile_options.json" +if [ ! -s "${profileJson}" ]; then + printf "ERROR: planemo profile not found: %s\n" "${profileJson}" 1>&2 + exit 255 +fi + +export galaxyUrl=$(jq -r '.galaxy_url' "${profileJson}") +export galaxyApiKey=$(jq -r '.galaxy_user_key' "${profileJson}") + +if [ -z "${galaxyUrl}" -o -z "${galaxyApiKey}" ]; then + printf "ERROR: could not read galaxy_url or galaxy_user_key from %s\n" \ + "${profileJson}" 1>&2 + exit 255 +fi + +# strip trailing slash +galaxyUrl="${galaxyUrl%/}" + +############################################################################ +# query invocation state from the Galaxy API +############################################################################ +stateJson=$(curl -s -H "x-api-key: ${galaxyApiKey}" \ + "${galaxyUrl}/api/invocations/${invocationId}") + +state=$(printf "%s" "${stateJson}" | jq -r '.state // "unknown"') +printf "# invocation state: %s\n" "${state}" 1>&2 + +case "${state}" in + "cancelled"|"failed") + printf "ERROR: workflow %s -- invocation %s\n" "${state}" "${invocationId}" 1>&2 + exit 1 + ;; + "new"|"ready") + printf "# workflow still starting up, will check again later\n" 1>&2 + exit 0 + ;; + "scheduled") + # all steps dispatched -- fall through to check individual jobs + ;; + *) + printf "# unexpected state '%s', will check again later\n" "${state}" 1>&2 + exit 0 + ;; +esac + +# "scheduled" means all steps dispatched, but jobs may still be running. +# use the jobs_summary endpoint to get aggregate job state counts. +summaryJson=$(curl -s -H "x-api-key: ${galaxyApiKey}" \ + "${galaxyUrl}/api/invocations/${invocationId}/jobs_summary") + +# count jobs in non-terminal states (new, queued, running, waiting, upload, +# setting_metadata, resubmitted). Terminal states: ok, error, deleted, +# skipped, paused. +nonTerminalCount=$(printf "%s" "${summaryJson}" | jq \ + '[.states | to_entries[] | select(.key != "ok" and .key != "error" + and .key != "deleted" and .key != "skipped" + and .key != "paused") | .value] | add // 0') + +if [ "${nonTerminalCount}" -gt 0 ]; then + activeStates=$(printf "%s" "${summaryJson}" | jq -r \ + '[.states | to_entries[] | select(.key != "ok" and .key != "error" + and .key != "deleted" and .key != "skipped" + and .key != "paused") | "\(.value) \(.key)"] | join(", ")') + printf "# %d jobs still active (%s), will check again later\n" \ + "${nonTerminalCount}" "${activeStates}" 1>&2 + exit 0 +fi + +# warn about errored jobs but proceed to download what we can +errorCount=$(printf "%s" "${summaryJson}" | jq '.states.error // 0') + +if [ "${errorCount}" -gt 0 ]; then + printf "WARNING: %d jobs had errors in invocation %s\n" \ + "${errorCount}" "${invocationId}" 1>&2 +fi + +printf "# all jobs complete, downloading results\n" 1>&2 + +############################################################################ +# download results via planemo +############################################################################ +mkdir -p "result/${DS}" +${PM} invocation_download "${invocationId}" --profile vgp \ + --output_directory "result/${DS}" + +############################################################################ +# chainBigBedFb - convert chain to bigBed and compute featureBits +# args: db chainName chainGz sizesFile fbFile +############################################################################ +function chainBigBedFb() { + local db=$1 + local chainName=$2 + local chainGz=$3 + local sizesFile=$4 + local fbFile=$5 + chainToBigChain "${chainGz}" ${chainName}.tab ${chainName}Link.tab + bedToBigBed -type=bed6+6 -as=$HOME/kent/src/hg/lib/bigChain.as -tab \ + ${chainName}.tab ${sizesFile} ${chainName}.bb + bedToBigBed -type=bed4+1 -as=$HOME/kent/src/hg/lib/bigLink.as -tab \ + ${chainName}Link.tab ${sizesFile} ${chainName}Link.bb + rm -f ${chainName}.tab ${chainName}Link.tab chain.tab link.tab + local totalBases=$(ave -col=2 ${sizesFile} | grep "^total" | awk '{printf "%d", $2}') + local basesCovered=$(bigBedInfo ${chainName}Link.bb | grep "basesCovered" | cut -d' ' -f2 | tr -d ',') + local percentCovered=$(echo ${basesCovered} ${totalBases} | awk '{printf "%.3f", 100.0*$1/$2}') + printf "%d bases of %d (%s%%) in intersection\n" \ + "${basesCovered}" "${totalBases}" "${percentCovered}" > ${fbFile} +} + +############################################################################ +# install target-side chains +############################################################################ + +### allChain +mkdir -p "${buildDir}/axtChain" +if [ ! -s "${buildDir}/axtChain/${targetDb}.${queryDb}.all.chain.gz" ]; then + allChainFile=$(ls result/${DS}/allChain__.*.chain) + gzip -c "${allChainFile}" \ + > "${buildDir}/axtChain/${targetDb}.${queryDb}.all.chain.gz" +fi + +### target allChain -> bigBed + featureBits +cd "${buildDir}/axtChain" +if [ ! -s "${buildDir}/fb.${targetDb}.chain${QueryDb}Link.txt" ]; then + chainBigBedFb ${targetDb} chain${QueryDb} \ + "${targetDb}.${queryDb}.all.chain.gz" ${tSizes} \ + "${buildDir}/fb.${targetDb}.chain${QueryDb}Link.txt" +fi +cat "${buildDir}/fb.${targetDb}.chain${QueryDb}Link.txt" 1>&2 + +### target liftOver chain (over.chain) +cd "${buildDir}" +if [ ! -s "${buildDir}/axtChain/${targetDb}.${queryDb}.over.chain.gz" ]; then + overChainFile=$(ls result/${DS}/liftOverChain_.*.chain) + gzip -c "${overChainFile}" \ + > "${buildDir}/axtChain/${targetDb}.${queryDb}.over.chain.gz" +fi + +### target over.chain -> bigBed + featureBits +cd "${buildDir}/axtChain" +if [ ! -s "${buildDir}/fb.${targetDb}.chainLiftOver${QueryDb}Link.txt" ]; then + chainBigBedFb ${targetDb} chainLiftOver${QueryDb} \ + "${targetDb}.${queryDb}.over.chain.gz" ${tSizes} \ + "${buildDir}/fb.${targetDb}.chainLiftOver${QueryDb}Link.txt" +fi +cat "${buildDir}/fb.${targetDb}.chainLiftOver${QueryDb}Link.txt" 1>&2 + +############################################################################ +# install swap-side chains +############################################################################ + +### swap allChain +mkdir -p "${swapDir}/axtChain" +cd "${buildDir}" +if [ ! -s "${swapDir}/axtChain/${queryDb}.${targetDb}.all.chain.gz" ]; then + allChainSwapFile=$(ls result/${DS}/allChainSwap_.*.chain) + gzip -c "${allChainSwapFile}" \ + > "${swapDir}/axtChain/${queryDb}.${targetDb}.all.chain.gz" +fi + +### swap allChain -> bigBed + featureBits +cd "${swapDir}/axtChain" +if [ ! -s "${swapDir}/fb.${queryDb}.chain${Target}Link.txt" ]; then + chainBigBedFb ${queryDb} chain${Target} \ + "${queryDb}.${targetDb}.all.chain.gz" ${qSizes} \ + "${swapDir}/fb.${queryDb}.chain${Target}Link.txt" +fi +cat "${swapDir}/fb.${queryDb}.chain${Target}Link.txt" 1>&2 + +### swap liftOver chain (over.chain) +cd "${buildDir}" +if [ ! -s "${swapDir}/axtChain/${queryDb}.${targetDb}.over.chain.gz" ]; then + overChainSwapFile=$(ls result/${DS}/swapLiftOverChain_.*.chain) + gzip -c "${overChainSwapFile}" \ + > "${swapDir}/axtChain/${queryDb}.${targetDb}.over.chain.gz" +fi + +### swap over.chain -> bigBed + featureBits +cd "${swapDir}/axtChain" +if [ ! -s "${swapDir}/fb.${queryDb}.chainLiftOver${Target}Link.txt" ]; then + chainBigBedFb ${queryDb} chainLiftOver${Target} \ + "${queryDb}.${targetDb}.over.chain.gz" ${qSizes} \ + "${swapDir}/fb.${queryDb}.chainLiftOver${Target}Link.txt" +fi +cat "${swapDir}/fb.${queryDb}.chainLiftOver${Target}Link.txt" 1>&2 + +############################################################################ +# mark complete +############################################################################ +printf "%s\tinvocation ID: %s\t%s\n" "${DS}" "${invocationId}" "${logJson}" \ + > successInvocationId.txt +rm -f pendingInvocationId.txt + +printf "### workflow monitor complete: %s %s -> %s\n" \ + "${buildDir}" "${targetDb}" "${queryDb}" 1>&2