70668f5dd4d6bbcf079a1025649e17b9885335c7
angie
  Mon May 13 10:17:53 2013 -0700
Added 2 new args to annoStreamer:nextRow: minChrom and minEnd(which could also be called regionStart depending on point of view).
Streamers may use those hints to skip over data that precedes
minChrom and minEnd, to avoid the overhead of creating annoRows
that annoGrators will then have to skip over.  When primary data
are sparse and grator data are very dense, this saves significant
memory and user-cycles.  Unfortunately mysql can still be the
bottleneck for elapsed time.  Room for improvement in annoStreamDb:
when assembly has a reasonably small number of sequences (<1000),
genome-wide queries could be internally broken down into per-seq
queries; that would let us skip over chroms that precede minChrom.
refs #6152

diff --git src/hg/lib/annoStreamDb.c src/hg/lib/annoStreamDb.c
index 1d5e342..54752c7 100644
--- src/hg/lib/annoStreamDb.c
+++ src/hg/lib/annoStreamDb.c
@@ -28,38 +28,41 @@
     struct annoRow *smallItemQueue;	// Max 1 item for merge-sorting with bigItemQueue
     struct lm *qLm;			// localmem for merge-sorting queues
     int minFinestBin;			// Smallest bin number for finest bin level
     boolean gotFinestBin;		// Flag that it's time to merge-sort with bigItemQueue
     };
 
 static void asdSetRegion(struct annoStreamer *vSelf, char *chrom, uint regionStart, uint regionEnd)
 /* Set region -- and free current sqlResult if there is one. */
 {
 annoStreamerSetRegion(vSelf, chrom, regionStart, regionEnd);
 struct annoStreamDb *self = (struct annoStreamDb *)vSelf;
 if (self->sr != NULL)
     sqlFreeResult(&(self->sr));
 }
 
-static void asdDoQuery(struct annoStreamDb *self)
+static void asdDoQuery(struct annoStreamDb *self, char *minChrom, uint minEnd)
 /* Return a sqlResult for a query on table items in position range. */
 // NOTE: it would be possible to implement filters at this level, as in hgTables.
 {
 struct annoStreamer *streamer = &(self->streamer);
 struct dyString *query = dyStringCreate("select * from %s", self->table);
 if (!streamer->positionIsGenome)
     {
+    if (minChrom && differentString(minChrom, streamer->chrom))
+	errAbort("annoStreamDb %s: nextRow minChrom='%s' but region chrom='%s'",
+		 streamer->name, minChrom, streamer->chrom);
     if (self->hasBin)
 	{
 	// Results will be in bin order, but we can restore chromStart order by
 	// accumulating initial coarse-bin items and merge-sorting them with
 	// subsequent finest-bin items which will be in chromStart order.
 	self->mergeBins = TRUE;
 	self->bigItemQueue = self->smallItemQueue = NULL;
 	lmCleanup(&(self->qLm));
 	self->qLm = lmInit(0);
 	self->gotFinestBin = FALSE;
 	}
     if (self->endFieldIndexName != NULL)
 	// Don't let mysql use a (chrom, chromEnd) index because that messes up
 	// sorting by chromStart.
 	dyStringPrintf(query, " IGNORE INDEX (%s)", self->endFieldIndexName);
@@ -71,72 +74,85 @@
 	if (self->hasBin)
 	    hAddBinToQuery(streamer->regionStart, streamer->regionEnd, query);
 	dyStringPrintf(query, "%s < %u and %s > %u", self->startField, streamer->regionEnd,
 		       self->endField, streamer->regionStart);
 	}
     if (self->notSorted)
 	dyStringPrintf(query, " order by %s", self->startField);
     }
 else if (self->notSorted)
     dyStringPrintf(query, " order by %s,%s", self->chromField, self->startField);
 struct sqlResult *sr = sqlGetResult(self->conn, query->string);
 dyStringFree(&query);
 self->sr = sr;
 }
 
-static char **nextRowFiltered(struct annoStreamDb *self, boolean *retRightFail)
+static char **nextRowFiltered(struct annoStreamDb *self, boolean *retRightFail,
+			      char *minChrom, uint minEnd)
 /* Skip past any left-join failures until we get a right-join failure, a passing row,
  * or end of data.  Return row or NULL, and return right-join fail status via retRightFail. */
 {
 int numCols = self->streamer.numCols;
 char **row = sqlNextRow(self->sr);
+if (minChrom != NULL && row != NULL)
+    {
+    // Ignore rows that fall completely before (minChrom, minEnd) - save annoGrator's time
+    int chromIx = self->omitBin+self->chromIx;
+    int endIx = self->omitBin+self->endIx;
+    int chromCmp;
+    while (row &&
+	   ((chromCmp = strcmp(row[chromIx], minChrom)) < 0 || // this chrom precedes minChrom
+	    (chromCmp == 0 && atoll(row[endIx]) < minEnd)))    // on minChrom, but before minEnd
+	row = sqlNextRow(self->sr);
+    }
 boolean rightFail = FALSE;
 struct annoFilter *filterList = self->streamer.filters;
 while (row && annoFilterRowFails(filterList, row+self->omitBin, numCols, &rightFail))
     {
     if (rightFail)
 	break;
     row = sqlNextRow(self->sr);
     }
 *retRightFail = rightFail;
 return row;
 }
 
 static struct annoRow *rowToAnnoRow(struct annoStreamDb *self, char **row, boolean rightFail,
 				    struct lm *lm)
 /* Extract coords from row and return an annoRow including right-fail status. */
 {
 row += self->omitBin;
 char *chrom = row[self->chromIx];
 uint chromStart = sqlUnsigned(row[self->startIx]);
 uint chromEnd = sqlUnsigned(row[self->endIx]);
 return annoRowFromStringArray(chrom, chromStart, chromEnd, rightFail, row,
 			      self->streamer.numCols, lm);
 }
 
