1c6cebb2113b22e71b73fb51d43bc31a06c90046
angie
  Mon Aug 31 12:47:59 2015 -0700
Discard large-bin items that fall on or after nextChunkStart because they
will break the (chrom, chromStart) sorting order if they are also to the
right of small-bin items on or after nextChunkStart -- and they will appear
again in the next chunk of rows.

Also a couple other fixes for problems found while debugging:
- there was a spot where I forgot to restart merging after calling
resetMergeState (when moving on to the next chrom in a genome-wide query)
- in mergeRow, don't resetMergeState until both big and small queues are empty

refs #15953

diff --git src/hg/lib/annoStreamDb.c src/hg/lib/annoStreamDb.c
index 7318f18..3e55037 100644
--- src/hg/lib/annoStreamDb.c
+++ src/hg/lib/annoStreamDb.c
@@ -92,30 +92,38 @@
 rowBuf->size = 0;
 rowBuf->ix = 0;
 lmCleanup(&(rowBuf->lm));
 }
 
 static void resetChunkState(struct annoStreamDb *self)
 /* Reset members that track chunked queries. */
 {
 self->queryChrom = NULL;
 self->eof = FALSE;
 self->doNextChunk = FALSE;
 self->needQuery = TRUE;
 resetRowBuf(&self->rowBuf);
 }
 
+static void startMerging(struct annoStreamDb *self)
+/* Set self->mergeBins flag and create self->qLm if necessary. */
+{
+self->mergeBins = TRUE;
+if (self->qLm == NULL)
+    self->qLm = lmInit(0);
+}
+
 static void asdSetRegion(struct annoStreamer *vSelf, char *chrom, uint regionStart, uint regionEnd)
 /* Set region -- and free current sqlResult if there is one. */
 {
 annoStreamerSetRegion(vSelf, chrom, regionStart, regionEnd);
 struct annoStreamDb *self = (struct annoStreamDb *)vSelf;
 sqlFreeResult(&(self->sr));
 resetMergeState(self);
 resetChunkState(self);
 }
 
 static char **nextRowFromSqlResult(struct annoStreamDb *self)
 /* Stream rows directly from self->sr. */
 {
 return sqlNextRow(self->sr);
 }
