40ed3e546ef868bffe4c338d93d6528138ecfc44
angie
  Wed Sep 30 09:25:12 2015 -0700
Add test cases (and bug fixes!) to make sure that we get the desired
behavior when an insertion is adjacent to a non-insertion, and when
an insertion falls at the start or end of the search region (if applicable):

- Include insertions that fall at the start or end of the search region

- If the primary row is an insertion, keep secondary non-insertion rows
to the left and right.

- If the primary row is a non-insertion, keep secondary insertion rows
at its start and end.

(i.e. keep insertions at boundaries -- but don't let any non-insertions
slip through)

The VCF logic is more complicated because VCF indels always include an
extra base to the left, so they appear to start before they actually do,
and can be interspersed with non-indels that start there.

diff --git src/hg/lib/annoStreamDb.c src/hg/lib/annoStreamDb.c
index 3eb57e7..8348420 100644
--- src/hg/lib/annoStreamDb.c
+++ src/hg/lib/annoStreamDb.c
@@ -1,725 +1,749 @@
 /* annoStreamDb -- subclass of annoStreamer for database tables */
 
 /* Copyright (C) 2014 The Regents of the University of California 
  * See README in this or parent directory for licensing information. */
 
 #include "annoStreamDb.h"
 #include "annoGratorQuery.h"
 #include "binRange.h"
 #include "hdb.h"
 #include "sqlNum.h"
 
 struct annoStreamDb
     {
     struct annoStreamer streamer;	// Parent class members & methods
     // Private members
     struct sqlConnection *conn;		// Database connection (e.g. hg19 or customTrash)
     struct sqlResult *sr;		// SQL query result from which we grab rows
     char *table;			// Table name, must exist in database
 
     struct dyString *(*makeBaselineQuery)(struct annoStreamDb *self, boolean *retHasWhere);
     /* Provide baseline query, by default just 'select * from <table>'.
      * Override this to make a query with specific fields, joins etc.
      * If the returned query includes a join/where, set *retHasWhere to TRUE. */
 
     // These members enable us to extract coords from the otherwise unknown row:
     char *chromField;			// Name of chrom-ish column in table
     char *startField;			// Name of chromStart-ish column in table
     char *endField;			// Name of chromEnd-ish column in table
     int chromIx;			// Index of chrom-ish col in autoSql or bin-less table
     int startIx;			// Index of chromStart-ish col in autoSql or bin-less table
     int endIx;				// Index of chromEnd-ish col in autoSql or bin-less table
 
     // These members enable us to produce {chrom, start}-sorted output:
     char *endFieldIndexName;		// SQL index on end field, if any (can mess up sorting)
     boolean notSorted;			// TRUE if table is not sorted (e.g. genbank-updated)
     boolean hasBin;			// 1 if SQL table's first column is bin
     boolean omitBin;			// 1 if table hasBin and autoSql doesn't have bin
     boolean mergeBins;			// TRUE if query results will be in bin order
     struct annoRow *bigItemQueue;	// If mergeBins, accumulate coarse-bin items here
     struct annoRow *smallItemQueue;	// Max 1 item for merge-sorting with bigItemQueue
     struct lm *qLm;			// localmem for merge-sorting queues
     int minFinestBin;			// Smallest bin number for finest bin level
     boolean gotFinestBin;		// Flag that it's time to merge-sort with bigItemQueue
 
     // Limit (or not) number of rows processed:
     boolean useMaxOutRows;		// TRUE if maxOutRows passed to annoStreamDbNew is > 0
     int maxOutRows;			// Maximum number of rows we can output.
 
     // Process large tables in manageable chunks:
     struct slName *chromList;		// list of chromosomes/sequences in assembly
     struct slName *queryChrom;		// most recently queried chrom for whole-genome (or NULL)
     boolean eof;			// TRUE when we are done (maxItems or no more items)
     boolean needQuery;			// TRUE when we haven't yet queried, or need to query again
     boolean doNextChunk;		// TRUE if rowBuf ends before end of chrom/region
     uint nextChunkStart;		// Start coord for next chunk of rows to query
 
     struct rowBuf
     // Temporary storage for rows from chunked query
         {
 	struct lm *lm;			// storage for rows
 	char ***buf;			// array of pointers to rows
 	int size;			// number of rows
 	int ix;				// offset in buffer, [0..size]
         } rowBuf;
 
     char **(*nextRowRaw)(struct annoStreamDb *self);
     // Depending on query style, use either sqlNextRow or temporary row storage to get next row.
     // This may return NULL but set self->needQuery; asdNextRow watches for that.
 
     void (*doQuery)(struct annoStreamDb *self, char *minChrom, uint minEnd);
     // Depending on query style, perform either a single query or (series of) chunked query
     };
 
 // For performance reasons, even if !useMaxItems (no limit), we need to limit the
 // number of rows that are returned from a query, so we can slurp them into memory and
 // close the sqlResult before mysql gets unhappy about the result being open so long.
 #define ASD_CHUNK_SIZE 100000
 
 static void resetMergeState(struct annoStreamDb *self)
 /* Reset or initialize members that track merging of coarse-bin items with fine-bin items. */
 {
 self->mergeBins = FALSE;
 self->bigItemQueue = self->smallItemQueue = NULL;
 lmCleanup(&(self->qLm));
 self->gotFinestBin = FALSE;
 }
 
 static void resetRowBuf(struct rowBuf *rowBuf)
 /* Reset temporary storage for chunked query rows. */
 {
 rowBuf->buf = NULL;
 rowBuf->size = 0;
 rowBuf->ix = 0;
 lmCleanup(&(rowBuf->lm));
 }
 
 static void resetChunkState(struct annoStreamDb *self)
 /* Reset members that track chunked queries. */
 {
 self->queryChrom = NULL;
 self->eof = FALSE;
 self->doNextChunk = FALSE;
 self->needQuery = TRUE;
 resetRowBuf(&self->rowBuf);
 }
 
 static void startMerging(struct annoStreamDb *self)
 /* Set self->mergeBins flag and create self->qLm if necessary. */
 {
 self->mergeBins = TRUE;
+self->gotFinestBin = FALSE;
 if (self->qLm == NULL)
     self->qLm = lmInit(0);
 }
 
 static void asdSetRegion(struct annoStreamer *vSelf, char *chrom, uint regionStart, uint regionEnd)
 /* Set region -- and free current sqlResult if there is one. */
 {
 annoStreamerSetRegion(vSelf, chrom, regionStart, regionEnd);
 struct annoStreamDb *self = (struct annoStreamDb *)vSelf;
 sqlFreeResult(&(self->sr));
 resetMergeState(self);
 resetChunkState(self);
 }
 
 static char **nextRowFromSqlResult(struct annoStreamDb *self)
 /* Stream rows directly from self->sr. */
 {
 return sqlNextRow(self->sr);
 }
 
 static struct dyString *asdMakeBaselineQuery(struct annoStreamDb *self, boolean *retHasWhere)
 /* Return a baseline query, i.e. "select * from <table>".  This is the default implementation
  * of annoStreamDb.makeBaselineQuery. */
 {
 if (retHasWhere)
     *retHasWhere = FALSE;
 return sqlDyStringCreate("select * from %s ", self->table);
 }
 
