8fc3690db1631b1585c4ee971b3b99accea2d607
angie
  Fri Apr 5 15:51:51 2013 -0700
Performance improvement for annoStreamDb, based on hg/oneShot/timeMysqlHandler.c: instead of using 'SELECT ...ORDER BY' queries to ensure that output is sorted by (chrom, chromStart),
for tables with (chrom, bin) indices, use the fact that coarser bin numbers
are lower numerically than finer bin numbers (for example, with the standard
binRange scheme, the coarsest bin is 0 and the finest bins begin at 585).
For tables like snp137 with > 50M rows of tiny items, the results from a
query that uses the (chrom,bin) index typically include a very few items
from coarser bins, followed by small items from the finest bin levels that
do come out in chromStart order.  So it works out well to save aside the
initial coarse-bin annoRows in a list, and then as soon as we see a row
from the finest bin level, sort the list of coarse-bin items and merge
them into the stream of finest-bin items.  This way we produce coordinates
sorted by chromStart as required by annoGrator, but can start returning
rows very soon after starting the query.
For tables that don't have a (chrom,bin) index, if they have a (chrom, end)
index, tell mysql to ignore that index so it will use the (chrom, start) index
and produce correctly sorted output.
refs #6152

diff --git src/hg/lib/annoStreamDb.c src/hg/lib/annoStreamDb.c
index 64538a7..4e4e27d 100644
--- src/hg/lib/annoStreamDb.c
+++ src/hg/lib/annoStreamDb.c
@@ -1,116 +1,234 @@
 /* annoStreamDb -- subclass of annoStreamer for database tables */
 
 #include "annoStreamDb.h"
 #include "annoGratorQuery.h"
 #include "hdb.h"
 #include "sqlNum.h"
 
 struct annoStreamDb
     {
     struct annoStreamer streamer;	// Parent class members & methods
     // Private members
     struct sqlConnection *conn;		// Database connection (e.g. hg19 or customTrash)
     char *table;			// Table name, must exist in database
     struct sqlResult *sr;		// SQL query result from which we grab rows
-    boolean hasBin;			// 1 if SQL table's first column is bin
-    boolean omitBin;			// 1 if table hasBin and autoSql doesn't have bin
     char *chromField;			// Name of chrom-ish column in table
     char *startField;			// Name of chromStart-ish column in table
     char *endField;			// Name of chromEnd-ish column in table
+    char *endFieldIndexName;		// SQL index on end field, if any (can mess up sorting)
     int chromIx;			// Index of chrom-ish col in autoSql or bin-less table
     int startIx;			// Index of chromStart-ish col in autoSql or bin-less table
     int endIx;				// Index of chromEnd-ish col in autoSql or bin-less table
-    boolean hasEndFieldIndex;		// TRUE if index on end field (can mess up sorting)
     boolean notSorted;			// TRUE if table is not sorted (e.g. genbank-updated)
+    boolean hasBin;			// 1 if SQL table's first column is bin
+    boolean omitBin;			// 1 if table hasBin and autoSql doesn't have bin
+    boolean mergeBins;			// TRUE if query results will be in bin order
+    struct annoRow *bigItemQueue;	// If mergeBins, accumulate coarse-bin items here
+    struct annoRow *smallItemQueue;	// Max 1 item for merge-sorting with bigItemQueue
+    int minFinestBin;			// Smallest bin number for finest bin level
+    boolean gotFinestBin;		// Flag that it's time to merge-sort with bigItemQueue
     };
 
 static void asdSetRegion(struct annoStreamer *vSelf, char *chrom, uint regionStart, uint regionEnd)
 /* Set region -- and free current sqlResult if there is one. */
 {
 annoStreamerSetRegion(vSelf, chrom, regionStart, regionEnd);
 struct annoStreamDb *self = (struct annoStreamDb *)vSelf;
 if (self->sr != NULL)
     sqlFreeResult(&(self->sr));
 }
 
 static void asdDoQuery(struct annoStreamDb *self)
 /* Return a sqlResult for a query on table items in position range. */
 // NOTE: it would be possible to implement filters at this level, as in hgTables.
 {
 struct annoStreamer *streamer = &(self->streamer);
 struct dyString *query = dyStringCreate("select * from %s", self->table);
 if (!streamer->positionIsGenome)
     {
+    if (self->hasBin)
+	{
+	// Results will be in bin order, but we can restore chromStart order by
+	// accumulating initial coarse-bin items and merge-sorting them with
+	// subsequent finest-bin items which will be in chromStart order.
+	self->mergeBins = TRUE;
+	self->bigItemQueue = self->smallItemQueue = NULL;
+	self->gotFinestBin = FALSE;
+	}
+    if (self->endFieldIndexName != NULL)
+	// Don't let mysql use a (chrom, chromEnd) index because that messes up
+	// sorting by chromStart.
+	dyStringPrintf(query, " IGNORE INDEX (%s)", self->endFieldIndexName);
     dyStringPrintf(query, " where %s='%s'", self->chromField, streamer->chrom);
     int chromSize = hashIntVal(streamer->query->chromSizes, streamer->chrom);
     if (streamer->regionStart != 0 || streamer->regionEnd != chromSize)
 	{
 	dyStringAppend(query, " and ");
 	if (self->hasBin)
 	    hAddBinToQuery(streamer->regionStart, streamer->regionEnd, query);
 	dyStringPrintf(query, "%s < %u and %s > %u", self->startField, streamer->regionEnd,
 		       self->endField, streamer->regionStart);
 	}
-    if (self->hasEndFieldIndex || self->notSorted)
+    if (self->notSorted)
 	dyStringPrintf(query, " order by %s", self->startField);
     }
 else if (self->notSorted)
     dyStringPrintf(query, " order by %s,%s", self->chromField, self->startField);
 struct sqlResult *sr = sqlGetResult(self->conn, query->string);
 dyStringFree(&query);
 self->sr = sr;
 }
 
