70668f5dd4d6bbcf079a1025649e17b9885335c7
angie
  Mon May 13 10:17:53 2013 -0700
Added 2 new args to annoStreamer:nextRow: minChrom and minEnd(which could also be called regionStart depending on point of view).
Streamers may use those hints to skip over data that precedes
minChrom and minEnd, to avoid the overhead of creating annoRows
that annoGrators will then have to skip over.  When primary data
are sparse and grator data are very dense, this saves significant
memory and user-cycles.  Unfortunately mysql can still be the
bottleneck for elapsed time.  Room for improvement in annoStreamDb:
when assembly has a reasonably small number of sequences (<1000),
genome-wide queries could be internally broken down into per-seq
queries; that would let us skip over chroms that precede minChrom.
refs #6152

diff --git src/lib/annoStreamVcf.c src/lib/annoStreamVcf.c
index b7194ec..4e4b6d5 100644
--- src/lib/annoStreamVcf.c
+++ src/lib/annoStreamVcf.c
@@ -1,142 +1,183 @@
 /* annoStreamVcf -- subclass of annoStreamer for VCF files */
 
 #include "annoStreamVcf.h"
 #include "vcf.h"
 
 struct annoStreamVcf
 {
     struct annoStreamer streamer;	// Parent class members & methods
     // Private members
     struct vcfFile *vcff;		// VCF parsed header and file object
     struct vcfRecord *record;		// Current parsed row of VCF (need this for chromEnd)
     char *asWords[VCF_NUM_COLS];	// Current row of VCF with genotypes squashed for autoSql
     struct dyString *dyGt;		// Scratch space for squashing genotype columns
     int numCols;			// Number of columns in autoSql def of VCF.
     int numFileCols;			// Number of columns in VCF file.
     boolean isTabix;			// True if we are accessing compressed VCF via tabix index
 };
 
 
 static void asvSetRegion(struct annoStreamer *vSelf, char *chrom, uint regionStart, uint regionEnd)
 /* Set region -- and free current sqlResult if there is one. */
 {
 annoStreamerSetRegion(vSelf, chrom, regionStart, regionEnd);
 struct annoStreamVcf *self = (struct annoStreamVcf *)vSelf;
 if (self->isTabix)
     lineFileSetTabixRegion(self->vcff->lf, chrom, regionStart, regionEnd);
-else if (chrom != NULL)
-    errAbort("annoStreamVcf %s: setRegion not yet implemented for non-tabix VCF.", vSelf->name);
 }
 
 static char *asvGetHeader(struct annoStreamer *vSelf)
 /* Return VCF header (e.g. for use by formatter) */
 {
 struct annoStreamVcf *self = (struct annoStreamVcf *)vSelf;
 return cloneString(self->vcff->headerString);
 }
 
-static char **nextRowUnfiltered(struct annoStreamVcf *self)
+static char **nextRowRaw(struct annoStreamVcf *self)
 /* Get the next VCF record and put the row text into autoSql words.
  * Return pointer to self->asWords if we get a row, otherwise NULL. */
 {
 char *words[self->numFileCols];
 int wordCount;
 if ((wordCount = lineFileChop(self->vcff->lf, words)) <= 0)
     return NULL;
 lineFileExpectWords(self->vcff->lf, self->numFileCols, wordCount);
 int i;
 // First 8 columns are always in the VCF file:
 for (i = 0;  i < 8;  i++)
     {
     freeMem(self->asWords[i]);
     self->asWords[i] = cloneString(words[i]);
     }
 // Depending on whether VCF contains genotypes, number of file columns may be
 // smaller or larger than number of autoSql columns:
 if (self->vcff->genotypeCount > 0)
     {
     self->asWords[8] = words[8];
     dyStringClear(self->dyGt);
     for (i = 0;  i < self->vcff->genotypeCount;  i++)
 	{
 	if (i > 0)
 	    dyStringAppendC(self->dyGt, '\t');
 	dyStringAppend(self->dyGt, words[9+i]);
 	}
     self->asWords[9] = self->dyGt->string;
     }
 else
     {
     self->asWords[8] = "";
     self->asWords[9] = "";
     }
 self->record = vcfRecordFromRow(self->vcff, words);
 return self->asWords;
 }
 
