70668f5dd4d6bbcf079a1025649e17b9885335c7 angie Mon May 13 10:17:53 2013 -0700 Added 2 new args to annoStreamer:nextRow: minChrom and minEnd(which could also be called regionStart depending on point of view). Streamers may use those hints to skip over data that precedes minChrom and minEnd, to avoid the overhead of creating annoRows that annoGrators will then have to skip over. When primary data are sparse and grator data are very dense, this saves significant memory and user-cycles. Unfortunately mysql can still be the bottleneck for elapsed time. Room for improvement in annoStreamDb: when assembly has a reasonably small number of sequences (<1000), genome-wide queries could be internally broken down into per-seq queries; that would let us skip over chroms that precede minChrom. refs #6152 diff --git src/hg/lib/annoStreamDb.c src/hg/lib/annoStreamDb.c index 1d5e342..54752c7 100644 --- src/hg/lib/annoStreamDb.c +++ src/hg/lib/annoStreamDb.c @@ -28,38 +28,41 @@ struct annoRow *smallItemQueue; // Max 1 item for merge-sorting with bigItemQueue struct lm *qLm; // localmem for merge-sorting queues int minFinestBin; // Smallest bin number for finest bin level boolean gotFinestBin; // Flag that it's time to merge-sort with bigItemQueue }; static void asdSetRegion(struct annoStreamer *vSelf, char *chrom, uint regionStart, uint regionEnd) /* Set region -- and free current sqlResult if there is one. */ { annoStreamerSetRegion(vSelf, chrom, regionStart, regionEnd); struct annoStreamDb *self = (struct annoStreamDb *)vSelf; if (self->sr != NULL) sqlFreeResult(&(self->sr)); } -static void asdDoQuery(struct annoStreamDb *self) +static void asdDoQuery(struct annoStreamDb *self, char *minChrom, uint minEnd) /* Return a sqlResult for a query on table items in position range. */ // NOTE: it would be possible to implement filters at this level, as in hgTables. { struct annoStreamer *streamer = &(self->streamer); struct dyString *query = dyStringCreate("select * from %s", self->table); if (!streamer->positionIsGenome) { + if (minChrom && differentString(minChrom, streamer->chrom)) + errAbort("annoStreamDb %s: nextRow minChrom='%s' but region chrom='%s'", + streamer->name, minChrom, streamer->chrom); if (self->hasBin) { // Results will be in bin order, but we can restore chromStart order by // accumulating initial coarse-bin items and merge-sorting them with // subsequent finest-bin items which will be in chromStart order. self->mergeBins = TRUE; self->bigItemQueue = self->smallItemQueue = NULL; lmCleanup(&(self->qLm)); self->qLm = lmInit(0); self->gotFinestBin = FALSE; } if (self->endFieldIndexName != NULL) // Don't let mysql use a (chrom, chromEnd) index because that messes up // sorting by chromStart. dyStringPrintf(query, " IGNORE INDEX (%s)", self->endFieldIndexName); @@ -71,72 +74,85 @@ if (self->hasBin) hAddBinToQuery(streamer->regionStart, streamer->regionEnd, query); dyStringPrintf(query, "%s < %u and %s > %u", self->startField, streamer->regionEnd, self->endField, streamer->regionStart); } if (self->notSorted) dyStringPrintf(query, " order by %s", self->startField); } else if (self->notSorted) dyStringPrintf(query, " order by %s,%s", self->chromField, self->startField); struct sqlResult *sr = sqlGetResult(self->conn, query->string); dyStringFree(&query); self->sr = sr; } -static char **nextRowFiltered(struct annoStreamDb *self, boolean *retRightFail) +static char **nextRowFiltered(struct annoStreamDb *self, boolean *retRightFail, + char *minChrom, uint minEnd) /* Skip past any left-join failures until we get a right-join failure, a passing row, * or end of data. Return row or NULL, and return right-join fail status via retRightFail. */ { int numCols = self->streamer.numCols; char **row = sqlNextRow(self->sr); +if (minChrom != NULL && row != NULL) + { + // Ignore rows that fall completely before (minChrom, minEnd) - save annoGrator's time + int chromIx = self->omitBin+self->chromIx; + int endIx = self->omitBin+self->endIx; + int chromCmp; + while (row && + ((chromCmp = strcmp(row[chromIx], minChrom)) < 0 || // this chrom precedes minChrom + (chromCmp == 0 && atoll(row[endIx]) < minEnd))) // on minChrom, but before minEnd + row = sqlNextRow(self->sr); + } boolean rightFail = FALSE; struct annoFilter *filterList = self->streamer.filters; while (row && annoFilterRowFails(filterList, row+self->omitBin, numCols, &rightFail)) { if (rightFail) break; row = sqlNextRow(self->sr); } *retRightFail = rightFail; return row; } static struct annoRow *rowToAnnoRow(struct annoStreamDb *self, char **row, boolean rightFail, struct lm *lm) /* Extract coords from row and return an annoRow including right-fail status. */ { row += self->omitBin; char *chrom = row[self->chromIx]; uint chromStart = sqlUnsigned(row[self->startIx]); uint chromEnd = sqlUnsigned(row[self->endIx]); return annoRowFromStringArray(chrom, chromStart, chromEnd, rightFail, row, self->streamer.numCols, lm); } -static char **getFinestBinItem(struct annoStreamDb *self, char **row, boolean *pRightFail) +static char **getFinestBinItem(struct annoStreamDb *self, char **row, boolean *pRightFail, + char *minChrom, uint minEnd) /* If row is a coarse-bin item, add it to bigItemQueue, get the next row(s) and * add any subsequent coarse-bin items to bigItemQueue. As soon as we get an item from a * finest-level bin (or NULL), sort the bigItemQueue and return the finest-bin item/row. */ { int bin = atoi(row[0]); while (bin < self->minFinestBin) { // big item -- store aside in queue for merging later, move on to next item slAddHead(&(self->bigItemQueue), rowToAnnoRow(self, row, *pRightFail, self->qLm)); *pRightFail = FALSE; - row = nextRowFiltered(self, pRightFail); + row = nextRowFiltered(self, pRightFail, minChrom, minEnd); if (row == NULL) break; bin = atoi(row[0]); } // First finest-bin item! Sort bigItemQueue in preparation for merging: self->gotFinestBin = TRUE; slReverse(&(self->bigItemQueue)); slSort(&(self->bigItemQueue), annoRowCmp); return row; } static struct annoRow *mergeRow(struct annoStreamDb *self, struct annoRow *aRow, struct lm *callerLm) /* Compare head of bigItemQueue with (finest-bin) aRow; return the one with * lower chromStart and save the other for later. */ @@ -163,80 +179,82 @@ // the lower chromStart. { struct annoRow *row = NULL; if (self->bigItemQueue && annoRowCmp(&(self->bigItemQueue), &(self->smallItemQueue)) < 0) row = slPopHead(&(self->bigItemQueue)); else row = slPopHead(&(self->smallItemQueue)); if (self->bigItemQueue == NULL && self->smallItemQueue == NULL) // All done merge-sorting, just stream finest-bin items from here on out. self->mergeBins = FALSE; enum annoRowType rowType = self->streamer.rowType; int numCols = self->streamer.numCols; return annoRowClone(row, rowType, numCols, callerLm); } -static struct annoRow *nextRowMergeBins(struct annoStreamDb *self, struct lm *callerLm) +static struct annoRow *nextRowMergeBins(struct annoStreamDb *self, char *minChrom, uint minEnd, + struct lm *callerLm) /* Fetch the next filtered row from mysql, merge-sorting coarse-bin items into finest-bin * items to maintain chromStart ordering. */ { assert(self->mergeBins && self->hasBin); if (self->smallItemQueue) // In this case we have already begun merge-sorting; don't pull a new row from mysql, // use the queues. This should keep smallItemQueue's max depth at 1. return nextQueuedRow(self, callerLm); else { // We might need to collect initial coarse-bin items, or might already be merge-sorting. boolean rightFail = FALSE; - char **row = nextRowFiltered(self, &rightFail); + char **row = nextRowFiltered(self, &rightFail, minChrom, minEnd); if (row && !self->gotFinestBin) { // We are just starting -- queue up coarse-bin items, if any, until we get the first // finest-bin item. - row = getFinestBinItem(self, row, &rightFail); + row = getFinestBinItem(self, row, &rightFail, minChrom, minEnd); } // Time to merge-sort finest-bin items from mysql with coarse-bin items from queue. if (row != NULL) { struct annoRow *aRow = rowToAnnoRow(self, row, rightFail, self->qLm); return mergeRow(self, aRow, callerLm); } else { struct annoRow *qRow = slPopHead(&(self->bigItemQueue)); enum annoRowType rowType = self->streamer.rowType; int numCols = self->streamer.numCols; return annoRowClone(qRow, rowType, numCols, callerLm); } } } -static struct annoRow *asdNextRow(struct annoStreamer *vSelf, struct lm *callerLm) +static struct annoRow *asdNextRow(struct annoStreamer *vSelf, char *minChrom, uint minEnd, + struct lm *callerLm) /* Perform sql query if we haven't already and return a single * annoRow, or NULL if there are no more items. */ { struct annoStreamDb *self = (struct annoStreamDb *)vSelf; if (self->sr == NULL) - asdDoQuery(self); + asdDoQuery(self, minChrom, minEnd); if (self->sr == NULL) // This is necessary only if we're using sqlStoreResult in asdDoQuery, harmless otherwise: return NULL; if (self->mergeBins) - return nextRowMergeBins(self, callerLm); + return nextRowMergeBins(self, minChrom, minEnd, callerLm); boolean rightFail = FALSE; -char **row = nextRowFiltered(self, &rightFail); +char **row = nextRowFiltered(self, &rightFail, minChrom, minEnd); if (row == NULL) return NULL; return rowToAnnoRow(self, row, rightFail, callerLm); } static void asdClose(struct annoStreamer **pVSelf) /* Close db connection and free self. */ { if (pVSelf == NULL) return; struct annoStreamDb *self = *(struct annoStreamDb **)pVSelf; lmCleanup(&(self->qLm)); freeMem(self->table); sqlFreeResult(&(self->sr)); hFreeConn(&(self->conn));