-static char **nextRowUnfiltered(struct annoStreamDb *self)
-/* Fetch the next row from the sqlResult, and skip past table's bin field if necessary. */
+static char **nextRowFiltered(struct annoStreamDb *self, boolean *retRightFail)
+/* Skip past any left-join failures until we get a right-join failure, a passing row,
+ * or end of data.  Return row or NULL, and return right-join fail status via retRightFail. */
 {
+int numCols = self->streamer.numCols;
 char **row = sqlNextRow(self->sr);
-if (row == NULL)
-    return NULL;
-// Skip table's bin field if not in autoSql:
+boolean rightFail = FALSE;
+struct annoFilter *filterList = self->streamer.filters;
+while (row && annoFilterRowFails(filterList, row+self->omitBin, numCols, &rightFail))
+    {
+    if (rightFail)
+	break;
+    row = sqlNextRow(self->sr);
+    }
+*retRightFail = rightFail;
+return row;
+}
+
+static struct annoRow *rowToAnnoRow(struct annoStreamDb *self, char **row, boolean rightFail)
+/* Extract coords from row and return an annoRow including right-fail status. */
+{
 row += self->omitBin;
+char *chrom = row[self->chromIx];
+uint chromStart = sqlUnsigned(row[self->startIx]);
+uint chromEnd = sqlUnsigned(row[self->endIx]);
+return annoRowFromStringArray(chrom, chromStart, chromEnd, rightFail, row,
+			      self->streamer.numCols);
+}
+
+static char **getFinestBinItem(struct annoStreamDb *self, char **row, boolean *pRightFail)
+/* If row is a coarse-bin item, add it to bigItemQueue, get the next row(s) and
+ * add any subsequent coarse-bin items to bigItemQueue.  As soon as we get an item from a
+ * finest-level bin (or NULL), sort the bigItemQueue and return the finest-bin item/row. */
+{
+int bin = atoi(row[0]);
+while (bin < self->minFinestBin)
+    {
+    // big item -- store aside in queue for merging later, move on to next item
+    slAddHead(&(self->bigItemQueue), rowToAnnoRow(self, row, *pRightFail));
+    *pRightFail = FALSE;
+    row = nextRowFiltered(self, pRightFail);
+    if (row == NULL)
+	break;
+    bin = atoi(row[0]);
+    }
+// First finest-bin item!  Sort bigItemQueue in preparation for merging:
+self->gotFinestBin = TRUE;
+slReverse(&(self->bigItemQueue));
+slSort(&(self->bigItemQueue), annoRowCmp);
+return row;
+}
+
+static struct annoRow *mergeRow(struct annoStreamDb *self, struct annoRow *aRow)
+/* Compare head of bigItemQueue with (finest-bin) aRow; return the one with
+ * lower chromStart and save the other for later.  */
+{
+if (self->bigItemQueue == NULL)
+    {
+    // No coarse-bin items to merge-sort, just stream finest-bin items from here on out.
+    self->mergeBins = FALSE;
+    return aRow;
+    }
+else
+    {
+    if (annoRowCmp(&(self->bigItemQueue), &aRow) < 0)
+	{
+	// Big item gets to go now, so save aside small item for next time.
+	slAddHead(&(self->smallItemQueue), aRow);
+	return slPopHead(&(self->bigItemQueue));
+	}
+    else
+	return aRow;
+    }
+}
+
+static struct annoRow *nextQueuedRow(struct annoStreamDb *self)
+// Return the head of either bigItemQueue or smallItemQueue, depending on which has
+// the lower chromStart.
+{
+struct annoRow *row = NULL;
+if (self->bigItemQueue && annoRowCmp(&(self->bigItemQueue), &(self->smallItemQueue)) < 0)
+    row = slPopHead(&(self->bigItemQueue));
+else
+    row = slPopHead(&(self->smallItemQueue));
+if (self->bigItemQueue == NULL && self->smallItemQueue == NULL)
+    // All done merge-sorting, just stream finest-bin items from here on out.
+    self->mergeBins = FALSE;
 return row;
 }
 
