7a5ae9c2c1d5519793a4bdd99cca3a911c14b68e angie Fri Nov 18 16:36:51 2016 -0800 Added support for split tables to annoStreamDb since ChrisV found it was missing which broke some hgIntegrator queries in hg18 and older dbs. refs #18397 diff --git src/hg/lib/annoStreamDb.c src/hg/lib/annoStreamDb.c index ce6b30c..a2f61da 100644 --- src/hg/lib/annoStreamDb.c +++ src/hg/lib/annoStreamDb.c @@ -7,31 +7,32 @@ #include "annoGratorQuery.h" #include "binRange.h" #include "hAnno.h" #include "joinMixer.h" #include "hdb.h" #include "obscure.h" #include "sqlNum.h" struct annoStreamDb { struct annoStreamer streamer; // Parent class members & methods // Private members char *db; // Database name (e.g. hg19 or customTrash) struct sqlConnection *conn; // Database connection struct sqlResult *sr; // SQL query result from which we grab rows - char *table; // Table name, must exist in database + char *trackTable; // Name of database table (or root name if split tables) + char *table; // If split, chr..._trackTable; otherwise same as trackTable char *baselineQuery; // SQL query without position constraints boolean baselineQueryHasWhere; // True if baselineQuery contains filter or join clauses // These members enable us to extract coords from the otherwise unknown row: char *chromField; // Name of chrom-ish column in table char *startField; // Name of chromStart-ish column in table char *endField; // Name of chromEnd-ish column in table int chromIx; // Index of chrom-ish col in autoSql or bin-less table int startIx; // Index of chromStart-ish col in autoSql or bin-less table int endIx; // Index of chromEnd-ish col in autoSql or bin-less table // These members enable us to produce {chrom, start}-sorted output: char *endFieldIndexName; // SQL index on end field, if any (can mess up sorting) boolean notSorted; // TRUE if table is not sorted (e.g. genbank-updated) boolean hasBin; // 1 if SQL table's first column is bin @@ -92,38 +93,30 @@ { struct annoFilter filter; // parent class struct joinerDtf *dtf; // {database, table, field} in case this is from // some table to be joined with the main table }; // For performance reasons, even if !useMaxItems (no limit), we need to limit the // number of rows that are returned from a query, so we can slurp them into memory and // close the sqlResult before mysql gets unhappy about the result being open so long. #define ASD_CHUNK_SIZE 100000 #define JOINER_FILE "all.joiner" static const boolean asdDebug = FALSE; -// Potential optimization: call asdMakeBaselineQuery in asdSetRegion, and scale expectedRows -// by the proportion of the genome that the search region (if one is defined) covers. -// Pitfall: if we are doing a list of regions, and don't know that from the start, then -// we would underestimate overall coverage, and might waste a lot of time rebuilding -// and reloading hashJoins unless we recycle them. -// Since annoStreamDb does not internally handle split tables yet, hgIntegrator -// is inefficiently making brand new streamers for each region anyway. Fix that first. - static void resetMergeState(struct annoStreamDb *self) /* Reset or initialize members that track merging of coarse-bin items with fine-bin items. */ { self->mergeBins = FALSE; self->bigItemQueue = self->smallItemQueue = NULL; lmCleanup(&(self->qLm)); self->gotFinestBin = FALSE; } static void resetRowBuf(struct rowBuf *rowBuf) /* Reset temporary storage for chunked query rows. */ { rowBuf->buf = NULL; rowBuf->size = 0; rowBuf->ix = 0; @@ -145,141 +138,186 @@ { self->mergeBins = TRUE; self->gotFinestBin = FALSE; if (self->qLm == NULL) self->qLm = lmInit(0); } static void resetQueryState(struct annoStreamDb *self) /* Free sqlResult if there is one, and reset state associated with the current query. */ { sqlFreeResult(&(self->sr)); resetMergeState(self); resetChunkState(self); } +// Forward declaration in order to avoid moving lots of code: +static void asdUpdateBaselineQuery(struct annoStreamDb *self); +/* Build a dy SQL query with no position constraints (select ... from ...) + * possibly including joins and filters if specified (where ...), using the current splitTable. */ + static void asdSetRegion(struct annoStreamer *vSelf, char *chrom, uint regionStart, uint regionEnd) /* Set region -- and free current sqlResult if there is one. */ { annoStreamerSetRegion(vSelf, chrom, regionStart, regionEnd); -resetQueryState((struct annoStreamDb *)vSelf); +struct annoStreamDb *self = (struct annoStreamDb *)vSelf; +// If splitTable differs from table, use new chrom in splitTable: +if (!sameString(self->table, self->trackTable)) + { + char newSplitTable[PATH_LEN]; + safef(newSplitTable, sizeof(newSplitTable), "%s_%s", chrom, self->trackTable); + freeMem(self->table); + self->table = cloneString(newSplitTable); + } +resetQueryState(self); +asdUpdateBaselineQuery(self); } static char **nextRowFromSqlResult(struct annoStreamDb *self) /* Stream rows directly from self->sr. */ { return sqlNextRow(self->sr); } +INLINE boolean useSplitTable(struct annoStreamDb *self, struct joinerDtf *dtf) +/* Return TRUE if dtf matches self->{db,table} and table is split. */ +{ +return (sameString(dtf->database, self->db) && + sameString(dtf->table, self->trackTable) && + !sameString(self->table, self->trackTable)); +} + static void appendFieldList(struct annoStreamDb *self, struct dyString *query) /* Append SQL field list to query. */ { struct joinerDtf *fieldList = self->joinMixer ? self->joinMixer->sqlFieldList : self->mainTableDtfList; struct joinerDtf *dtf; for (dtf = fieldList; dtf != NULL; dtf = dtf->next) { if (dtf != fieldList) dyStringAppendC(query, ','); + if (useSplitTable(self, dtf)) + dyStringPrintf(query, "%s.%s", self->table, dtf->field); + else + { char dtfString[PATH_LEN]; joinerDtfToSqlFieldString(dtf, self->db, dtfString, sizeof(dtfString)); dyStringAppend(query, dtfString); } } +} static void ignoreEndIndexIfNecessary(struct annoStreamDb *self, char *dbTable, struct dyString *query) /* Don't let mysql use a (chrom, chromEnd) index because that messes up sorting by chromStart. */ { -if (sameString(dbTable, self->table) && self->endFieldIndexName != NULL) +if (sameString(dbTable, self->trackTable) && self->endFieldIndexName != NULL) sqlDyStringPrintf(query, " IGNORE INDEX (%s) ", self->endFieldIndexName); } static void appendOneTable(struct annoStreamDb *self, struct joinerDtf *dt, struct dyString *query) -/* Add the (db.)table string from dt to query. */ +/* Add the (db.)table string from dt to query; if dt is NULL or table is split then + * use self->table. */ { char dbTable[PATH_LEN]; +if (dt == NULL || useSplitTable(self, dt)) + safecpy(dbTable, sizeof(dbTable), self->table); +else joinerDtfToSqlTableString(dt, self->db, dbTable, sizeof(dbTable)); dyStringAppend(query, dbTable); ignoreEndIndexIfNecessary(self, dbTable, query); } +INLINE void splitOrDtfToSqlField(struct annoStreamDb *self, struct joinerDtf *dtf, + char *fieldName, size_t fieldNameSize) +/* Write [db].table.field into fieldName, where table may be split. */ +{ +if (useSplitTable(self, dtf)) + safef(fieldName, fieldNameSize, "%s.%s", self->table, dtf->field); +else + joinerDtfToSqlFieldString(dtf, self->db, fieldName, fieldNameSize); +} + static boolean appendTableList(struct annoStreamDb *self, struct dyString *query) /* Append SQL table list to query, including tables used for output, filtering and joining. */ { boolean hasLeftJoin = FALSE; if (self->joinMixer == NULL || self->joinMixer->sqlRouteList == NULL) - { - dyStringAppend(query, self->table); - ignoreEndIndexIfNecessary(self, self->table, query); - } + appendOneTable(self, NULL, query); else { // Use both a and b of the first pair and only b of each subsequent pair appendOneTable(self, self->joinMixer->sqlRouteList->a, query); struct joinerPair *jp; for (jp = self->joinMixer->sqlRouteList; jp != NULL; jp = jp->next) { dyStringAppend(query, " left join "); appendOneTable(self, jp->b, query); char fieldA[PATH_LEN], fieldB[PATH_LEN]; - joinerDtfToSqlFieldString(jp->a, self->db, fieldA, sizeof(fieldA)); - joinerDtfToSqlFieldString(jp->b, self->db, fieldB, sizeof(fieldB)); + splitOrDtfToSqlField(self, jp->a, fieldA, sizeof(fieldA)); + splitOrDtfToSqlField(self, jp->b, fieldB, sizeof(fieldB)); struct joinerField *jfA = joinerSetFindField(jp->identifier, jp->a); if (sameOk(jfA->separator, ",")) dyStringPrintf(query, " on find_in_set(%s, %s)", fieldB, fieldA); else dyStringPrintf(query, " on %s = %s", fieldA, fieldB); hasLeftJoin = TRUE; } } return hasLeftJoin; } // libify? static struct joinerDtf *joinerDtfCloneList(struct joinerDtf *listIn) /* Return a list with cloned items of listIn. */ { struct joinerDtf *listOut = NULL, *item; for (item = listIn; item != NULL; item = item->next) slAddHead(&listOut, joinerDtfClone(item)); slReverse(&listOut); return listOut; } -static void asdMakeBaselineQuery(struct annoStreamDb *self) +static void asdInitBaselineQuery(struct annoStreamDb *self) /* Build a dy SQL query with no position constraints (select ... from ...) * possibly including joins and filters if specified (where ...). */ { if (self->relatedDtfList) { struct joinerDtf *outputFieldList = slCat(joinerDtfCloneList(self->mainTableDtfList), joinerDtfCloneList(self->relatedDtfList)); if (self->joiner == NULL) self->joiner = joinerRead(JOINER_FILE); int expectedRows = sqlRowCount(self->conn, self->table); self->joinMixer = joinMixerNew(self->joiner, self->db, self->table, outputFieldList, expectedRows, self->naForMissing); self->sqlRowSize = slCount(self->joinMixer->sqlFieldList); self->bigRowSize = self->joinMixer->bigRowSize; joinerDtfFreeList(&outputFieldList); } else { self->sqlRowSize = slCount(self->mainTableDtfList); self->bigRowSize = self->sqlRowSize; } +} + +static void asdUpdateBaselineQuery(struct annoStreamDb *self) +/* Build a dy SQL query with no position constraints (select ... from ...) + * possibly including joins and filters if specified (where ...), using the current splitTable. */ +{ struct dyString *query = sqlDyStringCreate("select "); appendFieldList(self, query); dyStringAppend(query, " from "); self->hasLeftJoin = appendTableList(self, query); boolean hasWhere = FALSE; self->baselineQuery = dyStringCannibalize(&query); self->baselineQueryHasWhere = hasWhere; // Don't free joiner; we need its storage of joinerFields. } static void addBinToQuery(struct annoStreamDb *self, uint start, uint end, struct dyString *query) /* If applicable, add bin range constraints to query with explicit table name, in case we're * joining with another table that has a bin column. */ { if (self->hasBin) @@ -853,31 +891,31 @@ if (self->needQuery && !self->eof) // Recurse: query, then get next merged/filtered row: return asdNextRow(vSelf, minChrom, minEnd, callerLm); else return NULL; } return rowToAnnoRow(self, row, rightFail, callerLm); } static void makeMainTableDtfList(struct annoStreamDb *self, struct asObject *mainAsObj) /* Make a list of mainTable columns. */ { struct joinerDtf mainDtf; mainDtf.database = self->db; -mainDtf.table = self->table; +mainDtf.table = self->trackTable; struct asColumn *col; for (col = mainAsObj->columnList; col != NULL; col = col->next) { mainDtf.field = col->name; slAddHead(&self->mainTableDtfList, joinerDtfClone(&mainDtf)); } slReverse(&self->mainTableDtfList); // If table has bin but asObj does not, add bin to head of mainTableDtfList. if (self->hasBin && differentString("bin", self->mainTableDtfList->field)) { mainDtf.field = "bin"; slAddHead(&self->mainTableDtfList, joinerDtfClone(&mainDtf)); } } @@ -967,71 +1005,72 @@ dyStringPrintf(dy, " %s; \"%s\"\n", colName, col->comment); // Store plain old dotted triple in dtfNames in case we need to look it up later. char dtfString[PATH_LEN]; makeDottedTriple(dtfString, sizeof(dtfString), dtf->database, dtf->table, dtf->field); hashAdd(dtfNames, dtfString, NULL); } static struct asObject *asdAutoSqlFromTableFields(struct annoStreamDb *self, struct asObject *mainAsObj) /* Get autoSql for each table in self->relatedDtfList and append the columns * included in self->relatedDtfList to the main table asObj columns. */ { struct dyString *newAsText = dyStringCreate("table %sCustom\n" "\"query based on %s with customized fields.\"\n" " (", - self->table, self->table); + self->trackTable, self->trackTable); // Use a hash of table to asObject so we fetch autoSql only once per table. struct hash *asObjCache = hashNew(0); // Use a hash of dtf strings to test whether or not one has been added already. struct hash *dtfNames = hashNew(0); // Start with all columns of main table: struct joinerDtf mainDtf; mainDtf.database = self->db; -mainDtf.table = self->table; +mainDtf.table = self->trackTable; struct asColumn *col; for (col = mainAsObj->columnList; col != NULL; col = col->next) { mainDtf.field = col->name; - addOneColumn(newAsText, &mainDtf, self->db, self->table, col, dtfNames); + addOneColumn(newAsText, &mainDtf, self->db, self->trackTable, col, dtfNames); } // Append fields from related tables: struct joinerDtf *dtf; for (dtf = self->relatedDtfList; dtf != NULL; dtf = dtf->next) { struct asObject *asObj = asObjForDtf(asObjCache, dtf); struct asColumn *col = asColumnFind(asObj, dtf->field); if (col == NULL) errAbort("annoStreamDb: Can't find column %s in autoSql for table %s.%s", dtf->field, dtf->database, dtf->table); - addOneColumn(newAsText, dtf, self->db, self->table, col, dtfNames); + addOneColumn(newAsText, dtf, self->db, self->trackTable, col, dtfNames); } dyStringAppendC(newAsText, ')'); struct asObject *newAsObj = asParseText(newAsText->string); hashFreeWithVals(&asObjCache, asObjectFree); dyStringFree(&newAsText); freeHashAndVals(&dtfNames); return newAsObj; } static void asdClose(struct annoStreamer **pVSelf) /* Close db connection and free self. */ { if (pVSelf == NULL) return; struct annoStreamDb *self = *(struct annoStreamDb **)pVSelf; lmCleanup(&(self->qLm)); +freeMem(self->trackTable); freeMem(self->table); slNameFreeList(&self->chromList); joinerDtfFreeList(&self->mainTableDtfList); joinerDtfFreeList(&self->relatedDtfList); joinerFree(&self->joiner); joinMixerFree(&self->joinMixer); sqlFreeResult(&(self->sr)); hFreeConn(&(self->conn)); annoStreamerFree(pVSelf); } static boolean asdInitBed3Fields(struct annoStreamDb *self) /* Use autoSql to figure out which table fields correspond to {chrom, chromStart, chromEnd}. */ { struct annoStreamer *vSelf = &(self->streamer); @@ -1056,57 +1095,53 @@ } if (sameOk(indexNameA, indexNameB)) indexNameA = NULL; sqlFreeResult(&sr); return indexNameA; } static boolean isIncrementallyUpdated(char *table) // Tables that have rows added to them after initial creation are not completely sorted // because of new rows at end, so we have to 'order by'. { return (sameString(table, "refGene") || sameString(table, "refFlat") || sameString(table, "xenoRefGene") || sameString(table, "xenoRefFlat") || sameString(table, "all_mrna") || sameString(table, "xenoMrna") || sameString(table, "all_est") || sameString(table, "xenoEst") || + sameString(table, "intronEst") || sameString(table, "refSeqAli") || sameString(table, "xenoRefSeqAli")); } static boolean isPubsTable(char *table) // Not absolutely every pubs* table is unsorted, but most of them are. { return startsWith("pubs", table); } static struct asObject *asdParseConfig(struct annoStreamDb *self, struct jsonElement *configEl) -/* Extract the autoSql for self->table from the database. +/* Extract the autoSql for self->trackTable from the database. * If configEl is not NULL, expect it to be a description of related tables and fields like this: * config = { "relatedTables": [ { "table": "hg19.kgXref", * "fields": ["geneSymbol", "description"] }, * { "table": "hg19.knownCanonical", * "fields": ["clusterId"] } * ] } * If so, unpack the [db.]tables and fields into self->relatedDtfList and append autoSql * column descriptions for each field to the autoSql object that describes our output. * It might also have "naForMissing": true/false; if so, set self->naForMissing. */ { -//#*** TODO: hAnnoGetAutoSqlForDbTable should do its own split-table checking -char maybeSplitTable[HDB_MAX_TABLE_STRING]; -if (!hFindSplitTable(self->db, NULL, self->table, maybeSplitTable, NULL)) - errAbort("annoStreamDbNew: can't find table (or split table) for '%s.%s'", - self->db, self->table); -struct asObject *asObj = hAnnoGetAutoSqlForDbTable(self->db, maybeSplitTable, NULL, TRUE); +struct asObject *asObj = hAnnoGetAutoSqlForDbTable(self->db, self->trackTable, NULL, TRUE); makeMainTableDtfList(self, asObj); if (configEl != NULL) { struct hash *config = jsonObjectVal(configEl, "config"); struct jsonElement *relatedTablesEl = hashFindVal(config, "relatedTables"); if (relatedTablesEl) { // relatedTables is a list of objects like { table: <[db.]table name>, // fields: [ , , ...] } struct slRef *relatedTables = jsonListVal(relatedTablesEl, "relatedTables"); struct slRef *tfRef; for (tfRef = relatedTables; tfRef != NULL; tfRef = tfRef->next) { struct jsonElement *dbTableFieldEl = tfRef->val; struct hash *tfObj = jsonObjectVal(dbTableFieldEl, @@ -1194,79 +1229,82 @@ * If config is NULL, then the streamer produces output from all fields * (except bin, unless table's autoSql includes bin). * Otherwise, config is a json object with a member 'relatedTables' that specifies * related tables and fields to join with table, for example: * config = { "relatedTables": [ { "table": "hg19.kgXref", * "fields": ["geneSymbol", "description"] }, * { "table": "hg19.knownCanonical", * "fields": ["clusterId"] } * ] } * -- the streamer's autoSql will be constructed by appending autoSql column * descriptions to the columns of table. * Caller may free db, and table when done with them, but must keep the * annoAssembly aa alive for the lifetime of the returned annoStreamer. */ { struct sqlConnection *conn = hAllocConn(db); -if (!sqlTableExists(conn, table)) - errAbort("annoStreamDbNew: table '%s' doesn't exist in database '%s'", table, db); +char splitTable[HDB_MAX_TABLE_STRING]; +if (!hFindSplitTable(db, NULL, table, splitTable, NULL)) + errAbort("annoStreamDbNew: can't find table (or split table) for '%s.%s'", db, table); struct annoStreamDb *self = NULL; AllocVar(self); self->conn = conn; self->db = cloneString(db); -self->table = cloneString(table); +self->trackTable = cloneString(table); +self->table = cloneString(splitTable); if (sqlFieldIndex(self->conn, self->table, "bin") == 0) { self->hasBin = 1; self->minFinestBin = binFromRange(0, 1); } struct asObject *asObj = asdParseConfig(self, configEl); struct annoStreamer *streamer = &(self->streamer); int dbtLen = strlen(db) + strlen(table) + 2; char streamerName[dbtLen]; safef(streamerName, sizeof(streamerName), "%s.%s", db, table); annoStreamerInit(streamer, aa, asObj, streamerName); streamer->rowType = arWords; streamer->setRegion = asdSetRegion; streamer->nextRow = asdNextRow; streamer->close = asdClose; char *asFirstColumnName = streamer->asObj->columnList->name; if (self->hasBin && !sameString(asFirstColumnName, "bin")) self->omitBin = 1; if (!asdInitBed3Fields(self)) errAbort("annoStreamDbNew: can't figure out which fields of %s.%s to use as " - "{chrom, chromStart, chromEnd}.", db, table); + "{chrom, chromStart, chromEnd}.", db, self->table); // When a table has an index on endField (not startField), sometimes the query optimizer uses it // and that ruins the sorting. Fortunately most tables don't anymore. self->endFieldIndexName = sqlTableIndexOnFieldANotB(self->conn, self->table, self->endField, self->startField); self->notSorted = FALSE; // Special case: genbank-updated tables are not sorted because new mappings are // tacked on at the end. Max didn't sort the pubs* tables but I hope he will // sort the tables for any future tracks. :) if (isIncrementallyUpdated(table) || isPubsTable(table)) self->notSorted = TRUE; self->mergeBins = FALSE; self->maxOutRows = maxOutRows; self->useMaxOutRows = (maxOutRows > 0); self->needQuery = TRUE; self->chromList = annoAssemblySeqNames(aa); if (slCount(self->chromList) > 1000) { // Assembly has many sequences (e.g. scaffold-based assembly) -- // don't break up into per-sequence queries. Take our chances // with mysql being unhappy about the sqlResult being open too long. self->doQuery = asdDoQuerySimple; self->nextRowRaw = nextRowFromSqlResult; } else { // All-chromosome assembly -- if table is large, perform a series of // chunked queries. self->doQuery = asdDoQueryChunking; self->nextRowRaw = nextRowFromBuffer; } -asdMakeBaselineQuery(self); +asdInitBaselineQuery(self); +asdUpdateBaselineQuery(self); struct annoStreamer *sSelf = (struct annoStreamer *)self; if (asdDebug) sSelf->getHeader = asdGetHeader; return sSelf; }