1c6cebb2113b22e71b73fb51d43bc31a06c90046
angie
Mon Aug 31 12:47:59 2015 -0700
Discard large-bin items that fall on or after nextChunkStart because they
will break the (chrom, chromStart) sorting order if they are also to the
right of small-bin items on or after nextChunkStart -- and they will appear
again in the next chunk of rows.
Also a couple other fixes for problems found while debugging:
- there was a spot where I forgot to restart merging after calling
resetMergeState (when moving on to the next chrom in a genome-wide query)
- in mergeRow, don't resetMergeState until both big and small queues are empty
refs #15953
diff --git src/hg/lib/annoStreamDb.c src/hg/lib/annoStreamDb.c
index 7318f18..3e55037 100644
--- src/hg/lib/annoStreamDb.c
+++ src/hg/lib/annoStreamDb.c
@@ -1,718 +1,731 @@
/* annoStreamDb -- subclass of annoStreamer for database tables */
/* Copyright (C) 2014 The Regents of the University of California
* See README in this or parent directory for licensing information. */
#include "annoStreamDb.h"
#include "annoGratorQuery.h"
#include "binRange.h"
#include "hdb.h"
#include "sqlNum.h"
struct annoStreamDb
{
struct annoStreamer streamer; // Parent class members & methods
// Private members
struct sqlConnection *conn; // Database connection (e.g. hg19 or customTrash)
struct sqlResult *sr; // SQL query result from which we grab rows
char *table; // Table name, must exist in database
struct dyString *(*makeBaselineQuery)(struct annoStreamDb *self, boolean *retHasWhere);
/* Provide baseline query, by default just 'select * from
'.
* Override this to make a query with specific fields, joins etc.
* If the returned query includes a join/where, set *retHasWhere to TRUE. */
// These members enable us to extract coords from the otherwise unknown row:
char *chromField; // Name of chrom-ish column in table
char *startField; // Name of chromStart-ish column in table
char *endField; // Name of chromEnd-ish column in table
int chromIx; // Index of chrom-ish col in autoSql or bin-less table
int startIx; // Index of chromStart-ish col in autoSql or bin-less table
int endIx; // Index of chromEnd-ish col in autoSql or bin-less table
// These members enable us to produce {chrom, start}-sorted output:
char *endFieldIndexName; // SQL index on end field, if any (can mess up sorting)
boolean notSorted; // TRUE if table is not sorted (e.g. genbank-updated)
boolean hasBin; // 1 if SQL table's first column is bin
boolean omitBin; // 1 if table hasBin and autoSql doesn't have bin
boolean mergeBins; // TRUE if query results will be in bin order
struct annoRow *bigItemQueue; // If mergeBins, accumulate coarse-bin items here
struct annoRow *smallItemQueue; // Max 1 item for merge-sorting with bigItemQueue
struct lm *qLm; // localmem for merge-sorting queues
int minFinestBin; // Smallest bin number for finest bin level
boolean gotFinestBin; // Flag that it's time to merge-sort with bigItemQueue
// Limit (or not) number of rows processed:
boolean useMaxOutRows; // TRUE if maxOutRows passed to annoStreamDbNew is > 0
int maxOutRows; // Maximum number of rows we can output.
// Process large tables in manageable chunks:
struct slName *chromList; // list of chromosomes/sequences in assembly
struct slName *queryChrom; // most recently queried chrom for whole-genome (or NULL)
boolean eof; // TRUE when we are done (maxItems or no more items)
boolean needQuery; // TRUE when we haven't yet queried, or need to query again
boolean doNextChunk; // TRUE if rowBuf ends before end of chrom/region
uint nextChunkStart; // Start coord for next chunk of rows to query
struct rowBuf
// Temporary storage for rows from chunked query
{
struct lm *lm; // storage for rows
char ***buf; // array of pointers to rows
int size; // number of rows
int ix; // offset in buffer, [0..size]
} rowBuf;
char **(*nextRowRaw)(struct annoStreamDb *self);
// Depending on query style, use either sqlNextRow or temporary row storage to get next row.
// This may return NULL but set self->needQuery; asdNextRow watches for that.
void (*doQuery)(struct annoStreamDb *self, char *minChrom, uint minEnd);
// Depending on query style, perform either a single query or (series of) chunked query
};
// For performance reasons, even if !useMaxItems (no limit), we need to limit the
// number of rows that are returned from a query, so we can slurp them into memory and
// close the sqlResult before mysql gets unhappy about the result being open so long.
#define ASD_CHUNK_SIZE 100000
static void resetMergeState(struct annoStreamDb *self)
/* Reset or initialize members that track merging of coarse-bin items with fine-bin items. */
{
self->mergeBins = FALSE;
self->bigItemQueue = self->smallItemQueue = NULL;
lmCleanup(&(self->qLm));
self->gotFinestBin = FALSE;
}
static void resetRowBuf(struct rowBuf *rowBuf)
/* Reset temporary storage for chunked query rows. */
{
rowBuf->buf = NULL;
rowBuf->size = 0;
rowBuf->ix = 0;
lmCleanup(&(rowBuf->lm));
}
static void resetChunkState(struct annoStreamDb *self)
/* Reset members that track chunked queries. */
{
self->queryChrom = NULL;
self->eof = FALSE;
self->doNextChunk = FALSE;
self->needQuery = TRUE;
resetRowBuf(&self->rowBuf);
}
+static void startMerging(struct annoStreamDb *self)
+/* Set self->mergeBins flag and create self->qLm if necessary. */
+{
+self->mergeBins = TRUE;
+if (self->qLm == NULL)
+ self->qLm = lmInit(0);
+}
+
static void asdSetRegion(struct annoStreamer *vSelf, char *chrom, uint regionStart, uint regionEnd)
/* Set region -- and free current sqlResult if there is one. */
{
annoStreamerSetRegion(vSelf, chrom, regionStart, regionEnd);
struct annoStreamDb *self = (struct annoStreamDb *)vSelf;
sqlFreeResult(&(self->sr));
resetMergeState(self);
resetChunkState(self);
}
static char **nextRowFromSqlResult(struct annoStreamDb *self)
/* Stream rows directly from self->sr. */
{
return sqlNextRow(self->sr);
}
static struct dyString *asdMakeBaselineQuery(struct annoStreamDb *self, boolean *retHasWhere)
/* Return a baseline query, i.e. "select * from ". This is the default implementation
* of annoStreamDb.makeBaselineQuery. */
{
if (retHasWhere)
*retHasWhere = FALSE;
return sqlDyStringCreate("select * from %s ", self->table);
}
static void asdDoQuerySimple(struct annoStreamDb *self, char *minChrom, uint minEnd)
/* Return a sqlResult for a query on table items in position range.
* If doing a whole genome query. just select all rows from table. */
// NOTE: it would be possible to implement filters at this level, as in hgTables.
{
struct annoStreamer *streamer = &(self->streamer);
boolean hasWhere = FALSE;
struct dyString *query = self->makeBaselineQuery(self, &hasWhere);
if (!streamer->positionIsGenome)
{
if (minChrom && differentString(minChrom, streamer->chrom))
errAbort("annoStreamDb %s: nextRow minChrom='%s' but region chrom='%s'",
streamer->name, minChrom, streamer->chrom);
if (self->hasBin)
{
// Results will be in bin order, but we can restore chromStart order by
// accumulating initial coarse-bin items and merge-sorting them with
// subsequent finest-bin items which will be in chromStart order.
resetMergeState(self);
- self->mergeBins = TRUE;
- self->qLm = lmInit(0);
+ startMerging(self);
}
if (self->endFieldIndexName != NULL)
// Don't let mysql use a (chrom, chromEnd) index because that messes up
// sorting by chromStart.
sqlDyStringPrintf(query, " IGNORE INDEX (%s)", self->endFieldIndexName);
sqlDyStringAppend(query, hasWhere ? " and " : " where ");
sqlDyStringPrintf(query, "%s='%s'", self->chromField, streamer->chrom);
int chromSize = annoAssemblySeqSize(streamer->assembly, streamer->chrom);
if (streamer->regionStart != 0 || streamer->regionEnd != chromSize)
{
dyStringAppend(query, " and ");
if (self->hasBin)
hAddBinToQuery(streamer->regionStart, streamer->regionEnd, query);
sqlDyStringPrintf(query, "%s < %u and %s > %u", self->startField, streamer->regionEnd,
self->endField, streamer->regionStart);
}
if (self->notSorted)
sqlDyStringPrintf(query, " order by %s", self->startField);
}
else if (self->notSorted)
sqlDyStringPrintf(query, " order by %s,%s", self->chromField, self->startField);
if (self->maxOutRows > 0)
dyStringPrintf(query, " limit %d", self->maxOutRows);
struct sqlResult *sr = sqlGetResult(self->conn, query->string);
dyStringFree(&query);
self->sr = sr;
self->needQuery = FALSE;
}
static void rowBufInit(struct rowBuf *rowBuf, int size)
/* Clean up rowBuf and give it a new lm and buffer[size]. */
{
resetRowBuf(rowBuf);
rowBuf->lm = lmInit(0);
rowBuf->size = size;
lmAllocArray(rowBuf->lm, rowBuf->buf, size);
}
static char **lmCloneRow(struct lm *lm, char **row, int colCount)
/* Use lm to allocate an array of strings and its contents copied from row. */
{
char **cloneRow = NULL;
lmAllocArray(lm, cloneRow, colCount);
int i;
for (i = 0; i < colCount; i++)
cloneRow[i] = lmCloneString(lm, row[i]);
return cloneRow;
}
static void updateNextChunkState(struct annoStreamDb *self, int queryMaxItems)
/* If the just-fetched interval list was limited to ASD_CHUNK_SIZE, set doNextChunk
* and trim the last row(s) so that when we query the next chunk, we don't get
* repeat rows due to querying a start coord that was already returned. */
{
struct rowBuf *rowBuf = &self->rowBuf;
if (queryMaxItems == ASD_CHUNK_SIZE && rowBuf->size == ASD_CHUNK_SIZE)
{
self->doNextChunk = TRUE;
// Starting at the last row in rowBuf, work back to find a value with a different start.
int ix = rowBuf->size - 1;
char **words = rowBuf->buf[ix];
int startIx = self->startIx + self->omitBin;
uint lastStart = atoll(words[startIx]);
for (ix = rowBuf->size - 2; ix >= 0; ix--)
{
words = rowBuf->buf[ix];
uint thisStart = atoll(words[startIx]);
if (thisStart != lastStart)
{
rowBuf->size = ix+1;
self->nextChunkStart = lastStart;
break;
}
}
}
else
self->doNextChunk = FALSE;
self->needQuery = FALSE;
}
static void bufferRowsFromSqlQuery(struct annoStreamDb *self, char *query, int queryMaxItems)
/* Store all rows from query in rowBuf. */
{
struct sqlResult *sr = sqlGetResult(self->conn, query);
struct rowBuf *rowBuf = &(self->rowBuf);
rowBufInit(rowBuf, ASD_CHUNK_SIZE);
struct annoStreamer *sSelf = &(self->streamer);
char **row = NULL;
int ix = 0;
while ((row = sqlNextRow(sr)) != NULL)
{
if (ix >= rowBuf->size)
errAbort("annoStreamDb %s: rowBuf overflow, got more than %d rows",
sSelf->name, rowBuf->size);
rowBuf->buf[ix++] = lmCloneRow(rowBuf->lm, row, sSelf->numCols+self->omitBin);
}
// Set rowBuf->size to the number of rows we actually stored.
rowBuf->size = ix;
sqlFreeResult(&sr);
updateNextChunkState(self, queryMaxItems);
}
static void asdDoQueryChunking(struct annoStreamDb *self, char *minChrom, uint minEnd)
/* Return a sqlResult for a query on table items in position range.
* If doing a whole genome query, just select all rows from table. */
{
struct annoStreamer *sSelf = &(self->streamer);
boolean hasWhere = FALSE;
struct dyString *query = self->makeBaselineQuery(self, &hasWhere);
if (sSelf->chrom != NULL && self->rowBuf.size > 0 && !self->doNextChunk)
{
// We're doing a region query, we already got some rows, and don't need another chunk:
resetRowBuf(&self->rowBuf);
self->eof = TRUE;
}
if (self->useMaxOutRows)
{
self->maxOutRows -= self->rowBuf.size;
if (self->maxOutRows <= 0)
self->eof = TRUE;
}
if (self->eof)
return;
int queryMaxItems = ASD_CHUNK_SIZE;
if (self->useMaxOutRows && self->maxOutRows < queryMaxItems)
queryMaxItems = self->maxOutRows;
if (self->hasBin)
{
// Results will be in bin order, but we can restore chromStart order by
// accumulating initial coarse-bin items and merge-sorting them with
// subsequent finest-bin items which will be in chromStart order.
if (self->doNextChunk && self->mergeBins && !self->gotFinestBin)
errAbort("annoStreamDb %s: can't continue merge in chunking query; "
"increase ASD_CHUNK_SIZE", sSelf->name);
- self->mergeBins = TRUE;
- if (self->qLm == NULL)
- self->qLm = lmInit(0);
+ startMerging(self);
}
if (self->endFieldIndexName != NULL)
// Don't let mysql use a (chrom, chromEnd) index because that messes up
// sorting by chromStart.
sqlDyStringPrintf(query, " IGNORE INDEX (%s) ", self->endFieldIndexName);
if (sSelf->chrom != NULL)
{
uint start = sSelf->regionStart;
if (minChrom)
{
if (differentString(minChrom, sSelf->chrom))
errAbort("annoStreamDb %s: nextRow minChrom='%s' but region chrom='%s'",
sSelf->name, minChrom, sSelf->chrom);
if (start < minEnd)
start = minEnd;
}
if (self->doNextChunk && start < self->nextChunkStart)
start = self->nextChunkStart;
sqlDyStringAppend(query, hasWhere ? " and " : " where ");
sqlDyStringPrintf(query, "%s = '%s' and ", self->chromField, sSelf->chrom);
if (self->hasBin)
{
if (self->doNextChunk && self->gotFinestBin)
// It would be way more elegant to make a hAddBinTopLevelOnly but this will do:
dyStringPrintf(query, "bin > %d and ", self->minFinestBin);
hAddBinToQuery(start, sSelf->regionEnd, query);
}
if (self->doNextChunk)
sqlDyStringPrintf(query, "%s >= %u and ", self->startField, self->nextChunkStart);
sqlDyStringPrintf(query, "%s < %u and %s > %u ", self->startField, sSelf->regionEnd,
self->endField, start);
if (self->notSorted)
sqlDyStringPrintf(query, "order by %s ", self->startField);
sqlDyStringPrintf(query, "limit %d", queryMaxItems);
bufferRowsFromSqlQuery(self, query->string, queryMaxItems);
if (self->rowBuf.size == 0)
self->eof = TRUE;
}
else
{
// Genome-wide query: break it into chrom-by-chrom queries.
if (self->queryChrom == NULL)
self->queryChrom = self->chromList;
else if (!self->doNextChunk)
{
self->queryChrom = self->queryChrom->next;
+ if (self->hasBin)
+ {
resetMergeState(self);
+ startMerging(self);
+ }
}
if (minChrom != NULL)
{
// Skip chroms that precede minChrom
while (self->queryChrom != NULL && strcmp(self->queryChrom->name, minChrom) < 0)
{
self->queryChrom = self->queryChrom->next;
self->doNextChunk = FALSE;
- resetMergeState(self);
}
if (self->hasBin)
{
- self->mergeBins = TRUE;
- if (self->qLm == NULL)
- self->qLm = lmInit(0);
+ resetMergeState(self);
+ startMerging(self);
}
}
if (self->queryChrom == NULL)
self->eof = TRUE;
else
{
char *chrom = self->queryChrom->name;
int start = 0;
if (minChrom != NULL && sameString(chrom, minChrom))
start = minEnd;
if (self->doNextChunk && start < self->nextChunkStart)
start = self->nextChunkStart;
uint end = annoAssemblySeqSize(self->streamer.assembly, self->queryChrom->name);
sqlDyStringAppend(query, hasWhere ? " and " : " where ");
sqlDyStringPrintf(query, "%s = '%s' ", self->chromField, chrom);
if (start > 0 || self->doNextChunk)
{
dyStringAppend(query, "and ");
if (self->hasBin)
{
if (self->doNextChunk && self->gotFinestBin)
// It would be way more elegant to make a hAddBinTopLevelOnly but this will do:
dyStringPrintf(query, "bin > %d and ", self->minFinestBin);
hAddBinToQuery(start, end, query);
}
if (self->doNextChunk)
sqlDyStringPrintf(query, "%s >= %u and ", self->startField, self->nextChunkStart);
// region end is chromSize, so no need to constrain startField here:
sqlDyStringPrintf(query, "%s > %u ", self->endField, start);
}
if (self->notSorted)
sqlDyStringPrintf(query, "order by %s ", self->startField);
dyStringPrintf(query, "limit %d", queryMaxItems);
bufferRowsFromSqlQuery(self, query->string, queryMaxItems);
// If there happens to be no items on chrom, try again with the next chrom:
if (! self->eof && self->rowBuf.size == 0)
asdDoQueryChunking(self, minChrom, minEnd);
}
}
dyStringFree(&query);
}
static char **nextRowFromBuffer(struct annoStreamDb *self)
/* Instead of streaming directly from self->sr, we have buffered up the results
* of a chunked query; return the head of that queue. */
{
struct rowBuf *rowBuf = &self->rowBuf;
if (rowBuf->ix > rowBuf->size)
errAbort("annoStreamDb %s: rowBuf overflow (%d > %d)", self->streamer.name,
rowBuf->ix, rowBuf->size);
if (rowBuf->ix == rowBuf->size)
{
// Last row in buffer -- we'll need another query to get subsequent rows (if any).
// But first, see if we need to update gotFinestBin, since getFinestBin might be
// one of our callers.
if (rowBuf->size > 0)
{
char **lastRow = rowBuf->buf[rowBuf->size-1];
int lastBin = atoi(lastRow[0]);
if (lastBin >= self->minFinestBin)
self->gotFinestBin = TRUE;
}
if (self->bigItemQueue == NULL && self->smallItemQueue == NULL)
self->needQuery = TRUE;
// Bounce back out -- asdNextRow or nextRowMergeBins will need to do another query.
return NULL;
}
if (rowBuf->size == 0)
return NULL;
else
return rowBuf->buf[rowBuf->ix++];
}
static char **nextRowFiltered(struct annoStreamDb *self, boolean *retRightFail,
char *minChrom, uint minEnd)
/* Skip past any left-join failures until we get a right-join failure, a passing row,
* or end of data. Return row or NULL, and return right-join fail status via retRightFail. */
{
int numCols = self->streamer.numCols;
char **row = self->nextRowRaw(self);
if (minChrom != NULL && row != NULL)
{
// Ignore rows that fall completely before (minChrom, minEnd) - save annoGrator's time
int chromIx = self->omitBin+self->chromIx;
int endIx = self->omitBin+self->endIx;
int chromCmp;
while (row &&
((chromCmp = strcmp(row[chromIx], minChrom)) < 0 || // this chrom precedes minChrom
(chromCmp == 0 && atoll(row[endIx]) < minEnd))) // on minChrom, but before minEnd
row = self->nextRowRaw(self);
}
boolean rightFail = FALSE;
struct annoFilter *filterList = self->streamer.filters;
while (row && annoFilterRowFails(filterList, row+self->omitBin, numCols, &rightFail))
{
if (rightFail)
break;
row = self->nextRowRaw(self);
}
*retRightFail = rightFail;
return row;
}
static struct annoRow *rowToAnnoRow(struct annoStreamDb *self, char **row, boolean rightFail,
struct lm *lm)
/* Extract coords from row and return an annoRow including right-fail status. */
{
row += self->omitBin;
char *chrom = row[self->chromIx];
uint chromStart = sqlUnsigned(row[self->startIx]);
uint chromEnd = sqlUnsigned(row[self->endIx]);
return annoRowFromStringArray(chrom, chromStart, chromEnd, rightFail, row,
self->streamer.numCols, lm);
}
static char **getFinestBinItem(struct annoStreamDb *self, char **row, boolean *pRightFail,
char *minChrom, uint minEnd)
/* If row is a coarse-bin item, add it to bigItemQueue, get the next row(s) and
* add any subsequent coarse-bin items to bigItemQueue. As soon as we get an item from a
* finest-level bin (or NULL), sort the bigItemQueue and return the finest-bin item/row. */
{
int bin = atoi(row[0]);
while (bin < self->minFinestBin)
{
- // big item -- store aside in queue for merging later, move on to next item
- slAddHead(&(self->bigItemQueue), rowToAnnoRow(self, row, *pRightFail, self->qLm));
+ // big item -- store aside in queue for merging later (unless it falls off the end of
+ // the current chunk), move on to next item
+ struct annoRow *aRow = rowToAnnoRow(self, row, *pRightFail, self->qLm);
+ if (! (self->doNextChunk && self->nextChunkStart <= aRow->start))
+ slAddHead(&(self->bigItemQueue), aRow);
*pRightFail = FALSE;
row = nextRowFiltered(self, pRightFail, minChrom, minEnd);
if (row == NULL)
break;
bin = atoi(row[0]);
}
// First finest-bin item! Sort bigItemQueue in preparation for merging:
self->gotFinestBin = TRUE;
slReverse(&(self->bigItemQueue));
slSort(&(self->bigItemQueue), annoRowCmp);
return row;
}
static struct annoRow *mergeRow(struct annoStreamDb *self, struct annoRow *aRow,
struct lm *callerLm)
/* Compare head of bigItemQueue with (finest-bin) aRow; return the one with
* lower chromStart and save the other for later. */
{
struct annoRow *outRow = aRow;
-if (self->bigItemQueue == NULL)
- {
- // No coarse-bin items to merge-sort, just stream finest-bin items from here on out.
- resetMergeState(self);
- }
-else if (annoRowCmp(&(self->bigItemQueue), &aRow) < 0)
+if (self->bigItemQueue != NULL && annoRowCmp(&(self->bigItemQueue), &aRow) < 0)
{
// Big item gets to go now, so save aside small item for next time.
outRow = slPopHead(&(self->bigItemQueue));
slAddHead(&(self->smallItemQueue), aRow);
}
+// Clone outRow using callerLm
enum annoRowType rowType = self->streamer.rowType;
int numCols = self->streamer.numCols;
-return annoRowClone(outRow, rowType, numCols, callerLm);
+outRow = annoRowClone(outRow, rowType, numCols, callerLm);
+if (self->bigItemQueue == NULL && self->smallItemQueue == NULL)
+ {
+ // No coarse-bin items to merge-sort, just stream finest-bin items from here on out.
+ // This needs to be done after cloning outRow because it was allocated in self->qLm.
+ resetMergeState(self);
+ }
+return outRow;
}
static struct annoRow *nextQueuedRow(struct annoStreamDb *self, struct lm *callerLm)
// Return the head of either bigItemQueue or smallItemQueue, depending on which has
// the lower chromStart.
{
struct annoRow *row = NULL;
if (self->bigItemQueue && annoRowCmp(&(self->bigItemQueue), &(self->smallItemQueue)) < 0)
row = slPopHead(&(self->bigItemQueue));
else
row = slPopHead(&(self->smallItemQueue));
if (self->bigItemQueue == NULL && self->smallItemQueue == NULL)
// All done merge-sorting, just stream finest-bin items from here on out.
self->mergeBins = FALSE;
enum annoRowType rowType = self->streamer.rowType;
int numCols = self->streamer.numCols;
return annoRowClone(row, rowType, numCols, callerLm);
}
static struct annoRow *nextRowMergeBins(struct annoStreamDb *self, char *minChrom, uint minEnd,
struct lm *callerLm)
/* Fetch the next filtered row from mysql, merge-sorting coarse-bin items into finest-bin
* items to maintain chromStart ordering. */
{
assert(self->mergeBins && self->hasBin);
if (self->smallItemQueue)
// In this case we have already begun merge-sorting; don't pull a new row from mysql,
// use the queues. This should keep smallItemQueue's max depth at 1.
return nextQueuedRow(self, callerLm);
else
{
// We might need to collect initial coarse-bin items, or might already be merge-sorting.
boolean rightFail = FALSE;
char **row = nextRowFiltered(self, &rightFail, minChrom, minEnd);
if (row && !self->gotFinestBin)
{
// We are just starting -- queue up coarse-bin items, if any, until we get the first
// finest-bin item.
row = getFinestBinItem(self, row, &rightFail, minChrom, minEnd);
}
// Time to merge-sort finest-bin items from mysql with coarse-bin items from queue.
if (row != NULL)
{
struct annoRow *aRow = rowToAnnoRow(self, row, rightFail, self->qLm);
return mergeRow(self, aRow, callerLm);
}
else
{
struct annoRow *qRow = slPopHead(&(self->bigItemQueue));
enum annoRowType rowType = self->streamer.rowType;
int numCols = self->streamer.numCols;
return annoRowClone(qRow, rowType, numCols, callerLm);
}
}
}
static struct annoRow *asdNextRow(struct annoStreamer *vSelf, char *minChrom, uint minEnd,
struct lm *callerLm)
/* Perform sql query if we haven't already and return a single
* annoRow, or NULL if there are no more items. */
{
struct annoStreamDb *self = (struct annoStreamDb *)vSelf;
if (self->needQuery)
self->doQuery(self, minChrom, minEnd);
if (self->mergeBins)
{
struct annoRow *aRow = nextRowMergeBins(self, minChrom, minEnd, callerLm);
if (aRow == NULL && self->needQuery && !self->eof)
// Recurse: query, then get next merged/filtered row:
return asdNextRow(vSelf, minChrom, minEnd, callerLm);
else
return aRow;
}
boolean rightFail = FALSE;
char **row = nextRowFiltered(self, &rightFail, minChrom, minEnd);
if (row == NULL)
{
if (self->needQuery && !self->eof)
// Recurse: query, then get next merged/filtered row:
return asdNextRow(vSelf, minChrom, minEnd, callerLm);
else
return NULL;
}
return rowToAnnoRow(self, row, rightFail, callerLm);
}
static void asdClose(struct annoStreamer **pVSelf)
/* Close db connection and free self. */
{
if (pVSelf == NULL)
return;
struct annoStreamDb *self = *(struct annoStreamDb **)pVSelf;
lmCleanup(&(self->qLm));
freeMem(self->table);
sqlFreeResult(&(self->sr));
hFreeConn(&(self->conn));
annoStreamerFree(pVSelf);
}
static boolean asdInitBed3Fields(struct annoStreamDb *self)
/* Use autoSql to figure out which table fields correspond to {chrom, chromStart, chromEnd}. */
{
struct annoStreamer *vSelf = &(self->streamer);
return annoStreamerFindBed3Columns(vSelf, &(self->chromIx), &(self->startIx), &(self->endIx),
&(self->chromField), &(self->startField), &(self->endField));
}
char *sqlTableIndexOnField(struct sqlConnection *conn, char *table, char *field)
/* If table has an index that includes field, return the index name, else NULL. */
{
char *indexName = NULL;
char query[512];
sqlSafef(query, sizeof(query), "show index from %s", table);
struct sqlResult *sr = sqlGetResult(conn, query);
char **row;
while ((row = sqlNextRow(sr)) != NULL)
{
if (sameString(row[4], field))
{
indexName = cloneString(row[2]);
break;
}
}
sqlFreeResult(&sr);
return indexName;
}
static boolean isIncrementallyUpdated(char *table)
// Tables that have rows added to them after initial creation are not completely sorted
// because of new rows at end, so we have to 'order by'.
{
return (sameString(table, "refGene") || sameString(table, "refFlat") ||
sameString(table, "xenoRefGene") || sameString(table, "xenoRefFlat") ||
sameString(table, "all_mrna") || sameString(table, "xenoMrna") ||
sameString(table, "all_est") || sameString(table, "xenoEst") ||
sameString(table, "refSeqAli") || sameString(table, "xenoRefSeqAli"));
}
static boolean isPubsTable(char *table)
// Not absolutely every pubs* table is unsorted, but most of them are.
{
return startsWith("pubs", table);
}
struct annoStreamer *annoStreamDbNew(char *db, char *table, struct annoAssembly *aa,
struct asObject *asObj, int maxOutRows)
/* Create an annoStreamer (subclass) object from a database table described by asObj. */
{
struct sqlConnection *conn = hAllocConn(db);
if (!sqlTableExists(conn, table))
errAbort("annoStreamDbNew: table '%s' doesn't exist in database '%s'", table, db);
struct annoStreamDb *self = NULL;
AllocVar(self);
struct annoStreamer *streamer = &(self->streamer);
int dbtLen = strlen(db) + strlen(table) + 2;
char dbTable[dbtLen];
safef(dbTable, dbtLen, "%s.%s", db, table);
annoStreamerInit(streamer, aa, asObj, dbTable);
streamer->rowType = arWords;
streamer->setRegion = asdSetRegion;
streamer->nextRow = asdNextRow;
streamer->close = asdClose;
self->conn = conn;
self->table = cloneString(table);
char *asFirstColumnName = streamer->asObj->columnList->name;
if (sqlFieldIndex(self->conn, self->table, "bin") == 0)
{
self->hasBin = 1;
self->minFinestBin = binFromRange(0, 1);
}
if (self->hasBin && !sameString(asFirstColumnName, "bin"))
self->omitBin = 1;
if (!asdInitBed3Fields(self))
errAbort("annoStreamDbNew: can't figure out which fields of %s.%s to use as "
"{chrom, chromStart, chromEnd}.", db, table);
self->makeBaselineQuery = asdMakeBaselineQuery;
// When a table has an index on endField, sometimes the query optimizer uses it
// and that ruins the sorting. Fortunately most tables don't anymore.
self->endFieldIndexName = sqlTableIndexOnField(self->conn, self->table, self->endField);
self->notSorted = FALSE;
// Special case: genbank-updated tables are not sorted because new mappings are
// tacked on at the end. Max didn't sort the pubs* tables but I hope he will
// sort the tables for any future tracks. :)
if (isIncrementallyUpdated(table) || isPubsTable(table))
self->notSorted = TRUE;
self->mergeBins = FALSE;
self->maxOutRows = maxOutRows;
self->useMaxOutRows = (maxOutRows > 0);
self->needQuery = TRUE;
self->chromList = annoAssemblySeqNames(aa);
if (slCount(self->chromList) > 1000)
{
// Assembly has many sequences (e.g. scaffold-based assembly) --
// don't break up into per-sequence queries. Take our chances
// with mysql being unhappy about the sqlResult being open too long.
self->doQuery = asdDoQuerySimple;
self->nextRowRaw = nextRowFromSqlResult;
}
else
{
// All-chromosome assembly -- if table is large, perform a series of
// chunked queries.
self->doQuery = asdDoQueryChunking;
self->nextRowRaw = nextRowFromBuffer;
}
return (struct annoStreamer *)self;
}