+static struct annoRow *nextRowMergeBins(struct annoStreamDb *self)
+/* Fetch the next filtered row from mysql, merge-sorting coarse-bin items into finest-bin
+ * items to maintain chromStart ordering. */
+{
+assert(self->mergeBins && self->hasBin);
+if (self->smallItemQueue)
+    // In this case we have already begun merge-sorting; don't pull a new row from mysql,
+    // use the queues.  This should keep smallItemQueue's max depth at 1.
+    return nextQueuedRow(self);
+else
+    {
+    // We might need to collect initial coarse-bin items, or might already be merge-sorting.
+    boolean rightFail = FALSE;
+    char **row = nextRowFiltered(self, &rightFail);
+    if (row && !self->gotFinestBin)
+	{
+	// We are just starting -- queue up coarse-bin items, if any, until we get the first
+	// finest-bin item.
+	row = getFinestBinItem(self, row, &rightFail);
+	}
+    // Time to merge-sort finest-bin items from mysql with coarse-bin items from queue.
+    if (row != NULL)
+	{
+	struct annoRow *aRow = rowToAnnoRow(self, row, rightFail);
+	return mergeRow(self, aRow);
+	}
+    else
+	return slPopHead(&(self->bigItemQueue));
+    }
+}
+
 static struct annoRow *asdNextRow(struct annoStreamer *vSelf)
 /* Perform sql query if we haven't already and return a single
  * annoRow, or NULL if there are no more items. */
 {
 struct annoStreamDb *self = (struct annoStreamDb *)vSelf;
 if (self->sr == NULL)
     asdDoQuery(self);
 if (self->sr == NULL)
     // This is necessary only if we're using sqlStoreResult in asdDoQuery, harmless otherwise:
     return NULL;
-char **row = nextRowUnfiltered(self);
-if (row == NULL)
-    return NULL;
-// Skip past any left-join failures until we get a right-join failure, a passing row, or EOF.
+if (self->mergeBins)
+    return nextRowMergeBins(self);
 boolean rightFail = FALSE;
-while (annoFilterRowFails(vSelf->filters, row, vSelf->numCols, &rightFail))
-    {
-    if (rightFail)
-	break;
-    row = nextRowUnfiltered(self);
+char **row = nextRowFiltered(self, &rightFail);
     if (row == NULL)
 	return NULL;
-    }
-char *chrom = row[self->chromIx];
-uint chromStart = sqlUnsigned(row[self->startIx]);
-uint chromEnd = sqlUnsigned(row[self->endIx]);
-return annoRowFromStringArray(chrom, chromStart, chromEnd, rightFail, row, vSelf->numCols);
+return rowToAnnoRow(self, row, rightFail);
 }
 
 static void asdClose(struct annoStreamer **pVSelf)
 /* Close db connection and free self. */
 {
 if (pVSelf == NULL)
     return;
 struct annoStreamDb *self = *(struct annoStreamDb **)pVSelf;
 // Let the caller close conn; it might be from a cache.
 freeMem(self->table);
 sqlFreeResult(&(self->sr));
 hFreeConn(&(self->conn));
 annoStreamerFree(pVSelf);
 }
 
