1c6cebb2113b22e71b73fb51d43bc31a06c90046 angie Mon Aug 31 12:47:59 2015 -0700 Discard large-bin items that fall on or after nextChunkStart because they will break the (chrom, chromStart) sorting order if they are also to the right of small-bin items on or after nextChunkStart -- and they will appear again in the next chunk of rows. Also a couple other fixes for problems found while debugging: - there was a spot where I forgot to restart merging after calling resetMergeState (when moving on to the next chrom in a genome-wide query) - in mergeRow, don't resetMergeState until both big and small queues are empty refs #15953 diff --git src/hg/lib/annoStreamDb.c src/hg/lib/annoStreamDb.c index 7318f18..3e55037 100644 --- src/hg/lib/annoStreamDb.c +++ src/hg/lib/annoStreamDb.c @@ -92,30 +92,38 @@ rowBuf->size = 0; rowBuf->ix = 0; lmCleanup(&(rowBuf->lm)); } static void resetChunkState(struct annoStreamDb *self) /* Reset members that track chunked queries. */ { self->queryChrom = NULL; self->eof = FALSE; self->doNextChunk = FALSE; self->needQuery = TRUE; resetRowBuf(&self->rowBuf); } +static void startMerging(struct annoStreamDb *self) +/* Set self->mergeBins flag and create self->qLm if necessary. */ +{ +self->mergeBins = TRUE; +if (self->qLm == NULL) + self->qLm = lmInit(0); +} + static void asdSetRegion(struct annoStreamer *vSelf, char *chrom, uint regionStart, uint regionEnd) /* Set region -- and free current sqlResult if there is one. */ { annoStreamerSetRegion(vSelf, chrom, regionStart, regionEnd); struct annoStreamDb *self = (struct annoStreamDb *)vSelf; sqlFreeResult(&(self->sr)); resetMergeState(self); resetChunkState(self); } static char **nextRowFromSqlResult(struct annoStreamDb *self) /* Stream rows directly from self->sr. */ { return sqlNextRow(self->sr); } @@ -137,32 +145,31 @@ { struct annoStreamer *streamer = &(self->streamer); boolean hasWhere = FALSE; struct dyString *query = self->makeBaselineQuery(self, &hasWhere); if (!streamer->positionIsGenome) { if (minChrom && differentString(minChrom, streamer->chrom)) errAbort("annoStreamDb %s: nextRow minChrom='%s' but region chrom='%s'", streamer->name, minChrom, streamer->chrom); if (self->hasBin) { // Results will be in bin order, but we can restore chromStart order by // accumulating initial coarse-bin items and merge-sorting them with // subsequent finest-bin items which will be in chromStart order. resetMergeState(self); - self->mergeBins = TRUE; - self->qLm = lmInit(0); + startMerging(self); } if (self->endFieldIndexName != NULL) // Don't let mysql use a (chrom, chromEnd) index because that messes up // sorting by chromStart. sqlDyStringPrintf(query, " IGNORE INDEX (%s)", self->endFieldIndexName); sqlDyStringAppend(query, hasWhere ? " and " : " where "); sqlDyStringPrintf(query, "%s='%s'", self->chromField, streamer->chrom); int chromSize = annoAssemblySeqSize(streamer->assembly, streamer->chrom); if (streamer->regionStart != 0 || streamer->regionEnd != chromSize) { dyStringAppend(query, " and "); if (self->hasBin) hAddBinToQuery(streamer->regionStart, streamer->regionEnd, query); sqlDyStringPrintf(query, "%s < %u and %s > %u", self->startField, streamer->regionEnd, self->endField, streamer->regionStart); @@ -273,33 +280,31 @@ self->eof = TRUE; } if (self->eof) return; int queryMaxItems = ASD_CHUNK_SIZE; if (self->useMaxOutRows && self->maxOutRows < queryMaxItems) queryMaxItems = self->maxOutRows; if (self->hasBin) { // Results will be in bin order, but we can restore chromStart order by // accumulating initial coarse-bin items and merge-sorting them with // subsequent finest-bin items which will be in chromStart order. if (self->doNextChunk && self->mergeBins && !self->gotFinestBin) errAbort("annoStreamDb %s: can't continue merge in chunking query; " "increase ASD_CHUNK_SIZE", sSelf->name); - self->mergeBins = TRUE; - if (self->qLm == NULL) - self->qLm = lmInit(0); + startMerging(self); } if (self->endFieldIndexName != NULL) // Don't let mysql use a (chrom, chromEnd) index because that messes up // sorting by chromStart. sqlDyStringPrintf(query, " IGNORE INDEX (%s) ", self->endFieldIndexName); if (sSelf->chrom != NULL) { uint start = sSelf->regionStart; if (minChrom) { if (differentString(minChrom, sSelf->chrom)) errAbort("annoStreamDb %s: nextRow minChrom='%s' but region chrom='%s'", sSelf->name, minChrom, sSelf->chrom); if (start < minEnd) start = minEnd; @@ -322,46 +327,48 @@ if (self->notSorted) sqlDyStringPrintf(query, "order by %s ", self->startField); sqlDyStringPrintf(query, "limit %d", queryMaxItems); bufferRowsFromSqlQuery(self, query->string, queryMaxItems); if (self->rowBuf.size == 0) self->eof = TRUE; } else { // Genome-wide query: break it into chrom-by-chrom queries. if (self->queryChrom == NULL) self->queryChrom = self->chromList; else if (!self->doNextChunk) { self->queryChrom = self->queryChrom->next; + if (self->hasBin) + { resetMergeState(self); + startMerging(self); + } } if (minChrom != NULL) { // Skip chroms that precede minChrom while (self->queryChrom != NULL && strcmp(self->queryChrom->name, minChrom) < 0) { self->queryChrom = self->queryChrom->next; self->doNextChunk = FALSE; - resetMergeState(self); } if (self->hasBin) { - self->mergeBins = TRUE; - if (self->qLm == NULL) - self->qLm = lmInit(0); + resetMergeState(self); + startMerging(self); } } if (self->queryChrom == NULL) self->eof = TRUE; else { char *chrom = self->queryChrom->name; int start = 0; if (minChrom != NULL && sameString(chrom, minChrom)) start = minEnd; if (self->doNextChunk && start < self->nextChunkStart) start = self->nextChunkStart; uint end = annoAssemblySeqSize(self->streamer.assembly, self->queryChrom->name); sqlDyStringAppend(query, hasWhere ? " and " : " where "); sqlDyStringPrintf(query, "%s = '%s' ", self->chromField, chrom); @@ -462,65 +469,71 @@ uint chromStart = sqlUnsigned(row[self->startIx]); uint chromEnd = sqlUnsigned(row[self->endIx]); return annoRowFromStringArray(chrom, chromStart, chromEnd, rightFail, row, self->streamer.numCols, lm); } static char **getFinestBinItem(struct annoStreamDb *self, char **row, boolean *pRightFail, char *minChrom, uint minEnd) /* If row is a coarse-bin item, add it to bigItemQueue, get the next row(s) and * add any subsequent coarse-bin items to bigItemQueue. As soon as we get an item from a * finest-level bin (or NULL), sort the bigItemQueue and return the finest-bin item/row. */ { int bin = atoi(row[0]); while (bin < self->minFinestBin) { - // big item -- store aside in queue for merging later, move on to next item - slAddHead(&(self->bigItemQueue), rowToAnnoRow(self, row, *pRightFail, self->qLm)); + // big item -- store aside in queue for merging later (unless it falls off the end of + // the current chunk), move on to next item + struct annoRow *aRow = rowToAnnoRow(self, row, *pRightFail, self->qLm); + if (! (self->doNextChunk && self->nextChunkStart <= aRow->start)) + slAddHead(&(self->bigItemQueue), aRow); *pRightFail = FALSE; row = nextRowFiltered(self, pRightFail, minChrom, minEnd); if (row == NULL) break; bin = atoi(row[0]); } // First finest-bin item! Sort bigItemQueue in preparation for merging: self->gotFinestBin = TRUE; slReverse(&(self->bigItemQueue)); slSort(&(self->bigItemQueue), annoRowCmp); return row; } static struct annoRow *mergeRow(struct annoStreamDb *self, struct annoRow *aRow, struct lm *callerLm) /* Compare head of bigItemQueue with (finest-bin) aRow; return the one with * lower chromStart and save the other for later. */ { struct annoRow *outRow = aRow; -if (self->bigItemQueue == NULL) - { - // No coarse-bin items to merge-sort, just stream finest-bin items from here on out. - resetMergeState(self); - } -else if (annoRowCmp(&(self->bigItemQueue), &aRow) < 0) +if (self->bigItemQueue != NULL && annoRowCmp(&(self->bigItemQueue), &aRow) < 0) { // Big item gets to go now, so save aside small item for next time. outRow = slPopHead(&(self->bigItemQueue)); slAddHead(&(self->smallItemQueue), aRow); } +// Clone outRow using callerLm enum annoRowType rowType = self->streamer.rowType; int numCols = self->streamer.numCols; -return annoRowClone(outRow, rowType, numCols, callerLm); +outRow = annoRowClone(outRow, rowType, numCols, callerLm); +if (self->bigItemQueue == NULL && self->smallItemQueue == NULL) + { + // No coarse-bin items to merge-sort, just stream finest-bin items from here on out. + // This needs to be done after cloning outRow because it was allocated in self->qLm. + resetMergeState(self); + } +return outRow; } static struct annoRow *nextQueuedRow(struct annoStreamDb *self, struct lm *callerLm) // Return the head of either bigItemQueue or smallItemQueue, depending on which has // the lower chromStart. { struct annoRow *row = NULL; if (self->bigItemQueue && annoRowCmp(&(self->bigItemQueue), &(self->smallItemQueue)) < 0) row = slPopHead(&(self->bigItemQueue)); else row = slPopHead(&(self->smallItemQueue)); if (self->bigItemQueue == NULL && self->smallItemQueue == NULL) // All done merge-sorting, just stream finest-bin items from here on out. self->mergeBins = FALSE; enum annoRowType rowType = self->streamer.rowType;