-static struct annoRow *asvNextRow(struct annoStreamer *vSelf, struct lm *callerLm)
+static char **nextRowUnfiltered(struct annoStreamVcf *self, char *minChrom, uint minEnd)
+/* Get the next VCF record and put the row text into autoSql words.
+ * Return pointer to self->asWords if we get a row, otherwise NULL. */
+{
+struct annoStreamer *sSelf = (struct annoStreamer *)self;
+char *regionChrom = sSelf->chrom;
+uint regionStart = sSelf->regionStart;
+uint regionEnd = sSelf->regionEnd;
+if (minChrom != NULL)
+    {
+    if (regionChrom == NULL)
+	{
+	regionChrom = minChrom;
+	regionStart = minEnd;
+	regionEnd = annoAssemblySeqSize(sSelf->assembly, minChrom);
+	}
+    else
+	{
+	if (differentString(minChrom, regionChrom))
+	    errAbort("annoStreamVcf %s: nextRow minChrom='%s' but region chrom='%s'",
+		     sSelf->name, minChrom, regionChrom);
+	regionStart = max(regionStart, minEnd);
+	}
+    }
+char **words = nextRowRaw(self);
+if (minChrom != NULL && words != NULL)
+    {
+    if (self->isTabix && strcmp(words[0], minChrom) < 0)
+	{
+	uint regionEnd = sSelf->regionEnd;
+	if (sSelf->chrom == NULL)
+	    regionEnd = annoAssemblySeqSize(sSelf->assembly, minChrom);
+	lineFileSetTabixRegion(self->vcff->lf, minChrom, minEnd, regionEnd);
+	}
+    while (words != NULL &&
+	   (strcmp(words[0], minChrom) < 0 ||
+	    (sameString(words[0], minChrom) && self->record->chromEnd < minEnd)))
+	words = nextRowRaw(self);
+    }
+return words;
+}
+
+static struct annoRow *asvNextRow(struct annoStreamer *vSelf, char *minChrom, uint minEnd,
+				  struct lm *callerLm)
 /* Return an annoRow encoding the next VCF record, or NULL if there are no more items. */
 {
 struct annoStreamVcf *self = (struct annoStreamVcf *)vSelf;
-char **words = nextRowUnfiltered(self);
+char **words = nextRowUnfiltered(self, minChrom, minEnd);
 if (words == NULL)
     return NULL;
 // Skip past any left-join failures until we get a right-join failure, a passing row, or EOF.
 boolean rightFail = FALSE;
 while (annoFilterRowFails(vSelf->filters, words, self->numCols, &rightFail))
     {
     if (rightFail)
 	break;
-    words = nextRowUnfiltered(self);
+    words = nextRowUnfiltered(self, minChrom, minEnd);
     if (words == NULL)
 	return NULL;
     }
 struct vcfRecord *rec = self->record;
 return annoRowFromStringArray(rec->chrom, rec->chromStart, rec->chromEnd,
 			      rightFail, words, self->numCols, callerLm);
 }
 
 static void asvClose(struct annoStreamer **pVSelf)
 /* Close VCF file and free self. */
 {
 if (pVSelf == NULL)
     return;
 struct annoStreamVcf *self = *(struct annoStreamVcf **)pVSelf;
 vcfFileFree(&(self->vcff));
 // Don't free self->record -- currently it belongs to vcff's localMem
 dyStringFree(&(self->dyGt));
 annoStreamerFree(pVSelf);
 }
 
 struct annoStreamer *annoStreamVcfNew(char *fileOrUrl, boolean isTabix, struct annoAssembly *aa,
 				      int maxRecords)
 /* Create an annoStreamer (subclass) object from a VCF file, which may
  * or may not have been compressed and indexed by tabix. */
 {
 int maxErr = -1; // don't errAbort on VCF format warnings/errs
 struct vcfFile *vcff;
 if (isTabix)
     vcff = vcfTabixFileMayOpen(fileOrUrl, NULL, 0, 0, maxErr, maxRecords);
 else
     vcff = vcfFileMayOpen(fileOrUrl, maxErr, maxRecords, FALSE);
 if (vcff == NULL)
     errAbort("annoStreamVcfNew: unable to open VCF: '%s'", fileOrUrl);
 struct annoStreamVcf *self;
 AllocVar(self);
 struct annoStreamer *streamer = &(self->streamer);
 struct asObject *asObj = vcfAsObj();
 annoStreamerInit(streamer, aa, asObj, fileOrUrl);
 streamer->rowType = arWords;
 streamer->setRegion = asvSetRegion;
 streamer->getHeader = asvGetHeader;
 streamer->nextRow = asvNextRow;
 streamer->close = asvClose;
 self->vcff = vcff;
 self->dyGt = dyStringNew(1024);
 self->isTabix = isTabix;
 self->numCols = slCount(asObj->columnList);
 self->numFileCols = 8;
 if (vcff->genotypeCount > 0)
     self->numFileCols = 9 + vcff->genotypeCount;
 return (struct annoStreamer *)self;
 }