46da73e7e18e0ee3f4299ae70d4359bfa1600f23 angie Fri Sep 27 17:27:02 2013 -0700 Code review feedback, thanks Tim.refs #11800 diff --git src/lib/annoStreamVcf.c src/lib/annoStreamVcf.c index 33a8e2c..7496f8e 100644 --- src/lib/annoStreamVcf.c +++ src/lib/annoStreamVcf.c @@ -1,337 +1,340 @@ /* annoStreamVcf -- subclass of annoStreamer for VCF files */ #include "annoStreamVcf.h" #include "twoBit.h" #include "vcf.h" struct annoStreamVcf { struct annoStreamer streamer; // Parent class members & methods // Private members struct vcfFile *vcff; // VCF parsed header and file object struct vcfRecord *record; // Current parsed row of VCF (need this for chromEnd) char *asWords[VCF_NUM_COLS]; // Current row of VCF with genotypes squashed for autoSql struct dyString *dyGt; // Scratch space for squashing genotype columns struct hash *chromNameHash; // Translate "chr"-less seq names if necessary. struct annoRow *indelQ; // FIFO for coord-tweaked indels, to maintain sorting struct annoRow *nextPosQ; // FIFO (max len=1) for stashing row while draining indelQ struct lm *qLm; // Local mem for saving rows in Q's (callerLm disappears) int numFileCols; // Number of columns in VCF file. int maxRecords; // Maximum number of annoRows to return. int recordCount; // Number of annoRows we have returned so far. boolean isTabix; // True if we are accessing compressed VCF via tabix index boolean eof; // True when we have hit end of file or maxRecords }; static void asvSetRegion(struct annoStreamer *vSelf, char *chrom, uint regionStart, uint regionEnd) /* Set region -- and free current sqlResult if there is one. */ { annoStreamerSetRegion(vSelf, chrom, regionStart, regionEnd); struct annoStreamVcf *self = (struct annoStreamVcf *)vSelf; if (self->isTabix) lineFileSetTabixRegion(self->vcff->lf, chrom, regionStart, regionEnd); self->indelQ = self->nextPosQ = NULL; self->eof = FALSE; } static char *asvGetHeader(struct annoStreamer *vSelf) /* Return VCF header (e.g. for use by formatter) */ { struct annoStreamVcf *self = (struct annoStreamVcf *)vSelf; return cloneString(self->vcff->headerString); } static char **nextRowRaw(struct annoStreamVcf *self) /* Get the next VCF record and put the row text into autoSql words. * Return pointer to self->asWords if we get a row, otherwise NULL. */ { char *words[self->numFileCols]; int wordCount; if ((wordCount = lineFileChop(self->vcff->lf, words)) <= 0) return NULL; lineFileExpectWords(self->vcff->lf, self->numFileCols, wordCount); int i; // First 8 columns are always in the VCF file: for (i = 0; i < 8; i++) { freeMem(self->asWords[i]); self->asWords[i] = cloneString(words[i]); } // Depending on whether VCF contains genotypes, number of file columns may be // smaller or larger than number of autoSql columns: if (self->vcff->genotypeCount > 0) { self->asWords[8] = words[8]; dyStringClear(self->dyGt); for (i = 0; i < self->vcff->genotypeCount; i++) { if (i > 0) dyStringAppendC(self->dyGt, '\t'); dyStringAppend(self->dyGt, words[9+i]); } self->asWords[9] = self->dyGt->string; } else { self->asWords[8] = ""; self->asWords[9] = ""; } self->record = vcfRecordFromRow(self->vcff, words); return self->asWords; } static char *getProperChromName(struct annoStreamVcf *self, char *vcfChrom) /* We tolerate chr-less chrom names in VCF and BAM ("1" for "chr1" etc); to avoid * confusing the rest of the system, return the chr-ful version if it exists. */ { char *name = hashFindVal(self->chromNameHash, vcfChrom); if (name == NULL) { name = vcfChrom; struct twoBitFile *tbf = self->streamer.assembly->tbf; char buf[256]; if (! twoBitIsSequence(tbf, vcfChrom)) { safef(buf, sizeof(buf), "chr%s", vcfChrom); if (twoBitIsSequence(tbf, buf)) name = buf; } name = lmCloneString(self->chromNameHash->lm, name); hashAdd(self->chromNameHash, vcfChrom, name); } return name; } static char **nextRowUnfiltered(struct annoStreamVcf *self, char *minChrom, uint minEnd) /* Get the next VCF record and put the row text into autoSql words. * Return pointer to self->asWords if we get a row, otherwise NULL. */ { struct annoStreamer *sSelf = (struct annoStreamer *)self; char *regionChrom = sSelf->chrom; uint regionStart = sSelf->regionStart; uint regionEnd = sSelf->regionEnd; if (minChrom != NULL) { if (regionChrom == NULL) { regionChrom = minChrom; regionStart = minEnd; regionEnd = annoAssemblySeqSize(sSelf->assembly, minChrom); } else { regionStart = max(regionStart, minEnd); } } char **words = nextRowRaw(self); if (regionChrom != NULL && words != NULL) { if (self->isTabix && strcmp(getProperChromName(self, words[0]), regionChrom) < 0) lineFileSetTabixRegion(self->vcff->lf, regionChrom, regionStart, regionEnd); while (words != NULL && (strcmp(getProperChromName(self, words[0]), regionChrom) < 0 || (sameString(words[0], regionChrom) && self->record->chromEnd < regionStart))) words = nextRowRaw(self); } self->recordCount++; if (words == NULL || (self->maxRecords > 0 && self->recordCount >= self->maxRecords)) self->eof = TRUE; return words; } static struct annoRow *nextRowFiltered(struct annoStreamVcf *self, char *minChrom, uint minEnd, struct lm *callerLm) /* Get the next record that passes our filters. */ { char **words = nextRowUnfiltered(self, minChrom, minEnd); if (words == NULL) return NULL; // Skip past any left-join failures until we get a right-join failure, a passing row, or EOF. struct annoStreamer *sSelf = (struct annoStreamer *)self; boolean rightFail = FALSE; while (annoFilterRowFails(sSelf->filters, words, sSelf->numCols, &rightFail)) { if (rightFail) break; words = nextRowUnfiltered(self, minChrom, minEnd); if (words == NULL) return NULL; } struct vcfRecord *rec = self->record; vcfRecordTrimIndelLeftBase(rec); char *chrom = getProperChromName(self, rec->chrom); return annoRowFromStringArray(chrom, rec->chromStart, rec->chromEnd, rightFail, words, sSelf->numCols, callerLm); } INLINE boolean vcfAnnoRowIsIndel(struct annoRow *row) /* If vcf row's start is one base to the left of row->start, it's an indel whose start * coord and alleles required translation into our representation. */ { char **words = (char **)(row->data); uint vcfStart = atoll(words[1]); return (row->start == vcfStart); // 0-based row->start, 1-based vcfStart } INLINE void nextPosQShouldBeEmpty(struct annoStreamVcf *self) /* errAbort if nextPosQ is not empty when expected. */ { if (self->nextPosQ != NULL) errAbort("annoStreamVcf %s: nextPosQ should be empty!", ((struct annoStreamer *)self)->name); } #define MAX_QLM_MEM (1024 * 1024) INLINE struct annoRow *asvCloneForQ(struct annoStreamVcf *self, struct annoRow *row) /* Rows that we store in our queues have to be clone with our own qLm for persistence * across calls to asvNextRow. */ { if (self->indelQ == NULL && self->nextPosQ == NULL && lmSize(self->qLm) > MAX_QLM_MEM) { lmCleanup(&self->qLm); self->qLm = lmInit(0); } struct annoStreamer *sSelf = (struct annoStreamer *)self; return annoRowClone(row, sSelf->rowType, sSelf->numCols, self->qLm); } INLINE struct annoRow *asvPopQ(struct annoRow **pQ, struct annoStreamer *sSelf, struct lm *callerLm) /* Return the row at the head of the indel queue, cloned with caller's localmem. */ { return annoRowClone(slPopHead(pQ), sSelf->rowType, sSelf->numCols, callerLm); } static struct annoRow *asvNextRow(struct annoStreamer *sSelf, char *minChrom, uint minEnd, struct lm *callerLm) /* Return an annoRow encoding the next VCF record, or NULL if there are no more items. * Use queues to save indels aside until we get to the following base, because VCF's * indel encoding starts one base to the left of the actual indel. Thus, sorted VCF might * not be sorted in our internal coords, but it won't be off by more than one base. */ { struct annoStreamVcf *self = (struct annoStreamVcf *)sSelf; if (minChrom != NULL && sSelf->chrom != NULL && differentString(minChrom, sSelf->chrom)) errAbort("annoStreamVcf %s: nextRow minChrom='%s' but region chrom='%s'", sSelf->name, minChrom, sSelf->chrom); if (self->nextPosQ != NULL || self->eof) { // We have found EOF or a variant at a position after the indels that we have been // saving aside. Let the indels drain first, then the stashed variant (if there is // one, and it's not an indel). if (self->indelQ != NULL) return asvPopQ(&self->indelQ, sSelf, callerLm); else if (self->nextPosQ != NULL) { if (vcfAnnoRowIsIndel(self->nextPosQ)) { // Another indel at the next position -- move it from nextPosQ to indelQ // and keep searching below. self->indelQ = slPopHead(&self->nextPosQ); nextPosQShouldBeEmpty(self); } else return asvPopQ(&self->nextPosQ, sSelf, callerLm); } else // eof return NULL; } struct annoRow *nextRow = NULL; while ((nextRow = nextRowFiltered(self, minChrom, minEnd, callerLm)) != NULL) { // nextRow has been allocated with callerLm; if we save it for later, we need a clone in qLm. // If we're popping a row from qLm, we need to clone back to the (probably new) callerLm. if (vcfAnnoRowIsIndel(nextRow)) { if (self->indelQ != NULL) { // Coords are apples-to-apples (both indels), look for strict ordering: if (self->indelQ->start < nextRow->start) { // Another indel, but at some subsequent base -- store in nextPosQ & pop indelQ nextPosQShouldBeEmpty(self); self->nextPosQ = asvCloneForQ(self, nextRow); return asvPopQ(&self->indelQ, sSelf, callerLm); } else // Another indel at same position -- queue it up and keep looking. // I expect very few of these, so addTail OK slAddTail(&self->indelQ, asvCloneForQ(self, nextRow)); } else // nextRow is first indel at this position -- queue it up and keep looking. self->indelQ = asvCloneForQ(self, nextRow); } else // i.e. nextRow is non-indel { - // Coords are not apples-to-apples, so having the same start means let indels go first: + // Coords are not apples-to-apples: having the same annoRow->start means + // that the indel VCF starts are one less than the non-indel VCF starts, + // so let indels go first: if (self->indelQ != NULL && self->indelQ->start <= nextRow->start) { // nextRow is a non-indel at a subsequent *VCF* base; store in nextPosQ & pop indelQ nextPosQShouldBeEmpty(self); self->nextPosQ = asvCloneForQ(self, nextRow); return asvPopQ(&self->indelQ, sSelf, callerLm); } else // No indelQ, or nextRow is a non-indel at the same *VCF* base (but prior UCSC base); // use it now (BTW I expect this to be the common case): return nextRow; } } +nextPosQShouldBeEmpty(self); if (nextRow == NULL) { if (self->indelQ != NULL) return asvPopQ(&self->indelQ, sSelf, callerLm); else return NULL; } errAbort("annoStreamVcf %s: how did we get here?", sSelf->name); return NULL; // avoid compiler warning } static void asvClose(struct annoStreamer **pVSelf) /* Close VCF file and free self. */ { if (pVSelf == NULL) return; struct annoStreamVcf *self = *(struct annoStreamVcf **)pVSelf; vcfFileFree(&(self->vcff)); // Don't free self->record -- currently it belongs to vcff's localMem dyStringFree(&(self->dyGt)); lmCleanup(&self->qLm); annoStreamerFree(pVSelf); } struct annoStreamer *annoStreamVcfNew(char *fileOrUrl, boolean isTabix, struct annoAssembly *aa, int maxRecords) /* Create an annoStreamer (subclass) object from a VCF file, which may * or may not have been compressed and indexed by tabix. */ { int maxErr = -1; // don't errAbort on VCF format warnings/errs struct vcfFile *vcff; if (isTabix) vcff = vcfTabixFileMayOpen(fileOrUrl, NULL, 0, 0, maxErr, 0); else vcff = vcfFileMayOpen(fileOrUrl, maxErr, 0, FALSE); if (vcff == NULL) errAbort("annoStreamVcfNew: unable to open VCF: '%s'", fileOrUrl); struct annoStreamVcf *self; AllocVar(self); struct annoStreamer *streamer = &(self->streamer); struct asObject *asObj = vcfAsObj(); annoStreamerInit(streamer, aa, asObj, fileOrUrl); streamer->rowType = arWords; streamer->setRegion = asvSetRegion; streamer->getHeader = asvGetHeader; streamer->nextRow = asvNextRow; streamer->close = asvClose; self->vcff = vcff; self->dyGt = dyStringNew(1024); self->chromNameHash = hashNew(0); self->isTabix = isTabix; self->numFileCols = 8; if (vcff->genotypeCount > 0) self->numFileCols = 9 + vcff->genotypeCount; self->maxRecords = maxRecords; self->qLm = lmInit(0); return (struct annoStreamer *)self; }