1f55f129858863d78076fd3818d3841429741121 galt Mon Jan 29 01:35:45 2018 -0800 Fixing bug: sqlDyStringAppend() is not safe, and it is not needed. Using sqlDyStringPrintf instead. diff --git src/hg/lib/annoStreamDb.c src/hg/lib/annoStreamDb.c index 15b91dd..67bdc9d 100644 --- src/hg/lib/annoStreamDb.c +++ src/hg/lib/annoStreamDb.c @@ -1,1320 +1,1323 @@ /* annoStreamDb -- subclass of annoStreamer for database tables */ /* Copyright (C) 2014 The Regents of the University of California * See README in this or parent directory for licensing information. */ #include "annoStreamDb.h" #include "annoGratorQuery.h" #include "binRange.h" #include "hAnno.h" #include "joinMixer.h" #include "hdb.h" #include "obscure.h" #include "sqlNum.h" struct annoStreamDb { struct annoStreamer streamer; // Parent class members & methods // Private members char *db; // Database name (e.g. hg19 or customTrash) struct sqlConnection *conn; // Database connection struct sqlResult *sr; // SQL query result from which we grab rows char *trackTable; // Name of database table (or root name if split tables) char *table; // If split, chr..._trackTable; otherwise same as trackTable char *baselineQuery; // SQL query without position constraints boolean baselineQueryHasWhere; // True if baselineQuery contains filter or join clauses // These members enable us to extract coords from the otherwise unknown row: char *chromField; // Name of chrom-ish column in table char *startField; // Name of chromStart-ish column in table char *endField; // Name of chromEnd-ish column in table int chromIx; // Index of chrom-ish col in autoSql or bin-less table int startIx; // Index of chromStart-ish col in autoSql or bin-less table int endIx; // Index of chromEnd-ish col in autoSql or bin-less table // These members enable us to produce {chrom, start}-sorted output: char *endFieldIndexName; // SQL index on end field, if any (can mess up sorting) boolean notSorted; // TRUE if table is not sorted (e.g. genbank-updated) boolean hasBin; // 1 if SQL table's first column is bin boolean omitBin; // 1 if table hasBin and autoSql doesn't have bin boolean mergeBins; // TRUE if query results will be in bin order struct annoRow *bigItemQueue; // If mergeBins, accumulate coarse-bin items here struct annoRow *smallItemQueue; // Max 1 item for merge-sorting with bigItemQueue struct lm *qLm; // localmem for merge-sorting queues int minFinestBin; // Smallest bin number for finest bin level boolean gotFinestBin; // Flag that it's time to merge-sort with bigItemQueue // Limit (or not) number of rows processed: boolean useMaxOutRows; // TRUE if maxOutRows passed to annoStreamDbNew is > 0 int maxOutRows; // Maximum number of rows we can output. // Process large tables in manageable chunks: struct slName *chromList; // list of chromosomes/sequences in assembly struct slName *queryChrom; // most recently queried chrom for whole-genome (or NULL) boolean eof; // TRUE when we are done (maxItems or no more items) boolean needQuery; // TRUE when we haven't yet queried, or need to query again boolean doNextChunk; // TRUE if rowBuf ends before end of chrom/region uint nextChunkStart; // Start coord for next chunk of rows to query // Info for joining in related tables/fields struct joinerDtf *mainTableDtfList; // Fields from the main table to include in output struct joinerDtf *relatedDtfList; // Fields from related tables to include in output struct joiner *joiner; // Parsed all.joiner schema struct joinMixer *joinMixer; // Plan for joining related tables using sql and/or hash // (NULL if no joining is necessary) uint sqlRowSize; // Number of columns from sql query (may include related) uint bigRowSize; // Number of columns from sql + joinMixer->hashJoins boolean hasLeftJoin; // If we have to use 'left join' we'll have to 'order by'. boolean naForMissing; // If true, insert "n/a" for missing related table values // to match hgTables. struct rowBuf // Temporary storage for rows from chunked query { struct lm *lm; // storage for rows char ***buf; // array of pointers to rows int size; // number of rows int ix; // offset in buffer, [0..size] } rowBuf; char **(*nextRowRaw)(struct annoStreamDb *self); // Depending on query style, use either sqlNextRow or temporary row storage to get next row. // This may return NULL but set self->needQuery; asdNextRow watches for that. void (*doQuery)(struct annoStreamDb *self, char *minChrom, uint minEnd); // Depending on query style, perform either a single query or (series of) chunked query }; //#*** TODO: make a way to pass the filter with dtf into annoStreamDb. struct annoFilterDb // annoFilter has columnIx which works fine for all fields of main table, // but for joining filters we will need dtf. { struct annoFilter filter; // parent class struct joinerDtf *dtf; // {database, table, field} in case this is from // some table to be joined with the main table }; // For performance reasons, even if !useMaxItems (no limit), we need to limit the // number of rows that are returned from a query, so we can slurp them into memory and // close the sqlResult before mysql gets unhappy about the result being open so long. #define ASD_CHUNK_SIZE 100000 #define JOINER_FILE "all.joiner" static const boolean asdDebug = FALSE; static void resetMergeState(struct annoStreamDb *self) /* Reset or initialize members that track merging of coarse-bin items with fine-bin items. */ { self->mergeBins = FALSE; self->bigItemQueue = self->smallItemQueue = NULL; lmCleanup(&(self->qLm)); self->gotFinestBin = FALSE; } static void resetRowBuf(struct rowBuf *rowBuf) /* Reset temporary storage for chunked query rows. */ { rowBuf->buf = NULL; rowBuf->size = 0; rowBuf->ix = 0; lmCleanup(&(rowBuf->lm)); } static void resetChunkState(struct annoStreamDb *self) /* Reset members that track chunked queries. */ { self->queryChrom = NULL; self->eof = FALSE; self->doNextChunk = FALSE; self->needQuery = TRUE; resetRowBuf(&self->rowBuf); } static void startMerging(struct annoStreamDb *self) /* Set self->mergeBins flag and create self->qLm if necessary. */ { self->mergeBins = TRUE; self->gotFinestBin = FALSE; if (self->qLm == NULL) self->qLm = lmInit(0); } static void resetQueryState(struct annoStreamDb *self) /* Free sqlResult if there is one, and reset state associated with the current query. */ { sqlFreeResult(&(self->sr)); resetMergeState(self); resetChunkState(self); } // Forward declaration in order to avoid moving lots of code: static void asdUpdateBaselineQuery(struct annoStreamDb *self); /* Build a dy SQL query with no position constraints (select ... from ...) * possibly including joins and filters if specified (where ...), using the current splitTable. */ static void asdSetRegion(struct annoStreamer *vSelf, char *chrom, uint regionStart, uint regionEnd) /* Set region -- and free current sqlResult if there is one. */ { annoStreamerSetRegion(vSelf, chrom, regionStart, regionEnd); struct annoStreamDb *self = (struct annoStreamDb *)vSelf; // If splitTable differs from table, use new chrom in splitTable: if (differentString(self->table, self->trackTable)) { char newSplitTable[PATH_LEN]; safef(newSplitTable, sizeof(newSplitTable), "%s_%s", chrom, self->trackTable); freeMem(self->table); self->table = cloneString(newSplitTable); } resetQueryState(self); asdUpdateBaselineQuery(self); } static char **nextRowFromSqlResult(struct annoStreamDb *self) /* Stream rows directly from self->sr. */ { return sqlNextRow(self->sr); } INLINE boolean useSplitTable(struct annoStreamDb *self, struct joinerDtf *dtf) /* Return TRUE if dtf matches self->{db,table} and table is split. */ { return (sameString(dtf->database, self->db) && sameString(dtf->table, self->trackTable) && differentString(self->table, self->trackTable)); } static void appendFieldList(struct annoStreamDb *self, struct dyString *query) /* Append SQL field list to query. */ { struct joinerDtf *fieldList = self->joinMixer ? self->joinMixer->sqlFieldList : self->mainTableDtfList; struct joinerDtf *dtf; for (dtf = fieldList; dtf != NULL; dtf = dtf->next) { if (dtf != fieldList) dyStringAppendC(query, ','); if (useSplitTable(self, dtf)) dyStringPrintf(query, "%s.%s", self->table, dtf->field); else { char dtfString[PATH_LEN]; joinerDtfToSqlFieldString(dtf, self->db, dtfString, sizeof(dtfString)); dyStringAppend(query, dtfString); } } } static void ignoreEndIndexIfNecessary(struct annoStreamDb *self, char *dbTable, struct dyString *query) /* Don't let mysql use a (chrom, chromEnd) index because that messes up sorting by chromStart. */ { if (sameString(dbTable, self->trackTable) && self->endFieldIndexName != NULL) sqlDyStringPrintf(query, " IGNORE INDEX (%s) ", self->endFieldIndexName); } static void appendOneTable(struct annoStreamDb *self, struct joinerDtf *dt, struct dyString *query) /* Add the (db.)table string from dt to query; if dt is NULL or table is split then * use self->table. */ { char dbTable[PATH_LEN]; if (dt == NULL || useSplitTable(self, dt)) safecpy(dbTable, sizeof(dbTable), self->table); else joinerDtfToSqlTableString(dt, self->db, dbTable, sizeof(dbTable)); dyStringAppend(query, dbTable); ignoreEndIndexIfNecessary(self, dbTable, query); } INLINE void splitOrDtfToSqlField(struct annoStreamDb *self, struct joinerDtf *dtf, char *fieldName, size_t fieldNameSize) /* Write [db].table.field into fieldName, where table may be split. */ { if (useSplitTable(self, dtf)) safef(fieldName, fieldNameSize, "%s.%s", self->table, dtf->field); else joinerDtfToSqlFieldString(dtf, self->db, fieldName, fieldNameSize); } static boolean appendTableList(struct annoStreamDb *self, struct dyString *query) /* Append SQL table list to query, including tables used for output, filtering and joining. */ { boolean hasLeftJoin = FALSE; if (self->joinMixer == NULL || self->joinMixer->sqlRouteList == NULL) appendOneTable(self, NULL, query); else { // Use both a and b of the first pair and only b of each subsequent pair appendOneTable(self, self->joinMixer->sqlRouteList->a, query); struct joinerPair *jp; for (jp = self->joinMixer->sqlRouteList; jp != NULL; jp = jp->next) { dyStringAppend(query, " left join "); appendOneTable(self, jp->b, query); char fieldA[PATH_LEN], fieldB[PATH_LEN]; splitOrDtfToSqlField(self, jp->a, fieldA, sizeof(fieldA)); splitOrDtfToSqlField(self, jp->b, fieldB, sizeof(fieldB)); struct joinerField *jfA = joinerSetFindField(jp->identifier, jp->a); if (sameOk(jfA->separator, ",")) dyStringPrintf(query, " on find_in_set(%s, %s)", fieldB, fieldA); else dyStringPrintf(query, " on %s = %s", fieldA, fieldB); hasLeftJoin = TRUE; } } return hasLeftJoin; } // libify? static struct joinerDtf *joinerDtfCloneList(struct joinerDtf *listIn) /* Return a list with cloned items of listIn. */ { struct joinerDtf *listOut = NULL, *item; for (item = listIn; item != NULL; item = item->next) slAddHead(&listOut, joinerDtfClone(item)); slReverse(&listOut); return listOut; } static char *joinerFilePath() /* Return the location of all.joiner - default is ./all.joiner but a different file * can be substituted using environment variable ALL_JOINER_FILE */ { char *joinerFile = getenv("ALL_JOINER_FILE"); if (isEmpty(joinerFile)) joinerFile = JOINER_FILE; return joinerFile; } static void asdInitBaselineQuery(struct annoStreamDb *self) /* Build a dy SQL query with no position constraints (select ... from ...) * possibly including joins and filters if specified (where ...). */ { if (self->relatedDtfList) { struct joinerDtf *outputFieldList = slCat(joinerDtfCloneList(self->mainTableDtfList), joinerDtfCloneList(self->relatedDtfList)); if (self->joiner == NULL) self->joiner = joinerRead(joinerFilePath()); int expectedRows = sqlRowCount(self->conn, self->table); self->joinMixer = joinMixerNew(self->joiner, self->db, self->table, outputFieldList, expectedRows, self->naForMissing); self->sqlRowSize = slCount(self->joinMixer->sqlFieldList); self->bigRowSize = self->joinMixer->bigRowSize; joinerDtfFreeList(&outputFieldList); } else { self->sqlRowSize = slCount(self->mainTableDtfList); self->bigRowSize = self->sqlRowSize; } } static void asdUpdateBaselineQuery(struct annoStreamDb *self) /* Build a dy SQL query with no position constraints (select ... from ...) * possibly including joins and filters if specified (where ...), using the current splitTable. */ { struct dyString *query = sqlDyStringCreate("select "); appendFieldList(self, query); dyStringAppend(query, " from "); self->hasLeftJoin = appendTableList(self, query); boolean hasWhere = FALSE; self->baselineQuery = dyStringCannibalize(&query); self->baselineQueryHasWhere = hasWhere; // Don't free joiner; we need its storage of joinerFields. } static void addBinToQuery(struct annoStreamDb *self, uint start, uint end, struct dyString *query) /* If applicable, add bin range constraints to query with explicit table name, in case we're * joining with another table that has a bin column. */ { if (self->hasBin) { // Get the bin constraints with no table specification: struct dyString *binConstraints = dyStringNew(0); hAddBinToQuery(start, end, binConstraints); // Swap in explicit table name for bin field: char tableDotBin[PATH_LEN]; safef(tableDotBin, sizeof(tableDotBin), "%s.bin", self->table); struct dyString *explicitBinConstraints = dyStringSub(binConstraints->string, "bin", tableDotBin); dyStringAppend(query, explicitBinConstraints->string); dyStringFree(&explicitBinConstraints); dyStringFree(&binConstraints); } } static void addRangeToQuery(struct annoStreamDb *self, struct dyString *query, char *chrom, uint start, uint end, boolean hasWhere) /* Add position constraints to query. */ { -sqlDyStringAppend(query, hasWhere ? " and " : " where "); +if (hasWhere) + sqlDyStringPrintf(query, " and "); +else + sqlDyStringPrintf(query, " where "); sqlDyStringPrintf(query, "%s.%s='%s'", self->table, self->chromField, chrom); uint chromSize = annoAssemblySeqSize(self->streamer.assembly, chrom); boolean addStartConstraint = (start > 0); boolean addEndConstraint = (end < chromSize); if (addStartConstraint || addEndConstraint) { - sqlDyStringAppend(query, "and "); + sqlDyStringPrintf(query, "and "); if (self->hasBin) addBinToQuery(self, start, end, query); if (addStartConstraint) { if (self->doNextChunk) sqlDyStringPrintf(query, "%s.%s >= %u ", self->table, self->startField, start); else // Make sure to include insertions at start: sqlDyStringPrintf(query, "(%s.%s > %u or (%s.%s = %s.%s and %s.%s = %u)) ", self->table, self->endField, start, self->table, self->endField, self->table, self->startField, self->table, self->startField, start); } if (addEndConstraint) { if (addStartConstraint) - sqlDyStringAppend(query, "and "); + sqlDyStringPrintf(query, "and "); // Make sure to include insertions at end: sqlDyStringPrintf(query, "(%s.%s < %u or (%s.%s = %s.%s and %s.%s = %u)) ", self->table, self->startField, end, self->table, self->startField, self->table, self->endField, self->table, self->endField, end); } } } static void asdDoQuerySimple(struct annoStreamDb *self, char *minChrom, uint minEnd) /* Return a sqlResult for a query on table items in position range. * If doing a whole genome query. just select all rows from table. */ // NOTE: it would be possible to implement filters at this level, as in hgTables. { struct annoStreamer *streamer = &(self->streamer); boolean hasWhere = self->baselineQueryHasWhere; struct dyString *query = dyStringCreate("%s", self->baselineQuery); if (!streamer->positionIsGenome) { if (minChrom && differentString(minChrom, streamer->chrom)) errAbort("annoStreamDb %s: nextRow minChrom='%s' but region chrom='%s'", streamer->name, minChrom, streamer->chrom); if (self->hasBin) { // Results will be in bin order, but we can restore chromStart order by // accumulating initial coarse-bin items and merge-sorting them with // subsequent finest-bin items which will be in chromStart order. resetMergeState(self); startMerging(self); } addRangeToQuery(self, query, streamer->chrom, streamer->regionStart, streamer->regionEnd, hasWhere); if (self->notSorted || self->hasLeftJoin) sqlDyStringPrintf(query, " order by %s.%s", self->table, self->startField); } else if (self->notSorted || self->hasLeftJoin) sqlDyStringPrintf(query, " order by %s.%s,%s.%s", self->table, self->chromField, self->table, self->startField); if (self->maxOutRows > 0) dyStringPrintf(query, " limit %d", self->maxOutRows); struct sqlResult *sr = sqlGetResult(self->conn, query->string); dyStringFree(&query); self->sr = sr; self->needQuery = FALSE; } static void rowBufInit(struct rowBuf *rowBuf, int size) /* Clean up rowBuf and give it a new lm and buffer[size]. */ { resetRowBuf(rowBuf); rowBuf->lm = lmInit(0); rowBuf->size = size; lmAllocArray(rowBuf->lm, rowBuf->buf, size); } static void updateNextChunkState(struct annoStreamDb *self, int queryMaxItems) /* If the just-fetched interval list was limited to ASD_CHUNK_SIZE, set doNextChunk * and trim the last row(s) so that when we query the next chunk, we don't get * repeat rows due to querying a start coord that was already returned. */ { struct rowBuf *rowBuf = &self->rowBuf; if (queryMaxItems == ASD_CHUNK_SIZE && rowBuf->size == ASD_CHUNK_SIZE) { self->doNextChunk = TRUE; // Starting at the last row in rowBuf, work back to find a value with a different start. int ix = rowBuf->size - 1; char **words = rowBuf->buf[ix]; int startIx = self->startIx + self->omitBin; uint lastStart = atoll(words[startIx]); for (ix = rowBuf->size - 2; ix >= 0; ix--) { words = rowBuf->buf[ix]; uint thisStart = atoll(words[startIx]); if (thisStart != lastStart) { rowBuf->size = ix+1; self->nextChunkStart = lastStart; break; } } } else self->doNextChunk = FALSE; self->needQuery = FALSE; } static boolean glomSqlDup(char **oldRow, char **newRow, int mainColCount, int sqlColCount, struct lm *lm) /* If newRow's contents are identical to oldRow's for all fields from the main table, * then comma-glom new values of joined related fields onto oldRow's values and return TRUE; * otherwise leave oldRow alone and return FALSE. */ { boolean isDup = TRUE; int i; for (i = 0; i < mainColCount; i++) if (differentStringNullOk(oldRow[i], newRow[i])) { isDup = FALSE; break; } if (isDup) { // Glom related column values produced by mysql, collapsing consecutive duplicate values // and appending comma to match hgTables for (i = mainColCount; i < sqlColCount; i++) { char *oldVal = oldRow[i]; char *newVal = newRow[i]; if (newVal != NULL) { int newValLen = strlen(newVal); char newValComma[newValLen+2]; safef(newValComma, sizeof(newValComma), "%s,", newVal); if (oldVal != NULL) { int oldValLen = strlen(oldVal); if (! (endsWithWordComma(oldVal, newVal))) { char *comma = (oldVal[oldValLen-1] == ',') ? "" : ","; int glommedSize = oldValLen + 1 + newValLen + 2; char *glommedVal = lmAlloc(lm, glommedSize); safef(glommedVal, glommedSize, "%s%s%s", oldVal, comma, newValComma); oldRow[i] = glommedVal; } } else { oldRow[i] = lmCloneString(lm, newValComma); } } } } return isDup; } static void bufferRowsFromSqlQuery(struct annoStreamDb *self, char *query, int queryMaxItems) /* Store all rows from query in rowBuf. */ { struct sqlResult *sr = sqlGetResult(self->conn, query); struct rowBuf *rowBuf = &(self->rowBuf); rowBufInit(rowBuf, ASD_CHUNK_SIZE); struct annoStreamer *sSelf = &(self->streamer); boolean didSqlJoin = (self->joinMixer && self->joinMixer->sqlRouteList); int mainColCount = slCount(self->mainTableDtfList); int sqlColCount = self->sqlRowSize; char **row = NULL; int ix = 0; while ((row = sqlNextRow(sr)) != NULL) { if (ix >= rowBuf->size) errAbort("annoStreamDb %s: rowBuf overflow, got more than %d rows", sSelf->name, rowBuf->size); // SQL join outputs separate rows for multiple matches. Accumulate multiple matches as // comma-sep lists to match hgTables and hashJoin (and prevent rowBuf overflow). boolean didGlom = FALSE; if (ix != 0 && didSqlJoin) didGlom = glomSqlDup(rowBuf->buf[ix-1], row, mainColCount, sqlColCount, rowBuf->lm); if (! didGlom) rowBuf->buf[ix++] = lmCloneRowExt(rowBuf->lm, row, self->bigRowSize, self->sqlRowSize); } // Set rowBuf->size to the number of rows we actually stored. rowBuf->size = ix; sqlFreeResult(&sr); updateNextChunkState(self, queryMaxItems); } static void updateQueryChrom(struct annoStreamDb *self, char *minChrom) /* Figure out whether we need to query the next chunk on the current chromosome * or move on to the next chromosome. */ { if (self->queryChrom == NULL) self->queryChrom = self->chromList; else if (!self->doNextChunk) { self->queryChrom = self->queryChrom->next; if (self->hasBin) { resetMergeState(self); startMerging(self); } } // -- don't resetMergeState if doNextChunk. if (minChrom != NULL) { // Skip chroms that precede minChrom while (self->queryChrom != NULL && strcmp(self->queryChrom->name, minChrom) < 0) { self->queryChrom = self->queryChrom->next; self->doNextChunk = FALSE; } if (self->hasBin) { resetMergeState(self); startMerging(self); } } } static void doOneChunkQuery(struct annoStreamDb *self, struct dyString *query, char *chrom, uint start, uint end, boolean hasWhere, int maxItems) /* Add range constraints to query, perform query and buffer the results. */ { addRangeToQuery(self, query, chrom, start, end, hasWhere); if (self->notSorted || self->hasLeftJoin) sqlDyStringPrintf(query, " order by %s.%s", self->table, self->startField); sqlDyStringPrintf(query, " limit %d", maxItems); bufferRowsFromSqlQuery(self, query->string, maxItems); } static void asdDoQueryChunking(struct annoStreamDb *self, char *minChrom, uint minEnd) /* Get rows from mysql with a limit on the number of rows returned at one time (ASD_CHUNK_SIZE), * to avoid long delays for very large tables. This will be called multiple times if * the number of rows in region is more than ASD_CHUNK_SIZE. If doing a genome-wide query, * break it up into chrom-by-chrom queries because the code that merges large bin items * in with small bin items assumes that all rows are on the same chrom. */ { struct annoStreamer *sSelf = &(self->streamer); boolean hasWhere = self->baselineQueryHasWhere; struct dyString *query = dyStringCreate("%s", self->baselineQuery); if (sSelf->chrom != NULL && self->rowBuf.size > 0 && !self->doNextChunk) { // We're doing a region query, we already got some rows, and don't need another chunk: resetRowBuf(&self->rowBuf); self->eof = TRUE; } if (self->useMaxOutRows) { self->maxOutRows -= self->rowBuf.size; if (self->maxOutRows <= 0) self->eof = TRUE; } if (self->eof) return; int queryMaxItems = ASD_CHUNK_SIZE; if (self->useMaxOutRows && self->maxOutRows < queryMaxItems) queryMaxItems = self->maxOutRows; if (self->hasBin) { // Results will be in bin order, but we can restore chromStart order by // accumulating initial coarse-bin items and merge-sorting them with // subsequent finest-bin items which will be in chromStart order. if (self->doNextChunk && self->mergeBins && !self->gotFinestBin) errAbort("annoStreamDb %s: can't continue merge in chunking query; " "increase ASD_CHUNK_SIZE", sSelf->name); // Don't reset merge state here in case bigItemQueue has a large-bin item // at the end of the chrom, past all smallest-bin items. startMerging(self); } if (sSelf->chrom != NULL) { // Region query (but might end up as multiple chunked queries) char *chrom = sSelf->chrom; uint start = sSelf->regionStart; uint end = sSelf->regionEnd; if (minChrom) { if (differentString(minChrom, chrom)) errAbort("annoStreamDb %s: nextRow minChrom='%s' but region chrom='%s'", sSelf->name, minChrom, chrom); if (start < minEnd) start = minEnd; } if (self->doNextChunk && start < self->nextChunkStart) start = self->nextChunkStart; doOneChunkQuery(self, query, chrom, start, end, hasWhere, queryMaxItems); if (self->rowBuf.size == 0) self->eof = TRUE; } else { // Genome-wide query: break it into chrom-by-chrom queries (that might be chunked) // because the mergeBins stuff assumes that all rows are from the same chrom. updateQueryChrom(self, minChrom); if (self->queryChrom == NULL) self->eof = TRUE; else { char *chrom = self->queryChrom->name; int start = 0; if (minChrom != NULL && sameString(chrom, minChrom)) start = minEnd; if (self->doNextChunk && start < self->nextChunkStart) start = self->nextChunkStart; uint end = annoAssemblySeqSize(self->streamer.assembly, chrom); doOneChunkQuery(self, query, chrom, start, end, hasWhere, queryMaxItems); // If there happens to be no items on chrom, try again with the next chrom: if (! self->eof && self->rowBuf.size == 0) asdDoQueryChunking(self, minChrom, minEnd); } } dyStringFree(&query); } static char **nextRowFromBuffer(struct annoStreamDb *self) /* Instead of streaming directly from self->sr, we have buffered up the results * of a chunked query; return the head of that queue. */ { struct rowBuf *rowBuf = &self->rowBuf; if (rowBuf->ix > rowBuf->size) errAbort("annoStreamDb %s: rowBuf overflow (%d > %d)", self->streamer.name, rowBuf->ix, rowBuf->size); if (rowBuf->ix == rowBuf->size) { // Last row in buffer -- we'll need another query to get subsequent rows (if any). // But first, see if we need to update gotFinestBin, since getFinestBin might be // one of our callers. if (rowBuf->size > 0) { char **lastRow = rowBuf->buf[rowBuf->size-1]; int lastBin = atoi(lastRow[0]); if (lastBin >= self->minFinestBin) self->gotFinestBin = TRUE; } if (self->bigItemQueue == NULL && self->smallItemQueue == NULL) self->needQuery = TRUE; // Bounce back out -- asdNextRow or nextRowMergeBins will need to do another query. return NULL; } if (rowBuf->size == 0) return NULL; else return rowBuf->buf[rowBuf->ix++]; } static char **nextRowUnfiltered(struct annoStreamDb *self) /* Call self->nextRowRaw to get the next row from the sql query on the main table. * Then, if applicable, add columns added from hashes of related tables. */ { char **row = self->nextRowRaw(self); if (self->joinMixer && self->joinMixer->hashJoins && row) { // Add columns from hashedJoins to row struct hashJoin *hj; for (hj = self->joinMixer->hashJoins; hj != NULL; hj = hashJoinNext(hj)) hashJoinOneRow(hj, row); } return row; } static char **nextRowFiltered(struct annoStreamDb *self, boolean *retRightFail, char *minChrom, uint minEnd) /* Skip past any left-join failures until we get a right-join failure, a passing row, * or end of data. Return row or NULL, and return right-join fail status via retRightFail. */ { int numCols = self->streamer.numCols; char **row = nextRowUnfiltered(self); if (minChrom != NULL && row != NULL) { // Ignore rows that fall completely before (minChrom, minEnd) - save annoGrator's time int chromIx = self->omitBin+self->chromIx; int endIx = self->omitBin+self->endIx; int chromCmp; while (row && ((chromCmp = strcmp(row[chromIx], minChrom)) < 0 || // this chrom precedes minChrom (chromCmp == 0 && atoll(row[endIx]) < minEnd))) // on minChrom, but before minEnd row = nextRowUnfiltered(self); } boolean rightFail = FALSE; struct annoFilter *filterList = self->streamer.filters; while (row && annoFilterRowFails(filterList, row+self->omitBin, numCols, &rightFail)) { if (rightFail) break; row = nextRowUnfiltered(self); } *retRightFail = rightFail; return row; } static struct annoRow *rowToAnnoRow(struct annoStreamDb *self, char **row, boolean rightFail, struct lm *lm) /* Extract coords from row and return an annoRow including right-fail status. */ { char **finalRow = row + self->omitBin; uint numCols = self->streamer.numCols; char *swizzleRow[numCols]; if (self->joinMixer) { uint i; for (i = 0; i < numCols; i++) { uint outIx = self->joinMixer->outIxs[i+self->omitBin]; if (row[outIx] == NULL) swizzleRow[i] = self->naForMissing ? "n/a" : ""; else swizzleRow[i] = row[outIx]; } finalRow = swizzleRow; } char *chrom = finalRow[self->chromIx]; uint chromStart = sqlUnsigned(finalRow[self->startIx]); uint chromEnd = sqlUnsigned(finalRow[self->endIx]); return annoRowFromStringArray(chrom, chromStart, chromEnd, rightFail, finalRow, numCols, lm); } static char **getFinestBinItem(struct annoStreamDb *self, char **row, boolean *pRightFail, char *minChrom, uint minEnd) /* If row is a coarse-bin item, add it to bigItemQueue, get the next row(s) and * add any subsequent coarse-bin items to bigItemQueue. As soon as we get an item from a * finest-level bin (or NULL), sort the bigItemQueue and return the finest-bin item/row. */ { int bin = atoi(row[0]); while (bin < self->minFinestBin) { // big item -- store aside in queue for merging later (unless it falls off the end of // the current chunk), move on to next item struct annoRow *aRow = rowToAnnoRow(self, row, *pRightFail, self->qLm); if (! (self->doNextChunk && self->nextChunkStart <= aRow->start)) slAddHead(&(self->bigItemQueue), aRow); *pRightFail = FALSE; row = nextRowFiltered(self, pRightFail, minChrom, minEnd); if (row == NULL) break; bin = atoi(row[0]); } // First finest-bin item! Sort bigItemQueue in preparation for merging: self->gotFinestBin = TRUE; slReverse(&(self->bigItemQueue)); slSort(&(self->bigItemQueue), annoRowCmp); return row; } static struct annoRow *mergeRow(struct annoStreamDb *self, struct annoRow *aRow, struct lm *callerLm) /* Compare head of bigItemQueue with (finest-bin) aRow; return the one with * lower chromStart and save the other for later. */ { struct annoRow *outRow = aRow; if (self->bigItemQueue != NULL && annoRowCmp(&(self->bigItemQueue), &aRow) < 0) { // Big item gets to go now, so save aside small item for next time. outRow = slPopHead(&(self->bigItemQueue)); slAddHead(&(self->smallItemQueue), aRow); } // Clone outRow using callerLm enum annoRowType rowType = self->streamer.rowType; int numCols = self->streamer.numCols; outRow = annoRowClone(outRow, rowType, numCols, callerLm); if (self->bigItemQueue == NULL && self->smallItemQueue == NULL) { // No coarse-bin items to merge-sort, just stream finest-bin items from here on out. // This needs to be done after cloning outRow because it was allocated in self->qLm. resetMergeState(self); } return outRow; } static struct annoRow *nextQueuedRow(struct annoStreamDb *self, struct lm *callerLm) // Return the head of either bigItemQueue or smallItemQueue, depending on which has // the lower chromStart. { struct annoRow *row = NULL; if (self->bigItemQueue && annoRowCmp(&(self->bigItemQueue), &(self->smallItemQueue)) < 0) row = slPopHead(&(self->bigItemQueue)); else row = slPopHead(&(self->smallItemQueue)); if (self->bigItemQueue == NULL && self->smallItemQueue == NULL) // All done merge-sorting, just stream finest-bin items from here on out. self->mergeBins = FALSE; enum annoRowType rowType = self->streamer.rowType; int numCols = self->streamer.numCols; return annoRowClone(row, rowType, numCols, callerLm); } static struct annoRow *nextRowMergeBins(struct annoStreamDb *self, char *minChrom, uint minEnd, struct lm *callerLm) /* Fetch the next filtered row from mysql, merge-sorting coarse-bin items into finest-bin * items to maintain chromStart ordering. */ { assert(self->mergeBins && self->hasBin); if (self->smallItemQueue) // In this case we have already begun merge-sorting; don't pull a new row from mysql, // use the queues. This should keep smallItemQueue's max depth at 1. return nextQueuedRow(self, callerLm); else { // We might need to collect initial coarse-bin items, or might already be merge-sorting. boolean rightFail = FALSE; char **row = nextRowFiltered(self, &rightFail, minChrom, minEnd); if (row && !self->gotFinestBin) { // We are just starting -- queue up coarse-bin items, if any, until we get the first // finest-bin item. row = getFinestBinItem(self, row, &rightFail, minChrom, minEnd); } // Time to merge-sort finest-bin items from mysql with coarse-bin items from queue. if (row != NULL) { struct annoRow *aRow = rowToAnnoRow(self, row, rightFail, self->qLm); return mergeRow(self, aRow, callerLm); } else { struct annoRow *qRow = slPopHead(&(self->bigItemQueue)); enum annoRowType rowType = self->streamer.rowType; int numCols = self->streamer.numCols; return annoRowClone(qRow, rowType, numCols, callerLm); } } } static struct annoRow *asdNextRow(struct annoStreamer *vSelf, char *minChrom, uint minEnd, struct lm *callerLm) /* Perform sql query if we haven't already and return a single * annoRow, or NULL if there are no more items. */ { struct annoStreamDb *self = (struct annoStreamDb *)vSelf; if (self->needQuery) self->doQuery(self, minChrom, minEnd); if (self->mergeBins) { struct annoRow *aRow = nextRowMergeBins(self, minChrom, minEnd, callerLm); if (aRow == NULL && self->needQuery && !self->eof) // Recurse: query, then get next merged/filtered row: return asdNextRow(vSelf, minChrom, minEnd, callerLm); else return aRow; } boolean rightFail = FALSE; char **row = nextRowFiltered(self, &rightFail, minChrom, minEnd); if (row == NULL) { if (self->needQuery && !self->eof) // Recurse: query, then get next merged/filtered row: return asdNextRow(vSelf, minChrom, minEnd, callerLm); else return NULL; } return rowToAnnoRow(self, row, rightFail, callerLm); } static void makeMainTableDtfList(struct annoStreamDb *self, struct asObject *mainAsObj) /* Make a list of mainTable columns. */ { struct joinerDtf mainDtf; mainDtf.database = self->db; mainDtf.table = self->trackTable; struct asColumn *col; for (col = mainAsObj->columnList; col != NULL; col = col->next) { mainDtf.field = col->name; slAddHead(&self->mainTableDtfList, joinerDtfClone(&mainDtf)); } slReverse(&self->mainTableDtfList); // If table has bin but asObj does not, add bin to head of mainTableDtfList. if (self->hasBin && differentString("bin", self->mainTableDtfList->field)) { mainDtf.field = "bin"; slAddHead(&self->mainTableDtfList, joinerDtfClone(&mainDtf)); } } static struct asObject *asObjForDtf(struct hash *hash, struct joinerDtf *dtf) /* Get asObj for dtf, either from hash if we've seen it before, or make one. */ { struct asObject *asObj = NULL; char dbTable[PATH_LEN]; joinerDtfToSqlTableString(dtf, NULL, dbTable, sizeof(dbTable)); struct hashEl *hel = hashLookup(hash, dbTable); if (hel == NULL) { asObj = hAnnoGetAutoSqlForDbTable(dtf->database, dtf->table, NULL, TRUE); if (asObj == NULL) errAbort("annoStreamDb: No autoSql for %s.%s.%s", dtf->database, dtf->table, dtf->field); hel = hashAdd(hash, dbTable, asObj); } else asObj = hel->val; return asObj; } static void makeDottedTriple(char *dtfString, size_t dtfStringSize, char *db, char *table, char *field) /* In case we don't have a struct joinerDtf for a field that we want to look up, * but we do have the db, table and field name, concat with dots into dtfString. * Unlike joinerDtfToSqlFieldString, don't bother checking whether db is the main db. */ { safef(dtfString, dtfStringSize, "%s.%s.%s", db, table, field); } char *annoStreamDbColumnNameFromDtf(char *db, char *mainTable, struct joinerDtf *dtf) /* Return a string with the autoSql column name that would be assigned according to dtf's * db, table and field. */ { char colName[PATH_LEN*2]; if (differentString(dtf->table, mainTable) || differentString(dtf->database, db)) { joinerDtfToSqlFieldString(dtf, db, colName, sizeof(colName)); // asParse rejects names that have '.' in them, which makes sense because it's for SQL, // so replace the '.'s with '_'s. subChar(colName, '.', '_'); } else safecpy(colName, sizeof(colName), dtf->field); return cloneString(colName); } static void addOneColumn(struct dyString *dy, struct joinerDtf *dtf, char *db, char *mainTable, struct asColumn *col, struct hash *dtfNames) /* Append an autoSql text line describing col to dy. * If col is an array whose size is some other column that has not yet been added, * coerce its type to string to avoid asParseText errAbort. */ { // First see if this col depends on a linked size column that hasn't been added yet. boolean sizeColIsMissing = FALSE; if (col->isArray && !col->fixedSize && isNotEmpty(col->linkedSizeName)) { // col's size comes from another column -- has that column already been added? char linkedDtfString[PATH_LEN]; makeDottedTriple(linkedDtfString, sizeof(linkedDtfString), dtf->database, dtf->table, col->linkedSizeName); if (!hashLookup(dtfNames, linkedDtfString)) sizeColIsMissing = TRUE; } if (col->isArray && sizeColIsMissing) { // The size column is missing, so this can't be a valid array in autoSql -- // ignore col->lowType and call it a (comma-separated) string. dyStringAppend(dy, " lstring"); } else { dyStringPrintf(dy, " %s", col->lowType->name); if (col->isArray) { dyStringAppendC(dy, '['); if (col->fixedSize) dyStringPrintf(dy, "%d", col->fixedSize); else dyStringAppend(dy, col->linkedSizeName); dyStringAppendC(dy, ']'); } } char *colName = annoStreamDbColumnNameFromDtf(db, mainTable, dtf); dyStringPrintf(dy, " %s; \"%s\"\n", colName, col->comment); // Store plain old dotted triple in dtfNames in case we need to look it up later. char dtfString[PATH_LEN]; makeDottedTriple(dtfString, sizeof(dtfString), dtf->database, dtf->table, dtf->field); hashAdd(dtfNames, dtfString, NULL); } static struct asObject *asdAutoSqlFromTableFields(struct annoStreamDb *self, struct asObject *mainAsObj) /* Get autoSql for each table in self->relatedDtfList and append the columns * included in self->relatedDtfList to the main table asObj columns. */ { struct dyString *newAsText = dyStringCreate("table %sCustom\n" "\"query based on %s with customized fields.\"\n" " (", self->trackTable, self->trackTable); // Use a hash of table to asObject so we fetch autoSql only once per table. struct hash *asObjCache = hashNew(0); // Use a hash of dtf strings to test whether or not one has been added already. struct hash *dtfNames = hashNew(0); // Start with all columns of main table: struct joinerDtf mainDtf; mainDtf.database = self->db; mainDtf.table = self->trackTable; struct asColumn *col; for (col = mainAsObj->columnList; col != NULL; col = col->next) { mainDtf.field = col->name; addOneColumn(newAsText, &mainDtf, self->db, self->trackTable, col, dtfNames); } // Append fields from related tables: struct joinerDtf *dtf; for (dtf = self->relatedDtfList; dtf != NULL; dtf = dtf->next) { struct asObject *asObj = asObjForDtf(asObjCache, dtf); struct asColumn *col = asColumnFind(asObj, dtf->field); if (col == NULL) errAbort("annoStreamDb: Can't find column %s in autoSql for table %s.%s", dtf->field, dtf->database, dtf->table); addOneColumn(newAsText, dtf, self->db, self->trackTable, col, dtfNames); } dyStringAppendC(newAsText, ')'); struct asObject *newAsObj = asParseText(newAsText->string); hashFreeWithVals(&asObjCache, asObjectFree); dyStringFree(&newAsText); freeHashAndVals(&dtfNames); return newAsObj; } static void asdClose(struct annoStreamer **pVSelf) /* Close db connection and free self. */ { if (pVSelf == NULL) return; struct annoStreamDb *self = *(struct annoStreamDb **)pVSelf; lmCleanup(&(self->qLm)); freeMem(self->trackTable); freeMem(self->table); slNameFreeList(&self->chromList); joinerDtfFreeList(&self->mainTableDtfList); joinerDtfFreeList(&self->relatedDtfList); joinerFree(&self->joiner); joinMixerFree(&self->joinMixer); sqlFreeResult(&(self->sr)); hFreeConn(&(self->conn)); annoStreamerFree(pVSelf); } static boolean asdInitBed3Fields(struct annoStreamDb *self) /* Use autoSql to figure out which table fields correspond to {chrom, chromStart, chromEnd}. */ { struct annoStreamer *vSelf = &(self->streamer); return annoStreamerFindBed3Columns(vSelf, &(self->chromIx), &(self->startIx), &(self->endIx), &(self->chromField), &(self->startField), &(self->endField)); } char *sqlTableIndexOnFieldANotB(struct sqlConnection *conn, char *table, char *fieldA, char *fieldB) /* If table has an index that includes fieldA but not fieldB, return the index name, else NULL. */ { char *indexNameA = NULL, *indexNameB = NULL; char query[512]; sqlSafef(query, sizeof(query), "show index from %s", table); struct sqlResult *sr = sqlGetResult(conn, query); char **row; while ((row = sqlNextRow(sr)) != NULL) { if (sameString(row[4], fieldA)) indexNameA = cloneString(row[2]); else if (sameString(row[4], fieldB)) indexNameB = cloneString(row[2]); } if (sameOk(indexNameA, indexNameB)) indexNameA = NULL; sqlFreeResult(&sr); return indexNameA; } static boolean isIncrementallyUpdated(char *table) // Tables that have rows added to them after initial creation are not completely sorted // because of new rows at end, so we have to 'order by'. { return (sameString(table, "refGene") || sameString(table, "refFlat") || sameString(table, "xenoRefGene") || sameString(table, "xenoRefFlat") || sameString(table, "all_mrna") || sameString(table, "xenoMrna") || sameString(table, "all_est") || sameString(table, "xenoEst") || sameString(table, "intronEst") || sameString(table, "refSeqAli") || sameString(table, "xenoRefSeqAli")); } static boolean isPubsTable(char *table) // Not absolutely every pubs* table is unsorted, but most of them are. { return startsWith("pubs", table); } static struct asObject *asdParseConfig(struct annoStreamDb *self, struct jsonElement *configEl) /* Extract the autoSql for self->trackTable from the database. * If configEl is not NULL, expect it to be a description of related tables and fields like this: * config = { "relatedTables": [ { "table": "hg19.kgXref", * "fields": ["geneSymbol", "description"] }, * { "table": "hg19.knownCanonical", * "fields": ["clusterId"] } * ] } * If so, unpack the [db.]tables and fields into self->relatedDtfList and append autoSql * column descriptions for each field to the autoSql object that describes our output. * It might also have "naForMissing": true/false; if so, set self->naForMissing. */ { struct asObject *asObj = hAnnoGetAutoSqlForDbTable(self->db, self->trackTable, NULL, TRUE); makeMainTableDtfList(self, asObj); if (configEl != NULL) { struct hash *config = jsonObjectVal(configEl, "config"); struct jsonElement *relatedTablesEl = hashFindVal(config, "relatedTables"); if (relatedTablesEl) { // relatedTables is a list of objects like { table: <[db.]table name>, // fields: [ , , ...] } struct slRef *relatedTables = jsonListVal(relatedTablesEl, "relatedTables"); struct slRef *tfRef; for (tfRef = relatedTables; tfRef != NULL; tfRef = tfRef->next) { struct jsonElement *dbTableFieldEl = tfRef->val; struct hash *tfObj = jsonObjectVal(dbTableFieldEl, "{table,fields} object in relatedTables"); struct jsonElement *dbTableEl = hashMustFindVal(tfObj, "table"); char *dbTable = jsonStringVal(dbTableEl, "[db.]table in relatedTables"); char tfDb[PATH_LEN], tfTable[PATH_LEN]; hParseDbDotTable(self->db, dbTable, tfDb, sizeof(tfDb), tfTable, sizeof(tfTable)); if (isEmpty(tfDb)) safecpy(tfDb, sizeof(tfDb), self->db); if (hTableExists(tfDb, tfTable)) { struct jsonElement *fieldListEl = hashMustFindVal(tfObj, "fields"); struct slRef *fieldList = jsonListVal(fieldListEl, "fieldList"); struct slRef *fieldRef; for (fieldRef = fieldList; fieldRef != NULL; fieldRef = fieldRef->next) { struct jsonElement *fieldEl = fieldRef->val; char *tfField = jsonStringVal(fieldEl, "field"); slAddHead(&self->relatedDtfList, joinerDtfNew(tfDb, tfTable, tfField)); } } } if (self->relatedDtfList) { slReverse(&self->relatedDtfList); asObj = asdAutoSqlFromTableFields(self, asObj); } } struct jsonElement *naForMissingEl = hashFindVal(config, "naForMissing"); if (naForMissingEl != NULL) self->naForMissing = jsonBooleanVal(naForMissingEl, "naForMissing"); } return asObj; } static char *sqlExplain(struct sqlConnection *conn, char *query) /* For now, just turn the values back into a multi-line "#"-comment string. */ { char *trimmedQuery = query; if (startsWith(NOSQLINJ, trimmedQuery)) trimmedQuery = trimmedQuery + strlen(NOSQLINJ); struct dyString *dy = dyStringCreate("# Output of 'explain %s':\n", trimmedQuery); char explainQuery[PATH_LEN*8]; safef(explainQuery, sizeof(explainQuery), NOSQLINJ "explain %s", trimmedQuery); struct sqlResult *sr = sqlGetResult(conn, explainQuery); struct slName *fieldList = sqlResultFieldList(sr); int nColumns = slCount(fieldList); // Header: dyStringPrintf(dy, "# %s\n", slNameListToString(fieldList, '\t')); char **row; while ((row = sqlNextRow(sr)) != NULL) { dyStringAppend(dy, "# "); int i; for (i = 0; i < nColumns; i++) { if (i > 0) dyStringAppend(dy, "\t"); if (row[i] == NULL) dyStringAppend(dy, "NULL"); else dyStringAppend(dy, row[i]); } dyStringAppendC(dy, '\n'); } return dyStringCannibalize(&dy); } static char *asdGetHeader(struct annoStreamer *sSelf) /* Return header with debug info. */ { struct annoStreamDb *self = (struct annoStreamDb *)sSelf; // Add a fake constraint on chromField because a real one is added to baselineQuery. char queryWithChrom[PATH_LEN*4]; safef(queryWithChrom, sizeof(queryWithChrom), "%s %s %s.%s = 'someValue'", self->baselineQuery, (strstr(self->baselineQuery, "where") ? "and" : "where"), self->table, self->chromField); char *explanation = sqlExplain(self->conn, queryWithChrom); return explanation; } struct annoStreamer *annoStreamDbNew(char *db, char *table, struct annoAssembly *aa, int maxOutRows, struct jsonElement *configEl) /* Create an annoStreamer (subclass) object from a database table. * If config is NULL, then the streamer produces output from all fields * (except bin, unless table's autoSql includes bin). * Otherwise, config is a json object with a member 'relatedTables' that specifies * related tables and fields to join with table, for example: * config = { "relatedTables": [ { "table": "hg19.kgXref", * "fields": ["geneSymbol", "description"] }, * { "table": "hg19.knownCanonical", * "fields": ["clusterId"] } * ] } * -- the streamer's autoSql will be constructed by appending autoSql column * descriptions to the columns of table. * Caller may free db, and table when done with them, but must keep the * annoAssembly aa alive for the lifetime of the returned annoStreamer. */ { struct sqlConnection *conn = hAllocConn(db); char splitTable[HDB_MAX_TABLE_STRING]; if (!hFindSplitTable(db, NULL, table, splitTable, NULL)) errAbort("annoStreamDbNew: can't find table (or split table) for '%s.%s'", db, table); struct annoStreamDb *self = NULL; AllocVar(self); self->conn = conn; self->db = cloneString(db); self->trackTable = cloneString(table); self->table = cloneString(splitTable); if (sqlFieldIndex(self->conn, self->table, "bin") == 0) { self->hasBin = 1; self->minFinestBin = binFromRange(0, 1); } struct asObject *asObj = asdParseConfig(self, configEl); struct annoStreamer *streamer = &(self->streamer); int dbtLen = strlen(db) + strlen(table) + 2; char streamerName[dbtLen]; safef(streamerName, sizeof(streamerName), "%s.%s", db, table); annoStreamerInit(streamer, aa, asObj, streamerName); streamer->rowType = arWords; streamer->setRegion = asdSetRegion; streamer->nextRow = asdNextRow; streamer->close = asdClose; char *asFirstColumnName = streamer->asObj->columnList->name; if (self->hasBin && !sameString(asFirstColumnName, "bin")) self->omitBin = 1; if (!asdInitBed3Fields(self)) errAbort("annoStreamDbNew: can't figure out which fields of %s.%s to use as " "{chrom, chromStart, chromEnd}.", db, self->table); // When a table has an index on endField (not startField), sometimes the query optimizer uses it // and that ruins the sorting. Fortunately most tables don't anymore. self->endFieldIndexName = sqlTableIndexOnFieldANotB(self->conn, self->table, self->endField, self->startField); self->notSorted = FALSE; // Special case: genbank-updated tables are not sorted because new mappings are // tacked on at the end. Max didn't sort the pubs* tables but I hope he will // sort the tables for any future tracks. :) if (isIncrementallyUpdated(table) || isPubsTable(table)) self->notSorted = TRUE; self->mergeBins = FALSE; self->maxOutRows = maxOutRows; self->useMaxOutRows = (maxOutRows > 0); self->needQuery = TRUE; self->chromList = annoAssemblySeqNames(aa); if (slCount(self->chromList) > 1000) { // Assembly has many sequences (e.g. scaffold-based assembly) -- // don't break up into per-sequence queries. Take our chances // with mysql being unhappy about the sqlResult being open too long. self->doQuery = asdDoQuerySimple; self->nextRowRaw = nextRowFromSqlResult; } else { // All-chromosome assembly -- if table is large, perform a series of // chunked queries. self->doQuery = asdDoQueryChunking; self->nextRowRaw = nextRowFromBuffer; } asdInitBaselineQuery(self); asdUpdateBaselineQuery(self); struct annoStreamer *sSelf = (struct annoStreamer *)self; if (asdDebug) sSelf->getHeader = asdGetHeader; return sSelf; }