0f1ba1ee0636b352d6dca8966962d90a120864ed angie Fri Jun 14 14:12:23 2013 -0700 More bugfixing after testing with artificially low ASD_CHUNK_SIZE. diff --git src/hg/lib/annoStreamDb.c src/hg/lib/annoStreamDb.c index 5dfebb4..7bfca75 100644 --- src/hg/lib/annoStreamDb.c +++ src/hg/lib/annoStreamDb.c @@ -178,35 +178,36 @@ return cloneRow; } static void updateNextChunkState(struct annoStreamDb *self, int queryMaxItems) /* If the just-fetched interval list was limited to ASD_CHUNK_SIZE, set doNextChunk * and trim the last row(s) so that when we query the next chunk, we don't get * repeat rows due to querying a start coord that was already returned. */ { struct rowBuf *rowBuf = &self->rowBuf; if (queryMaxItems == ASD_CHUNK_SIZE && rowBuf->size == ASD_CHUNK_SIZE) { self->doNextChunk = TRUE; // Starting at the last row in rowBuf, work back to find a value with a different start. int ix = rowBuf->size - 1; char **words = rowBuf->buf[ix]; - uint lastStart = atoll(words[self->startIx]); + int startIx = self->startIx + self->omitBin; + uint lastStart = atoll(words[startIx]); for (ix = rowBuf->size - 2; ix >= 0; ix--) { words = rowBuf->buf[ix]; - uint thisStart = atoll(words[self->startIx]); + uint thisStart = atoll(words[startIx]); if (thisStart != lastStart) { rowBuf->size = ix+1; self->nextChunkStart = lastStart; break; } } } else self->doNextChunk = FALSE; self->needQuery = FALSE; } static void bufferRowsFromSqlQuery(struct annoStreamDb *self, char *query, int queryMaxItems) /* Store all rows from query in rowBuf. */ @@ -243,54 +244,61 @@ { self->maxOutRows -= self->rowBuf.size; if (self->maxOutRows <= 0) self->eof = TRUE; } if (self->eof) return; int queryMaxItems = ASD_CHUNK_SIZE; if (self->useMaxOutRows && self->maxOutRows < queryMaxItems) queryMaxItems = self->maxOutRows; if (self->hasBin) { // Results will be in bin order, but we can restore chromStart order by // accumulating initial coarse-bin items and merge-sorting them with // subsequent finest-bin items which will be in chromStart order. - resetMergeState(self); + if (self->doNextChunk && self->mergeBins && !self->gotFinestBin) + errAbort("annoStreamDb can't continue merge in chunking query; increase ASD_CHUNK_SIZE"); self->mergeBins = TRUE; + if (self->qLm == NULL) self->qLm = lmInit(0); } if (self->endFieldIndexName != NULL) // Don't let mysql use a (chrom, chromEnd) index because that messes up // sorting by chromStart. dyStringPrintf(query, "IGNORE INDEX (%s) ", self->endFieldIndexName); if (sSelf->chrom != NULL) { uint start = sSelf->regionStart; if (minChrom) { if (differentString(minChrom, sSelf->chrom)) errAbort("annoStreamDb %s: nextRow minChrom='%s' but region chrom='%s'", sSelf->name, minChrom, sSelf->chrom); if (start < minEnd) start = minEnd; } if (self->doNextChunk && start < self->nextChunkStart) start = self->nextChunkStart; dyStringPrintf(query, "where %s = '%s' and ", self->chromField, sSelf->chrom); if (self->hasBin) + { + if (self->doNextChunk && self->gotFinestBin) + // It would be way more elegant to make a hAddBinTopLevelOnly but this will do: + dyStringPrintf(query, "bin > %d and ", self->minFinestBin); hAddBinToQuery(start, sSelf->regionEnd, query); + } if (self->doNextChunk) dyStringPrintf(query, "%s >= %u and ", self->startField, self->nextChunkStart); dyStringPrintf(query, "%s < %u and %s > %u limit %d", self->startField, sSelf->regionEnd, self->endField, start, queryMaxItems); bufferRowsFromSqlQuery(self, query->string, queryMaxItems); } else { // Genome-wide query: break it into chrom-by-chrom queries. if (self->queryChrom == NULL) self->queryChrom = self->chromList; else if (!self->doNextChunk) self->queryChrom = self->queryChrom->next; if (minChrom != NULL) { @@ -305,31 +313,36 @@ self->eof = TRUE; else { char *chrom = self->queryChrom->name; int start = 0; if (minChrom != NULL && sameString(chrom, minChrom)) start = minEnd; if (self->doNextChunk && start < self->nextChunkStart) start = self->nextChunkStart; uint end = annoAssemblySeqSize(self->streamer.assembly, self->queryChrom->name); dyStringPrintf(query, "where %s = '%s' ", self->chromField, chrom); if (start > 0 || self->doNextChunk) { dyStringAppend(query, "and "); if (self->hasBin) + { + if (self->doNextChunk && self->gotFinestBin) + // It would be way more elegant to make a hAddBinTopLevelOnly but this will do: + dyStringPrintf(query, "bin > %d and ", self->minFinestBin); hAddBinToQuery(start, end, query); + } if (self->doNextChunk) dyStringPrintf(query, "%s >= %u and ", self->startField, self->nextChunkStart); // region end is chromSize, so no need to constrain startField here: dyStringPrintf(query, "%s > %u ", self->endField, start); } dyStringPrintf(query, "limit %d", queryMaxItems); bufferRowsFromSqlQuery(self, query->string, queryMaxItems); // If there happens to be no items on chrom, try again with the next chrom: if (! self->eof && self->rowBuf.size == 0) asdDoQueryChunking(self, minChrom, minEnd); } } dyStringFree(&query); } @@ -410,31 +423,31 @@ self->gotFinestBin = TRUE; slReverse(&(self->bigItemQueue)); slSort(&(self->bigItemQueue), annoRowCmp); return row; } static struct annoRow *mergeRow(struct annoStreamDb *self, struct annoRow *aRow, struct lm *callerLm) /* Compare head of bigItemQueue with (finest-bin) aRow; return the one with * lower chromStart and save the other for later. */ { struct annoRow *outRow = aRow; if (self->bigItemQueue == NULL) { // No coarse-bin items to merge-sort, just stream finest-bin items from here on out. - self->mergeBins = FALSE; + resetMergeState(self); } else if (annoRowCmp(&(self->bigItemQueue), &aRow) < 0) { // Big item gets to go now, so save aside small item for next time. outRow = slPopHead(&(self->bigItemQueue)); slAddHead(&(self->smallItemQueue), aRow); } enum annoRowType rowType = self->streamer.rowType; int numCols = self->streamer.numCols; return annoRowClone(outRow, rowType, numCols, callerLm); } static struct annoRow *nextQueuedRow(struct annoStreamDb *self, struct lm *callerLm) // Return the head of either bigItemQueue or smallItemQueue, depending on which has // the lower chromStart.