7a5ae9c2c1d5519793a4bdd99cca3a911c14b68e
angie
  Fri Nov 18 16:36:51 2016 -0800
Added support for split tables to annoStreamDb since ChrisV found it was missing which broke some hgIntegrator queries in hg18 and older dbs.
refs #18397

diff --git src/hg/lib/annoStreamDb.c src/hg/lib/annoStreamDb.c
index ce6b30c..a2f61da 100644
--- src/hg/lib/annoStreamDb.c
+++ src/hg/lib/annoStreamDb.c
@@ -7,31 +7,32 @@
 #include "annoGratorQuery.h"
 #include "binRange.h"
 #include "hAnno.h"
 #include "joinMixer.h"
 #include "hdb.h"
 #include "obscure.h"
 #include "sqlNum.h"
 
 struct annoStreamDb
     {
     struct annoStreamer streamer;	// Parent class members & methods
     // Private members
     char *db;                           // Database name (e.g. hg19 or customTrash)
     struct sqlConnection *conn;	        // Database connection
     struct sqlResult *sr;		// SQL query result from which we grab rows
-    char *table;			// Table name, must exist in database
+    char *trackTable;			// Name of database table (or root name if split tables)
+    char *table;			// If split, chr..._trackTable; otherwise same as trackTable
     char *baselineQuery;                // SQL query without position constraints
     boolean baselineQueryHasWhere;      // True if baselineQuery contains filter or join clauses
 
     // These members enable us to extract coords from the otherwise unknown row:
     char *chromField;			// Name of chrom-ish column in table
     char *startField;			// Name of chromStart-ish column in table
     char *endField;			// Name of chromEnd-ish column in table
     int chromIx;			// Index of chrom-ish col in autoSql or bin-less table
     int startIx;			// Index of chromStart-ish col in autoSql or bin-less table
     int endIx;				// Index of chromEnd-ish col in autoSql or bin-less table
 
     // These members enable us to produce {chrom, start}-sorted output:
     char *endFieldIndexName;		// SQL index on end field, if any (can mess up sorting)
     boolean notSorted;			// TRUE if table is not sorted (e.g. genbank-updated)
     boolean hasBin;			// 1 if SQL table's first column is bin
@@ -92,38 +93,30 @@
     {
     struct annoFilter filter;            // parent class
     struct joinerDtf *dtf;               // {database, table, field} in case this is from
                                          // some table to be joined with the main table
     };
 
 // For performance reasons, even if !useMaxItems (no limit), we need to limit the
 // number of rows that are returned from a query, so we can slurp them into memory and
 // close the sqlResult before mysql gets unhappy about the result being open so long.
 #define ASD_CHUNK_SIZE 100000
 
 #define JOINER_FILE "all.joiner"
 
 static const boolean asdDebug = FALSE;
 
-// Potential optimization: call asdMakeBaselineQuery in asdSetRegion, and scale expectedRows
-// by the proportion of the genome that the search region (if one is defined) covers.
-// Pitfall: if we are doing a list of regions, and don't know that from the start, then
-// we would underestimate overall coverage, and might waste a lot of time rebuilding
-// and reloading hashJoins unless we recycle them.
-// Since annoStreamDb does not internally handle split tables yet, hgIntegrator
-// is inefficiently making brand new streamers for each region anyway.  Fix that first.
-
 static void resetMergeState(struct annoStreamDb *self)
 /* Reset or initialize members that track merging of coarse-bin items with fine-bin items. */
 {
 self->mergeBins = FALSE;
 self->bigItemQueue = self->smallItemQueue = NULL;
 lmCleanup(&(self->qLm));
 self->gotFinestBin = FALSE;
 }
 
 static void resetRowBuf(struct rowBuf *rowBuf)
 /* Reset temporary storage for chunked query rows. */
 {
 rowBuf->buf = NULL;
 rowBuf->size = 0;
 rowBuf->ix = 0;
@@ -145,141 +138,186 @@
 {
 self->mergeBins = TRUE;
 self->gotFinestBin = FALSE;
 if (self->qLm == NULL)
     self->qLm = lmInit(0);
 }
 
 static void resetQueryState(struct annoStreamDb *self)
 /* Free sqlResult if there is one, and reset state associated with the current query. */
 {
 sqlFreeResult(&(self->sr));
 resetMergeState(self);
 resetChunkState(self);
 }
 
+// Forward declaration in order to avoid moving lots of code:
+static void asdUpdateBaselineQuery(struct annoStreamDb *self);
+/* Build a dy SQL query with no position constraints (select ... from ...)
+ * possibly including joins and filters if specified (where ...), using the current splitTable. */
+
 static void asdSetRegion(struct annoStreamer *vSelf, char *chrom, uint regionStart, uint regionEnd)
 /* Set region -- and free current sqlResult if there is one. */
 {
 annoStreamerSetRegion(vSelf, chrom, regionStart, regionEnd);
-resetQueryState((struct annoStreamDb *)vSelf);
+struct annoStreamDb *self = (struct annoStreamDb *)vSelf;
+// If splitTable differs from table, use new chrom in splitTable:
+if (!sameString(self->table, self->trackTable))
+    {
+    char newSplitTable[PATH_LEN];
+    safef(newSplitTable, sizeof(newSplitTable), "%s_%s", chrom, self->trackTable);
+    freeMem(self->table);
+    self->table = cloneString(newSplitTable);
+    }
+resetQueryState(self);
+asdUpdateBaselineQuery(self);
 }
 
 static char **nextRowFromSqlResult(struct annoStreamDb *self)
 /* Stream rows directly from self->sr. */
 {
 return sqlNextRow(self->sr);
 }
 
+INLINE boolean useSplitTable(struct annoStreamDb *self, struct joinerDtf *dtf)
+/* Return TRUE if dtf matches self->{db,table} and table is split. */
+{
+return (sameString(dtf->database, self->db) &&
+        sameString(dtf->table, self->trackTable) &&
+        !sameString(self->table, self->trackTable));
+}
+
 static void appendFieldList(struct annoStreamDb *self, struct dyString *query)
 /* Append SQL field list to query. */
 {
 struct joinerDtf *fieldList = self->joinMixer ? self->joinMixer->sqlFieldList :
                                                 self->mainTableDtfList;
 struct joinerDtf *dtf;
 for (dtf = fieldList;  dtf != NULL;  dtf = dtf->next)
     {
     if (dtf != fieldList)
         dyStringAppendC(query, ',');
+    if (useSplitTable(self, dtf))
+        dyStringPrintf(query, "%s.%s", self->table, dtf->field);
+    else
+        {
         char dtfString[PATH_LEN];
         joinerDtfToSqlFieldString(dtf, self->db, dtfString, sizeof(dtfString));
         dyStringAppend(query, dtfString);
         }
     }
+}
 
 static void ignoreEndIndexIfNecessary(struct annoStreamDb *self, char *dbTable,
                                       struct dyString *query)
 /* Don't let mysql use a (chrom, chromEnd) index because that messes up sorting by chromStart. */
 {
-if (sameString(dbTable, self->table) && self->endFieldIndexName != NULL)
+if (sameString(dbTable, self->trackTable) && self->endFieldIndexName != NULL)
     sqlDyStringPrintf(query, " IGNORE INDEX (%s) ", self->endFieldIndexName);
 }
 
 static void appendOneTable(struct annoStreamDb *self, struct joinerDtf *dt, struct dyString *query)
-/* Add the (db.)table string from dt to query. */
+/* Add the (db.)table string from dt to query; if dt is NULL or table is split then
+ * use self->table. */
 {
 char dbTable[PATH_LEN];
+if (dt == NULL || useSplitTable(self, dt))
+    safecpy(dbTable, sizeof(dbTable), self->table);
+else
     joinerDtfToSqlTableString(dt, self->db, dbTable, sizeof(dbTable));
 dyStringAppend(query, dbTable);
 ignoreEndIndexIfNecessary(self, dbTable, query);
 }
 
+INLINE void splitOrDtfToSqlField(struct annoStreamDb *self, struct joinerDtf *dtf,
+                                 char *fieldName, size_t fieldNameSize)
+/* Write [db].table.field into fieldName, where table may be split. */
+{
+if (useSplitTable(self, dtf))
+    safef(fieldName, fieldNameSize, "%s.%s", self->table, dtf->field);
+else
+    joinerDtfToSqlFieldString(dtf, self->db, fieldName, fieldNameSize);
+}
+
 static boolean appendTableList(struct annoStreamDb *self, struct dyString *query)
 /* Append SQL table list to query, including tables used for output, filtering and joining. */
 {
 boolean hasLeftJoin = FALSE;
 if (self->joinMixer == NULL || self->joinMixer->sqlRouteList == NULL)
-    {
-    dyStringAppend(query, self->table);
-    ignoreEndIndexIfNecessary(self, self->table, query);
-    }
+    appendOneTable(self, NULL, query);
 else
     {
     // Use both a and b of the first pair and only b of each subsequent pair
     appendOneTable(self, self->joinMixer->sqlRouteList->a, query);
     struct joinerPair *jp;
     for (jp = self->joinMixer->sqlRouteList;  jp != NULL;  jp = jp->next)
         {
         dyStringAppend(query, " left join ");
         appendOneTable(self, jp->b, query);
         char fieldA[PATH_LEN], fieldB[PATH_LEN];
-        joinerDtfToSqlFieldString(jp->a, self->db, fieldA, sizeof(fieldA));
-        joinerDtfToSqlFieldString(jp->b, self->db, fieldB, sizeof(fieldB));
+        splitOrDtfToSqlField(self, jp->a, fieldA, sizeof(fieldA));
+        splitOrDtfToSqlField(self, jp->b, fieldB, sizeof(fieldB));
         struct joinerField *jfA = joinerSetFindField(jp->identifier, jp->a);
         if (sameOk(jfA->separator, ","))
             dyStringPrintf(query, " on find_in_set(%s, %s)", fieldB, fieldA);
         else
             dyStringPrintf(query, " on %s = %s", fieldA, fieldB);
         hasLeftJoin = TRUE;
         }
     }
 return hasLeftJoin;
 }
 
 // libify?
 static struct joinerDtf *joinerDtfCloneList(struct joinerDtf *listIn)
 /* Return a list with cloned items of listIn. */
 {
 struct joinerDtf *listOut = NULL, *item;
 for (item = listIn;  item != NULL;  item = item->next)
     slAddHead(&listOut, joinerDtfClone(item));
 slReverse(&listOut);
 return listOut;
 }
 
-static void asdMakeBaselineQuery(struct annoStreamDb *self)
+static void asdInitBaselineQuery(struct annoStreamDb *self)
 /* Build a dy SQL query with no position constraints (select ... from ...)
  * possibly including joins and filters if specified (where ...). */
 {
 if (self->relatedDtfList)
     {
     struct joinerDtf *outputFieldList = slCat(joinerDtfCloneList(self->mainTableDtfList),
                                               joinerDtfCloneList(self->relatedDtfList));
     if (self->joiner == NULL)
         self->joiner = joinerRead(JOINER_FILE);
     int expectedRows = sqlRowCount(self->conn, self->table);
     self->joinMixer = joinMixerNew(self->joiner, self->db, self->table, outputFieldList,
                                    expectedRows, self->naForMissing);
     self->sqlRowSize = slCount(self->joinMixer->sqlFieldList);
     self->bigRowSize = self->joinMixer->bigRowSize;
     joinerDtfFreeList(&outputFieldList);
     }
 else
     {
     self->sqlRowSize = slCount(self->mainTableDtfList);
     self->bigRowSize = self->sqlRowSize;
     }
+}
+
+static void asdUpdateBaselineQuery(struct annoStreamDb *self)
+/* Build a dy SQL query with no position constraints (select ... from ...)
+ * possibly including joins and filters if specified (where ...), using the current splitTable. */
+{
 struct dyString *query = sqlDyStringCreate("select ");
 appendFieldList(self, query);
 dyStringAppend(query, " from ");
 self->hasLeftJoin = appendTableList(self, query);
 boolean hasWhere = FALSE;
 self->baselineQuery = dyStringCannibalize(&query);
 self->baselineQueryHasWhere = hasWhere;
 // Don't free joiner; we need its storage of joinerFields.
 }
 
 static void addBinToQuery(struct annoStreamDb *self, uint start, uint end, struct dyString *query)
 /* If applicable, add bin range constraints to query with explicit table name, in case we're
  * joining with another table that has a bin column. */
 {
 if (self->hasBin)
@@ -853,31 +891,31 @@
     if (self->needQuery && !self->eof)
 	// Recurse: query, then get next merged/filtered row:
 	return asdNextRow(vSelf, minChrom, minEnd, callerLm);
     else
 	return NULL;
     }
 return rowToAnnoRow(self, row, rightFail, callerLm);
 }
 
 
 static void makeMainTableDtfList(struct annoStreamDb *self, struct asObject *mainAsObj)
 /* Make a list of mainTable columns. */
 {
 struct joinerDtf mainDtf;
 mainDtf.database = self->db;
-mainDtf.table = self->table;
+mainDtf.table = self->trackTable;
 struct asColumn *col;
 for (col = mainAsObj->columnList;  col != NULL;  col = col->next)
     {
     mainDtf.field = col->name;
     slAddHead(&self->mainTableDtfList, joinerDtfClone(&mainDtf));
     }
 slReverse(&self->mainTableDtfList);
 // If table has bin but asObj does not, add bin to head of mainTableDtfList.
 if (self->hasBin && differentString("bin", self->mainTableDtfList->field))
     {
     mainDtf.field = "bin";
     slAddHead(&self->mainTableDtfList, joinerDtfClone(&mainDtf));
     }
 }
 
@@ -967,71 +1005,72 @@
 dyStringPrintf(dy, "  %s; \"%s\"\n", colName, col->comment);
 // Store plain old dotted triple in dtfNames in case we need to look it up later.
 char dtfString[PATH_LEN];
 makeDottedTriple(dtfString, sizeof(dtfString), dtf->database, dtf->table, dtf->field);
 hashAdd(dtfNames, dtfString, NULL);
 }
 
 static struct asObject *asdAutoSqlFromTableFields(struct annoStreamDb *self,
                                                   struct asObject *mainAsObj)
 /* Get autoSql for each table in self->relatedDtfList and append the columns
  * included in self->relatedDtfList to the main table asObj columns. */
 {
 struct dyString *newAsText = dyStringCreate("table %sCustom\n"
                                             "\"query based on %s with customized fields.\"\n"
                                             "    (",
-                                            self->table, self->table);
+                                            self->trackTable, self->trackTable);
 // Use a hash of table to asObject so we fetch autoSql only once per table.
 struct hash *asObjCache = hashNew(0);
 // Use a hash of dtf strings to test whether or not one has been added already.
 struct hash *dtfNames = hashNew(0);
 // Start with all columns of main table:
 struct joinerDtf mainDtf;
 mainDtf.database = self->db;
