5c3f98617dd779a136cbd685e502e08bd2e7e990
angie
  Fri Oct 11 15:00:31 2013 -0700
Calling asdDoQueryChunking from way down in nextRowFromBuffer wasn'ta good idea because nextRowFromBuffer might have a caller that assumes
we're done merging... but after calling asdDoQueryChunking, we need
to start over with the merging.  So, return NULL, but set self->needQuery;
then, way up in asdNextRow, if we get a NULL, then check self->needQuery,
and try again if necessary.

diff --git src/hg/lib/annoStreamDb.c src/hg/lib/annoStreamDb.c
index 48939de..cfe4a12 100644
--- src/hg/lib/annoStreamDb.c
+++ src/hg/lib/annoStreamDb.c
@@ -43,32 +43,33 @@
     struct slName *queryChrom;		// most recently queried chrom for whole-genome (or NULL)
     boolean eof;			// TRUE when we are done (maxItems or no more items)
     boolean needQuery;			// TRUE when we haven't yet queried, or need to query again
     boolean doNextChunk;		// TRUE if rowBuf ends before end of chrom/region
     uint nextChunkStart;		// Start coord for next chunk of rows to query
 
     struct rowBuf
     // Temporary storage for rows from chunked query
         {
 	struct lm *lm;			// storage for rows
 	char ***buf;			// array of pointers to rows
 	int size;			// number of rows
 	int ix;				// offset in buffer, [0..size]
         } rowBuf;
 
-    char **(*nextRowRaw)(struct annoStreamDb *self, char *minChrom, uint minEnd);
+    char **(*nextRowRaw)(struct annoStreamDb *self);
     // Depending on query style, use either sqlNextRow or temporary row storage to get next row.
+    // This may return NULL but set self->needQuery; asdNextRow watches for that.
 
     void (*doQuery)(struct annoStreamDb *self, char *minChrom, uint minEnd);
     // Depending on query style, perform either a single query or (series of) chunked query
     };
 
 // For performance reasons, even if !useMaxItems (no limit), we need to limit the
 // number of rows that are returned from a query, so we can slurp them into memory and
 // close the sqlResult before mysql gets unhappy about the result being open so long.
 #define ASD_CHUNK_SIZE 100000
 
 static void resetMergeState(struct annoStreamDb *self)
 /* Reset or initialize members that track merging of coarse-bin items with fine-bin items. */
 {
 self->mergeBins = FALSE;
 self->bigItemQueue = self->smallItemQueue = NULL;
@@ -93,31 +94,31 @@
 self->doNextChunk = FALSE;
 self->needQuery = TRUE;
 resetRowBuf(&self->rowBuf);
 }
 
 static void asdSetRegion(struct annoStreamer *vSelf, char *chrom, uint regionStart, uint regionEnd)
 /* Set region -- and free current sqlResult if there is one. */
 {
 annoStreamerSetRegion(vSelf, chrom, regionStart, regionEnd);
 struct annoStreamDb *self = (struct annoStreamDb *)vSelf;
 sqlFreeResult(&(self->sr));
 resetMergeState(self);
 resetChunkState(self);
 }
 
-static char **nextRowFromSqlResult(struct annoStreamDb *self, char *minChrom, uint minEnd)
+static char **nextRowFromSqlResult(struct annoStreamDb *self)
 /* Stream rows directly from self->sr. */
 {
 return sqlNextRow(self->sr);
 }
 
 static void asdDoQuerySimple(struct annoStreamDb *self, char *minChrom, uint minEnd)
 /* Return a sqlResult for a query on table items in position range.
  * If doing a whole genome query. just 'select * from' table. */
 // NOTE: it would be possible to implement filters at this level, as in hgTables.
 {
 struct annoStreamer *streamer = &(self->streamer);
 struct dyString *query = sqlDyStringCreate("select * from %s", self->table);
 if (!streamer->positionIsGenome)
     {
     if (minChrom && differentString(minChrom, streamer->chrom))
@@ -354,83 +355,85 @@
 	    // region end is chromSize, so no need to constrain startField here:
 	    sqlDyStringPrintf(query, "%s > %u ", self->endField, start);
 	    }
 	if (self->notSorted)
 	    sqlDyStringPrintf(query, "order by %s ", self->startField);
 	dyStringPrintf(query, "limit %d", queryMaxItems);
 	bufferRowsFromSqlQuery(self, query->string, queryMaxItems);
 	// If there happens to be no items on chrom, try again with the next chrom:
 	if (! self->eof && self->rowBuf.size == 0)
 	    asdDoQueryChunking(self, minChrom, minEnd);
 	}
     }
 dyStringFree(&query);
 }
 
