533112afe2a2005e80cdb1f82904ea65032d4302 braney Sat Oct 2 11:37:34 2021 -0700 split hg/lib into two separate libaries, one only used by the cgis diff --git src/hg/cgilib/annoStreamDb.c src/hg/cgilib/annoStreamDb.c new file mode 100644 index 0000000..f524cad --- /dev/null +++ src/hg/cgilib/annoStreamDb.c @@ -0,0 +1,1342 @@ +/* annoStreamDb -- subclass of annoStreamer for database tables */ + +/* Copyright (C) 2014 The Regents of the University of California + * See README in this or parent directory for licensing information. */ + +#include "annoStreamDb.h" +#include "annoGratorQuery.h" +#include "binRange.h" +#include "hAnno.h" +#include "joinMixer.h" +#include "hdb.h" +#include "obscure.h" +#include "sqlNum.h" + +struct annoStreamDb + { + struct annoStreamer streamer; // Parent class members & methods + // Private members + char *db; // Database name (e.g. hg19 or customTrash) + struct sqlConnection *conn; // Database connection + struct sqlResult *sr; // SQL query result from which we grab rows + char *trackTable; // Name of database table (or root name if split tables) + char *table; // If split, chr..._trackTable; otherwise same as trackTable + char *baselineQuery; // SQL query without position constraints + boolean baselineQueryHasWhere; // True if baselineQuery contains filter or join clauses + + // These members enable us to extract coords from the otherwise unknown row: + char *chromField; // Name of chrom-ish column in table + char *startField; // Name of chromStart-ish column in table + char *endField; // Name of chromEnd-ish column in table + int chromIx; // Index of chrom-ish col in autoSql or bin-less table + int startIx; // Index of chromStart-ish col in autoSql or bin-less table + int endIx; // Index of chromEnd-ish col in autoSql or bin-less table + + // These members enable us to produce {chrom, start}-sorted output: + char *endFieldIndexName; // SQL index on end field, if any (can mess up sorting) + boolean notSorted; // TRUE if table is not sorted (e.g. genbank-updated) + boolean hasBin; // 1 if SQL table's first column is bin + boolean omitBin; // 1 if table hasBin and autoSql doesn't have bin + boolean mergeBins; // TRUE if query results will be in bin order + struct annoRow *bigItemQueue; // If mergeBins, accumulate coarse-bin items here + struct annoRow *smallItemQueue; // Max 1 item for merge-sorting with bigItemQueue + struct lm *qLm; // localmem for merge-sorting queues + int minFinestBin; // Smallest bin number for finest bin level + boolean gotFinestBin; // Flag that it's time to merge-sort with bigItemQueue + + // Limit (or not) number of rows processed: + boolean useMaxOutRows; // TRUE if maxOutRows passed to annoStreamDbNew is > 0 + int maxOutRows; // Maximum number of rows we can output. + + // Process large tables in manageable chunks: + struct slName *chromList; // list of chromosomes/sequences in assembly + struct slName *queryChrom; // most recently queried chrom for whole-genome (or NULL) + boolean eof; // TRUE when we are done (maxItems or no more items) + boolean needQuery; // TRUE when we haven't yet queried, or need to query again + boolean doNextChunk; // TRUE if rowBuf ends before end of chrom/region + uint nextChunkStart; // Start coord for next chunk of rows to query + + // Info for joining in related tables/fields + struct joinerDtf *mainTableDtfList; // Fields from the main table to include in output + struct joinerDtf *relatedDtfList; // Fields from related tables to include in output + struct joiner *joiner; // Parsed all.joiner schema + struct joinMixer *joinMixer; // Plan for joining related tables using sql and/or hash + // (NULL if no joining is necessary) + uint sqlRowSize; // Number of columns from sql query (may include related) + uint bigRowSize; // Number of columns from sql + joinMixer->hashJoins + boolean hasLeftJoin; // If we have to use 'left join' we'll have to 'order by'. + boolean naForMissing; // If true, insert "n/a" for missing related table values + // to match hgTables. + + struct rowBuf + // Temporary storage for rows from chunked query + { + struct lm *lm; // storage for rows + char ***buf; // array of pointers to rows + int size; // number of rows + int ix; // offset in buffer, [0..size] + } rowBuf; + + char **(*nextRowRaw)(struct annoStreamDb *self); + // Depending on query style, use either sqlNextRow or temporary row storage to get next row. + // This may return NULL but set self->needQuery; asdNextRow watches for that. + + void (*doQuery)(struct annoStreamDb *self, char *minChrom, uint minEnd); + // Depending on query style, perform either a single query or (series of) chunked query + }; + +//#*** TODO: make a way to pass the filter with dtf into annoStreamDb. + +struct annoFilterDb + // annoFilter has columnIx which works fine for all fields of main table, + // but for joining filters we will need dtf. + { + struct annoFilter filter; // parent class + struct joinerDtf *dtf; // {database, table, field} in case this is from + // some table to be joined with the main table + }; + +// For performance reasons, even if !useMaxItems (no limit), we need to limit the +// number of rows that are returned from a query, so we can slurp them into memory and +// close the sqlResult before mysql gets unhappy about the result being open so long. +#define ASD_CHUNK_SIZE 100000 + +#define JOINER_FILE "all.joiner" + +static const boolean asdDebug = FALSE; + +static void resetMergeState(struct annoStreamDb *self) +/* Reset or initialize members that track merging of coarse-bin items with fine-bin items. */ +{ +self->mergeBins = FALSE; +self->bigItemQueue = self->smallItemQueue = NULL; +lmCleanup(&(self->qLm)); +self->gotFinestBin = FALSE; +} + +static void resetRowBuf(struct rowBuf *rowBuf) +/* Reset temporary storage for chunked query rows. */ +{ +rowBuf->buf = NULL; +rowBuf->size = 0; +rowBuf->ix = 0; +lmCleanup(&(rowBuf->lm)); +} + +static void resetChunkState(struct annoStreamDb *self) +/* Reset members that track chunked queries. */ +{ +self->queryChrom = NULL; +self->eof = FALSE; +self->doNextChunk = FALSE; +self->needQuery = TRUE; +resetRowBuf(&self->rowBuf); +} + +static void startMerging(struct annoStreamDb *self) +/* Set self->mergeBins flag and create self->qLm if necessary. */ +{ +self->mergeBins = TRUE; +self->gotFinestBin = FALSE; +if (self->qLm == NULL) + self->qLm = lmInit(0); +} + +static void resetQueryState(struct annoStreamDb *self) +/* Free sqlResult if there is one, and reset state associated with the current query. */ +{ +sqlFreeResult(&(self->sr)); +resetMergeState(self); +resetChunkState(self); +} + +// Forward declaration in order to avoid moving lots of code: +static void asdUpdateBaselineQuery(struct annoStreamDb *self); +/* Build a dy SQL query with no position constraints (select ... from ...) + * possibly including joins and filters if specified (where ...), using the current splitTable. */ + +static void asdSetRegion(struct annoStreamer *vSelf, char *chrom, uint regionStart, uint regionEnd) +/* Set region -- and free current sqlResult if there is one. */ +{ +annoStreamerSetRegion(vSelf, chrom, regionStart, regionEnd); +struct annoStreamDb *self = (struct annoStreamDb *)vSelf; +// If splitTable differs from table, use new chrom in splitTable: +if (differentString(self->table, self->trackTable)) + { + char newSplitTable[PATH_LEN]; + safef(newSplitTable, sizeof(newSplitTable), "%s_%s", chrom, self->trackTable); + freeMem(self->table); + self->table = cloneString(newSplitTable); + } +resetQueryState(self); +asdUpdateBaselineQuery(self); +} + +static char **nextRowFromSqlResult(struct annoStreamDb *self) +/* Stream rows directly from self->sr. */ +{ +return sqlNextRow(self->sr); +} + +INLINE boolean useSplitTable(struct annoStreamDb *self, struct joinerDtf *dtf) +/* Return TRUE if dtf matches self->{db,table} and table is split. */ +{ +return (sameString(dtf->database, self->db) && + sameString(dtf->table, self->trackTable) && + differentString(self->table, self->trackTable)); +} + +static void appendFieldList(struct annoStreamDb *self, struct dyString *query) +/* Append SQL field list to query. */ +{ +struct joinerDtf *fieldList = self->joinMixer ? self->joinMixer->sqlFieldList : + self->mainTableDtfList; +struct joinerDtf *dtf; +for (dtf = fieldList; dtf != NULL; dtf = dtf->next) + { + if (dtf != fieldList) + sqlDyStringPrintf(query, ","); + if (useSplitTable(self, dtf)) + sqlDyStringPrintf(query, "%s.%s", self->table, dtf->field); + else + { + char dtfString[PATH_LEN]; + joinerDtfToSqlFieldString(dtf, self->db, dtfString, sizeof(dtfString)); + sqlDyStringPrintf(query, "%s", dtfString); + } + } +} + +static void ignoreEndIndexIfNecessary(struct annoStreamDb *self, char *dbTable, + struct dyString *query) +/* Don't let mysql use a (chrom, chromEnd) index because that messes up sorting by chromStart. */ +{ +if (sameString(dbTable, self->trackTable) && self->endFieldIndexName != NULL) + sqlDyStringPrintf(query, " IGNORE INDEX (%s) ", self->endFieldIndexName); +} + +static void appendOneTable(struct annoStreamDb *self, struct joinerDtf *dt, struct dyString *query) +/* Add the (db.)table string from dt to query; if dt is NULL or table is split then + * use self->table. */ +{ +char dbTable[PATH_LEN]; +if (dt == NULL || useSplitTable(self, dt)) + safecpy(dbTable, sizeof(dbTable), self->table); +else + joinerDtfToSqlTableString(dt, self->db, dbTable, sizeof(dbTable)); +dyStringAppend(query, dbTable); +ignoreEndIndexIfNecessary(self, dbTable, query); +} + +INLINE void splitOrDtfToSqlField(struct annoStreamDb *self, struct joinerDtf *dtf, + char *fieldName, size_t fieldNameSize) +/* Write [db].table.field into fieldName, where table may be split. */ +{ +if (useSplitTable(self, dtf)) + safef(fieldName, fieldNameSize, "%s.%s", self->table, dtf->field); +else + joinerDtfToSqlFieldString(dtf, self->db, fieldName, fieldNameSize); +} + +static void appendJoin(struct annoStreamDb *self, struct joinerPair *routeList, + struct dyString *query) +/* Append join statement(s) for a possibly tree-structured routeList. */ +{ +struct joinerPair *jp; +for (jp = routeList; jp != NULL; jp = jp->next) + { + struct joinerField *jfB = joinerSetFindField(jp->identifier, jp->b); + if (! jfB->full) + dyStringAppend(query, " left"); + dyStringAppend(query, " join "); + if (jp->child) + { + dyStringAppendC(query, '('); + appendOneTable(self, jp->child->a, query); + appendJoin(self, jp->child, query); + dyStringAppendC(query, ')'); + } + else + appendOneTable(self, jp->b, query); + char fieldA[PATH_LEN], fieldB[PATH_LEN]; + splitOrDtfToSqlField(self, jp->a, fieldA, sizeof(fieldA)); + splitOrDtfToSqlField(self, jp->b, fieldB, sizeof(fieldB)); + struct joinerField *jfA = joinerSetFindField(jp->identifier, jp->a); + if (sameOk(jfA->separator, ",")) + dyStringPrintf(query, " on find_in_set(%s, %s)", fieldB, fieldA); + else + dyStringPrintf(query, " on %s = %s", fieldA, fieldB); + } +} + +static boolean appendTableList(struct annoStreamDb *self, struct dyString *query) +/* Append SQL table list to query, including tables used for output, filtering and joining. */ +{ +boolean hasLeftJoin = FALSE; +if (self->joinMixer == NULL || self->joinMixer->sqlRouteList == NULL) + appendOneTable(self, NULL, query); +else + { + // Use both a and b of the first pair and only b of each subsequent pair + appendOneTable(self, self->joinMixer->sqlRouteList->a, query); + appendJoin(self, self->joinMixer->sqlRouteList, query); + hasLeftJoin = TRUE; + } +return hasLeftJoin; +} + +// libify? +static struct joinerDtf *joinerDtfCloneList(struct joinerDtf *listIn) +/* Return a list with cloned items of listIn. */ +{ +struct joinerDtf *listOut = NULL, *item; +for (item = listIn; item != NULL; item = item->next) + slAddHead(&listOut, joinerDtfClone(item)); +slReverse(&listOut); +return listOut; +} + +static char *joinerFilePath() +/* Return the location of all.joiner - default is ./all.joiner but a different file + * can be substituted using environment variable ALL_JOINER_FILE */ +{ +char *joinerFile = getenv("ALL_JOINER_FILE"); +if (isEmpty(joinerFile)) + joinerFile = JOINER_FILE; +return joinerFile; +} + +static void asdInitBaselineQuery(struct annoStreamDb *self) +/* Build a dy SQL query with no position constraints (select ... from ...) + * possibly including joins and filters if specified (where ...). */ +{ +if (self->relatedDtfList) + { + struct joinerDtf *outputFieldList = slCat(joinerDtfCloneList(self->mainTableDtfList), + joinerDtfCloneList(self->relatedDtfList)); + if (self->joiner == NULL) + self->joiner = joinerRead(joinerFilePath()); + int expectedRows = sqlRowCount(self->conn, self->table); + self->joinMixer = joinMixerNew(self->joiner, self->db, self->table, outputFieldList, + expectedRows, self->naForMissing); + joinerPairListToTree(self->joinMixer->sqlRouteList); + self->sqlRowSize = slCount(self->joinMixer->sqlFieldList); + self->bigRowSize = self->joinMixer->bigRowSize; + joinerDtfFreeList(&outputFieldList); + } +else + { + self->sqlRowSize = slCount(self->mainTableDtfList); + self->bigRowSize = self->sqlRowSize; + } +} + +static void asdUpdateBaselineQuery(struct annoStreamDb *self) +/* Build a dy SQL query with no position constraints (select ... from ...) + * possibly including joins and filters if specified (where ...), using the current splitTable. */ +{ +struct dyString *query = sqlDyStringCreate("select "); +appendFieldList(self, query); +sqlDyStringPrintf(query, " from "); +self->hasLeftJoin = appendTableList(self, query); +boolean hasWhere = FALSE; +self->baselineQuery = dyStringCannibalize(&query); +self->baselineQueryHasWhere = hasWhere; +// Don't free joiner; we need its storage of joinerFields. +} + +static void addBinToQuery(struct annoStreamDb *self, uint start, uint end, struct dyString *query) +/* If applicable, add bin range constraints to query with explicit table name, in case we're + * joining with another table that has a bin column. */ +{ +if (self->hasBin) + { + // Get the bin constraints with no table specification: + struct dyString *binConstraints = dyStringNew(0); + hAddBinToQuery(start, end, binConstraints); + // Swap in explicit table name for bin field: + char tableDotBin[PATH_LEN]; + safef(tableDotBin, sizeof(tableDotBin), "%s.bin", self->table); + struct dyString *explicitBinConstraints = dyStringSub(binConstraints->string, + "bin", tableDotBin); + dyStringAppend(query, explicitBinConstraints->string); + dyStringFree(&explicitBinConstraints); + dyStringFree(&binConstraints); + } +} + +static void addRangeToQuery(struct annoStreamDb *self, struct dyString *query, + char *chrom, uint start, uint end, boolean hasWhere) +/* Add position constraints to query. */ +{ +if (hasWhere) + sqlDyStringPrintf(query, " and "); +else + sqlDyStringPrintf(query, " where "); +sqlDyStringPrintf(query, "%s.%s='%s'", self->table, self->chromField, chrom); +uint chromSize = annoAssemblySeqSize(self->streamer.assembly, chrom); +boolean addStartConstraint = (start > 0); +boolean addEndConstraint = (end < chromSize); +if (addStartConstraint || addEndConstraint) + { + sqlDyStringPrintf(query, "and "); + if (self->hasBin) + addBinToQuery(self, start, end, query); + if (addStartConstraint) + { + if (self->doNextChunk) + sqlDyStringPrintf(query, "%s.%s >= %u ", self->table, self->startField, start); + else + // Make sure to include insertions at start: + sqlDyStringPrintf(query, "(%s.%s > %u or (%s.%s = %s.%s and %s.%s = %u)) ", + self->table, self->endField, start, + self->table, self->endField, self->table, self->startField, + self->table, self->startField, start); + } + if (addEndConstraint) + { + if (addStartConstraint) + sqlDyStringPrintf(query, "and "); + // Make sure to include insertions at end: + sqlDyStringPrintf(query, "(%s.%s < %u or (%s.%s = %s.%s and %s.%s = %u)) ", + self->table, self->startField, end, + self->table, self->startField, self->table, self->endField, + self->table, self->endField, end); + } + } +} + +static void asdDoQuerySimple(struct annoStreamDb *self, char *minChrom, uint minEnd) +/* Return a sqlResult for a query on table items in position range. + * If doing a whole genome query. just select all rows from table. */ +// NOTE: it would be possible to implement filters at this level, as in hgTables. +{ +struct annoStreamer *streamer = &(self->streamer); +boolean hasWhere = self->baselineQueryHasWhere; +struct dyString *query = dyStringCreate("%s", self->baselineQuery); +if (!streamer->positionIsGenome) + { + if (minChrom && differentString(minChrom, streamer->chrom)) + errAbort("annoStreamDb %s: nextRow minChrom='%s' but region chrom='%s'", + streamer->name, minChrom, streamer->chrom); + if (self->hasBin) + { + // Results will be in bin order, but we can restore chromStart order by + // accumulating initial coarse-bin items and merge-sorting them with + // subsequent finest-bin items which will be in chromStart order. + resetMergeState(self); + startMerging(self); + } + addRangeToQuery(self, query, streamer->chrom, streamer->regionStart, streamer->regionEnd, + hasWhere); + if (self->notSorted || self->hasLeftJoin) + sqlDyStringPrintf(query, " order by %s.%s", self->table, self->startField); + } +else if (self->notSorted || self->hasLeftJoin) + sqlDyStringPrintf(query, " order by %s.%s,%s.%s", + self->table, self->chromField, self->table, self->startField); +if (self->maxOutRows > 0) + dyStringPrintf(query, " limit %d", self->maxOutRows); +struct sqlResult *sr = sqlGetResult(self->conn, query->string); +dyStringFree(&query); +self->sr = sr; +self->needQuery = FALSE; +} + +static void rowBufInit(struct rowBuf *rowBuf, int size) +/* Clean up rowBuf and give it a new lm and buffer[size]. */ +{ +resetRowBuf(rowBuf); +rowBuf->lm = lmInit(0); +rowBuf->size = size; +lmAllocArray(rowBuf->lm, rowBuf->buf, size); +} + +static void updateNextChunkState(struct annoStreamDb *self, int queryMaxItems) +/* If the just-fetched interval list was limited to ASD_CHUNK_SIZE, set doNextChunk + * and trim the last row(s) so that when we query the next chunk, we don't get + * repeat rows due to querying a start coord that was already returned. */ +{ +struct rowBuf *rowBuf = &self->rowBuf; +if (queryMaxItems == ASD_CHUNK_SIZE && rowBuf->size == ASD_CHUNK_SIZE) + { + self->doNextChunk = TRUE; + // Starting at the last row in rowBuf, work back to find a value with a different start. + int ix = rowBuf->size - 1; + char **words = rowBuf->buf[ix]; + int startIx = self->startIx + self->omitBin; + uint lastStart = atoll(words[startIx]); + for (ix = rowBuf->size - 2; ix >= 0; ix--) + { + words = rowBuf->buf[ix]; + uint thisStart = atoll(words[startIx]); + if (thisStart != lastStart) + { + rowBuf->size = ix+1; + self->nextChunkStart = lastStart; + break; + } + } + } +else + self->doNextChunk = FALSE; +self->needQuery = FALSE; +} + +static boolean glomSqlDup(char **oldRow, char **newRow, int mainColCount, int sqlColCount, + struct lm *lm) +/* If newRow's contents are identical to oldRow's for all fields from the main table, + * then comma-glom new values of joined related fields onto oldRow's values and return TRUE; + * otherwise leave oldRow alone and return FALSE. */ +{ +boolean isDup = TRUE; +int i; +for (i = 0; i < mainColCount; i++) + if (differentStringNullOk(oldRow[i], newRow[i])) + { + isDup = FALSE; + break; + } +if (isDup) + { + // Glom related column values produced by mysql, collapsing consecutive duplicate values + // and appending comma to match hgTables + for (i = mainColCount; i < sqlColCount; i++) + { + char *oldVal = oldRow[i]; + char *newVal = newRow[i]; + if (newVal != NULL) + { + int newValLen = strlen(newVal); + char newValComma[newValLen+2]; + safef(newValComma, sizeof(newValComma), "%s,", newVal); + if (oldVal != NULL) + { + int oldValLen = strlen(oldVal); + if (! (endsWithWordComma(oldVal, newVal))) + { + char *comma = (oldVal[oldValLen-1] == ',') ? "" : ","; + int glommedSize = oldValLen + 1 + newValLen + 2; + char *glommedVal = lmAlloc(lm, glommedSize); + safef(glommedVal, glommedSize, "%s%s%s", oldVal, comma, newValComma); + oldRow[i] = glommedVal; + } + } + else + { + oldRow[i] = lmCloneString(lm, newValComma); + } + } + } + } +return isDup; +} + +static void bufferRowsFromSqlQuery(struct annoStreamDb *self, char *query, int queryMaxItems) +/* Store all rows from query in rowBuf. */ +{ +struct sqlResult *sr = sqlGetResult(self->conn, query); +struct rowBuf *rowBuf = &(self->rowBuf); +rowBufInit(rowBuf, ASD_CHUNK_SIZE); +struct annoStreamer *sSelf = &(self->streamer); +boolean didSqlJoin = (self->joinMixer && self->joinMixer->sqlRouteList); +int mainColCount = slCount(self->mainTableDtfList); +int sqlColCount = self->sqlRowSize; +char **row = NULL; +int ix = 0; +while ((row = sqlNextRow(sr)) != NULL) + { + if (ix >= rowBuf->size) + errAbort("annoStreamDb %s: rowBuf overflow, got more than %d rows", + sSelf->name, rowBuf->size); + // SQL join outputs separate rows for multiple matches. Accumulate multiple matches as + // comma-sep lists to match hgTables and hashJoin (and prevent rowBuf overflow). + boolean didGlom = FALSE; + if (ix != 0 && didSqlJoin) + didGlom = glomSqlDup(rowBuf->buf[ix-1], row, mainColCount, sqlColCount, rowBuf->lm); + if (! didGlom) + rowBuf->buf[ix++] = lmCloneRowExt(rowBuf->lm, row, self->bigRowSize, self->sqlRowSize); + } +// Set rowBuf->size to the number of rows we actually stored. +rowBuf->size = ix; +sqlFreeResult(&sr); +updateNextChunkState(self, queryMaxItems); +} + +static void updateQueryChrom(struct annoStreamDb *self, char *minChrom) +/* Figure out whether we need to query the next chunk on the current chromosome + * or move on to the next chromosome. */ +{ +if (self->queryChrom == NULL) + self->queryChrom = self->chromList; +else if (!self->doNextChunk) + { + self->queryChrom = self->queryChrom->next; + if (self->hasBin) + { + resetMergeState(self); + startMerging(self); + } + } +// -- don't resetMergeState if doNextChunk. +if (minChrom != NULL) + { + // Skip chroms that precede minChrom + while (self->queryChrom != NULL && strcmp(self->queryChrom->name, minChrom) < 0) + { + self->queryChrom = self->queryChrom->next; + self->doNextChunk = FALSE; + } + if (self->hasBin) + { + resetMergeState(self); + startMerging(self); + } + } +} + +static void doOneChunkQuery(struct annoStreamDb *self, struct dyString *query, + char *chrom, uint start, uint end, + boolean hasWhere, int maxItems) +/* Add range constraints to query, perform query and buffer the results. */ +{ +addRangeToQuery(self, query, chrom, start, end, hasWhere); +if (self->notSorted || self->hasLeftJoin) + sqlDyStringPrintf(query, " order by %s.%s", self->table, self->startField); +sqlDyStringPrintf(query, " limit %d", maxItems); +bufferRowsFromSqlQuery(self, query->string, maxItems); +} + +static void asdDoQueryChunking(struct annoStreamDb *self, char *minChrom, uint minEnd) +/* Get rows from mysql with a limit on the number of rows returned at one time (ASD_CHUNK_SIZE), + * to avoid long delays for very large tables. This will be called multiple times if + * the number of rows in region is more than ASD_CHUNK_SIZE. If doing a genome-wide query, + * break it up into chrom-by-chrom queries because the code that merges large bin items + * in with small bin items assumes that all rows are on the same chrom. */ +{ +struct annoStreamer *sSelf = &(self->streamer); +boolean hasWhere = self->baselineQueryHasWhere; +struct dyString *query = dyStringCreate("%s", self->baselineQuery); +if (sSelf->chrom != NULL && self->rowBuf.size > 0 && !self->doNextChunk) + { + // We're doing a region query, we already got some rows, and don't need another chunk: + resetRowBuf(&self->rowBuf); + self->eof = TRUE; + } +if (self->useMaxOutRows) + { + self->maxOutRows -= self->rowBuf.size; + if (self->maxOutRows <= 0) + self->eof = TRUE; + } +if (self->eof) + return; +int queryMaxItems = ASD_CHUNK_SIZE; +if (self->useMaxOutRows && self->maxOutRows < queryMaxItems) + queryMaxItems = self->maxOutRows; +if (self->hasBin) + { + // Results will be in bin order, but we can restore chromStart order by + // accumulating initial coarse-bin items and merge-sorting them with + // subsequent finest-bin items which will be in chromStart order. + if (self->doNextChunk && self->mergeBins && !self->gotFinestBin) + errAbort("annoStreamDb %s: can't continue merge in chunking query; " + "increase ASD_CHUNK_SIZE", sSelf->name); + // Don't reset merge state here in case bigItemQueue has a large-bin item + // at the end of the chrom, past all smallest-bin items. + startMerging(self); + } +if (sSelf->chrom != NULL) + { + // Region query (but might end up as multiple chunked queries) + char *chrom = sSelf->chrom; + uint start = sSelf->regionStart; + uint end = sSelf->regionEnd; + if (minChrom) + { + if (differentString(minChrom, chrom)) + errAbort("annoStreamDb %s: nextRow minChrom='%s' but region chrom='%s'", + sSelf->name, minChrom, chrom); + if (start < minEnd) + start = minEnd; + } + if (self->doNextChunk && start < self->nextChunkStart) + start = self->nextChunkStart; + doOneChunkQuery(self, query, chrom, start, end, hasWhere, queryMaxItems); + if (self->rowBuf.size == 0) + self->eof = TRUE; + } +else + { + // Genome-wide query: break it into chrom-by-chrom queries (that might be chunked) + // because the mergeBins stuff assumes that all rows are from the same chrom. + updateQueryChrom(self, minChrom); + if (self->queryChrom == NULL) + self->eof = TRUE; + else + { + char *chrom = self->queryChrom->name; + int start = 0; + if (minChrom != NULL && sameString(chrom, minChrom)) + start = minEnd; + if (self->doNextChunk && start < self->nextChunkStart) + start = self->nextChunkStart; + uint end = annoAssemblySeqSize(self->streamer.assembly, chrom); + doOneChunkQuery(self, query, chrom, start, end, hasWhere, queryMaxItems); + // If there happens to be no items on chrom, try again with the next chrom: + if (! self->eof && self->rowBuf.size == 0) + asdDoQueryChunking(self, minChrom, minEnd); + } + } +dyStringFree(&query); +} + +static char **nextRowFromBuffer(struct annoStreamDb *self) +/* Instead of streaming directly from self->sr, we have buffered up the results + * of a chunked query; return the head of that queue. */ +{ +struct rowBuf *rowBuf = &self->rowBuf; +if (rowBuf->ix > rowBuf->size) + errAbort("annoStreamDb %s: rowBuf overflow (%d > %d)", self->streamer.name, + rowBuf->ix, rowBuf->size); +if (rowBuf->ix == rowBuf->size) + { + // Last row in buffer -- we'll need another query to get subsequent rows (if any). + // But first, see if we need to update gotFinestBin, since getFinestBin might be + // one of our callers. + if (rowBuf->size > 0) + { + char **lastRow = rowBuf->buf[rowBuf->size-1]; + int lastBin = atoi(lastRow[0]); + if (lastBin >= self->minFinestBin) + self->gotFinestBin = TRUE; + } + if (self->bigItemQueue == NULL && self->smallItemQueue == NULL) + self->needQuery = TRUE; + // Bounce back out -- asdNextRow or nextRowMergeBins will need to do another query. + return NULL; + } +if (rowBuf->size == 0) + return NULL; +else + return rowBuf->buf[rowBuf->ix++]; +} + +static char **nextRowUnfiltered(struct annoStreamDb *self) +/* Call self->nextRowRaw to get the next row from the sql query on the main table. + * Then, if applicable, add columns added from hashes of related tables. */ +{ +char **row = self->nextRowRaw(self); +if (self->joinMixer && self->joinMixer->hashJoins && row) + { + // Add columns from hashedJoins to row + struct hashJoin *hj; + for (hj = self->joinMixer->hashJoins; hj != NULL; hj = hashJoinNext(hj)) + hashJoinOneRow(hj, row); + } +return row; +} + +static char **nextRowFiltered(struct annoStreamDb *self, boolean *retRightFail, + char *minChrom, uint minEnd) +/* Skip past any left-join failures until we get a right-join failure, a passing row, + * or end of data. Return row or NULL, and return right-join fail status via retRightFail. */ +{ +int numCols = self->streamer.numCols; +char **row = nextRowUnfiltered(self); +if (minChrom != NULL && row != NULL) + { + // Ignore rows that fall completely before (minChrom, minEnd) - save annoGrator's time + int chromIx = self->omitBin+self->chromIx; + int endIx = self->omitBin+self->endIx; + int chromCmp; + while (row && + ((chromCmp = strcmp(row[chromIx], minChrom)) < 0 || // this chrom precedes minChrom + (chromCmp == 0 && atoll(row[endIx]) < minEnd))) // on minChrom, but before minEnd + row = nextRowUnfiltered(self); + } +boolean rightFail = FALSE; +struct annoFilter *filterList = self->streamer.filters; +while (row && annoFilterRowFails(filterList, row+self->omitBin, numCols, &rightFail)) + { + if (rightFail) + break; + row = nextRowUnfiltered(self); + } +*retRightFail = rightFail; +return row; +} + +static struct annoRow *rowToAnnoRow(struct annoStreamDb *self, char **row, boolean rightFail, + struct lm *lm) +/* Extract coords from row and return an annoRow including right-fail status. */ +{ +char **finalRow = row + self->omitBin; +uint numCols = self->streamer.numCols; +char *swizzleRow[numCols]; +if (self->joinMixer) + { + uint i; + for (i = 0; i < numCols; i++) + { + uint outIx = self->joinMixer->outIxs[i+self->omitBin]; + if (row[outIx] == NULL) + swizzleRow[i] = self->naForMissing ? "n/a" : ""; + else + swizzleRow[i] = row[outIx]; + } + finalRow = swizzleRow; + } +char *chrom = finalRow[self->chromIx]; +uint chromStart = sqlUnsigned(finalRow[self->startIx]); +uint chromEnd = sqlUnsigned(finalRow[self->endIx]); +return annoRowFromStringArray(chrom, chromStart, chromEnd, rightFail, finalRow, numCols, lm); +} + +static char **getFinestBinItem(struct annoStreamDb *self, char **row, boolean *pRightFail, + char *minChrom, uint minEnd) +/* If row is a coarse-bin item, add it to bigItemQueue, get the next row(s) and + * add any subsequent coarse-bin items to bigItemQueue. As soon as we get an item from a + * finest-level bin (or NULL), sort the bigItemQueue and return the finest-bin item/row. */ +{ +int bin = atoi(row[0]); +while (bin < self->minFinestBin) + { + // big item -- store aside in queue for merging later (unless it falls off the end of + // the current chunk), move on to next item + struct annoRow *aRow = rowToAnnoRow(self, row, *pRightFail, self->qLm); + if (! (self->doNextChunk && self->nextChunkStart <= aRow->start)) + slAddHead(&(self->bigItemQueue), aRow); + *pRightFail = FALSE; + row = nextRowFiltered(self, pRightFail, minChrom, minEnd); + if (row == NULL) + break; + bin = atoi(row[0]); + } +// First finest-bin item! Sort bigItemQueue in preparation for merging: +self->gotFinestBin = TRUE; +slReverse(&(self->bigItemQueue)); +slSort(&(self->bigItemQueue), annoRowCmp); +return row; +} + +static struct annoRow *mergeRow(struct annoStreamDb *self, struct annoRow *aRow, + struct lm *callerLm) +/* Compare head of bigItemQueue with (finest-bin) aRow; return the one with + * lower chromStart and save the other for later. */ +{ +struct annoRow *outRow = aRow; +if (self->bigItemQueue != NULL && annoRowCmp(&(self->bigItemQueue), &aRow) < 0) + { + // Big item gets to go now, so save aside small item for next time. + outRow = slPopHead(&(self->bigItemQueue)); + slAddHead(&(self->smallItemQueue), aRow); + } +// Clone outRow using callerLm +enum annoRowType rowType = self->streamer.rowType; +int numCols = self->streamer.numCols; +outRow = annoRowClone(outRow, rowType, numCols, callerLm); +if (self->bigItemQueue == NULL && self->smallItemQueue == NULL) + { + // No coarse-bin items to merge-sort, just stream finest-bin items from here on out. + // This needs to be done after cloning outRow because it was allocated in self->qLm. + resetMergeState(self); + } +return outRow; +} + +static struct annoRow *nextQueuedRow(struct annoStreamDb *self, struct lm *callerLm) +// Return the head of either bigItemQueue or smallItemQueue, depending on which has +// the lower chromStart. +{ +struct annoRow *row = NULL; +if (self->bigItemQueue && annoRowCmp(&(self->bigItemQueue), &(self->smallItemQueue)) < 0) + row = slPopHead(&(self->bigItemQueue)); +else + row = slPopHead(&(self->smallItemQueue)); +if (self->bigItemQueue == NULL && self->smallItemQueue == NULL) + // All done merge-sorting, just stream finest-bin items from here on out. + self->mergeBins = FALSE; +enum annoRowType rowType = self->streamer.rowType; +int numCols = self->streamer.numCols; +return annoRowClone(row, rowType, numCols, callerLm); +} + +static struct annoRow *nextRowMergeBins(struct annoStreamDb *self, char *minChrom, uint minEnd, + struct lm *callerLm) +/* Fetch the next filtered row from mysql, merge-sorting coarse-bin items into finest-bin + * items to maintain chromStart ordering. */ +{ +assert(self->mergeBins && self->hasBin); +if (self->smallItemQueue) + // In this case we have already begun merge-sorting; don't pull a new row from mysql, + // use the queues. This should keep smallItemQueue's max depth at 1. + return nextQueuedRow(self, callerLm); +else + { + // We might need to collect initial coarse-bin items, or might already be merge-sorting. + boolean rightFail = FALSE; + char **row = nextRowFiltered(self, &rightFail, minChrom, minEnd); + if (row && !self->gotFinestBin) + { + // We are just starting -- queue up coarse-bin items, if any, until we get the first + // finest-bin item. + row = getFinestBinItem(self, row, &rightFail, minChrom, minEnd); + } + // Time to merge-sort finest-bin items from mysql with coarse-bin items from queue. + if (row != NULL) + { + struct annoRow *aRow = rowToAnnoRow(self, row, rightFail, self->qLm); + return mergeRow(self, aRow, callerLm); + } + else + { + struct annoRow *qRow = slPopHead(&(self->bigItemQueue)); + enum annoRowType rowType = self->streamer.rowType; + int numCols = self->streamer.numCols; + return annoRowClone(qRow, rowType, numCols, callerLm); + } + } +} + +static struct annoRow *asdNextRow(struct annoStreamer *vSelf, char *minChrom, uint minEnd, + struct lm *callerLm) +/* Perform sql query if we haven't already and return a single + * annoRow, or NULL if there are no more items. */ +{ +struct annoStreamDb *self = (struct annoStreamDb *)vSelf; +if (self->needQuery) + self->doQuery(self, minChrom, minEnd); +if (self->mergeBins) + { + struct annoRow *aRow = nextRowMergeBins(self, minChrom, minEnd, callerLm); + if (aRow == NULL && self->needQuery && !self->eof) + // Recurse: query, then get next merged/filtered row: + return asdNextRow(vSelf, minChrom, minEnd, callerLm); + else + return aRow; + } +boolean rightFail = FALSE; +char **row = nextRowFiltered(self, &rightFail, minChrom, minEnd); +if (row == NULL) + { + if (self->needQuery && !self->eof) + // Recurse: query, then get next merged/filtered row: + return asdNextRow(vSelf, minChrom, minEnd, callerLm); + else + return NULL; + } +return rowToAnnoRow(self, row, rightFail, callerLm); +} + + +static void makeMainTableDtfList(struct annoStreamDb *self, struct asObject *mainAsObj) +/* Make a list of mainTable columns. */ +{ +struct joinerDtf mainDtf; +mainDtf.database = self->db; +mainDtf.table = self->trackTable; +struct asColumn *col; +for (col = mainAsObj->columnList; col != NULL; col = col->next) + { + mainDtf.field = col->name; + slAddHead(&self->mainTableDtfList, joinerDtfClone(&mainDtf)); + } +slReverse(&self->mainTableDtfList); +// If table has bin but asObj does not, add bin to head of mainTableDtfList. +if (self->hasBin && differentString("bin", self->mainTableDtfList->field)) + { + mainDtf.field = "bin"; + slAddHead(&self->mainTableDtfList, joinerDtfClone(&mainDtf)); + } +} + +static struct asObject *asObjForDtf(struct hash *hash, struct joinerDtf *dtf) +/* Get asObj for dtf, either from hash if we've seen it before, or make one. */ +{ +struct asObject *asObj = NULL; +char dbTable[PATH_LEN]; +joinerDtfToSqlTableString(dtf, NULL, dbTable, sizeof(dbTable)); +struct hashEl *hel = hashLookup(hash, dbTable); +if (hel == NULL) + { + asObj = hAnnoGetAutoSqlForDbTable(dtf->database, dtf->table, NULL, TRUE); + if (asObj == NULL) + errAbort("annoStreamDb: No autoSql for %s.%s.%s", + dtf->database, dtf->table, dtf->field); + hel = hashAdd(hash, dbTable, asObj); + } +else + asObj = hel->val; +return asObj; +} + +static void makeDottedTriple(char *dtfString, size_t dtfStringSize, + char *db, char *table, char *field) +/* In case we don't have a struct joinerDtf for a field that we want to look up, + * but we do have the db, table and field name, concat with dots into dtfString. + * Unlike joinerDtfToSqlFieldString, don't bother checking whether db is the main db. */ +{ +safef(dtfString, dtfStringSize, "%s.%s.%s", db, table, field); +} + +char *annoStreamDbColumnNameFromDtf(char *db, char *mainTable, struct joinerDtf *dtf) +/* Return a string with the autoSql column name that would be assigned according to dtf's + * db, table and field. */ +{ +char colName[PATH_LEN*2]; +if (differentString(dtf->table, mainTable) || differentString(dtf->database, db)) + { + joinerDtfToSqlFieldString(dtf, db, colName, sizeof(colName)); + // asParse rejects names that have '.' in them, which makes sense because it's for SQL, + // so replace the '.'s with '_'s. + subChar(colName, '.', '_'); + } +else + safecpy(colName, sizeof(colName), dtf->field); +return cloneString(colName); +} + +static void addOneColumn(struct dyString *dy, struct joinerDtf *dtf, char *db, char *mainTable, + struct asColumn *col, struct hash *dtfNames) +/* Append an autoSql text line describing col to dy. + * If col is an array whose size is some other column that has not yet been added, + * coerce its type to string to avoid asParseText errAbort. */ +{ +// First see if this col depends on a linked size column that hasn't been added yet. +boolean sizeColIsMissing = FALSE; +if (col->isArray && !col->fixedSize && isNotEmpty(col->linkedSizeName)) + { + // col's size comes from another column -- has that column already been added? + char linkedDtfString[PATH_LEN]; + makeDottedTriple(linkedDtfString, sizeof(linkedDtfString), + dtf->database, dtf->table, col->linkedSizeName); + if (!hashLookup(dtfNames, linkedDtfString)) + sizeColIsMissing = TRUE; + } +if (col->isArray && sizeColIsMissing) + { + // The size column is missing, so this can't be a valid array in autoSql -- + // ignore col->lowType and call it a (comma-separated) string. + dyStringAppend(dy, " lstring"); + } +else + { + dyStringPrintf(dy, " %s", col->lowType->name); + if (col->isArray) + { + dyStringAppendC(dy, '['); + if (col->fixedSize) + dyStringPrintf(dy, "%d", col->fixedSize); + else + dyStringAppend(dy, col->linkedSizeName); + dyStringAppendC(dy, ']'); + } + } +char *colName = annoStreamDbColumnNameFromDtf(db, mainTable, dtf); +dyStringPrintf(dy, " %s; \"%s\"\n", colName, col->comment); +// Store plain old dotted triple in dtfNames in case we need to look it up later. +char dtfString[PATH_LEN]; +makeDottedTriple(dtfString, sizeof(dtfString), dtf->database, dtf->table, dtf->field); +hashAdd(dtfNames, dtfString, NULL); +} + +static struct asObject *asdAutoSqlFromTableFields(struct annoStreamDb *self, + struct asObject *mainAsObj) +/* Get autoSql for each table in self->relatedDtfList and append the columns + * included in self->relatedDtfList to the main table asObj columns. */ +{ +struct dyString *newAsText = dyStringCreate("table %sCustom\n" + "\"query based on %s with customized fields.\"\n" + " (", + self->trackTable, self->trackTable); +// Use a hash of table to asObject so we fetch autoSql only once per table. +struct hash *asObjCache = hashNew(0); +// Use a hash of dtf strings to test whether or not one has been added already. +struct hash *dtfNames = hashNew(0); +// Start with all columns of main table: +struct joinerDtf mainDtf; +mainDtf.database = self->db; +mainDtf.table = self->trackTable; +struct asColumn *col; +for (col = mainAsObj->columnList; col != NULL; col = col->next) + { + mainDtf.field = col->name; + addOneColumn(newAsText, &mainDtf, self->db, self->trackTable, col, dtfNames); + } +// Append fields from related tables: +struct joinerDtf *dtf; +for (dtf = self->relatedDtfList; dtf != NULL; dtf = dtf->next) + { + struct asObject *asObj = asObjForDtf(asObjCache, dtf); + struct asColumn *col = asColumnFind(asObj, dtf->field); + if (col == NULL) + errAbort("annoStreamDb: Can't find column %s in autoSql for table %s.%s", + dtf->field, dtf->database, dtf->table); + addOneColumn(newAsText, dtf, self->db, self->trackTable, col, dtfNames); + } +dyStringAppendC(newAsText, ')'); +struct asObject *newAsObj = asParseText(newAsText->string); +hashFreeWithVals(&asObjCache, asObjectFree); +dyStringFree(&newAsText); +freeHashAndVals(&dtfNames); +return newAsObj; +} + +static void asdClose(struct annoStreamer **pVSelf) +/* Close db connection and free self. */ +{ +if (pVSelf == NULL) + return; +struct annoStreamDb *self = *(struct annoStreamDb **)pVSelf; +lmCleanup(&(self->qLm)); +freeMem(self->trackTable); +freeMem(self->table); +slNameFreeList(&self->chromList); +joinerDtfFreeList(&self->mainTableDtfList); +joinerDtfFreeList(&self->relatedDtfList); +joinerFree(&self->joiner); +joinMixerFree(&self->joinMixer); +sqlFreeResult(&(self->sr)); +hFreeConn(&(self->conn)); +annoStreamerFree(pVSelf); +} + +static boolean asdInitBed3Fields(struct annoStreamDb *self) +/* Use autoSql to figure out which table fields correspond to {chrom, chromStart, chromEnd}. */ +{ +struct annoStreamer *vSelf = &(self->streamer); +return annoStreamerFindBed3Columns(vSelf, &(self->chromIx), &(self->startIx), &(self->endIx), + &(self->chromField), &(self->startField), &(self->endField)); +} + +char *sqlTableIndexOnFieldANotB(struct sqlConnection *conn, char *table, char *fieldA, char *fieldB) +/* If table has an index that includes fieldA but not fieldB, return the index name, else NULL. */ +{ +char *indexNameA = NULL, *indexNameB = NULL; +char query[512]; +sqlSafef(query, sizeof(query), "show index from %s", table); +struct sqlResult *sr = sqlGetResult(conn, query); +char **row; +while ((row = sqlNextRow(sr)) != NULL) + { + if (sameString(row[4], fieldA)) + indexNameA = cloneString(row[2]); + else if (sameString(row[4], fieldB)) + indexNameB = cloneString(row[2]); + } +if (sameOk(indexNameA, indexNameB)) + indexNameA = NULL; +sqlFreeResult(&sr); +return indexNameA; +} + +static boolean isIncrementallyUpdated(char *table) +// Tables that have rows added to them after initial creation are not completely sorted +// because of new rows at end, so we have to 'order by'. +{ +return (sameString(table, "refGene") || sameString(table, "refFlat") || + sameString(table, "xenoRefGene") || sameString(table, "xenoRefFlat") || + sameString(table, "all_mrna") || sameString(table, "xenoMrna") || + sameString(table, "all_est") || sameString(table, "xenoEst") || + sameString(table, "intronEst") || + sameString(table, "refSeqAli") || sameString(table, "xenoRefSeqAli")); +} + +static boolean isPubsTable(char *table) +// Not absolutely every pubs* table is unsorted, but most of them are. +{ +return startsWith("pubs", table); +} + +static struct asObject *asdParseConfig(struct annoStreamDb *self, struct jsonElement *configEl) +/* Extract the autoSql for self->trackTable from the database. + * If configEl is not NULL, expect it to be a description of related tables and fields like this: + * config = { "relatedTables": [ { "table": "hg19.kgXref", + * "fields": ["geneSymbol", "description"] }, + * { "table": "hg19.knownCanonical", + * "fields": ["clusterId"] } + * ] } + * If so, unpack the [db.]tables and fields into self->relatedDtfList and append autoSql + * column descriptions for each field to the autoSql object that describes our output. + * It might also have "naForMissing": true/false; if so, set self->naForMissing. */ +{ +struct asObject *asObj = hAnnoGetAutoSqlForDbTable(self->db, self->trackTable, NULL, TRUE); +makeMainTableDtfList(self, asObj); +if (configEl != NULL) + { + struct hash *config = jsonObjectVal(configEl, "config"); + struct jsonElement *relatedTablesEl = hashFindVal(config, "relatedTables"); + if (relatedTablesEl) + { + // relatedTables is a list of objects like { table: <[db.]table name>, + // fields: [ <field1>, <field2>, ...] } + struct slRef *relatedTables = jsonListVal(relatedTablesEl, "relatedTables"); + struct slRef *tfRef; + for (tfRef = relatedTables; tfRef != NULL; tfRef = tfRef->next) + { + struct jsonElement *dbTableFieldEl = tfRef->val; + struct hash *tfObj = jsonObjectVal(dbTableFieldEl, + "{table,fields} object in relatedTables"); + struct jsonElement *dbTableEl = hashMustFindVal(tfObj, "table"); + char *dbTable = jsonStringVal(dbTableEl, "[db.]table in relatedTables"); + char tfDb[PATH_LEN], tfTable[PATH_LEN]; + hParseDbDotTable(self->db, dbTable, tfDb, sizeof(tfDb), tfTable, sizeof(tfTable)); + if (isEmpty(tfDb)) + safecpy(tfDb, sizeof(tfDb), self->db); + if (hTableExists(tfDb, tfTable)) + { + struct jsonElement *fieldListEl = hashMustFindVal(tfObj, "fields"); + struct slRef *fieldList = jsonListVal(fieldListEl, "fieldList"); + struct slRef *fieldRef; + for (fieldRef = fieldList; fieldRef != NULL; fieldRef = fieldRef->next) + { + struct jsonElement *fieldEl = fieldRef->val; + char *tfField = jsonStringVal(fieldEl, "field"); + slAddHead(&self->relatedDtfList, joinerDtfNew(tfDb, tfTable, tfField)); + } + } + } + if (self->relatedDtfList) + { + slReverse(&self->relatedDtfList); + asObj = asdAutoSqlFromTableFields(self, asObj); + } + } + struct jsonElement *naForMissingEl = hashFindVal(config, "naForMissing"); + if (naForMissingEl != NULL) + self->naForMissing = jsonBooleanVal(naForMissingEl, "naForMissing"); + } +return asObj; +} + +static char *sqlExplain(struct sqlConnection *conn, char *query) +/* For now, just turn the values back into a multi-line "#"-comment string. */ +{ +char *trimmedQuery = query; +if (startsWith(NOSQLINJ, trimmedQuery)) + trimmedQuery = trimmedQuery + strlen(NOSQLINJ); +struct dyString *dy = dyStringCreate("# Output of 'explain %s':\n", trimmedQuery); +char explainQuery[PATH_LEN*8]; +safef(explainQuery, sizeof(explainQuery), NOSQLINJ "explain %s", trimmedQuery); +struct sqlResult *sr = sqlGetResult(conn, explainQuery); +struct slName *fieldList = sqlResultFieldList(sr); +int nColumns = slCount(fieldList); +// Header: +dyStringPrintf(dy, "# %s\n", slNameListToString(fieldList, '\t')); +char **row; +while ((row = sqlNextRow(sr)) != NULL) + { + dyStringAppend(dy, "# "); + int i; + for (i = 0; i < nColumns; i++) + { + if (i > 0) + dyStringAppend(dy, "\t"); + if (row[i] == NULL) + dyStringAppend(dy, "NULL"); + else + dyStringAppend(dy, row[i]); + } + dyStringAppendC(dy, '\n'); + } +return dyStringCannibalize(&dy); +} + +static char *asdGetHeader(struct annoStreamer *sSelf) +/* Return header with debug info. */ +{ +struct annoStreamDb *self = (struct annoStreamDb *)sSelf; +// Add a fake constraint on chromField because a real one is added to baselineQuery. +char queryWithChrom[PATH_LEN*4]; +safef(queryWithChrom, sizeof(queryWithChrom), "%s %s %s.%s = 'someValue'", self->baselineQuery, + (strstr(self->baselineQuery, "where") ? "and" : "where"), self->table, self->chromField); +char *explanation = sqlExplain(self->conn, queryWithChrom); +return explanation; +} + +struct annoStreamer *annoStreamDbNew(char *db, char *table, struct annoAssembly *aa, + int maxOutRows, struct jsonElement *configEl) +/* Create an annoStreamer (subclass) object from a database table. + * If config is NULL, then the streamer produces output from all fields + * (except bin, unless table's autoSql includes bin). + * Otherwise, config is a json object with a member 'relatedTables' that specifies + * related tables and fields to join with table, for example: + * config = { "relatedTables": [ { "table": "hg19.kgXref", + * "fields": ["geneSymbol", "description"] }, + * { "table": "hg19.knownCanonical", + * "fields": ["clusterId"] } + * ] } + * -- the streamer's autoSql will be constructed by appending autoSql column + * descriptions to the columns of table. + * Caller may free db, and table when done with them, but must keep the + * annoAssembly aa alive for the lifetime of the returned annoStreamer. */ +{ +struct sqlConnection *conn = hAllocConn(db); +char splitTable[HDB_MAX_TABLE_STRING]; +if (!hFindSplitTable(db, NULL, table, splitTable, sizeof splitTable, NULL)) + errAbort("annoStreamDbNew: can't find table (or split table) for '%s.%s'", db, table); +struct annoStreamDb *self = NULL; +AllocVar(self); +self->conn = conn; +self->db = cloneString(db); +self->trackTable = cloneString(table); +self->table = cloneString(splitTable); +if (sqlFieldIndex(self->conn, self->table, "bin") == 0) + { + self->hasBin = 1; + self->minFinestBin = binFromRange(0, 1); + } +struct asObject *asObj = asdParseConfig(self, configEl); +struct annoStreamer *streamer = &(self->streamer); +int dbtLen = strlen(db) + strlen(table) + 2; +char streamerName[dbtLen]; +safef(streamerName, sizeof(streamerName), "%s.%s", db, table); +annoStreamerInit(streamer, aa, asObj, streamerName); +streamer->rowType = arWords; +streamer->setRegion = asdSetRegion; +streamer->nextRow = asdNextRow; +streamer->close = asdClose; +char *asFirstColumnName = streamer->asObj->columnList->name; +if (self->hasBin && !sameString(asFirstColumnName, "bin")) + self->omitBin = 1; +if (!asdInitBed3Fields(self)) + errAbort("annoStreamDbNew: can't figure out which fields of %s.%s to use as " + "{chrom, chromStart, chromEnd}.", db, self->table); +// When a table has an index on endField (not startField), sometimes the query optimizer uses it +// and that ruins the sorting. Fortunately most tables don't anymore. +self->endFieldIndexName = sqlTableIndexOnFieldANotB(self->conn, self->table, self->endField, + self->startField); +self->notSorted = FALSE; +// Special case: genbank-updated tables are not sorted because new mappings are +// tacked on at the end. Max didn't sort the pubs* tables but I hope he will +// sort the tables for any future tracks. :) +if (isIncrementallyUpdated(table) || isPubsTable(table)) + self->notSorted = TRUE; +self->mergeBins = FALSE; +self->maxOutRows = maxOutRows; +self->useMaxOutRows = (maxOutRows > 0); +self->needQuery = TRUE; +self->chromList = annoAssemblySeqNames(aa); +if (slCount(self->chromList) > 1000) + { + // Assembly has many sequences (e.g. scaffold-based assembly) -- + // don't break up into per-sequence queries. Take our chances + // with mysql being unhappy about the sqlResult being open too long. + self->doQuery = asdDoQuerySimple; + self->nextRowRaw = nextRowFromSqlResult; + } +else + { + // All-chromosome assembly -- if table is large, perform a series of + // chunked queries. + self->doQuery = asdDoQueryChunking; + self->nextRowRaw = nextRowFromBuffer; + } +asdInitBaselineQuery(self); +asdUpdateBaselineQuery(self); +struct annoStreamer *sSelf = (struct annoStreamer *)self; +if (asdDebug) + sSelf->getHeader = asdGetHeader; +return sSelf; +}