+static void addRangeToQuery(struct annoStreamDb *self, struct dyString *query,
+                            char *chrom, uint start, uint end, boolean hasWhere)
+/* Add position constraints to query. */
+{
+sqlDyStringAppend(query, hasWhere ? " and " : " where ");
+sqlDyStringPrintf(query, "%s = '%s' ", self->chromField, chrom);
+uint chromSize = annoAssemblySeqSize(self->streamer.assembly, chrom);
+boolean addStartConstraint = (start > 0);
+boolean addEndConstraint = (end < chromSize);
+if (addStartConstraint || addEndConstraint)
+    {
+    sqlDyStringAppend(query, "and ");
+    if (self->hasBin)
+        hAddBinToQuery(start, end, query);
+    if (addStartConstraint)
+        {
+        if (self->doNextChunk)
+            sqlDyStringPrintf(query, "%s >= %u ", self->startField, start);
+        else
+            // Make sure to include insertions at start:
+            sqlDyStringPrintf(query, "(%s > %u or (%s = %s and %s = %u)) ",
+                              self->endField, start,
+                              self->endField, self->startField, self->startField, start);
+        }
+    if (addEndConstraint)
+        {
+        if (addStartConstraint)
+            sqlDyStringAppend(query, "and ");
+        // Make sure to include insertions at end:
+        sqlDyStringPrintf(query, "(%s < %u or (%s = %s and %s = %u)) ",
+                          self->startField, end,
+                          self->startField, self->endField, self->endField, end);
+        }
+    }
+}
 
 static void asdDoQuerySimple(struct annoStreamDb *self, char *minChrom, uint minEnd)
 /* Return a sqlResult for a query on table items in position range.
  * If doing a whole genome query. just select all rows from table. */
 // NOTE: it would be possible to implement filters at this level, as in hgTables.
 {
 struct annoStreamer *streamer = &(self->streamer);
 boolean hasWhere = FALSE;
 struct dyString *query = self->makeBaselineQuery(self, &hasWhere);
 if (!streamer->positionIsGenome)
     {
     if (minChrom && differentString(minChrom, streamer->chrom))
 	errAbort("annoStreamDb %s: nextRow minChrom='%s' but region chrom='%s'",
 		 streamer->name, minChrom, streamer->chrom);
     if (self->hasBin)
 	{
 	// Results will be in bin order, but we can restore chromStart order by
 	// accumulating initial coarse-bin items and merge-sorting them with
 	// subsequent finest-bin items which will be in chromStart order.
 	resetMergeState(self);
         startMerging(self);
 	}
     if (self->endFieldIndexName != NULL)
 	// Don't let mysql use a (chrom, chromEnd) index because that messes up
 	// sorting by chromStart.
 	sqlDyStringPrintf(query, " IGNORE INDEX (%s)", self->endFieldIndexName);
-    sqlDyStringAppend(query, hasWhere ? " and " : " where ");
-    sqlDyStringPrintf(query, "%s='%s'", self->chromField, streamer->chrom);
-    int chromSize = annoAssemblySeqSize(streamer->assembly, streamer->chrom);
-    if (streamer->regionStart != 0 || streamer->regionEnd != chromSize)
-	{
-	dyStringAppend(query, " and ");
-	if (self->hasBin)
-	    hAddBinToQuery(streamer->regionStart, streamer->regionEnd, query);
-	sqlDyStringPrintf(query, "%s < %u and %s > %u", self->startField, streamer->regionEnd,
-		       self->endField, streamer->regionStart);
+    addRangeToQuery(self, query, streamer->chrom, streamer->regionStart, streamer->regionEnd,
+                    hasWhere);
     }
 if (self->notSorted)
-	sqlDyStringPrintf(query, " order by %s", self->startField);
-    }
-else if (self->notSorted)
     sqlDyStringPrintf(query, " order by %s,%s", self->chromField, self->startField);
 if (self->maxOutRows > 0)
     dyStringPrintf(query, " limit %d", self->maxOutRows);
 struct sqlResult *sr = sqlGetResult(self->conn, query->string);
 dyStringFree(&query);
 self->sr = sr;
 self->needQuery = FALSE;
 }
 
 static void rowBufInit(struct rowBuf *rowBuf, int size)
 /* Clean up rowBuf and give it a new lm and buffer[size]. */
 {
 resetRowBuf(rowBuf);
 rowBuf->lm = lmInit(0);
 rowBuf->size = size;
 lmAllocArray(rowBuf->lm, rowBuf->buf, size);
 }
 
 static char **lmCloneRow(struct lm *lm, char **row, int colCount)
 /* Use lm to allocate an array of strings and its contents copied from row. */
 {
 char **cloneRow = NULL;
 lmAllocArray(lm, cloneRow, colCount);
 int i;
 for (i = 0;  i < colCount;  i++)
     cloneRow[i] = lmCloneString(lm, row[i]);
 return cloneRow;
 }
 
 static void updateNextChunkState(struct annoStreamDb *self, int queryMaxItems)
 /* If the just-fetched interval list was limited to ASD_CHUNK_SIZE, set doNextChunk
  * and trim the last row(s) so that when we query the next chunk, we don't get
  * repeat rows due to querying a start coord that was already returned. */
 {
 struct rowBuf *rowBuf = &self->rowBuf;
 if (queryMaxItems == ASD_CHUNK_SIZE && rowBuf->size == ASD_CHUNK_SIZE)
     {
     self->doNextChunk = TRUE;
     // Starting at the last row in rowBuf, work back to find a value with a different start.
     int ix = rowBuf->size - 1;
     char **words = rowBuf->buf[ix];
     int startIx = self->startIx + self->omitBin;
     uint lastStart = atoll(words[startIx]);
     for (ix = rowBuf->size - 2;  ix >= 0;  ix--)
 	{
 	words = rowBuf->buf[ix];
 	uint thisStart = atoll(words[startIx]);
 	if (thisStart != lastStart)
 	    {
 	    rowBuf->size = ix+1;
 	    self->nextChunkStart = lastStart;
 	    break;
 	    }
 	}
     }
 else
     self->doNextChunk = FALSE;
 self->needQuery = FALSE;
 }
 
 static void bufferRowsFromSqlQuery(struct annoStreamDb *self, char *query, int queryMaxItems)
 /* Store all rows from query in rowBuf. */
 {
 struct sqlResult *sr = sqlGetResult(self->conn, query);
 struct rowBuf *rowBuf = &(self->rowBuf);
 rowBufInit(rowBuf, ASD_CHUNK_SIZE);
 struct annoStreamer *sSelf = &(self->streamer);
 char **row = NULL;
 int ix = 0;
 while ((row = sqlNextRow(sr)) != NULL)
     {
     if (ix >= rowBuf->size)
 	errAbort("annoStreamDb %s: rowBuf overflow, got more than %d rows",
 		 sSelf->name, rowBuf->size);
     rowBuf->buf[ix++] = lmCloneRow(rowBuf->lm, row, sSelf->numCols+self->omitBin);
     }
 // Set rowBuf->size to the number of rows we actually stored.
 rowBuf->size = ix;
 sqlFreeResult(&sr);
 updateNextChunkState(self, queryMaxItems);
 }
 