-mainDtf.table = self->table;
+mainDtf.table = self->trackTable;
 struct asColumn *col;
 for (col = mainAsObj->columnList;  col != NULL;  col = col->next)
     {
     mainDtf.field = col->name;
-    addOneColumn(newAsText, &mainDtf, self->db, self->table, col, dtfNames);
+    addOneColumn(newAsText, &mainDtf, self->db, self->trackTable, col, dtfNames);
     }
 // Append fields from related tables:
 struct joinerDtf *dtf;
 for (dtf = self->relatedDtfList;  dtf != NULL;  dtf = dtf->next)
     {
     struct asObject *asObj = asObjForDtf(asObjCache, dtf);
     struct asColumn *col = asColumnFind(asObj, dtf->field);
     if (col == NULL)
         errAbort("annoStreamDb: Can't find column %s in autoSql for table %s.%s",
                  dtf->field, dtf->database, dtf->table);
-    addOneColumn(newAsText, dtf, self->db, self->table, col, dtfNames);
+    addOneColumn(newAsText, dtf, self->db, self->trackTable, col, dtfNames);
     }
 dyStringAppendC(newAsText, ')');
 struct asObject *newAsObj = asParseText(newAsText->string);
 hashFreeWithVals(&asObjCache, asObjectFree);
 dyStringFree(&newAsText);
 freeHashAndVals(&dtfNames);
 return newAsObj;
 }
 
 static void asdClose(struct annoStreamer **pVSelf)
 /* Close db connection and free self. */
 {
 if (pVSelf == NULL)
     return;
 struct annoStreamDb *self = *(struct annoStreamDb **)pVSelf;
 lmCleanup(&(self->qLm));
+freeMem(self->trackTable);
 freeMem(self->table);
 slNameFreeList(&self->chromList);
 joinerDtfFreeList(&self->mainTableDtfList);
 joinerDtfFreeList(&self->relatedDtfList);
 joinerFree(&self->joiner);
 joinMixerFree(&self->joinMixer);
 sqlFreeResult(&(self->sr));
 hFreeConn(&(self->conn));
 annoStreamerFree(pVSelf);
 }
 
 static boolean asdInitBed3Fields(struct annoStreamDb *self)
 /* Use autoSql to figure out which table fields correspond to {chrom, chromStart, chromEnd}. */
 {
 struct annoStreamer *vSelf = &(self->streamer);
@@ -1056,57 +1095,53 @@
     }
 if (sameOk(indexNameA, indexNameB))
     indexNameA = NULL;
 sqlFreeResult(&sr);
 return indexNameA;
 }
 
 static boolean isIncrementallyUpdated(char *table)
 // Tables that have rows added to them after initial creation are not completely sorted
 // because of new rows at end, so we have to 'order by'.
 {
 return (sameString(table, "refGene") || sameString(table, "refFlat") ||
 	sameString(table, "xenoRefGene") || sameString(table, "xenoRefFlat") ||
 	sameString(table, "all_mrna") || sameString(table, "xenoMrna") ||
 	sameString(table, "all_est") || sameString(table, "xenoEst") ||
+        sameString(table, "intronEst") ||
 	sameString(table, "refSeqAli") || sameString(table, "xenoRefSeqAli"));
 }
 
 static boolean isPubsTable(char *table)
 // Not absolutely every pubs* table is unsorted, but most of them are.
 {
 return startsWith("pubs", table);
 }
 
 static struct asObject *asdParseConfig(struct annoStreamDb *self, struct jsonElement *configEl)
