70668f5dd4d6bbcf079a1025649e17b9885335c7 angie Mon May 13 10:17:53 2013 -0700 Added 2 new args to annoStreamer:nextRow: minChrom and minEnd(which could also be called regionStart depending on point of view). Streamers may use those hints to skip over data that precedes minChrom and minEnd, to avoid the overhead of creating annoRows that annoGrators will then have to skip over. When primary data are sparse and grator data are very dense, this saves significant memory and user-cycles. Unfortunately mysql can still be the bottleneck for elapsed time. Room for improvement in annoStreamDb: when assembly has a reasonably small number of sequences (<1000), genome-wide queries could be internally broken down into per-seq queries; that would let us skip over chroms that precede minChrom. refs #6152 diff --git src/lib/annoStreamVcf.c src/lib/annoStreamVcf.c index b7194ec..4e4b6d5 100644 --- src/lib/annoStreamVcf.c +++ src/lib/annoStreamVcf.c @@ -1,142 +1,183 @@ /* annoStreamVcf -- subclass of annoStreamer for VCF files */ #include "annoStreamVcf.h" #include "vcf.h" struct annoStreamVcf { struct annoStreamer streamer; // Parent class members & methods // Private members struct vcfFile *vcff; // VCF parsed header and file object struct vcfRecord *record; // Current parsed row of VCF (need this for chromEnd) char *asWords[VCF_NUM_COLS]; // Current row of VCF with genotypes squashed for autoSql struct dyString *dyGt; // Scratch space for squashing genotype columns int numCols; // Number of columns in autoSql def of VCF. int numFileCols; // Number of columns in VCF file. boolean isTabix; // True if we are accessing compressed VCF via tabix index }; static void asvSetRegion(struct annoStreamer *vSelf, char *chrom, uint regionStart, uint regionEnd) /* Set region -- and free current sqlResult if there is one. */ { annoStreamerSetRegion(vSelf, chrom, regionStart, regionEnd); struct annoStreamVcf *self = (struct annoStreamVcf *)vSelf; if (self->isTabix) lineFileSetTabixRegion(self->vcff->lf, chrom, regionStart, regionEnd); -else if (chrom != NULL) - errAbort("annoStreamVcf %s: setRegion not yet implemented for non-tabix VCF.", vSelf->name); } static char *asvGetHeader(struct annoStreamer *vSelf) /* Return VCF header (e.g. for use by formatter) */ { struct annoStreamVcf *self = (struct annoStreamVcf *)vSelf; return cloneString(self->vcff->headerString); } -static char **nextRowUnfiltered(struct annoStreamVcf *self) +static char **nextRowRaw(struct annoStreamVcf *self) /* Get the next VCF record and put the row text into autoSql words. * Return pointer to self->asWords if we get a row, otherwise NULL. */ { char *words[self->numFileCols]; int wordCount; if ((wordCount = lineFileChop(self->vcff->lf, words)) <= 0) return NULL; lineFileExpectWords(self->vcff->lf, self->numFileCols, wordCount); int i; // First 8 columns are always in the VCF file: for (i = 0; i < 8; i++) { freeMem(self->asWords[i]); self->asWords[i] = cloneString(words[i]); } // Depending on whether VCF contains genotypes, number of file columns may be // smaller or larger than number of autoSql columns: if (self->vcff->genotypeCount > 0) { self->asWords[8] = words[8]; dyStringClear(self->dyGt); for (i = 0; i < self->vcff->genotypeCount; i++) { if (i > 0) dyStringAppendC(self->dyGt, '\t'); dyStringAppend(self->dyGt, words[9+i]); } self->asWords[9] = self->dyGt->string; } else { self->asWords[8] = ""; self->asWords[9] = ""; } self->record = vcfRecordFromRow(self->vcff, words); return self->asWords; } -static struct annoRow *asvNextRow(struct annoStreamer *vSelf, struct lm *callerLm) +static char **nextRowUnfiltered(struct annoStreamVcf *self, char *minChrom, uint minEnd) +/* Get the next VCF record and put the row text into autoSql words. + * Return pointer to self->asWords if we get a row, otherwise NULL. */ +{ +struct annoStreamer *sSelf = (struct annoStreamer *)self; +char *regionChrom = sSelf->chrom; +uint regionStart = sSelf->regionStart; +uint regionEnd = sSelf->regionEnd; +if (minChrom != NULL) + { + if (regionChrom == NULL) + { + regionChrom = minChrom; + regionStart = minEnd; + regionEnd = annoAssemblySeqSize(sSelf->assembly, minChrom); + } + else + { + if (differentString(minChrom, regionChrom)) + errAbort("annoStreamVcf %s: nextRow minChrom='%s' but region chrom='%s'", + sSelf->name, minChrom, regionChrom); + regionStart = max(regionStart, minEnd); + } + } +char **words = nextRowRaw(self); +if (minChrom != NULL && words != NULL) + { + if (self->isTabix && strcmp(words[0], minChrom) < 0) + { + uint regionEnd = sSelf->regionEnd; + if (sSelf->chrom == NULL) + regionEnd = annoAssemblySeqSize(sSelf->assembly, minChrom); + lineFileSetTabixRegion(self->vcff->lf, minChrom, minEnd, regionEnd); + } + while (words != NULL && + (strcmp(words[0], minChrom) < 0 || + (sameString(words[0], minChrom) && self->record->chromEnd < minEnd))) + words = nextRowRaw(self); + } +return words; +} + +static struct annoRow *asvNextRow(struct annoStreamer *vSelf, char *minChrom, uint minEnd, + struct lm *callerLm) /* Return an annoRow encoding the next VCF record, or NULL if there are no more items. */ { struct annoStreamVcf *self = (struct annoStreamVcf *)vSelf; -char **words = nextRowUnfiltered(self); +char **words = nextRowUnfiltered(self, minChrom, minEnd); if (words == NULL) return NULL; // Skip past any left-join failures until we get a right-join failure, a passing row, or EOF. boolean rightFail = FALSE; while (annoFilterRowFails(vSelf->filters, words, self->numCols, &rightFail)) { if (rightFail) break; - words = nextRowUnfiltered(self); + words = nextRowUnfiltered(self, minChrom, minEnd); if (words == NULL) return NULL; } struct vcfRecord *rec = self->record; return annoRowFromStringArray(rec->chrom, rec->chromStart, rec->chromEnd, rightFail, words, self->numCols, callerLm); } static void asvClose(struct annoStreamer **pVSelf) /* Close VCF file and free self. */ { if (pVSelf == NULL) return; struct annoStreamVcf *self = *(struct annoStreamVcf **)pVSelf; vcfFileFree(&(self->vcff)); // Don't free self->record -- currently it belongs to vcff's localMem dyStringFree(&(self->dyGt)); annoStreamerFree(pVSelf); } struct annoStreamer *annoStreamVcfNew(char *fileOrUrl, boolean isTabix, struct annoAssembly *aa, int maxRecords) /* Create an annoStreamer (subclass) object from a VCF file, which may * or may not have been compressed and indexed by tabix. */ { int maxErr = -1; // don't errAbort on VCF format warnings/errs struct vcfFile *vcff; if (isTabix) vcff = vcfTabixFileMayOpen(fileOrUrl, NULL, 0, 0, maxErr, maxRecords); else vcff = vcfFileMayOpen(fileOrUrl, maxErr, maxRecords, FALSE); if (vcff == NULL) errAbort("annoStreamVcfNew: unable to open VCF: '%s'", fileOrUrl); struct annoStreamVcf *self; AllocVar(self); struct annoStreamer *streamer = &(self->streamer); struct asObject *asObj = vcfAsObj(); annoStreamerInit(streamer, aa, asObj, fileOrUrl); streamer->rowType = arWords; streamer->setRegion = asvSetRegion; streamer->getHeader = asvGetHeader; streamer->nextRow = asvNextRow; streamer->close = asvClose; self->vcff = vcff; self->dyGt = dyStringNew(1024); self->isTabix = isTabix; self->numCols = slCount(asObj->columnList); self->numFileCols = 8; if (vcff->genotypeCount > 0) self->numFileCols = 9 + vcff->genotypeCount; return (struct annoStreamer *)self; }