@@ -138,65 +256,66 @@
 
 static boolean asdInitBed3Fields(struct annoStreamDb *self)
 /* Use autoSql to figure out which table fields correspond to {chrom, chromStart, chromEnd}. */
 {
 if (asHasFields(self, "chrom", "chromStart", "chromEnd"))
     return TRUE;
 if (asHasFields(self, "chrom", "txStart", "txEnd"))
     return TRUE;
 if (asHasFields(self, "tName", "tStart", "tEnd"))
     return TRUE;
 if (asHasFields(self, "genoName", "genoStart", "genoEnd"))
     return TRUE;
 return FALSE;
 }
 
-boolean sqlTableHasIndexOn(struct sqlConnection *conn, char *table, char *field)
-/* Return TRUE if table has an index that includes field. */
+char *sqlTableIndexOnField(struct sqlConnection *conn, char *table, char *field)
+/* If table has an index that includes field, return the index name, else NULL. */
 {
-boolean foundIndex = FALSE;
+char *indexName = NULL;
 char query[512];
 safef(query, sizeof(query), "show index from %s", table);
 struct sqlResult *sr = sqlGetResult(conn, query);
 char **row;
 while ((row = sqlNextRow(sr)) != NULL)
     {
     if (sameString(row[4], field))
 	{
-	foundIndex = TRUE;
+	indexName = cloneString(row[2]);
 	break;
 	}
     }
 sqlFreeResult(&sr);
-return foundIndex;
+return indexName;
 }
 
 struct annoStreamer *annoStreamDbNew(char *db, char *table, struct asObject *asObj)
 /* Create an annoStreamer (subclass) object from a database table described by asObj. */
 {
 struct sqlConnection *conn = hAllocConn(db);
 if (!sqlTableExists(conn, table))
     errAbort("annoStreamDbNew: table '%s' doesn't exist in database '%s'", table, db);
 struct annoStreamDb *self = NULL;
 AllocVar(self);
 struct annoStreamer *streamer = &(self->streamer);
 annoStreamerInit(streamer, asObj);
 streamer->rowType = arWords;
 streamer->setRegion = asdSetRegion;
 streamer->nextRow = asdNextRow;
 streamer->close = asdClose;
 self->conn = conn;
 self->table = cloneString(table);
 char *asFirstColumnName = streamer->asObj->columnList->name;
 if (sqlFieldIndex(self->conn, self->table, "bin") == 0)
     self->hasBin = 1;
 if (self->hasBin && !sameString(asFirstColumnName, "bin"))
     self->omitBin = 1;
 if (!asdInitBed3Fields(self))
     errAbort("annoStreamDbNew: can't figure out which fields of %s to use as "
 	     "{chrom, chromStart, chromEnd}.", table);
 // When a table has an index on endField, sometimes the query optimizer uses it
 // and that ruins the sorting.  Fortunately most tables don't anymore.
-self->hasEndFieldIndex = sqlTableHasIndexOn(self->conn, self->table, self->endField);
-self->notSorted = TRUE; // True for more tables than I counted on, e.g. snp135 (bc it's packed??)
+self->endFieldIndexName = sqlTableIndexOnField(self->conn, self->table, self->endField);
+self->notSorted = FALSE;
+self->mergeBins = FALSE;
 return (struct annoStreamer *)self;
 }