@@ -137,32 +145,31 @@
 {
 struct annoStreamer *streamer = &(self->streamer);
 boolean hasWhere = FALSE;
 struct dyString *query = self->makeBaselineQuery(self, &hasWhere);
 if (!streamer->positionIsGenome)
     {
     if (minChrom && differentString(minChrom, streamer->chrom))
 	errAbort("annoStreamDb %s: nextRow minChrom='%s' but region chrom='%s'",
 		 streamer->name, minChrom, streamer->chrom);
     if (self->hasBin)
 	{
 	// Results will be in bin order, but we can restore chromStart order by
 	// accumulating initial coarse-bin items and merge-sorting them with
 	// subsequent finest-bin items which will be in chromStart order.
 	resetMergeState(self);
-	self->mergeBins = TRUE;
-	self->qLm = lmInit(0);
+        startMerging(self);
 	}
     if (self->endFieldIndexName != NULL)
 	// Don't let mysql use a (chrom, chromEnd) index because that messes up
 	// sorting by chromStart.
 	sqlDyStringPrintf(query, " IGNORE INDEX (%s)", self->endFieldIndexName);
     sqlDyStringAppend(query, hasWhere ? " and " : " where ");
     sqlDyStringPrintf(query, "%s='%s'", self->chromField, streamer->chrom);
     int chromSize = annoAssemblySeqSize(streamer->assembly, streamer->chrom);
     if (streamer->regionStart != 0 || streamer->regionEnd != chromSize)
 	{
 	dyStringAppend(query, " and ");
 	if (self->hasBin)
 	    hAddBinToQuery(streamer->regionStart, streamer->regionEnd, query);
 	sqlDyStringPrintf(query, "%s < %u and %s > %u", self->startField, streamer->regionEnd,
 		       self->endField, streamer->regionStart);
@@ -273,33 +280,31 @@
 	self->eof = TRUE;
     }
 if (self->eof)
     return;
 int queryMaxItems = ASD_CHUNK_SIZE;
 if (self->useMaxOutRows && self->maxOutRows < queryMaxItems)
     queryMaxItems = self->maxOutRows;
 if (self->hasBin)
     {
     // Results will be in bin order, but we can restore chromStart order by
     // accumulating initial coarse-bin items and merge-sorting them with
     // subsequent finest-bin items which will be in chromStart order.
     if (self->doNextChunk && self->mergeBins && !self->gotFinestBin)
 	errAbort("annoStreamDb %s: can't continue merge in chunking query; "
 		 "increase ASD_CHUNK_SIZE", sSelf->name);
-    self->mergeBins = TRUE;
-    if (self->qLm == NULL)
-	self->qLm = lmInit(0);
+    startMerging(self);
     }
 if (self->endFieldIndexName != NULL)
     // Don't let mysql use a (chrom, chromEnd) index because that messes up
     // sorting by chromStart.
     sqlDyStringPrintf(query, " IGNORE INDEX (%s) ", self->endFieldIndexName);
 if (sSelf->chrom != NULL)
     {
     uint start = sSelf->regionStart;
     if (minChrom)
 	{
 	if (differentString(minChrom, sSelf->chrom))
 	    errAbort("annoStreamDb %s: nextRow minChrom='%s' but region chrom='%s'",
 		     sSelf->name, minChrom, sSelf->chrom);
 	if (start < minEnd)
 	    start = minEnd;
@@ -322,46 +327,48 @@
     if (self->notSorted)
 	sqlDyStringPrintf(query, "order by %s ", self->startField);
     sqlDyStringPrintf(query, "limit %d", queryMaxItems);
     bufferRowsFromSqlQuery(self, query->string, queryMaxItems);
     if (self->rowBuf.size == 0)
 	self->eof = TRUE;
     }
 else
     {
     // Genome-wide query: break it into chrom-by-chrom queries.
     if (self->queryChrom == NULL)
 	self->queryChrom = self->chromList;
     else if (!self->doNextChunk)
 	{
 	self->queryChrom = self->queryChrom->next;
+        if (self->hasBin)
+            {
             resetMergeState(self);
+            startMerging(self);
+            }
 	}
     if (minChrom != NULL)
 	{
 	// Skip chroms that precede minChrom
 	while (self->queryChrom != NULL && strcmp(self->queryChrom->name, minChrom) < 0)
 	    {
 	    self->queryChrom = self->queryChrom->next;
 	    self->doNextChunk = FALSE;
-	    resetMergeState(self);
 	    }
 	if (self->hasBin)
             {
-	    self->mergeBins = TRUE;
-	    if (self->qLm == NULL)
-		self->qLm = lmInit(0);
+	    resetMergeState(self);
+            startMerging(self);
             }
 	}
     if (self->queryChrom == NULL)
 	self->eof = TRUE;
     else
 	{
 	char *chrom = self->queryChrom->name;
 	int start = 0;
 	if (minChrom != NULL && sameString(chrom, minChrom))
 	    start = minEnd;
 	if (self->doNextChunk && start < self->nextChunkStart)
 	    start = self->nextChunkStart;
 	uint end = annoAssemblySeqSize(self->streamer.assembly, self->queryChrom->name);
 	sqlDyStringAppend(query, hasWhere ? " and " : " where ");
 	sqlDyStringPrintf(query, "%s = '%s' ", self->chromField, chrom);
@@ -462,65 +469,71 @@
 uint chromStart = sqlUnsigned(row[self->startIx]);
 uint chromEnd = sqlUnsigned(row[self->endIx]);
 return annoRowFromStringArray(chrom, chromStart, chromEnd, rightFail, row,
 			      self->streamer.numCols, lm);
 }
 
 static char **getFinestBinItem(struct annoStreamDb *self, char **row, boolean *pRightFail,
 			       char *minChrom, uint minEnd)
 /* If row is a coarse-bin item, add it to bigItemQueue, get the next row(s) and
  * add any subsequent coarse-bin items to bigItemQueue.  As soon as we get an item from a
  * finest-level bin (or NULL), sort the bigItemQueue and return the finest-bin item/row. */
 {
 int bin = atoi(row[0]);
 while (bin < self->minFinestBin)
     {
-    // big item -- store aside in queue for merging later, move on to next item
-    slAddHead(&(self->bigItemQueue), rowToAnnoRow(self, row, *pRightFail, self->qLm));
+    // big item -- store aside in queue for merging later (unless it falls off the end of
+    // the current chunk), move on to next item
+    struct annoRow *aRow = rowToAnnoRow(self, row, *pRightFail, self->qLm);
+    if (! (self->doNextChunk && self->nextChunkStart <= aRow->start))
+        slAddHead(&(self->bigItemQueue), aRow);
     *pRightFail = FALSE;
     row = nextRowFiltered(self, pRightFail, minChrom, minEnd);
     if (row == NULL)
 	break;
     bin = atoi(row[0]);
     }
 // First finest-bin item!  Sort bigItemQueue in preparation for merging:
 self->gotFinestBin = TRUE;
 slReverse(&(self->bigItemQueue));
 slSort(&(self->bigItemQueue), annoRowCmp);
 return row;
 }
 
 static struct annoRow *mergeRow(struct annoStreamDb *self, struct annoRow *aRow,
 				struct lm *callerLm)
 /* Compare head of bigItemQueue with (finest-bin) aRow; return the one with
  * lower chromStart and save the other for later.  */
 {
 struct annoRow *outRow = aRow;
-if (self->bigItemQueue == NULL)
-    {
-    // No coarse-bin items to merge-sort, just stream finest-bin items from here on out.
-    resetMergeState(self);
-    }
-else if (annoRowCmp(&(self->bigItemQueue), &aRow) < 0)
+if (self->bigItemQueue != NULL && annoRowCmp(&(self->bigItemQueue), &aRow) < 0)
     {
     // Big item gets to go now, so save aside small item for next time.
     outRow = slPopHead(&(self->bigItemQueue));
     slAddHead(&(self->smallItemQueue), aRow);
     }
+// Clone outRow using callerLm
 enum annoRowType rowType = self->streamer.rowType;
 int numCols = self->streamer.numCols;
-return annoRowClone(outRow, rowType, numCols, callerLm);
+outRow = annoRowClone(outRow, rowType, numCols, callerLm);
+if (self->bigItemQueue == NULL && self->smallItemQueue == NULL)
+    {
+    // No coarse-bin items to merge-sort, just stream finest-bin items from here on out.
+    // This needs to be done after cloning outRow because it was allocated in self->qLm.
+    resetMergeState(self);
+    }
+return outRow;
 }
 
 static struct annoRow *nextQueuedRow(struct annoStreamDb *self, struct lm *callerLm)
 // Return the head of either bigItemQueue or smallItemQueue, depending on which has
 // the lower chromStart.
 {
 struct annoRow *row = NULL;
 if (self->bigItemQueue && annoRowCmp(&(self->bigItemQueue), &(self->smallItemQueue)) < 0)
     row = slPopHead(&(self->bigItemQueue));
 else
     row = slPopHead(&(self->smallItemQueue));
 if (self->bigItemQueue == NULL && self->smallItemQueue == NULL)
     // All done merge-sorting, just stream finest-bin items from here on out.
     self->mergeBins = FALSE;
 enum annoRowType rowType = self->streamer.rowType;