4898794edd81be5285ea6e544acbedeaeb31bf78 max Tue Nov 23 08:10:57 2021 -0800 Fixing pointers to README file for license in all source code files. refs #27614 diff --git src/hg/encode3/eap/eapSchedule/eapSchedule.c src/hg/encode3/eap/eapSchedule/eapSchedule.c index 76eb3d6..dfeed0e 100644 --- src/hg/encode3/eap/eapSchedule/eapSchedule.c +++ src/hg/encode3/eap/eapSchedule/eapSchedule.c @@ -1,1530 +1,1530 @@ /* edwScheduleAnalysis - Schedule analysis runs.. */ /* Copyright (C) 2014 The Regents of the University of California - * See README in this or parent directory for licensing information. */ + * See kent/LICENSE or http://genome.ucsc.edu/license/ for licensing information. */ #include "common.h" #include "linefile.h" #include "hash.h" #include "options.h" #include "portable.h" #include "bamFile.h" #include "obscure.h" #include "intValTree.h" #include "../../encodeDataWarehouse/inc/encodeDataWarehouse.h" #include "../../encodeDataWarehouse/inc/edwLib.h" #include "../../../../parasol/inc/paraMessage.h" #include "eapDb.h" #include "eapLib.h" boolean again = FALSE; boolean retry = FALSE; boolean noJob = FALSE; boolean ignoreQa = FALSE; boolean justLink = FALSE; boolean dry = FALSE; boolean clTest = FALSE; boolean clUpgrade = FALSE; char *clStep = "*"; char *clInFormat = "*"; char *clTarget = "hg38"; char *clIdFile = NULL; int clMax = 0; void usage() /* Explain usage and exit. */ { errAbort( "eapSchedule - Schedule analysis runs.\n" "usage:\n" " eapSchedule startFileId endFileId\n" "options:\n" " -step=pattern - Just run steps with names matching pattern which is * by default\n" " -target=targetName - Target name may be ucscDb like hg38 or something more specific\n" " -inFormat=pattern - Just run steps that are triggered by files of this format\n" " -retry - if job has run and failed retry it\n" " -upgrade - redo older analysis runs if steps have been upgraded in optional way\n" " -again - if set schedule it even if it's been run once\n" " -noJob - if set then don't put job on eapJob table\n" " -ignoreQa - if set then ignore QA results when scheduling\n" " -justLink - just symbolically link rather than copy EDW files to cache\n" " -max=count - Maximum number of jobs to schedule\n" " -idFile=file - Restrict to file id's in file, which is whitespace separated\n" " -dry - just print out the commands that would result\n" ); } /* Command line validation table. */ static struct optionSpec options[] = { {"test", OPTION_BOOLEAN}, {"step", OPTION_STRING}, {"target", OPTION_STRING}, {"inFormat", OPTION_STRING}, {"retry", OPTION_BOOLEAN}, {"upgrade", OPTION_BOOLEAN}, {"again", OPTION_BOOLEAN}, {"noJob", OPTION_BOOLEAN}, {"ignoreQa", OPTION_BOOLEAN}, {"justLink", OPTION_BOOLEAN}, {"max", OPTION_INT}, {"idFile", OPTION_STRING}, {"dry", OPTION_BOOLEAN}, {NULL, 0}, }; /* Another calculated from the command line, consider read only outside of main: */ enum eapRedoPriority gRedoPriority = eapRedo; struct hash *gRedoThresholdCache = NULL; struct eapSwVersion *eapSwVersionForStepVersion(struct sqlConnection *conn, unsigned stepVersionId) /* Get a list of swVersion for all software used by the given step version */ { char query[128]; sqlSafef(query, sizeof(query), "select sv.* from eapSwVersion sv,eapStepSwVersion ssv " " where ssv.swVersionId = sv.id and ssv.stepVersionId=%u ", stepVersionId); return eapSwVersionLoadByQuery(conn, query); } unsigned eapStepVersionIdRedoThreshold(struct sqlConnection *conn, char *stepName, enum eapRedoPriority threshold) /* Returns lowest stepVersionId that does not *have* to be rerun according to * eapSwVersionRedoPriority being no more than threshold. */ { /* Query system for what version it would use currently. This actually forces it to * build version if need be to fit all the latest software. */ unsigned lastGoodVersion = eapCurrentStepVersion(conn, stepName); /* Get a list of all versions of this step ordered with most recent first. */ char query[256]; sqlSafef(query, sizeof(query), "select * from eapStepVersion where step='%s' order by id desc", stepName); struct eapStepVersion *stepVersion, *stepVersionList = eapStepVersionLoadByQuery(conn, query); /* So some cheap sanity checking to make sure indeed we have a step and know what the latest is. */ if (stepVersionList == NULL) internalErr(); assert(stepVersionList->id == lastGoodVersion); /* If we are only version of step things are easy, we'll skip most of the rest. */ if (stepVersionList->next != NULL) { verbose(2, "Oh my, more than one version of this step. Life gets hard.\n"); /* Get a list of all swVersion used in current step, assumed good */ struct eapSwVersion *curVer, *curVerList = eapSwVersionForStepVersion(conn, lastGoodVersion); /* Build up a hash keyed by software with eapSwVersion as values */ struct hash *curVersionHash = hashNew(5); for (curVer = curVerList; curVer != NULL; curVer = curVer->next) hashAdd(curVersionHash, curVer->software, curVer); /* Loop through each step version seeing if there is a required upgrade to a piece of * software used by step. */ for (stepVersion = stepVersionList->next; stepVersion != NULL; stepVersion = stepVersion->next) { struct eapSwVersion *ver, *verList = eapSwVersionForStepVersion(conn, stepVersion->id); boolean anyMeetThreshold = FALSE; for (ver = verList; ver != NULL; ver = ver->next) { struct hashEl *hel = hashLookup(curVersionHash, ver->software); if (hel == NULL) { warn("Older version of step %s is using software %s not used in current\n" "version of step. What to do?", stepName, ver->software); internalErr(); } struct eapSwVersion *curVer = hel->val; if (curVer->id != ver->id) { if (curVer->redoPriority >= threshold) { anyMeetThreshold = TRUE; break; } } } if (anyMeetThreshold) break; verbose(2, "Looks like we can go back a version ok to %d\n", stepVersion->id); lastGoodVersion = stepVersion->id; } } eapStepVersionFreeList(&stepVersionList); return lastGoodVersion; } unsigned stepVersionIdRedoThreshold(struct sqlConnection *conn, struct hash *cache, char *stepName, enum eapRedoPriority threshold) /* Look up minimum stepVersion required for this step, using cache to avoid overheating * database. You need a separate cache for each different threshold */ { struct hashEl *hel = hashLookup(cache, stepName); unsigned redoThreshold; if(hel == NULL) { redoThreshold = eapStepVersionIdRedoThreshold(conn, stepName, threshold); hashAddInt(cache, stepName, redoThreshold); } else { redoThreshold = ptToInt(hel->val); } return redoThreshold; } struct hashEl *hashAddInt(struct hash *hash, char *name, int val); void doTest(struct sqlConnection *conn, struct edwFile *efList) /* Do some test something */ { char query[512]; sqlSafef(query, sizeof(query), "select * from eapStep"); struct eapStep *step, *stepList = eapStepLoadByQuery(conn, query); for (step = stepList; step != NULL; step = step->next) { unsigned curStepVersion = eapCurrentStepVersion(conn, step->name); printf("%s\t%u\t", step->name, curStepVersion); long redoThreshold2 = eapStepVersionIdRedoThreshold(conn, step->name, 2); long redoThreshold1 = eapStepVersionIdRedoThreshold(conn, step->name, 2); printf("redo1 %ld\tredo2 %ld\n", redoThreshold1, redoThreshold2); } } int countAlphaPrefix(char *s) /* Return how many characters in string that are upper or lowercase letters. Will return * length of entire string if whole string is letters, 0 if first char is not a letter. */ { int count = 0; char c; while ((c = *s++) != 0) { if (!isalpha(c)) break; ++count; } return count; } boolean targetsCompatible(char *a, char *b) /* Return TRUE if a, b have shared alphabetic prefix. */ { int aSize = countAlphaPrefix(a); int bSize = countAlphaPrefix(b); if (aSize != bSize) return FALSE; // Do slightly better internal error checking than an assert. If we have no // prefix things are going to be pretty confused if (aSize == 0) errAbort("Internal error: Target '%s' vs '%s' in targetsCompatible?", a, b); // Return TRUE if alphabetic prefixes of a and b are the same. return memcmp(a, b, aSize) == 0; } struct edwAssembly *targetAssemblyForDbAndSex(struct sqlConnection *conn, char *ucscDb, char *sex) /* Figure out best target assembly for something that is associated with a given ucscDB. * In general this will be the most recent for the organism. */ { char sexedName[128]; safef(sexedName, sizeof(sexedName), "%s.%s", sex, clTarget); char query[256]; sqlSafef(query, sizeof(query), "select * from edwAssembly where name='%s'", sexedName); struct edwAssembly *assembly = edwAssemblyLoadByQuery(conn, query); if (assembly == NULL) errAbort("Unexpected fail on %s", query); return assembly; } struct edwBiosample *edwBiosampleFromTerm(struct sqlConnection *conn, char *term) /* Return biosamples associated with term. */ { char query[512]; sqlSafef(query, sizeof(query), "select * from edwBiosample where term = '%s'", term); return edwBiosampleLoadByQuery(conn, query); } char *sexFromExp(struct sqlConnection *conn, struct edwFile *ef, struct edwValidFile *vf) /* Return "male" or "female" */ { char *sex = "male"; // Consequences of Y as a mapping target not too bad, so default is male. struct edwExperiment *exp = edwExperimentFromAccession(conn, vf->experiment); if (exp != NULL) { struct edwBiosample *bio = edwBiosampleFromTerm(conn, exp->biosample); if (bio != NULL) { if (sameWord(bio->sex, "F")) sex = "female"; // Other things, unknown, pooled, etc. get to try to map to Y at least. } } return sex; } struct edwAssembly *chooseTarget(struct sqlConnection *conn, struct edwFile *ef, struct edwValidFile *vf) /* Pick mapping target - according to taxon and sex. */ { char *sex = sexFromExp(conn, ef, vf); return targetAssemblyForDbAndSex(conn, vf->ucscDb, sex); } static boolean alreadyTakenCareOf(struct sqlConnection *conn, struct edwAssembly *assembly, char *analysisStep, long long firstInput) /* Return true if job is already scheduled or otherwise taken care of */ { if (!wildMatch(clStep, analysisStep)) return TRUE; if (again) return FALSE; char query[512]; int statusMin = -1; if (retry) statusMin = 0; sqlSafef(query, sizeof(query), "select count(*) from eapRun,eapInput" " where eapRun.id = eapInput.runId" " and eapInput.fileId=%lld and analysisStep='%s' and createStatus >= %d" " and assemblyId = %u and stepVersionId >= %u", firstInput, analysisStep, statusMin, assembly->id, stepVersionIdRedoThreshold(conn, gRedoThresholdCache, analysisStep, gRedoPriority)); int count = sqlQuickNum(conn, query); verbose(2, "query %s\ncount %d\n", query, count); return count > 0; } static char *newTempDir(char *name) /* Return a freshly created temp dir where name occurs somewhere in it's name. */ { char *tempDir = rTempName(eapTempDir, name, ""); makeDir(tempDir); int withLastSize = strlen(tempDir) + 2; char withLast[withLastSize]; safef(withLast, sizeof(withLast), "%s/", tempDir); return cloneString(withLast); } void scheduleStepInputBundle(struct sqlConnection *conn, char *tempDir, char *analysisStep, char *commandLine, char *experiment, struct edwAssembly *assembly, int inCount, unsigned inputIds[], char *inputTypes[], boolean bundleInputs) /* Schedule a step. Optionally bundle together all inputs into single vector input. */ { char *commandPrefix = "edwCdJob "; int commandPad = strlen(commandPrefix); pmCheckCommandSize(commandLine, strlen(commandLine) + commandPad); struct eapStep *step = eapStepFromNameOrDie(conn, analysisStep); unsigned stepVersionId = eapCheckVersions(conn, analysisStep); /* Check count of steps we've made against max set by command line. */ if (clMax > 0) { static int stepCount = 0; if (++stepCount > clMax) return; } if (dry) printf("%u %s\n", inputIds[0], commandLine); else { verbose(1, "scheduling %s on %u in %s\n", analysisStep, inputIds[0], tempDir); /* Wrap command line in cd to temp dir */ char fullCommandLine[strlen(commandLine)+128]; safef(fullCommandLine, sizeof(fullCommandLine), "%s%s", commandPrefix, commandLine); /* Make up eapRun record and save it. */ struct eapRun *run; AllocVar(run); if (!noJob) run->jobId = eapJobAdd(conn, fullCommandLine, step->cpusRequested); safef(run->experiment, sizeof(run->experiment), "%s", experiment); run->analysisStep = analysisStep; run->stepVersionId = stepVersionId; run->tempDir = tempDir; if (assembly != NULL) run->assemblyId = assembly->id; run->jsonResult = ""; eapRunSaveToDb(conn, run, "eapRun", 0); run->id = sqlLastAutoId(conn); /* Write input records. */ int i; for (i=0; i<inCount; ++i) { struct eapInput in = {.runId=run->id}; in.name = inputTypes[i]; in.fileId = inputIds[i]; in.ix = (bundleInputs ? i : 0); eapInputSaveToDb(conn, &in, "eapInput", 0); } /* Write output records during eapFinish.... */ freez(&run); } } void scheduleStep(struct sqlConnection *conn, char *tempDir, char *analysisStep, char *commandLine, char *experiment, struct edwAssembly *assembly, int inCount, unsigned inputIds[], char *inputTypes[]) /* Schedule a step, or maybe just think it if the dry option is set. */ { scheduleStepInputBundle(conn, tempDir, analysisStep, commandLine, experiment, assembly, inCount, inputIds, inputTypes, FALSE); } static void makeBwaIndexPath(char *target, char indexPath[PATH_LEN]) /* Fill in path to BWA index. */ { safef(indexPath, PATH_LEN, "%s%s/bwaData/%s.fa", "/scratch/data/encValDir/", target, target); } struct edwAssembly *getBwaIndex(struct sqlConnection *conn, struct edwFile *ef, struct edwValidFile *vf, char indexPath[PATH_LEN]) /* Target for now is same as UCSC DB. Returns assembly ID */ { struct edwAssembly *assembly = chooseTarget(conn, ef, vf); if (assembly == NULL) errAbort("Couldn't chooseTarget"); makeBwaIndexPath(assembly->name, indexPath); return assembly; } struct cache /* Keep track of files to keep local copies of. */ { struct slRef *list; // Keyed by licensePlate, value is file name w/in cache }; void eapCacheName(struct edwFile *ef, char fileName[PATH_LEN]) /* Fill in fileName with name of ef when cached locally */ { /* Figure out file name within cache. */ char *noDir = strrchr(ef->edwFileName, '/'); assert(noDir != NULL); noDir += 1; safef(fileName, PATH_LEN, "%s%s", eapEdwCacheDir, noDir); } char *cacheMore(struct cache *cache, struct edwFile *ef) /* Add file to hash and return translated name. */ { refAdd(&cache->list, ef); char fileName[PATH_LEN]; eapCacheName(ef, fileName); return cloneString(fileName); } struct cache *cacheNew() /* Return new empty cache */ { struct cache *cache; return AllocVar(cache); } void preloadCache(struct sqlConnection *conn, struct cache *cache) /* If there's anything that needs precaching, return a command line * with a semicolon at the end to precache. Otherwise return blank. */ { if (dry) return; struct slRef *el; for (el = cache->list; el != NULL; el = el->next) { /* Figure out file name within cache. */ struct edwFile *ef = el->val; char fileName[PATH_LEN]; eapCacheName(ef, fileName); /* Copy file into cache if it's not already there */ if (!fileExists(fileName)) { char *source = edwPathForFileId(conn, ef->id); if (justLink) { verbose(1, "Symbolic link of %s to %s (%6.1g gig)\n", source, fileName, fileSize(source)/1e9); if (symlink(source, fileName) < 0) errnoAbort("Couldn't symlink %s to %s", source, fileName); } else { verbose(1, "Copy %s to %s (%6.1g gig)\n", source, fileName, fileSize(source)/1e9); copyFile(source, fileName); } } } } char *pickPairedBwaCommand(struct sqlConnection *conn, struct edwValidFile *vf1, struct edwValidFile *vf2) /* Pick either regular of solexa-converting bwa step. */ { char *result = NULL; struct edwFastqFile *fq1 = edwFastqFileFromFileId(conn, vf1->fileId); struct edwFastqFile *fq2 = edwFastqFileFromFileId(conn, vf2->fileId); if (!sameString(fq1->qualType, fq2->qualType)) errAbort("Mixed quality types in %u and %u", vf1->fileId, vf2->fileId); if (sameWord(fq1->qualType, "solexa")) result = "eap_run_slx_bwa_pe"; else if (sameWord(fq1->qualType, "sanger")) result = "eap_run_bwa_pe"; else internalErr(); edwFastqFileFree(&fq1); edwFastqFileFree(&fq2); return result; } char *pickSingleBwaCommand(struct sqlConnection *conn, struct edwValidFile *vf) /* Pick either regular of solexa-converting bwa step. */ { struct edwFastqFile *fq = edwFastqFileFromFileId(conn, vf->fileId); char *result = NULL; if (sameWord(fq->qualType, "solexa")) result = "eap_run_slx_bwa_se"; else if (sameWord(fq->qualType, "sanger")) result = "eap_run_bwa_se"; else internalErr(); edwFastqFileFree(&fq); return result; } void scheduleBwaPaired(struct sqlConnection *conn, struct edwValidFile *vf1, struct edwValidFile *vf2, struct edwExperiment *exp) /* If it hasn't already been done schedule bwa analysis of the two files. */ { /* Get ef records */ struct edwFile *ef1 = edwFileFromIdOrDie(conn, vf1->fileId); struct edwFile *ef2 = edwFileFromIdOrDie(conn, vf2->fileId); /* Figure out path to BWA index */ char indexPath[PATH_LEN]; struct edwAssembly *assembly = getBwaIndex(conn, ef1, vf1, indexPath); /* Make sure that we don't schedule it again and again */ char *analysisStep = "bwa_paired_end"; if (!alreadyTakenCareOf(conn, assembly, analysisStep, vf1->fileId)) { /* Grab temp dir */ char *tempDir = newTempDir(analysisStep); char *outBamName = "out.bam"; /* Stage inputs and make command line. */ struct cache *cache = cacheNew(); char *ef1Name = cacheMore(cache, ef1); char *ef2Name = cacheMore(cache, ef2); char *command = pickPairedBwaCommand(conn, vf1, vf2); preloadCache(conn, cache); char commandLine[4*PATH_LEN]; safef(commandLine, sizeof(commandLine), "%s %s %s %s %s%s", command, indexPath, ef1Name, ef2Name, tempDir, outBamName); /* Make up eapRun record and schedule it*/ unsigned inFileIds[2] = {ef1->id, ef2->id}; char *inTypes[2] = {"read1", "read2"}; scheduleStep(conn, tempDir, analysisStep, commandLine, exp->accession, assembly, ArraySize(inFileIds), inFileIds, inTypes); } /* Clean up */ edwFileFree(&ef1); edwFileFree(&ef2); } void scheduleBwaSingle(struct sqlConnection *conn, struct edwFile *ef, struct edwValidFile *vf, struct edwExperiment *exp) /* If it hasn't already been done schedule bwa analysis of the two files. */ { /* Figure out path to BWA index */ char indexPath[PATH_LEN]; struct edwAssembly *assembly = getBwaIndex(conn, ef, vf, indexPath); /* Make sure that we don't schedule it again and again */ char *analysisStep = "bwa_single_end"; if (alreadyTakenCareOf(conn, assembly, analysisStep, ef->id)) return; /* Grab temp dir */ char *tempDir = newTempDir(analysisStep); char *outBamName = "out.bam"; /* Stage inputs and make command line. */ struct cache *cache = cacheNew(); char *efName = cacheMore(cache, ef); char *command = pickSingleBwaCommand(conn, vf); preloadCache(conn, cache); char commandLine[4*PATH_LEN]; safef(commandLine, sizeof(commandLine), "%s %s %s %s%s", command, indexPath, efName, tempDir, outBamName); /* Make up eapRun record and schedule it*/ unsigned inFileIds[1] = {ef->id}; char *inTypes[1] = {"reads"}; scheduleStep(conn, tempDir, analysisStep, commandLine, exp->accession, assembly, ArraySize(inFileIds), inFileIds, inTypes); } boolean bamIsPaired(char *fileName, int maxCount) /* Read up to maxCount records, figure out if BAM is from a paired run */ { boolean isPaired = FALSE; samfile_t *sf = samopen(fileName, "rb", NULL); bam1_t one; ZeroVar(&one); // This seems to be necessary! int i; for (i=0; i<maxCount; ++i) { int err = bam_read1(sf->x.bam, &one); if (err < 0) break; if (one.core.flag&BAM_FPAIRED) { isPaired = TRUE; break; } } samclose(sf); return isPaired; } void scheduleMacsDnase(struct sqlConnection *conn, struct edwFile *ef, struct edwValidFile *vf, struct edwExperiment *exp) /* If it hasn't already been done schedule macs analysis of the file. */ { // Figure out input bam name char bamName[PATH_LEN]; safef(bamName, sizeof(bamName), "%s%s", edwRootDir, ef->edwFileName); // Figure out analysis step and script based on paired end status of input bam boolean isPaired = bamIsPaired(bamName, 1000); char *analysisStep, *scriptName; if (isPaired) { analysisStep = "macs2_dnase_pe"; scriptName = "eap_run_macs2_dnase_pe"; } else { analysisStep = "macs2_dnase_se"; scriptName = "eap_run_macs2_dnase_se"; } verbose(2, "schedulingMacsDnase on %s step %s, script %s\n", ef->edwFileName, analysisStep, scriptName); /* Make sure that we don't schedule it again and again */ struct edwAssembly *assembly = edwAssemblyForUcscDb(conn, vf->ucscDb); if (alreadyTakenCareOf(conn, assembly, analysisStep, ef->id)) return; char *tempDir = newTempDir(analysisStep); /* Stage inputs and make command line. */ struct cache *cache = cacheNew(); char *efName = cacheMore(cache, ef); preloadCache(conn, cache); char commandLine[4*PATH_LEN]; safef(commandLine, sizeof(commandLine), "%s %s %s %s%s %s%s", scriptName, assembly->name, efName, tempDir, "out.narrowPeak.bigBed", tempDir, "out.bigWig"); /* Declare step input and output arrays and schedule it. */ unsigned inFileIds[] = {ef->id}; char *inTypes[] = {"alignments"}; scheduleStep(conn, tempDir, analysisStep, commandLine, exp->accession, assembly, ArraySize(inFileIds), inFileIds, inTypes); } struct edwExperiment *findChipControlExp(struct sqlConnection *conn, char *accession) /* Given experiment accession try to find control. */ { char query[512]; sqlSafef(query, sizeof(query), "select control from edwExperiment where accession='%s'", accession); char *controlName = sqlQuickString(conn, query); if (isEmpty(controlName)) return NULL; sqlSafef(query, sizeof(query), "select * from edwExperiment where accession='%s'", controlName); freez(&controlName); return edwExperimentLoadByQuery(conn, query); } struct edwFile *mayFindChipControlFile(struct sqlConnection *conn, struct edwValidFile *vf, struct edwExperiment *exp) /* Find control file for this chip file. Return NULL if not found */ { struct edwExperiment *controlExp = findChipControlExp(conn, exp->accession); if (controlExp == NULL) { verbose(2, "Can't find control experiment for ChIP experiment %s\n", exp->accession); return NULL; } // Got control experiment, now let's go for a matching bam file. char query[PATH_LEN*3]; sqlSafef(query, sizeof(query), "select * from edwValidFile where experiment='%s' and format='%s' and ucscDb='%s'" , controlExp->accession, vf->format, vf->ucscDb); struct edwValidFile *controlVf, *controlVfList = edwValidFileLoadByQuery(conn, query); int controlCount = slCount(controlVfList); if (controlCount <= 0) return NULL; // Go through list and try to pick one int bestScore = 0; struct edwValidFile *bestControl = NULL; for (controlVf = controlVfList; controlVf != NULL; controlVf = controlVf->next) { int score = 0; if (sameString(controlVf->outputType, vf->outputType)) score += 100000000; if (sameString(controlVf->replicate, vf->replicate)) score += 50000000; if (sameString(controlVf->technicalReplicate, vf->technicalReplicate)) score += 25000000; long long sizeDiff = controlVf->itemCount - vf->itemCount; if (sizeDiff < 0) sizeDiff = -sizeDiff; if (sizeDiff > 25000000) sizeDiff = 25000000; score += 25000000 - sizeDiff; if (score > bestScore) { bestScore = score; bestControl = controlVf; } } if (bestControl == NULL) { verbose(2, "Can't find control file for ChIP experiment %s\n", exp->accession); return NULL; } // Figure out control bam file info return edwFileFromId(conn, bestControl->fileId); } void scheduleSppChip(struct sqlConnection *conn, struct edwFile *ef, struct edwValidFile *vf, struct edwExperiment *exp) /* If it hasn't already been done schedule SPP peak calling for a chip-seq bam file. */ { // Figure out input bam name char bamName[PATH_LEN]; safef(bamName, sizeof(bamName), "%s%s", edwRootDir, ef->edwFileName); // Figure out analysis step and script based on paired end status of input bam boolean isPaired = bamIsPaired(bamName, 1000); char *analysisStep, *scriptName; if (isPaired) { /* analysisStep = "spp_chip_pe"; scriptName = "eap_run_spp_chip_pe"; */ return; } else { analysisStep = "spp_chip_se"; scriptName = "eap_run_spp_chip_se"; } verbose(2, "Looking for control for chip file %s\n", ef->edwFileName); // Get control bam file info struct edwFile *controlEf = mayFindChipControlFile(conn, vf, exp); if (controlEf == NULL) return; verbose(2, "schedulingSppChip on %s with control %s, step %s, script %s\n", ef->edwFileName, controlEf->edwFileName, analysisStep, scriptName); /* Make sure that we don't schedule it again and again */ struct edwAssembly *assembly = edwAssemblyForUcscDb(conn, vf->ucscDb); if (alreadyTakenCareOf(conn, assembly, analysisStep, ef->id)) return; char *tempDir = newTempDir(analysisStep); /* Stage inputs and make command line. */ struct cache *cache = cacheNew(); char *efName = cacheMore(cache, ef); char *controlEfName = cacheMore(cache, controlEf); preloadCache(conn, cache); char commandLine[4*PATH_LEN]; safef(commandLine, sizeof(commandLine), "%s %s %s %s %s%s", scriptName, assembly->name, efName, controlEfName, tempDir, "out.narrowPeak.bigBed"); /* Declare step input and output arrays and schedule it. */ unsigned inFileIds[] = {ef->id, controlEf->id}; char *inTypes[] = {"chipBam", "controlBam"}; scheduleStep(conn, tempDir, analysisStep, commandLine, exp->accession, assembly, ArraySize(inFileIds), inFileIds, inTypes); } void scheduleMacsChip(struct sqlConnection *conn, struct edwFile *ef, struct edwValidFile *vf, struct edwExperiment *exp) /* If it hasn't already been done schedule Macs peak calling for a chip-seq bam file. */ { // Figure out input bam name char bamName[PATH_LEN]; safef(bamName, sizeof(bamName), "%s%s", edwRootDir, ef->edwFileName); // Figure out analysis step and script based on paired end status of input bam boolean isPaired = bamIsPaired(bamName, 1000); char *analysisStep, *scriptName; if (isPaired) { analysisStep = "macs2_chip_pe"; scriptName = "eap_run_macs2_chip_pe"; } else { analysisStep = "macs2_chip_se"; scriptName = "eap_run_macs2_chip_se"; } // Get control bam file info struct edwFile *controlEf = mayFindChipControlFile(conn, vf, exp); if (controlEf == NULL) return; verbose(2, "schedulingMacsChip on %s with control %s, step %s, script %s\n", ef->edwFileName, controlEf->edwFileName, analysisStep, scriptName); /* Make sure that we don't schedule it again and again */ struct edwAssembly *assembly = edwAssemblyForUcscDb(conn, vf->ucscDb); if (alreadyTakenCareOf(conn, assembly, analysisStep, ef->id)) return; char *tempDir = newTempDir(analysisStep); /* Stage inputs and make command line. */ struct cache *cache = cacheNew(); char *efName = cacheMore(cache, ef); char *controlEfName = cacheMore(cache, controlEf); preloadCache(conn, cache); char commandLine[4*PATH_LEN]; safef(commandLine, sizeof(commandLine), "%s %s %s %s %s%s %s%s", scriptName, assembly->name, efName, controlEfName, tempDir, "out.narrowPeak.bigBed", tempDir, "out.bigWig"); /* Declare step input and output arrays and schedule it. */ unsigned inFileIds[] = {ef->id, controlEf->id}; char *inTypes[] = {"chipBam", "controlBam"}; scheduleStep(conn, tempDir, analysisStep, commandLine, exp->accession, assembly, ArraySize(inFileIds), inFileIds, inTypes); } double mustGetReadLengthForFastq(struct sqlConnection *conn, unsigned fileId, boolean *retAllSame) /* Verify that associated file is in edwFastqFile table and return average read length */ { char query[256]; sqlSafef(query, sizeof(query), "select readSizeMean from edwFastqFile where fileId=%u", fileId); double result = sqlQuickDouble(conn, query); if (result < 1) errAbort("No edwFastqFile record for %u", fileId); sqlSafef(query, sizeof(query), "select readSizeMax - readSizeMin from edwFastqFile where fileId=%u", fileId); double diff = sqlQuickDouble(conn, query); *retAllSame = (diff == 0.0); return result; } unsigned getFirstParentOfFormat(struct sqlConnection *conn, char *format, unsigned childFileId) /* Return fastq id associated with bam file or die trying. */ { unsigned fastqId = 0; /* Scan backwards input/output tables until reach a fastq */ /* Look for input that led to us, getting both file ID and format */ char query[256]; sqlSafef(query, sizeof(query), "select eapInput.fileId,edwValidFile.format from eapInput,eapOutput,edwValidFile " " where eapOutput.runId = eapInput.runId and eapInput.fileId = edwValidFile.fileId" " and eapOutput.fileId=%u", childFileId); struct sqlResult *sr = sqlGetResult(conn, query); char **row; /* Hopefully fastq is an immediate parent. We loop through all input and * check it for fastqs. If it's not fastq we put it on a list in case we need * to look further upstream for the fastq. */ struct slUnsigned *parentList = NULL; while ((row = sqlNextRow(sr)) != NULL) { unsigned fileId = sqlUnsigned(row[0]); if (sameString(format, row[1])) { fastqId = fileId; break; } else { struct slUnsigned *el = slUnsignedNew(fileId); slAddHead(&parentList,el); } } sqlFreeResult(&sr); if (fastqId == 0) // No joy here, then go look in parents. { struct slUnsigned *parent; for (parent = parentList; parent != NULL; parent = parent->next) { unsigned result = getFirstParentOfFormat(conn, format, parent->val); if (result > 0) { fastqId = result; break; } } } slFreeList(&parentList); return fastqId; } double bamReadLength(char *fileName, int maxCount, boolean *retAllSame) /* Open up bam, read up to maxCount reads, and return size of read, checking * all are same size. */ { long long totalReadLength = 0; int count = 0; int readLength = 0; boolean allSame = TRUE; samfile_t *sf = samopen(fileName, "rb", NULL); bam1_t one; ZeroVar(&one); // This seems to be necessary! int i; for (i=0; i<maxCount; ++i) { int err = bam_read1(sf->x.bam, &one); if (err < 0) break; int length = one.core.l_qseq; if (readLength == 0) readLength = length; else if (readLength != length) { if (allSame) warn("Varying size reads in %s, %d and %d", fileName, readLength, length); allSame = FALSE; } totalReadLength += length; ++count; } samclose(sf); assert(readLength != 0); double averageReadLength = 0; *retAllSame = allSame; if (count > 0) averageReadLength = (double)totalReadLength/count; return averageReadLength; } static boolean hotspotReadLengthOk(int length) /* Return TRUE if hotspot read length is ok */ { int okReadLengths[] = {32,36,40,50,58,72,76,100}; int i; for (i=0; i<ArraySize(okReadLengths); ++i) { if (length == okReadLengths[i]) return TRUE; } return FALSE; } int getReadLengthAndCheckForHotspot(struct sqlConnection *conn, struct edwFile *ef) /* Figure out read length for file. */ { unsigned fastqId = getFirstParentOfFormat(conn, "fastq", ef->id); double readLength = 0; boolean allSame = FALSE; if (fastqId == 0) readLength = bamReadLength(edwPathForFileId(conn, ef->id), 1000, &allSame); else readLength = mustGetReadLengthForFastq(conn, fastqId, &allSame); if (!allSame) { verbose(2, "Couldn't schedule hotspot on %s, not all reads the same size.\n", ef->edwFileName); return 0; } if (!hotspotReadLengthOk(readLength)) { verbose(2, "Hotspot read length %g not supported on %s\n", readLength, ef->edwFileName); return 0; } return readLength; } void scheduleHotspot(struct sqlConnection *conn, struct edwFile *ef, struct edwValidFile *vf, struct edwExperiment *exp) /* If it hasn't already been done schedule hotspot analysis of the file. */ { /* Make sure that we don't schedule it again and again */ char *analysisStep = "hotspot"; struct edwAssembly *assembly = edwAssemblyForUcscDb(conn, vf->ucscDb); if (alreadyTakenCareOf(conn, assembly, analysisStep, ef->id)) return; /* Just do ones where it's an output of previous analysis. */ { char query[512]; sqlSafef(query, sizeof(query), "select count(*) from eapOutput where fileId=%u", ef->id); if (sqlQuickNum(conn, query) == 0) { verbose(2, "Skipping fileId %u, not an EAP output", ef->id); return; } } verbose(2, "schedulingHotspot on %s step %s\n", ef->edwFileName, analysisStep); /* Make command line */ int readLength = getReadLengthAndCheckForHotspot(conn, ef); if (readLength <= 0) return; char *tempDir = newTempDir(analysisStep); /* Stage inputs and make command line. */ struct cache *cache = cacheNew(); char *efName = cacheMore(cache, ef); preloadCache(conn, cache); char commandLine[4*PATH_LEN]; safef(commandLine, sizeof(commandLine), "eap_run_hotspot %s %s %d %s%s %s%s %s%s", edwSimpleAssemblyName(assembly->ucscDb), efName, readLength, tempDir, "out.narrowPeak.bigBed", tempDir, "out.broadPeak.bigBed", tempDir, "out.bigWig"); /* Declare step input arrays and schedule it. */ unsigned inFileIds[] = {ef->id}; char *inTypes[] = {"alignments"}; scheduleStep(conn, tempDir, analysisStep, commandLine, exp->accession, assembly, ArraySize(inFileIds), inFileIds, inTypes); } void schedulePhantomPeakSpp(struct sqlConnection *conn, struct edwFile *ef, struct edwValidFile *vf, struct edwExperiment *exp) /* Calculate phantom peak stats on a bam file we hope will be peaky. */ { verbose(2, "Scheduling phantomPeakSpp\n"); /* Make sure that we don't schedule it again and again */ char *analysisStep = "phantom_peak_stats"; struct edwAssembly *assembly = edwAssemblyForUcscDb(conn, vf->ucscDb); if (alreadyTakenCareOf(conn, assembly, analysisStep, ef->id)) return; /* Make temp dir and stage input files */ char *tempDir = newTempDir(analysisStep); struct cache *cache = cacheNew(); char *efName = cacheMore(cache, ef); preloadCache(conn, cache); /* Make command line */ char commandLine[3*PATH_LEN]; safef(commandLine, sizeof(commandLine), "eap_run_phantom_peak_spp %s %s%s", efName, tempDir, "out.tab"); /* Declare step input arrays and schedule it. */ unsigned inFileIds[] = {ef->id}; char *inTypes[] = {"alignments"}; scheduleStep(conn, tempDir, analysisStep, commandLine, exp->accession, assembly, ArraySize(inFileIds), inFileIds, inTypes); } void scheduleDnaseStats(struct sqlConnection *conn, struct edwFile *ef, struct edwValidFile *vf, struct edwExperiment *exp) /* Calculate dnase qa stats on a bam file we hope will be peaky. */ { verbose(2, "Scheduling dnaseStats\n"); /* Make sure that we don't schedule it again and again */ char *analysisStep = "dnase_stats"; struct edwAssembly *assembly = edwAssemblyForUcscDb(conn, vf->ucscDb); if (alreadyTakenCareOf(conn, assembly, analysisStep, ef->id)) return; /* Figure out read length */ int readLength = getReadLengthAndCheckForHotspot(conn, ef); if (readLength <= 0) return; /* Make temp dir and stage input files */ char *tempDir = newTempDir(analysisStep); struct cache *cache = cacheNew(); char *efName = cacheMore(cache, ef); preloadCache(conn, cache); /* Make command line */ char commandLine[3*PATH_LEN]; safef(commandLine, sizeof(commandLine), "eap_dnase_stats %s %s %d %s%s %s%s %s%s", efName, edwSimpleAssemblyName(assembly->name), readLength, tempDir, "bamStats.ra", tempDir, "phantomPeaks.tab", tempDir, "spot.ra"); /* Declare step input arrays and schedule it. */ unsigned inFileIds[] = {ef->id}; char *inTypes[] = {"alignments"}; scheduleStep(conn, tempDir, analysisStep, commandLine, exp->accession, assembly, ArraySize(inFileIds), inFileIds, inTypes); } void scheduleBamSort(struct sqlConnection *conn, struct edwFile *ef, struct edwValidFile *vf, struct edwExperiment *exp) /* Look to see if a bam is sorted, and if not, then produce a sorted version of it */ { /* Make sure that we don't schedule it again and again */ char *analysisStep = "bam_sort"; struct edwAssembly *assembly = chooseTarget(conn, ef, vf); if (alreadyTakenCareOf(conn, assembly, analysisStep, ef->id)) return; char *tempDir = newTempDir(analysisStep); /* Figure out command line */ struct cache *cache = cacheNew(); char *efName = cacheMore(cache, ef); preloadCache(conn, cache); char commandLine[3*PATH_LEN]; safef(commandLine, sizeof(commandLine), "eap_bam_sort %s %s%s", efName, tempDir, "out.bam"); /* Declare step input arrays and schedule it. */ unsigned inFileIds[] = {ef->id}; char *inTypes[] = {"unsorted"}; scheduleStep(conn, tempDir, analysisStep, commandLine, exp->accession, assembly, ArraySize(inFileIds), inFileIds, inTypes); } void runFastqAnalysis(struct sqlConnection *conn, struct edwFile *ef, struct edwValidFile *vf, struct edwExperiment *exp) /* Run fastq analysis, at least on the data types where we can handle it. */ { char *dataType = exp->dataType; if (sameString(dataType, "DNase-seq") || sameString(dataType, "ChIP-seq") || sameString(dataType, "ChiaPet") || sameString(dataType, "DnaPet") ) { if (!isEmpty(vf->pairedEnd)) { struct edwValidFile *vfB = edwOppositePairedEnd(conn, vf); if (vfB != NULL) { struct edwValidFile *vf1, *vf2; struct edwQaPairedEndFastq *pair = edwQaPairedEndFastqFromVfs(conn, vf, vfB, &vf1, &vf2); if (pair != NULL) { scheduleBwaPaired(conn, vf1, vf2, exp); edwQaPairedEndFastqFree(&pair); } edwValidFileFree(&vfB); } } else { scheduleBwaSingle(conn, ef, vf, exp); } } } void runBamAnalysis(struct sqlConnection *conn, struct edwFile *ef, struct edwValidFile *vf, struct edwExperiment *exp) /* Run fastq analysis, at least on the data types where we can handle it. */ { if (sameString(exp->dataType, "DNase-seq")) { scheduleMacsDnase(conn, ef, vf, exp); scheduleHotspot(conn, ef, vf, exp); schedulePhantomPeakSpp(conn, ef, vf, exp); scheduleDnaseStats(conn, ef, vf, exp); } else if (sameString(exp->dataType, "ChIP-seq")) { scheduleSppChip(conn, ef, vf, exp); scheduleMacsChip(conn, ef, vf, exp); #ifdef SOON #endif /* SOON */ } #ifdef SOON #endif /* SOON */ } void runSingleAnalysis(struct sqlConnection *conn, struct edwFile *ef, struct edwValidFile *vf, struct edwExperiment *exp) { verbose(2, "run single analysis format %s, dataType %s\n", vf->format, exp->dataType); if (sameWord("fastq", vf->format)) runFastqAnalysis(conn, ef, vf, exp); if (sameWord("bam", vf->format)) runBamAnalysis(conn, ef, vf, exp); } struct edwFile *listOutputTypeForExp(struct sqlConnection *conn, char *experiment, char *outputType, char *format, char *ucscDb) /* Return all files of given output type and format associated with given experiment */ { char query[512]; sqlSafef(query, sizeof(query), "select edwFile.* from edwFile,edwValidFile where edwValidFile.fileId=edwFile.id " " and experiment='%s' and outputType='%s' and format='%s' and ucscDb='%s' " " and deprecated='' and errorMessage=''" " order by fileId desc" , experiment, outputType, format, ucscDb); return edwFileLoadByQuery(conn, query); } void scheduleWigPooler(struct sqlConnection *conn, struct edwExperiment *exp, struct edwValidFile *youngestVf, struct edwFile *pool) /* Create job to run wig pooler */ { struct edwAssembly *assembly = edwAssemblyForUcscDb(conn, youngestVf->ucscDb); char *analysisStep = "sum_bigWig"; if (alreadyTakenCareOf(conn, assembly, analysisStep, youngestVf->fileId)) return; verbose(2, "Theoretically I'd be pooling %d wigs around %s\n", slCount(pool), exp->accession); struct eapStep *step = eapStepFromNameOrDie(conn, analysisStep); char *tempDir = newTempDir(analysisStep); /* Stage inputs and write file names into a file */ struct cache *cache = cacheNew(); struct edwFile *ef; char inListFile[PATH_LEN]; safef(inListFile,sizeof(inListFile), "%s%s", tempDir, "in.lst"); FILE *f = mustOpen(inListFile, "w"); for (ef = pool; ef != NULL; ef = ef->next) { char *fileName = cacheMore(cache, ef); fprintf(f, "%s\n", fileName); } carefulClose(&f); preloadCache(conn, cache); /* Make up command line */ struct dyString *command = dyStringNew(2048); dyStringPrintf(command, "eap_pool_big_wig %s %s %s%s ", assembly->name, inListFile, tempDir, "out.bigWig"); /* Create step input arrays and schedule it. */ int inCount = slCount(pool); unsigned inFileIds[inCount]; char *inTypes[inCount]; int i; for (i=0, ef = pool; i<inCount; ++i, ef = ef->next) { inFileIds[i] = ef->id; inTypes[i] = step->inputTypes[0]; } /* Schedule it */ scheduleStepInputBundle(conn, tempDir, analysisStep, command->string, exp->accession, assembly, inCount, inFileIds, inTypes, TRUE); /* Clean up and go home. */ dyStringFree(&command); eapStepFree(&step); } void replicaWigAnalysis(struct sqlConnection *conn, struct edwFile *ef, struct edwValidFile *vf, struct edwExperiment *exp) /* Run analysis on wigs that have been replicated. */ { #ifdef SOON char *outputType = vf->outputType; if (sameString(outputType, "macs2_dnase_signal") || sameString(outputType, "macs2_chip_signal") || sameString(outputType, "hotspot_signal")) { struct edwFile *pool = listOutputTypeForExp(conn, vf->experiment, vf->outputType, vf->format, vf->ucscDb); if (slCount(pool) > 1) { /* Output is sorted, so if we are first we are youngest (highest id) and responsible. * (Since scheduler is called on all in pool, don't want all to try to do this.) */ if (pool->id == ef->id) { verbose(2, "Looks like %u is the youngest in the pool!\n", pool->id); scheduleWigPooler(conn, exp, vf, pool); } } } #endif /* SOON */ } struct edwFile *bestRepOnList(struct sqlConnection *conn, char *notRep, struct edwFile *efList) /* Return best looking replica on list. How to decide? Hmm, good question! */ /* At the moment it is using the enrichment*coverage calcs if available, otherwise covereage * depth. It looks like it is not preferring peaks that have been processed all the way * from scratch from the fastq files by pipeline though. */ { verbose(2, "bestRepOnList for list of %d\n", slCount(efList)); /* If we are only one in list the choice is easy! */ if (efList->next == NULL) return efList; /* We multiply together two factors to make a score - the enrichment in our target area * times the coverage. Find the best scoring by this criteria and use it. */ double bestScore = -1; struct edwFile *bestEf = NULL, *ef; struct edwQaEnrichTarget *target = NULL; for (ef = efList; ef != NULL; ef = ef->next) { struct edwValidFile *vf = edwValidFileFromFileId(conn, ef->id); if (differentString(vf->replicate, notRep) ) { double score = 0; if (!isEmpty(vf->enrichedIn) && !sameString(vf->enrichedIn, "unknown")) { char query[256]; if (target == NULL) { char *assemblyName = edwSimpleAssemblyName(vf->ucscDb); struct edwAssembly *assembly = edwAssemblyForUcscDb(conn, assemblyName); sqlSafef(query, sizeof(query), "select * from edwQaEnrichTarget where assemblyId=%u and name='%s'" , assembly->id, vf->enrichedIn); target = edwQaEnrichTargetLoadByQuery(conn, query); if (target == NULL) errAbort("Unexpected empty result from %s", query); edwAssemblyFree(&assembly); } sqlSafef(query, sizeof(query), "select coverage*enrichment from edwQaEnrich where fileId=%u and qaEnrichTargetId=%u" , vf->fileId, target->id); score = sqlQuickDouble(conn, query); } else { score = vf->depth; } if (score > bestScore) { bestScore = score; bestEf = ef; } } edwValidFileFree(&vf); } edwQaEnrichTargetFree(&target); return bestEf; } void replicaPeakAnalysis(struct sqlConnection *conn, struct edwFile *ef, struct edwValidFile *vf, struct edwExperiment *exp, char *peakType) /* Run analysis on replicated peaks */ { #ifdef SOON char *outputType = vf->outputType; verbose(2, "replicaPeakAnalysis(outputType=%s)\n", outputType); if (sameString(outputType, "macs2_narrow_peaks") || sameString(outputType, "hotspot_narrow_peaks") || sameString(outputType, "hotspot_broad_peaks")) { struct edwFile *pool = listOutputTypeForExp(conn, vf->experiment, vf->outputType, vf->format, vf->ucscDb); if (slCount(pool) > 1) { /* Output is sorted, so if we are first we are youngest (highest id) and responsible. * (Since scheduler is called on all in pool, don't want all to try to do this.) */ if (pool->id == ef->id) { scheduleReplicatedPeaks(conn, peakType, exp, vf, pool); } } } #endif /* SOON */ } void scheduleReplicatedHotspot(struct sqlConnection *conn, struct edwExperiment *exp, struct edwValidFile *youngestVf, struct edwFile *pool) /* Schedule job that produces replicated peaks out of individual peaks from two replicates. */ { char *analysisStep = "replicated_hotspot"; verbose(2, "Considering %s on %s\n", analysisStep, youngestVf->licensePlate); /* Get assembly and figure out if we are already done */ struct edwAssembly *assembly = edwAssemblyForUcscDb(conn, youngestVf->ucscDb); if (alreadyTakenCareOf(conn, assembly, analysisStep, youngestVf->fileId)) return; verbose(2, "Theoretically I'd be making replicated hotspots in %s on file %u=%u\n", exp->accession, pool->id, youngestVf->fileId); /* We always take the most recent replicate. Choose best remaining rep by some measure for second */ struct edwFile *ef1 = pool; struct edwValidFile *vf1 = edwValidFileFromFileId(conn, ef1->id); struct edwFile *ef2 = bestRepOnList(conn, vf1->replicate, pool->next); if (ef2 == NULL) return; /* Make sure that the read lengths are all the same. Hotspot needs this */ int readLength1 = getReadLengthAndCheckForHotspot(conn, ef1); if (readLength1 <= 0) return; int readLength2 = getReadLengthAndCheckForHotspot(conn, ef2); if (readLength2 <= 0) return; if (readLength1 != readLength2) { warn("Replicates don't have matching read lengths in %u vs %u", ef1->id, ef2->id); return; } /* Stage inputs. */ struct cache *cache = cacheNew(); char *ef1Name = cacheMore(cache, ef1); char *ef2Name = cacheMore(cache, ef2); preloadCache(conn, cache); /* Grab temp dir */ char *tempDir = newTempDir(analysisStep); /* Make up a file in temp dir that is list of all inputs, just two... */ char inListFile[PATH_LEN]; safef(inListFile,sizeof(inListFile), "%s%s", tempDir, "in.lst"); FILE *f = mustOpen(inListFile, "w"); fprintf(f, "%s\n%s\n", ef1Name, ef2Name); carefulClose(&f); /* Make command line. */ char commandLine[4*PATH_LEN]; safef(commandLine, sizeof(commandLine), "eap_pool_hotspot %s %s %d %s%s %s%s %s%s", edwSimpleAssemblyName(assembly->ucscDb), inListFile, readLength1, tempDir, "out.narrowPeak.bigBed", tempDir, "out.broadPeak.bigBed", tempDir, "out.bigWig"); /* Make up eapRun record and schedule it*/ unsigned inFileIds[2] = {ef1->id, ef2->id}; char *inTypes[2] = {"rep1", "rep2"}; scheduleStep(conn, tempDir, analysisStep, commandLine, exp->accession, assembly, ArraySize(inFileIds), inFileIds, inTypes); } void replicaBamAnalysis(struct sqlConnection *conn, struct edwFile *ef, struct edwValidFile *vf, struct edwExperiment *exp) /* Run analysis on bams that have been replicated. */ { if (sameString(exp->dataType, "DNase-seq")) { struct edwFile *pool = listOutputTypeForExp(conn, vf->experiment, vf->outputType, vf->format, vf->ucscDb); if (slCount(pool) > 1) { scheduleReplicatedHotspot(conn, exp, vf, pool); } } } void runReplicateAnalysis(struct sqlConnection *conn, struct edwFile *ef, struct edwValidFile *vf, struct edwExperiment *exp) /* Run analysis we hold off on until we have replicates. */ { verbose(2, "run replicate analysis format %s, dataType %s\n", vf->format, exp->dataType); if (vf->replicateQaStatus > 0) { if (sameWord("bigWig", vf->format)) replicaWigAnalysis(conn, ef, vf, exp); else if (sameWord("broadPeak", vf->format) || sameWord("narrowPeak", vf->format)) replicaPeakAnalysis(conn, ef, vf, exp, vf->format); else if (sameWord("bam", vf->format)) replicaBamAnalysis(conn, ef, vf, exp); } } struct rbTree *intTreeFromFile(char *fileName) /* Create intVal tree with empty vals from a space separated file of integer IDs */ { struct rbTree *it = intValTreeNew(); struct lineFile *lf = lineFileOpen(fileName, TRUE); char *line; while (lineFileNextReal(lf, &line)) { char *word; while ((word = nextWord(&line)) != NULL) { int key = sqlUnsigned(word); intValTreeAdd(it, key, NULL); } } lineFileClose(&lf); return it; } void edwScheduleAnalysis(int startFileId, int endFileId) /* edwScheduleAnalysis - Schedule analysis runs.. */ { struct sqlConnection *conn = edwConnectReadWrite(); struct edwFile *ef, *efList = edwFileAllIntactBetween(conn, startFileId, endFileId); verbose(2, "Got %d intact files between %d and %d\n", slCount(efList), startFileId, endFileId); if (clTest) { doTest(conn, efList); return; } struct rbTree *idTree = NULL; if (clIdFile != NULL) { idTree = intTreeFromFile(clIdFile); verbose(1, "%d items in %s\n", idTree->n, clIdFile); } for (ef = efList; ef != NULL; ef = ef->next) { if (idTree != NULL) { struct intVal iv = {ef->id, NULL}; if (rbTreeFind(idTree, &iv) == NULL) continue; } struct edwValidFile *vf = edwValidFileFromFileId(conn, ef->id); if (vf != NULL && wildMatch(clInFormat, vf->format) && targetsCompatible(vf->ucscDb, clTarget)) { verbose(2, "aka %s\n", vf->licensePlate); struct edwExperiment *exp = edwExperimentFromAccession(conn, vf->experiment); if (exp != NULL) { verbose(2, "experiment %s, singleQaStatus %d, replicateQaStatus %d\n", exp->accession, vf->singleQaStatus, vf->replicateQaStatus); if (vf->singleQaStatus > 0 || ignoreQa) runSingleAnalysis(conn, ef, vf, exp); if (vf->replicateQaStatus > 0 || ignoreQa) runReplicateAnalysis(conn, ef, vf, exp); } } } sqlDisconnect(&conn); } int main(int argc, char *argv[]) /* Process command line. */ { optionInit(&argc, argv, options); if (argc != 3) usage(); clTest= optionExists("test"); retry = optionExists("retry"); again = optionExists("again"); noJob = optionExists("noJob"); ignoreQa = optionExists("ignoreQa"); justLink = optionExists("justLink"); dry = optionExists("dry"); clStep = optionVal("step", clStep); clInFormat = optionVal("inFormat", clInFormat); clMax = optionInt("max", clMax); clUpgrade = optionExists("upgrade"); clTarget = optionVal("target", clTarget); clIdFile = optionVal("idFile", clIdFile); if (clUpgrade) gRedoPriority = eapUpgrade; gRedoThresholdCache = hashNew(8); edwScheduleAnalysis(sqlUnsigned(argv[1]), sqlUnsigned(argv[2])); return 0; }