+static void updateQueryChrom(struct annoStreamDb *self, char *minChrom)
+/* Figure out whether we need to query the next chunk on the current chromosome
+ * or move on to the next chromosome. */
+{
+if (self->queryChrom == NULL)
+    self->queryChrom = self->chromList;
+else if (!self->doNextChunk)
+    {
+    self->queryChrom = self->queryChrom->next;
+    if (self->hasBin)
+        {
+        resetMergeState(self);
+        startMerging(self);
+        }
+    }
+// -- don't resetMergeState if doNextChunk.
+if (minChrom != NULL)
+    {
+    // Skip chroms that precede minChrom
+    while (self->queryChrom != NULL && strcmp(self->queryChrom->name, minChrom) < 0)
+        {
+        self->queryChrom = self->queryChrom->next;
+        self->doNextChunk = FALSE;
+        }
+    if (self->hasBin)
+        {
+        resetMergeState(self);
+        startMerging(self);
+        }
+    }
+}
+
+static void doOneChunkQuery(struct annoStreamDb *self, struct dyString *query,
+                         char *chrom, uint start, uint end,
+                         boolean hasWhere, int maxItems)
+/* Add range constraints to query, perform query and buffer the results. */
+{
+addRangeToQuery(self, query, chrom, start, end, hasWhere);
+if (self->notSorted)
+    sqlDyStringPrintf(query, "order by %s ", self->startField);
+sqlDyStringPrintf(query, "limit %d", maxItems);
+bufferRowsFromSqlQuery(self, query->string, maxItems);
+}
+
 static void asdDoQueryChunking(struct annoStreamDb *self, char *minChrom, uint minEnd)