-static char **getFinestBinItem(struct annoStreamDb *self, char **row, boolean *pRightFail)
+static char **getFinestBinItem(struct annoStreamDb *self, char **row, boolean *pRightFail,
+			       char *minChrom, uint minEnd)
 /* If row is a coarse-bin item, add it to bigItemQueue, get the next row(s) and
  * add any subsequent coarse-bin items to bigItemQueue.  As soon as we get an item from a
  * finest-level bin (or NULL), sort the bigItemQueue and return the finest-bin item/row. */
 {
 int bin = atoi(row[0]);
 while (bin < self->minFinestBin)
     {
     // big item -- store aside in queue for merging later, move on to next item
     slAddHead(&(self->bigItemQueue), rowToAnnoRow(self, row, *pRightFail, self->qLm));
     *pRightFail = FALSE;
-    row = nextRowFiltered(self, pRightFail);
+    row = nextRowFiltered(self, pRightFail, minChrom, minEnd);
     if (row == NULL)
 	break;
     bin = atoi(row[0]);
     }
 // First finest-bin item!  Sort bigItemQueue in preparation for merging:
 self->gotFinestBin = TRUE;
 slReverse(&(self->bigItemQueue));
 slSort(&(self->bigItemQueue), annoRowCmp);
 return row;
 }
 
 static struct annoRow *mergeRow(struct annoStreamDb *self, struct annoRow *aRow,
 				struct lm *callerLm)
 /* Compare head of bigItemQueue with (finest-bin) aRow; return the one with
  * lower chromStart and save the other for later.  */
@@ -163,80 +179,82 @@
 // the lower chromStart.
 {
 struct annoRow *row = NULL;
 if (self->bigItemQueue && annoRowCmp(&(self->bigItemQueue), &(self->smallItemQueue)) < 0)
     row = slPopHead(&(self->bigItemQueue));
 else
     row = slPopHead(&(self->smallItemQueue));
 if (self->bigItemQueue == NULL && self->smallItemQueue == NULL)
     // All done merge-sorting, just stream finest-bin items from here on out.
     self->mergeBins = FALSE;
 enum annoRowType rowType = self->streamer.rowType;
 int numCols = self->streamer.numCols;
 return annoRowClone(row, rowType, numCols, callerLm);
 }
 
-static struct annoRow *nextRowMergeBins(struct annoStreamDb *self, struct lm *callerLm)
+static struct annoRow *nextRowMergeBins(struct annoStreamDb *self, char *minChrom, uint minEnd,
+					struct lm *callerLm)
 /* Fetch the next filtered row from mysql, merge-sorting coarse-bin items into finest-bin
  * items to maintain chromStart ordering. */
 {
 assert(self->mergeBins && self->hasBin);
 if (self->smallItemQueue)
     // In this case we have already begun merge-sorting; don't pull a new row from mysql,
     // use the queues.  This should keep smallItemQueue's max depth at 1.
     return nextQueuedRow(self, callerLm);
 else
     {
     // We might need to collect initial coarse-bin items, or might already be merge-sorting.
     boolean rightFail = FALSE;
-    char **row = nextRowFiltered(self, &rightFail);
+    char **row = nextRowFiltered(self, &rightFail, minChrom, minEnd);
     if (row && !self->gotFinestBin)
 	{
 	// We are just starting -- queue up coarse-bin items, if any, until we get the first
 	// finest-bin item.
-	row = getFinestBinItem(self, row, &rightFail);
+	row = getFinestBinItem(self, row, &rightFail, minChrom, minEnd);
 	}
     // Time to merge-sort finest-bin items from mysql with coarse-bin items from queue.
     if (row != NULL)
 	{
 	struct annoRow *aRow = rowToAnnoRow(self, row, rightFail, self->qLm);
 	return mergeRow(self, aRow, callerLm);
 	}
     else
 	{
 	struct annoRow *qRow = slPopHead(&(self->bigItemQueue));
 	enum annoRowType rowType = self->streamer.rowType;
 	int numCols = self->streamer.numCols;
 	return annoRowClone(qRow, rowType, numCols, callerLm);
 	}
     }
 }
 
-static struct annoRow *asdNextRow(struct annoStreamer *vSelf, struct lm *callerLm)
+static struct annoRow *asdNextRow(struct annoStreamer *vSelf, char *minChrom, uint minEnd,
+				  struct lm *callerLm)
 /* Perform sql query if we haven't already and return a single
  * annoRow, or NULL if there are no more items. */
 {
 struct annoStreamDb *self = (struct annoStreamDb *)vSelf;
 if (self->sr == NULL)
-    asdDoQuery(self);
+    asdDoQuery(self, minChrom, minEnd);
 if (self->sr == NULL)
     // This is necessary only if we're using sqlStoreResult in asdDoQuery, harmless otherwise:
     return NULL;
 if (self->mergeBins)
-    return nextRowMergeBins(self, callerLm);
+    return nextRowMergeBins(self, minChrom, minEnd, callerLm);
 boolean rightFail = FALSE;
-char **row = nextRowFiltered(self, &rightFail);
+char **row = nextRowFiltered(self, &rightFail, minChrom, minEnd);
 if (row == NULL)
     return NULL;
 return rowToAnnoRow(self, row, rightFail, callerLm);
 }
 
 static void asdClose(struct annoStreamer **pVSelf)
 /* Close db connection and free self. */
 {
 if (pVSelf == NULL)
     return;
 struct annoStreamDb *self = *(struct annoStreamDb **)pVSelf;
 lmCleanup(&(self->qLm));
 freeMem(self->table);
 sqlFreeResult(&(self->sr));
 hFreeConn(&(self->conn));