-/* Extract the autoSql for self->table from the database.
+/* Extract the autoSql for self->trackTable from the database.
  * If configEl is not NULL, expect it to be a description of related tables and fields like this:
  * config = { "relatedTables": [ { "table": "hg19.kgXref",
  *                                 "fields": ["geneSymbol", "description"] },
  *                               { "table": "hg19.knownCanonical",
  *                                 "fields": ["clusterId"] }
  *                             ] }
  * If so, unpack the [db.]tables and fields into self->relatedDtfList and append autoSql
  * column descriptions for each field to the autoSql object that describes our output.
  * It might also have "naForMissing": true/false; if so, set self->naForMissing. */
 {
-//#*** TODO: hAnnoGetAutoSqlForDbTable should do its own split-table checking
-char maybeSplitTable[HDB_MAX_TABLE_STRING];
-if (!hFindSplitTable(self->db, NULL, self->table, maybeSplitTable, NULL))
-    errAbort("annoStreamDbNew: can't find table (or split table) for '%s.%s'",
-             self->db, self->table);
-struct asObject *asObj = hAnnoGetAutoSqlForDbTable(self->db, maybeSplitTable, NULL, TRUE);
+struct asObject *asObj = hAnnoGetAutoSqlForDbTable(self->db, self->trackTable, NULL, TRUE);
 makeMainTableDtfList(self, asObj);
 if (configEl != NULL)
     {
     struct hash *config = jsonObjectVal(configEl, "config");
     struct jsonElement *relatedTablesEl = hashFindVal(config, "relatedTables");
     if (relatedTablesEl)
         {
         // relatedTables is a list of objects like { table: <[db.]table name>,
         //                                           fields: [ <field1>, <field2>, ...] }
         struct slRef *relatedTables = jsonListVal(relatedTablesEl, "relatedTables");
         struct slRef *tfRef;
         for (tfRef = relatedTables;  tfRef != NULL;  tfRef = tfRef->next)
             {
             struct jsonElement *dbTableFieldEl = tfRef->val;
             struct hash *tfObj = jsonObjectVal(dbTableFieldEl,
@@ -1194,79 +1229,82 @@
  * If config is NULL, then the streamer produces output from all fields
  * (except bin, unless table's autoSql includes bin).
  * Otherwise, config is a json object with a member 'relatedTables' that specifies
  * related tables and fields to join with table, for example:
  * config = { "relatedTables": [ { "table": "hg19.kgXref",
  *                                 "fields": ["geneSymbol", "description"] },
  *                               { "table": "hg19.knownCanonical",
  *                                 "fields": ["clusterId"] }
  *                             ] }
  * -- the streamer's autoSql will be constructed by appending autoSql column
  * descriptions to the columns of table.
  * Caller may free db, and table when done with them, but must keep the
  * annoAssembly aa alive for the lifetime of the returned annoStreamer. */
 {
 struct sqlConnection *conn = hAllocConn(db);
-if (!sqlTableExists(conn, table))
-    errAbort("annoStreamDbNew: table '%s' doesn't exist in database '%s'", table, db);
+char splitTable[HDB_MAX_TABLE_STRING];
+if (!hFindSplitTable(db, NULL, table, splitTable, NULL))
+    errAbort("annoStreamDbNew: can't find table (or split table) for '%s.%s'", db, table);
 struct annoStreamDb *self = NULL;
 AllocVar(self);
 self->conn = conn;
 self->db = cloneString(db);
-self->table = cloneString(table);
+self->trackTable = cloneString(table);
+self->table = cloneString(splitTable);
 if (sqlFieldIndex(self->conn, self->table, "bin") == 0)
     {
     self->hasBin = 1;
     self->minFinestBin = binFromRange(0, 1);
     }
 struct asObject *asObj = asdParseConfig(self, configEl);
 struct annoStreamer *streamer = &(self->streamer);
 int dbtLen = strlen(db) + strlen(table) + 2;
 char streamerName[dbtLen];
 safef(streamerName, sizeof(streamerName), "%s.%s", db, table);
 annoStreamerInit(streamer, aa, asObj, streamerName);
 streamer->rowType = arWords;
 streamer->setRegion = asdSetRegion;
 streamer->nextRow = asdNextRow;
 streamer->close = asdClose;
 char *asFirstColumnName = streamer->asObj->columnList->name;
 if (self->hasBin && !sameString(asFirstColumnName, "bin"))
     self->omitBin = 1;
 if (!asdInitBed3Fields(self))
     errAbort("annoStreamDbNew: can't figure out which fields of %s.%s to use as "
-	     "{chrom, chromStart, chromEnd}.", db, table);
+	     "{chrom, chromStart, chromEnd}.", db, self->table);
 // When a table has an index on endField (not startField), sometimes the query optimizer uses it
 // and that ruins the sorting.  Fortunately most tables don't anymore.
 self->endFieldIndexName = sqlTableIndexOnFieldANotB(self->conn, self->table, self->endField,
                                                     self->startField);
 self->notSorted = FALSE;
 // Special case: genbank-updated tables are not sorted because new mappings are
 // tacked on at the end.  Max didn't sort the pubs* tables but I hope he will
 // sort the tables for any future tracks.  :)
 if (isIncrementallyUpdated(table) || isPubsTable(table))
     self->notSorted = TRUE;
 self->mergeBins = FALSE;
 self->maxOutRows = maxOutRows;
 self->useMaxOutRows = (maxOutRows > 0);
 self->needQuery = TRUE;
 self->chromList = annoAssemblySeqNames(aa);
 if (slCount(self->chromList) > 1000)
     {
     // Assembly has many sequences (e.g. scaffold-based assembly) --
     // don't break up into per-sequence queries.  Take our chances
     // with mysql being unhappy about the sqlResult being open too long.
     self->doQuery = asdDoQuerySimple;
     self->nextRowRaw = nextRowFromSqlResult;
     }
 else
     {
     // All-chromosome assembly -- if table is large, perform a series of
     // chunked queries.
     self->doQuery = asdDoQueryChunking;
     self->nextRowRaw = nextRowFromBuffer;
     }
-asdMakeBaselineQuery(self);
+asdInitBaselineQuery(self);
+asdUpdateBaselineQuery(self);
 struct annoStreamer *sSelf = (struct annoStreamer *)self;
 if (asdDebug)
     sSelf->getHeader = asdGetHeader;
 return sSelf;
 }