-/* Return a sqlResult for a query on table items in position range.
- * If doing a whole genome query, just select all rows from table. */
+/* Get rows from mysql with a limit on the number of rows returned at one time (ASD_CHUNK_SIZE),
+ * to avoid long delays for very large tables.  This will be called multiple times if
+ * the number of rows in region is more than ASD_CHUNK_SIZE.  If doing a genome-wide query,
+ * break it up into chrom-by-chrom queries because the code that merges large bin items
+ * in with small bin items assumes that all rows are on the same chrom. */
 {
 struct annoStreamer *sSelf = &(self->streamer);
 boolean hasWhere = FALSE;
 struct dyString *query = self->makeBaselineQuery(self, &hasWhere);
 if (sSelf->chrom != NULL && self->rowBuf.size > 0 && !self->doNextChunk)
     {
     // We're doing a region query, we already got some rows, and don't need another chunk:
     resetRowBuf(&self->rowBuf);
     self->eof = TRUE;
     }
 if (self->useMaxOutRows)
     {
     self->maxOutRows -= self->rowBuf.size;
     if (self->maxOutRows <= 0)
 	self->eof = TRUE;
     }
 if (self->eof)
     return;
 int queryMaxItems = ASD_CHUNK_SIZE;
 if (self->useMaxOutRows && self->maxOutRows < queryMaxItems)
     queryMaxItems = self->maxOutRows;
 if (self->hasBin)
     {
     // Results will be in bin order, but we can restore chromStart order by
     // accumulating initial coarse-bin items and merge-sorting them with
     // subsequent finest-bin items which will be in chromStart order.
     if (self->doNextChunk && self->mergeBins && !self->gotFinestBin)
 	errAbort("annoStreamDb %s: can't continue merge in chunking query; "
 		 "increase ASD_CHUNK_SIZE", sSelf->name);
+    // Don't reset merge state here in case bigItemQueue has a large-bin item
+    // at the end of the chrom, past all smallest-bin items.
     startMerging(self);
     }
 if (self->endFieldIndexName != NULL)
     // Don't let mysql use a (chrom, chromEnd) index because that messes up
     // sorting by chromStart.
     sqlDyStringPrintf(query, " IGNORE INDEX (%s) ", self->endFieldIndexName);
 if (sSelf->chrom != NULL)
     {
+    // Region query (but might end up as multiple chunked queries)
+    char *chrom = sSelf->chrom;
     uint start = sSelf->regionStart;
+    uint end = sSelf->regionEnd;
     if (minChrom)
 	{
-	if (differentString(minChrom, sSelf->chrom))
+	if (differentString(minChrom, chrom))
 	    errAbort("annoStreamDb %s: nextRow minChrom='%s' but region chrom='%s'",
-		     sSelf->name, minChrom, sSelf->chrom);
+		     sSelf->name, minChrom, chrom);
 	if (start < minEnd)
 	    start = minEnd;
 	}
     if (self->doNextChunk && start < self->nextChunkStart)
 	start = self->nextChunkStart;
-    sqlDyStringAppend(query, hasWhere ? " and " : " where ");
-    sqlDyStringPrintf(query, "%s = '%s' and ", self->chromField, sSelf->chrom);
-    if (self->hasBin)
-	{
-	hAddBinToQuery(start, sSelf->regionEnd, query);
-	}
-    if (self->doNextChunk)
-	sqlDyStringPrintf(query, "%s >= %u and ", self->startField, self->nextChunkStart);
-    sqlDyStringPrintf(query, "%s < %u and %s > %u ", self->startField, sSelf->regionEnd,
-		      self->endField, start);
-    if (self->notSorted)
-	sqlDyStringPrintf(query, "order by %s ", self->startField);
-    sqlDyStringPrintf(query, "limit %d", queryMaxItems);
-    bufferRowsFromSqlQuery(self, query->string, queryMaxItems);
+    doOneChunkQuery(self, query, chrom, start, end, hasWhere, queryMaxItems);
     if (self->rowBuf.size == 0)
 	self->eof = TRUE;
     }
 else
     {
-    // Genome-wide query: break it into chrom-by-chrom queries.
-    if (self->queryChrom == NULL)
-	self->queryChrom = self->chromList;
-    else if (!self->doNextChunk)
-	{
-	self->queryChrom = self->queryChrom->next;
-        if (self->hasBin)
-            {
-            resetMergeState(self);
-            startMerging(self);
-            }
-	}
-    if (minChrom != NULL)
-	{
-	// Skip chroms that precede minChrom
-	while (self->queryChrom != NULL && strcmp(self->queryChrom->name, minChrom) < 0)
-	    {
-	    self->queryChrom = self->queryChrom->next;
-	    self->doNextChunk = FALSE;
-	    }
-	if (self->hasBin)
-            {
-	    resetMergeState(self);
-            startMerging(self);
-            }
-	}
+    // Genome-wide query: break it into chrom-by-chrom queries (that might be chunked)
+    // because the mergeBins stuff assumes that all rows are from the same chrom.
+    updateQueryChrom(self, minChrom);
     if (self->queryChrom == NULL)
 	self->eof = TRUE;
     else
 	{
 	char *chrom = self->queryChrom->name;
 	int start = 0;
 	if (minChrom != NULL && sameString(chrom, minChrom))
 	    start = minEnd;
 	if (self->doNextChunk && start < self->nextChunkStart)
 	    start = self->nextChunkStart;
-	uint end = annoAssemblySeqSize(self->streamer.assembly, self->queryChrom->name);
-	sqlDyStringAppend(query, hasWhere ? " and " : " where ");
-	sqlDyStringPrintf(query, "%s = '%s' ", self->chromField, chrom);
-	if (start > 0 || self->doNextChunk)
-	    {
-	    dyStringAppend(query, "and ");
-	    if (self->hasBin)
-		{
-		hAddBinToQuery(start, end, query);
-		}
-	    if (self->doNextChunk)
-		sqlDyStringPrintf(query, "%s >= %u and ", self->startField, self->nextChunkStart);
-	    // region end is chromSize, so no need to constrain startField here:
-	    sqlDyStringPrintf(query, "%s > %u ", self->endField, start);
-	    }
-	if (self->notSorted)
-	    sqlDyStringPrintf(query, "order by %s ", self->startField);
-	dyStringPrintf(query, "limit %d", queryMaxItems);
-	bufferRowsFromSqlQuery(self, query->string, queryMaxItems);
+        uint end = annoAssemblySeqSize(self->streamer.assembly, chrom);
+        doOneChunkQuery(self, query, chrom, start, end, hasWhere, queryMaxItems);
 	// If there happens to be no items on chrom, try again with the next chrom:
 	if (! self->eof && self->rowBuf.size == 0)
 	    asdDoQueryChunking(self, minChrom, minEnd);
 	}
     }
 dyStringFree(&query);
 }
 
 static char **nextRowFromBuffer(struct annoStreamDb *self)
 /* Instead of streaming directly from self->sr, we have buffered up the results
  * of a chunked query; return the head of that queue. */
 {
 struct rowBuf *rowBuf = &self->rowBuf;
 if (rowBuf->ix > rowBuf->size)
     errAbort("annoStreamDb %s: rowBuf overflow (%d > %d)", self->streamer.name,
 	     rowBuf->ix, rowBuf->size);
 if (rowBuf->ix == rowBuf->size)
     {
     // Last row in buffer -- we'll need another query to get subsequent rows (if any).
     // But first, see if we need to update gotFinestBin, since getFinestBin might be
     // one of our callers.
     if (rowBuf->size > 0)
 	{
 	char **lastRow = rowBuf->buf[rowBuf->size-1];
 	int lastBin = atoi(lastRow[0]);
 	if (lastBin >= self->minFinestBin)
 	    self->gotFinestBin = TRUE;
 	}
     if (self->bigItemQueue == NULL && self->smallItemQueue == NULL)
         self->needQuery = TRUE;
     // Bounce back out -- asdNextRow or nextRowMergeBins will need to do another query.
     return NULL;
     }
 if (rowBuf->size == 0)
     return NULL;
 else
     return rowBuf->buf[rowBuf->ix++];
 }
 
 static char **nextRowFiltered(struct annoStreamDb *self, boolean *retRightFail,
 			      char *minChrom, uint minEnd)
 /* Skip past any left-join failures until we get a right-join failure, a passing row,
  * or end of data.  Return row or NULL, and return right-join fail status via retRightFail. */
 {
 int numCols = self->streamer.numCols;
 char **row = self->nextRowRaw(self);
 if (minChrom != NULL && row != NULL)
     {
     // Ignore rows that fall completely before (minChrom, minEnd) - save annoGrator's time
     int chromIx = self->omitBin+self->chromIx;
     int endIx = self->omitBin+self->endIx;
     int chromCmp;
     while (row &&
 	   ((chromCmp = strcmp(row[chromIx], minChrom)) < 0 || // this chrom precedes minChrom
 	    (chromCmp == 0 && atoll(row[endIx]) < minEnd)))    // on minChrom, but before minEnd
 	row = self->nextRowRaw(self);
     }
 boolean rightFail = FALSE;
 struct annoFilter *filterList = self->streamer.filters;
 while (row && annoFilterRowFails(filterList, row+self->omitBin, numCols, &rightFail))
     {
     if (rightFail)
 	break;
     row = self->nextRowRaw(self);
     }
 *retRightFail = rightFail;
 return row;
 }
 
 static struct annoRow *rowToAnnoRow(struct annoStreamDb *self, char **row, boolean rightFail,
 				    struct lm *lm)
 /* Extract coords from row and return an annoRow including right-fail status. */
 {
 row += self->omitBin;
 char *chrom = row[self->chromIx];
 uint chromStart = sqlUnsigned(row[self->startIx]);
 uint chromEnd = sqlUnsigned(row[self->endIx]);
 return annoRowFromStringArray(chrom, chromStart, chromEnd, rightFail, row,
 			      self->streamer.numCols, lm);
 }
 
 static char **getFinestBinItem(struct annoStreamDb *self, char **row, boolean *pRightFail,
 			       char *minChrom, uint minEnd)
 /* If row is a coarse-bin item, add it to bigItemQueue, get the next row(s) and
  * add any subsequent coarse-bin items to bigItemQueue.  As soon as we get an item from a
  * finest-level bin (or NULL), sort the bigItemQueue and return the finest-bin item/row. */
 {
 int bin = atoi(row[0]);
 while (bin < self->minFinestBin)
     {
     // big item -- store aside in queue for merging later (unless it falls off the end of
     // the current chunk), move on to next item
     struct annoRow *aRow = rowToAnnoRow(self, row, *pRightFail, self->qLm);
     if (! (self->doNextChunk && self->nextChunkStart <= aRow->start))
         slAddHead(&(self->bigItemQueue), aRow);
     *pRightFail = FALSE;
     row = nextRowFiltered(self, pRightFail, minChrom, minEnd);
     if (row == NULL)
 	break;
     bin = atoi(row[0]);
     }
 // First finest-bin item!  Sort bigItemQueue in preparation for merging:
 self->gotFinestBin = TRUE;
 slReverse(&(self->bigItemQueue));
 slSort(&(self->bigItemQueue), annoRowCmp);
 return row;
 }
 
 static struct annoRow *mergeRow(struct annoStreamDb *self, struct annoRow *aRow,
 				struct lm *callerLm)
 /* Compare head of bigItemQueue with (finest-bin) aRow; return the one with
  * lower chromStart and save the other for later.  */
 {
 struct annoRow *outRow = aRow;
 if (self->bigItemQueue != NULL && annoRowCmp(&(self->bigItemQueue), &aRow) < 0)
     {
     // Big item gets to go now, so save aside small item for next time.
     outRow = slPopHead(&(self->bigItemQueue));
     slAddHead(&(self->smallItemQueue), aRow);
     }
 // Clone outRow using callerLm
 enum annoRowType rowType = self->streamer.rowType;
 int numCols = self->streamer.numCols;
 outRow = annoRowClone(outRow, rowType, numCols, callerLm);
 if (self->bigItemQueue == NULL && self->smallItemQueue == NULL)
     {
     // No coarse-bin items to merge-sort, just stream finest-bin items from here on out.
     // This needs to be done after cloning outRow because it was allocated in self->qLm.
     resetMergeState(self);
     }
 return outRow;
 }
 
 static struct annoRow *nextQueuedRow(struct annoStreamDb *self, struct lm *callerLm)
 // Return the head of either bigItemQueue or smallItemQueue, depending on which has
 // the lower chromStart.
 {
 struct annoRow *row = NULL;
 if (self->bigItemQueue && annoRowCmp(&(self->bigItemQueue), &(self->smallItemQueue)) < 0)
     row = slPopHead(&(self->bigItemQueue));
 else
     row = slPopHead(&(self->smallItemQueue));
 if (self->bigItemQueue == NULL && self->smallItemQueue == NULL)
     // All done merge-sorting, just stream finest-bin items from here on out.
     self->mergeBins = FALSE;
 enum annoRowType rowType = self->streamer.rowType;
 int numCols = self->streamer.numCols;
 return annoRowClone(row, rowType, numCols, callerLm);
 }
 
 static struct annoRow *nextRowMergeBins(struct annoStreamDb *self, char *minChrom, uint minEnd,
 					struct lm *callerLm)
 /* Fetch the next filtered row from mysql, merge-sorting coarse-bin items into finest-bin
  * items to maintain chromStart ordering. */
 {
 assert(self->mergeBins && self->hasBin);
 if (self->smallItemQueue)
     // In this case we have already begun merge-sorting; don't pull a new row from mysql,
     // use the queues.  This should keep smallItemQueue's max depth at 1.
     return nextQueuedRow(self, callerLm);
 else
     {
     // We might need to collect initial coarse-bin items, or might already be merge-sorting.
     boolean rightFail = FALSE;
     char **row = nextRowFiltered(self, &rightFail, minChrom, minEnd);
     if (row && !self->gotFinestBin)
 	{
 	// We are just starting -- queue up coarse-bin items, if any, until we get the first
 	// finest-bin item.
 	row = getFinestBinItem(self, row, &rightFail, minChrom, minEnd);
 	}
     // Time to merge-sort finest-bin items from mysql with coarse-bin items from queue.
     if (row != NULL)
 	{
 	struct annoRow *aRow = rowToAnnoRow(self, row, rightFail, self->qLm);
 	return mergeRow(self, aRow, callerLm);
 	}
     else
 	{
 	struct annoRow *qRow = slPopHead(&(self->bigItemQueue));
 	enum annoRowType rowType = self->streamer.rowType;
 	int numCols = self->streamer.numCols;
 	return annoRowClone(qRow, rowType, numCols, callerLm);
 	}
     }
 }
 
 static struct annoRow *asdNextRow(struct annoStreamer *vSelf, char *minChrom, uint minEnd,
 				  struct lm *callerLm)
 /* Perform sql query if we haven't already and return a single
  * annoRow, or NULL if there are no more items. */
 {
 struct annoStreamDb *self = (struct annoStreamDb *)vSelf;
 if (self->needQuery)
     self->doQuery(self, minChrom, minEnd);
 if (self->mergeBins)
     {
     struct annoRow *aRow = nextRowMergeBins(self, minChrom, minEnd, callerLm);
     if (aRow == NULL && self->needQuery && !self->eof)
 	// Recurse: query, then get next merged/filtered row:
 	return asdNextRow(vSelf, minChrom, minEnd, callerLm);
     else
 	return aRow;
     }
 boolean rightFail = FALSE;
 char **row = nextRowFiltered(self, &rightFail, minChrom, minEnd);
 if (row == NULL)
     {
     if (self->needQuery && !self->eof)
 	// Recurse: query, then get next merged/filtered row:
 	return asdNextRow(vSelf, minChrom, minEnd, callerLm);
     else
 	return NULL;
     }
 return rowToAnnoRow(self, row, rightFail, callerLm);
 }
 
 static void asdClose(struct annoStreamer **pVSelf)
 /* Close db connection and free self. */
 {
 if (pVSelf == NULL)
     return;
 struct annoStreamDb *self = *(struct annoStreamDb **)pVSelf;
 lmCleanup(&(self->qLm));
 freeMem(self->table);
 sqlFreeResult(&(self->sr));
 hFreeConn(&(self->conn));
 annoStreamerFree(pVSelf);
 }
 
 static boolean asdInitBed3Fields(struct annoStreamDb *self)
 /* Use autoSql to figure out which table fields correspond to {chrom, chromStart, chromEnd}. */
 {
 struct annoStreamer *vSelf = &(self->streamer);
 return annoStreamerFindBed3Columns(vSelf, &(self->chromIx), &(self->startIx), &(self->endIx),
 				   &(self->chromField), &(self->startField), &(self->endField));
 }
 
 char *sqlTableIndexOnField(struct sqlConnection *conn, char *table, char *field)
 /* If table has an index that includes field, return the index name, else NULL. */
 {
 char *indexName = NULL;
 char query[512];
 sqlSafef(query, sizeof(query), "show index from %s", table);
 struct sqlResult *sr = sqlGetResult(conn, query);
 char **row;
 while ((row = sqlNextRow(sr)) != NULL)
     {
     if (sameString(row[4], field))
 	{
 	indexName = cloneString(row[2]);
 	break;
 	}
     }
 sqlFreeResult(&sr);
 return indexName;
 }
 
 static boolean isIncrementallyUpdated(char *table)
 // Tables that have rows added to them after initial creation are not completely sorted
 // because of new rows at end, so we have to 'order by'.
 {
 return (sameString(table, "refGene") || sameString(table, "refFlat") ||
 	sameString(table, "xenoRefGene") || sameString(table, "xenoRefFlat") ||
 	sameString(table, "all_mrna") || sameString(table, "xenoMrna") ||
 	sameString(table, "all_est") || sameString(table, "xenoEst") ||
 	sameString(table, "refSeqAli") || sameString(table, "xenoRefSeqAli"));
 }
 
 static boolean isPubsTable(char *table)
 // Not absolutely every pubs* table is unsorted, but most of them are.
 {
 return startsWith("pubs", table);
 }
 
 struct annoStreamer *annoStreamDbNew(char *db, char *table, struct annoAssembly *aa,
 				     struct asObject *asObj, int maxOutRows)
 /* Create an annoStreamer (subclass) object from a database table described by asObj. */
 {
 struct sqlConnection *conn = hAllocConn(db);
 if (!sqlTableExists(conn, table))
     errAbort("annoStreamDbNew: table '%s' doesn't exist in database '%s'", table, db);
 struct annoStreamDb *self = NULL;
 AllocVar(self);
 struct annoStreamer *streamer = &(self->streamer);
 int dbtLen = strlen(db) + strlen(table) + 2;
 char dbTable[dbtLen];
 safef(dbTable, dbtLen, "%s.%s", db, table);
 annoStreamerInit(streamer, aa, asObj, dbTable);
 streamer->rowType = arWords;
 streamer->setRegion = asdSetRegion;
 streamer->nextRow = asdNextRow;
 streamer->close = asdClose;
 self->conn = conn;
 self->table = cloneString(table);
 char *asFirstColumnName = streamer->asObj->columnList->name;
 if (sqlFieldIndex(self->conn, self->table, "bin") == 0)
     {
     self->hasBin = 1;
     self->minFinestBin = binFromRange(0, 1);
     }
 if (self->hasBin && !sameString(asFirstColumnName, "bin"))
     self->omitBin = 1;
 if (!asdInitBed3Fields(self))
     errAbort("annoStreamDbNew: can't figure out which fields of %s.%s to use as "
 	     "{chrom, chromStart, chromEnd}.", db, table);
 self->makeBaselineQuery = asdMakeBaselineQuery;
 // When a table has an index on endField, sometimes the query optimizer uses it
 // and that ruins the sorting.  Fortunately most tables don't anymore.
 self->endFieldIndexName = sqlTableIndexOnField(self->conn, self->table, self->endField);
 self->notSorted = FALSE;
 // Special case: genbank-updated tables are not sorted because new mappings are
 // tacked on at the end.  Max didn't sort the pubs* tables but I hope he will
 // sort the tables for any future tracks.  :)
 if (isIncrementallyUpdated(table) || isPubsTable(table))
     self->notSorted = TRUE;
 self->mergeBins = FALSE;
 self->maxOutRows = maxOutRows;
 self->useMaxOutRows = (maxOutRows > 0);
 self->needQuery = TRUE;
 self->chromList = annoAssemblySeqNames(aa);
 if (slCount(self->chromList) > 1000)
     {
     // Assembly has many sequences (e.g. scaffold-based assembly) --
     // don't break up into per-sequence queries.  Take our chances
     // with mysql being unhappy about the sqlResult being open too long.
     self->doQuery = asdDoQuerySimple;
     self->nextRowRaw = nextRowFromSqlResult;
     }
 else
     {
     // All-chromosome assembly -- if table is large, perform a series of
     // chunked queries.
     self->doQuery = asdDoQueryChunking;
     self->nextRowRaw = nextRowFromBuffer;
     }
 return (struct annoStreamer *)self;
 }