-static char **nextRowFromBuffer(struct annoStreamDb *self, char *minChrom, uint minEnd)
+static char **nextRowFromBuffer(struct annoStreamDb *self)
 /* Instead of streaming directly from self->sr, we have buffered up the results
  * of a chunked query; return the head of that queue. */
 {
 struct rowBuf *rowBuf = &self->rowBuf;
 if (rowBuf->ix > rowBuf->size)
     errAbort("annoStreamDb %s: rowBuf overflow (%d > %d)", self->streamer.name,
 	     rowBuf->ix, rowBuf->size);
 if (rowBuf->ix == rowBuf->size)
     {
     // Last row in buffer -- we'll need another query to get subsequent rows (if any).
     // But first, see if we need to update gotFinestBin, since getFinestBin might be
     // one of our callers.
     if (rowBuf->size > 0)
 	{
 	char **lastRow = rowBuf->buf[rowBuf->size-1];
 	int lastBin = atoi(lastRow[0]);
 	if (lastBin >= self->minFinestBin)
 	    self->gotFinestBin = TRUE;
 	}
-    asdDoQueryChunking(self, minChrom, minEnd);
+    self->needQuery = TRUE;
+    // Bounce back out -- asdNextRow will need to do another query.
+    return NULL;
     }
 if (rowBuf->size == 0)
     return NULL;
 else
     return rowBuf->buf[rowBuf->ix++];
 }
 
 static char **nextRowFiltered(struct annoStreamDb *self, boolean *retRightFail,
 			      char *minChrom, uint minEnd)
 /* Skip past any left-join failures until we get a right-join failure, a passing row,
  * or end of data.  Return row or NULL, and return right-join fail status via retRightFail. */
 {
 int numCols = self->streamer.numCols;
-char **row = self->nextRowRaw(self, minChrom, minEnd);
+char **row = self->nextRowRaw(self);
 if (minChrom != NULL && row != NULL)
     {
     // Ignore rows that fall completely before (minChrom, minEnd) - save annoGrator's time
     int chromIx = self->omitBin+self->chromIx;
     int endIx = self->omitBin+self->endIx;
     int chromCmp;
     while (row &&
 	   ((chromCmp = strcmp(row[chromIx], minChrom)) < 0 || // this chrom precedes minChrom
 	    (chromCmp == 0 && atoll(row[endIx]) < minEnd)))    // on minChrom, but before minEnd
-	row = self->nextRowRaw(self, minChrom, minEnd);
+	row = self->nextRowRaw(self);
     }
 boolean rightFail = FALSE;
 struct annoFilter *filterList = self->streamer.filters;
 while (row && annoFilterRowFails(filterList, row+self->omitBin, numCols, &rightFail))
     {
     if (rightFail)
 	break;
-    row = self->nextRowRaw(self, minChrom, minEnd);
+    row = self->nextRowRaw(self);
     }
 *retRightFail = rightFail;
 return row;
 }
 
 static struct annoRow *rowToAnnoRow(struct annoStreamDb *self, char **row, boolean rightFail,
 				    struct lm *lm)
 /* Extract coords from row and return an annoRow including right-fail status. */
 {
 row += self->omitBin;
 char *chrom = row[self->chromIx];
 uint chromStart = sqlUnsigned(row[self->startIx]);
 uint chromEnd = sqlUnsigned(row[self->endIx]);
 return annoRowFromStringArray(chrom, chromStart, chromEnd, rightFail, row,
 			      self->streamer.numCols, lm);
@@ -533,35 +536,48 @@
 	int numCols = self->streamer.numCols;
 	return annoRowClone(qRow, rowType, numCols, callerLm);
 	}
     }
 }
 
 static struct annoRow *asdNextRow(struct annoStreamer *vSelf, char *minChrom, uint minEnd,
 				  struct lm *callerLm)
 /* Perform sql query if we haven't already and return a single
  * annoRow, or NULL if there are no more items. */
 {
 struct annoStreamDb *self = (struct annoStreamDb *)vSelf;
 if (self->needQuery)
     self->doQuery(self, minChrom, minEnd);
 if (self->mergeBins)
-    return nextRowMergeBins(self, minChrom, minEnd, callerLm);
+    {
+    struct annoRow *aRow = nextRowMergeBins(self, minChrom, minEnd, callerLm);
+    if (aRow == NULL && self->needQuery && !self->eof)
+	// Recurse: query, then get next merged/filtered row:
+	return asdNextRow(vSelf, minChrom, minEnd, callerLm);
+    else
+	return aRow;
+    }
 boolean rightFail = FALSE;
 char **row = nextRowFiltered(self, &rightFail, minChrom, minEnd);
 if (row == NULL)
+    {
+    if (self->needQuery && !self->eof)
+	// Recurse: query, then get next merged/filtered row:
+	return asdNextRow(vSelf, minChrom, minEnd, callerLm);
+    else
 	return NULL;
+    }
 return rowToAnnoRow(self, row, rightFail, callerLm);
 }
 
 static void asdClose(struct annoStreamer **pVSelf)
 /* Close db connection and free self. */
 {
 if (pVSelf == NULL)
     return;
 struct annoStreamDb *self = *(struct annoStreamDb **)pVSelf;
 lmCleanup(&(self->qLm));
 freeMem(self->table);
 sqlFreeResult(&(self->sr));
 hFreeConn(&(self->conn));
 annoStreamerFree(pVSelf);
 }