418b11748d3ae9c1217714a92d4a7fe46b6819b9
chmalee
  Thu May 28 17:14:54 2020 -0700
Added a few more trios to the 1000 Genomes trio track QA ready, refs #25582

diff --git src/hg/makeDb/tgpTrio/makeTrio.py src/hg/makeDb/tgpTrio/makeTrio.py
new file mode 100755
index 0000000..cc29529
--- /dev/null
+++ src/hg/makeDb/tgpTrio/makeTrio.py
@@ -0,0 +1,311 @@
+#!/cluster/software/bin/python3
+
+#######
+# For a three column file relationship file:
+#child parent1,parent2 label
+#
+# Search for the child in the primary 1000 Genomes VCF, and the parents
+# in the supporting files, and make a merged VCF, one per chromsome, of
+# the trio data. Remove variants where all 3 individuals are homozygous
+# reference.
+#
+# The program relies on bcftools, make sure it's in your path
+#######
+
+import argparse,os,sys,subprocess
+#from collections import defaultdict
+
+validConfSettings = [
+    'primaryVCF', # the VCF file(s) of unrelated 1000 Genomes data
+    'suppVCF', # the VCF file of supplementary 1000 Genomes data
+    'relationshipFile' # the definition of child and parent sample names
+]
+
+# tabix and bcftools commands we will use:
+bcftoolsPath = "/hive/users/chmalee/bcftools/bin/bcftools"
+tabixCmd = "tabix -p vcf "
+viewCmd = bcftoolsPath + " view --no-version -s "
+mergeCmd = bcftoolsPath + " merge --no-version "
+filterCmd = " | " + bcftoolsPath + " view --no-version -i \"GT[*]!='RR'\" "
+zipPipe = " | bgzip -c > "
+
+#####
+# setup command line options
+#####
+
+def parseCommandLine():
+    """Parse command line, handle program usage, and help"""
+    parser = argparse.ArgumentParser(
+        description = "Filter and merge VCFs into trio VCFs, one directory per trio, one VCF per chromosome", add_help=True,
+        prefix_chars = "-", usage = "%(prog)s [options]")
+
+    parser.add_argument('confFile', action='store', default=None,
+        help="File with various config settings, such as the primary and supplementary"
+            "VCF and the relationship definitions")
+    parser.add_argument('-v', "--verbose", action='store_true', default=False,
+        help="Turn on extra logging to stderr")
+    parser.add_argument('-n', "--dry-run", action="store_true", default=False,
+        help="Dry-run mode, create work dirs and jobList but don't execute any jobs.")
+
+    args = parser.parse_args()
+    if not os.path.isfile(args.confFile):
+        sys.stderr.write("ERROR: config file '%s' does not exist.\n" % args.confFile)
+        sys.exit(1)
+    return args
+
+#####
+# PARASOL RELATED UTILITY FUNCTIONS
+#####
+
+def runBatch(jobDir, verbose=False):
+    """ssh to ku, run the para commands and return when jobs have finished running."""
+    if verbose:
+        sys.stderr.write("Running jobs in dir: %s\n" % (jobDir))
+    cmd = "ssh ku 'cd %s; para stop; para flushResults; para resetCounts; para freeBatch; para clearSickNodes; para create jobList; para push'" % jobDir
+    print(cmd)
+    runCmd(cmd)
+
+def createParasolJobList(jobDir, jobFileList, verbose=False):
+    """Create the jobList parasol needs from a list of scripts to execute. Skips jobs if they've 
+    already been run."""
+    jobLines = []
+    for script,outputName in jobFileList:
+        if os.path.exists(outputName):
+            if verbose:
+                sys.stderr.write("output file already exists, skipping job: %s %s\n" % (script, outputName))
+            continue
+        cmdLine = "bash %s {check out exists+ %s}\n" % (script, outputName)
+        jobLines.append(cmdLine)
+    jobList = os.path.join(jobDir, "jobList")
+    if verbose:
+        sys.stderr.write("writing %d jobs to jobFile: %s\n" % (len(jobLines), jobList))
+    with open(jobList, "w") as jlfh:
+        for cmdLine in jobLines:
+            jlfh.write(cmdLine)
+
+def writeToJobFile(fh, cmdList):
+    """Write command line strings in cmdList to open file handle fh."""
+    cmds = "\n".join(cmdList)
+    fh.write(cmds)
+
+#####
+# General util functions
+#####
+
+def cleanUpFiles(fileList, verbose=False):
+    """Remove temporary vcf files and the tabix index files."""
+    for f in fileList:
+        if f.startswith("/") or f.startswith(".."): # only delete files in the current directory
+            sys.stderr.write("ERROR: trying to remove non-temp file %s. Skipping for now\n" % f)
+            continue
+        elif os.path.exists(f):
+            if verbose:
+                sys.stderr.write("removing work file %s\n" % f)
+            os.remove(f)
+            os.remove(f+".tbi")
+        else:
+            sys.stderr.write("would have removed %s\n" % f)
+
+def runCmd(cmdString, verbose=False):
+    """Thin wrapper around subprocess.run() for clean error reporting."""
+    try:
+        if verbose:
+            sys.stderr.write("Running command: %s\n" % cmdString)
+            sys.stderr.flush()
+        subprocess.run(cmdString, shell=True)
+    except subprocess.CalledProcessError as e:
+        sys.stderr.write("ERROR: error running command: %s\n" % cmdString)
+        sys.stderr.write(e.msg + '\n')
+        sys.exit(1)
+
+def makeChromList():
+    chromosomes = []
+    for c in range(1,23):
+        chromosomes.append("chr"+str(c))
+    chromosomes.append("chrX")
+    return chromosomes
+
+#####
+# End utility functions
+#####
+
+def defineTrios(relationshipFname, verbose=False):
+    """parse out the trio definitions in relationshipFname"""
+    trioDefs = {}
+    labelSet = set()
+    with open(relationshipFname) as infh:
+        if verbose:
+            sys.stderr.write("reading relationship file: '%s'\n" % relationshipFname)
+        lineCount = 1
+        for line in infh:
+            if line.startswith("#"):
+                lineCount += 1
+                continue
+            splitLine = line.strip().split()
+            child = splitLine[0]
+            parents = splitLine[1]
+            label = splitLine[2]
+            if label in labelSet:
+                sys.stderr.write("ERROR: label '%s' not unique %s:%d\n" % (label,relationshipFname,lineCount))
+                sys.exit(1)
+            labelSet.add(label)
+            lineCount += 1
+            trioDefs[(child, label)] = parents
+    return trioDefs
+
+def parseConf(confFname,verbose=False):
+    """Parse conf file, lines starting with '#' are ignored. If the relationshipFile setting
+        is present parse it into a dictionary as well.
+        Valid settings are:
+        relationshipFile: the 3 column file defining a child sample, the parents, and a label
+        primaryVCF: the path to the primary 1000 Genomes dataset
+        suppVCF: the path to the supplementary VCF with related individuals"""
+    ret = {}
+    with open(confFname) as f:
+        for line in f:
+            if line.startswith('#'):
+                continue
+            trimmed = line.strip()
+            split = trimmed.split('=')
+            key = split[0].strip()
+            if key not in validConfSettings:
+                if verbose:
+                    sys.stderr.write("WARNING: config setting '%s' not supported, skipping\n")
+                continue
+            val = split[1].strip()
+            if key == "relationshipFile":
+                key = "trios"
+                val = defineTrios(val,verbose)
+            ret[key] = val
+    return ret
+
+def subFileName(conf, chromosome):
+    """Replace the '*' in a conf file with the current chromosome"""
+    primary = conf["primaryVCF"].replace("*", chromosome)
+    supp = conf["suppVCF"].replace("*", chromosome)
+    return primary, supp
+
+def checkTrioExists(conf, childName, parentNames, chromosome):
+    """Return the right VCF file paths or None if both the parent and child sample IDS
+    can't be found."""
+    primary, supp = subFileName(conf, chromosome)
+    findCmdStr1 = "bcftools query -l " + primary + " | grep "
+    findCmdStr2 = "bcftools query -l " + supp + " | grep "
+    childRet = None
+    par1Ret = None
+    par2Ret = None
+
+    # make sure the child sample exists:
+    if subprocess.run(findCmdStr1+childName,stdout=subprocess.DEVNULL,shell=True).returncode == 0:
+        childRet = primary
+    elif subprocess.run(findCmdStr2+childName,stdout=subprocess.DEVNULL,shell=True).returncode == 0:
+        childRet = supp
+    else:
+        sys.stderr.write("WARNING: '%s' does not exist in VCFs, "
+            "skipping trio: %s, %s\n" % (childName, childName, parentNames))
+        return (None, None, None)
+
+    # make sure the parent samples exist
+    parList = parentNames.split(',')
+    for ix in range(len(parList)):
+        par = parList[ix]
+        if subprocess.run(findCmdStr1+par, stdout=subprocess.DEVNULL, shell=True).returncode == 0:
+            tmpParRet = primary 
+            #if tmpParRet != parRet and parRet != None:
+            #    sys.stderr.write("ERROR: parent sample IDs: '%s' found in separate files. "
+            #        "Trio %s, %s\n" % (parentNames, childName, parentNames))
+            #    sys.exit(1)
+        elif subprocess.run(findCmdStr2+par, stdout=subprocess.DEVNULL, shell=True).returncode == 0:
+            tmpParRet = supp
+            #if tmpParRet != parRet and parRet != None:
+            #    sys.stderr.write("ERROR: parent sample IDs: '%s' found in separate files. "
+            #        "Trio %s, %s\n" % (parentNames, childName, parentNames))
+            #    sys.exit(1)
+        else:
+            sys.stderr.write("WARNING: '%s' does not exist in VCFs, "
+                "skipping trio: %s, %s\n" % (par, childName, parentNames))
+            return (None, None, None)
+        if ix == 0: par1Ret = tmpParRet
+        else: par2Ret = tmpParRet
+    return (childRet, par1Ret, par2Ret)
+
+def setupWorkDirs(verbose=False):
+    """Create work and output dirs if they don't exist and return their names."""
+    workDir = os.path.join(os.getcwd(),"work")
+    finalDir = os.path.join(os.getcwd(), "output")
+    if verbose:
+        sys.stderr.write("temp directory with intermediate VCFs: %s/\n" % (workDir))
+        sys.stderr.write("directory with final VCFs: %s/\n" % (finalDir))
+    if not os.path.exists(workDir):
+        os.mkdir(workDir)
+    if not os.path.exists(finalDir):
+        os.mkdir(finalDir)
+    return workDir, finalDir
+
+def genTrioScript(workDir, finalDir, trioName, childStr, parentStr, chromosome,
+                    childFname, parent1Fname, parent2Fname):
+    """Build up the string of bcftools commands to make a trio VCF."""
+    ret = "#!/bin/bash\n"
+    ret += "set -beEu -o pipefail\n"
+    ret += "temp1=$1\n" # parasol throws the output file on for some reason so catch it
+    outParent1Fname = os.path.join(workDir, trioName + "Parent1." + chromosome + ".vcf.gz")
+    outParent2Fname = os.path.join(workDir, trioName + "Parent2." + chromosome + ".vcf.gz")
+    outChildFname = os.path.join(workDir, trioName + "Child." + chromosome + ".vcf.gz")
+    finalOutFname = os.path.join(finalDir, trioName + "Trio." + chromosome + ".vcf.gz")
+    parent1, parent2 = parentStr.split(',')
+    parent1Cmd = viewCmd + "'" + parent1 + "' " + parent1Fname + zipPipe + outParent1Fname
+    parent2Cmd = viewCmd + "'" + parent2 + "' " + parent2Fname + zipPipe + outParent2Fname
+    childCmd = viewCmd + "'" + childStr + "' " + childFname + zipPipe + outChildFname
+    mergeFilterCmd = mergeCmd + outChildFname + " " + outParent1Fname + " " + outParent2Fname + filterCmd + zipPipe + finalOutFname
+    ret += parent1Cmd + "\n"
+    ret += parent2Cmd + "\n"
+    ret += childCmd + "\n"
+    ret += tabixCmd + outParent1Fname + "\n"
+    ret += tabixCmd + outParent2Fname + "\n"
+    ret += tabixCmd + outChildFname + "\n"
+    ret += mergeFilterCmd + "\n"
+    ret += tabixCmd + finalOutFname + "\n"
+    return ret
+
+def makeTrios(conf, dryRun=False, verbose=False):
+    """Given the trio definitions in conf['trios'], check the trios actually exist in
+    1000 Genomes data, find which samples are in which VCFs, and generate per chromosome
+    trio VCFs."""
+    baseWorkDir, baseFinalDir = setupWorkDirs(verbose)
+    chromosomes = makeChromList()
+    trioDefs = conf["trios"]
+    jobTupleList = [] # a list of (scriptName, outFile) tuples for parasol
+    for trio in trioDefs:
+        trioName = trio[1]
+        childStr = trio[0]
+        parentsStr = trioDefs[trio]
+        workDir = os.path.join(baseWorkDir, trioName)
+        finalDir = os.path.join(baseFinalDir, trioName)
+        childFname, parent1Fname, parent2Fname = checkTrioExists(conf, childStr, parentsStr, "chrX")
+        if childFname is not None and parent1Fname is not None and parent2Fname is not None:
+            if not os.path.exists(workDir):
+                os.mkdir(workDir)
+            if not os.path.exists(finalDir):
+                os.mkdir(finalDir)
+            for c in chromosomes:
+                childFname, parent1Fname, parent2Fname = checkTrioExists(conf, childStr, parentsStr, c)
+                jobFile = os.path.join(workDir, trio[1] + "." + c + ".sh")
+                with open(jobFile, "w+") as jfh:
+                    cmdScript = genTrioScript(workDir, finalDir, trioName, childStr,
+                        parentsStr, c, childFname, parent1Fname, parent2Fname)
+                    jfh.write(cmdScript)
+                    finalOutFname = os.path.join(finalDir, trioName + "Trio." + c + ".vcf.gz")
+                    jobTupleList.append((jobFile, finalOutFname))
+
+    # send all the jobs we want to run over to ku
+    createParasolJobList(baseWorkDir, jobTupleList, verbose)
+    if not dryRun:
+        runBatch(baseWorkDir,verbose)
+
+def main():
+    args = parseCommandLine()
+    conf = parseConf(args.confFile)
+    makeTrios(conf, args.dry_run, args.verbose)
+
+if __name__=="__main__":
+    main()