5c3f98617dd779a136cbd685e502e08bd2e7e990 angie Fri Oct 11 15:00:31 2013 -0700 Calling asdDoQueryChunking from way down in nextRowFromBuffer wasn'ta good idea because nextRowFromBuffer might have a caller that assumes we're done merging... but after calling asdDoQueryChunking, we need to start over with the merging. So, return NULL, but set self->needQuery; then, way up in asdNextRow, if we get a NULL, then check self->needQuery, and try again if necessary. diff --git src/hg/lib/annoStreamDb.c src/hg/lib/annoStreamDb.c index 48939de..cfe4a12 100644 --- src/hg/lib/annoStreamDb.c +++ src/hg/lib/annoStreamDb.c @@ -43,32 +43,33 @@ struct slName *queryChrom; // most recently queried chrom for whole-genome (or NULL) boolean eof; // TRUE when we are done (maxItems or no more items) boolean needQuery; // TRUE when we haven't yet queried, or need to query again boolean doNextChunk; // TRUE if rowBuf ends before end of chrom/region uint nextChunkStart; // Start coord for next chunk of rows to query struct rowBuf // Temporary storage for rows from chunked query { struct lm *lm; // storage for rows char ***buf; // array of pointers to rows int size; // number of rows int ix; // offset in buffer, [0..size] } rowBuf; - char **(*nextRowRaw)(struct annoStreamDb *self, char *minChrom, uint minEnd); + char **(*nextRowRaw)(struct annoStreamDb *self); // Depending on query style, use either sqlNextRow or temporary row storage to get next row. + // This may return NULL but set self->needQuery; asdNextRow watches for that. void (*doQuery)(struct annoStreamDb *self, char *minChrom, uint minEnd); // Depending on query style, perform either a single query or (series of) chunked query }; // For performance reasons, even if !useMaxItems (no limit), we need to limit the // number of rows that are returned from a query, so we can slurp them into memory and // close the sqlResult before mysql gets unhappy about the result being open so long. #define ASD_CHUNK_SIZE 100000 static void resetMergeState(struct annoStreamDb *self) /* Reset or initialize members that track merging of coarse-bin items with fine-bin items. */ { self->mergeBins = FALSE; self->bigItemQueue = self->smallItemQueue = NULL; @@ -93,31 +94,31 @@ self->doNextChunk = FALSE; self->needQuery = TRUE; resetRowBuf(&self->rowBuf); } static void asdSetRegion(struct annoStreamer *vSelf, char *chrom, uint regionStart, uint regionEnd) /* Set region -- and free current sqlResult if there is one. */ { annoStreamerSetRegion(vSelf, chrom, regionStart, regionEnd); struct annoStreamDb *self = (struct annoStreamDb *)vSelf; sqlFreeResult(&(self->sr)); resetMergeState(self); resetChunkState(self); } -static char **nextRowFromSqlResult(struct annoStreamDb *self, char *minChrom, uint minEnd) +static char **nextRowFromSqlResult(struct annoStreamDb *self) /* Stream rows directly from self->sr. */ { return sqlNextRow(self->sr); } static void asdDoQuerySimple(struct annoStreamDb *self, char *minChrom, uint minEnd) /* Return a sqlResult for a query on table items in position range. * If doing a whole genome query. just 'select * from' table. */ // NOTE: it would be possible to implement filters at this level, as in hgTables. { struct annoStreamer *streamer = &(self->streamer); struct dyString *query = sqlDyStringCreate("select * from %s", self->table); if (!streamer->positionIsGenome) { if (minChrom && differentString(minChrom, streamer->chrom)) @@ -354,83 +355,85 @@ // region end is chromSize, so no need to constrain startField here: sqlDyStringPrintf(query, "%s > %u ", self->endField, start); } if (self->notSorted) sqlDyStringPrintf(query, "order by %s ", self->startField); dyStringPrintf(query, "limit %d", queryMaxItems); bufferRowsFromSqlQuery(self, query->string, queryMaxItems); // If there happens to be no items on chrom, try again with the next chrom: if (! self->eof && self->rowBuf.size == 0) asdDoQueryChunking(self, minChrom, minEnd); } } dyStringFree(&query); } -static char **nextRowFromBuffer(struct annoStreamDb *self, char *minChrom, uint minEnd) +static char **nextRowFromBuffer(struct annoStreamDb *self) /* Instead of streaming directly from self->sr, we have buffered up the results * of a chunked query; return the head of that queue. */ { struct rowBuf *rowBuf = &self->rowBuf; if (rowBuf->ix > rowBuf->size) errAbort("annoStreamDb %s: rowBuf overflow (%d > %d)", self->streamer.name, rowBuf->ix, rowBuf->size); if (rowBuf->ix == rowBuf->size) { // Last row in buffer -- we'll need another query to get subsequent rows (if any). // But first, see if we need to update gotFinestBin, since getFinestBin might be // one of our callers. if (rowBuf->size > 0) { char **lastRow = rowBuf->buf[rowBuf->size-1]; int lastBin = atoi(lastRow[0]); if (lastBin >= self->minFinestBin) self->gotFinestBin = TRUE; } - asdDoQueryChunking(self, minChrom, minEnd); + self->needQuery = TRUE; + // Bounce back out -- asdNextRow will need to do another query. + return NULL; } if (rowBuf->size == 0) return NULL; else return rowBuf->buf[rowBuf->ix++]; } static char **nextRowFiltered(struct annoStreamDb *self, boolean *retRightFail, char *minChrom, uint minEnd) /* Skip past any left-join failures until we get a right-join failure, a passing row, * or end of data. Return row or NULL, and return right-join fail status via retRightFail. */ { int numCols = self->streamer.numCols; -char **row = self->nextRowRaw(self, minChrom, minEnd); +char **row = self->nextRowRaw(self); if (minChrom != NULL && row != NULL) { // Ignore rows that fall completely before (minChrom, minEnd) - save annoGrator's time int chromIx = self->omitBin+self->chromIx; int endIx = self->omitBin+self->endIx; int chromCmp; while (row && ((chromCmp = strcmp(row[chromIx], minChrom)) < 0 || // this chrom precedes minChrom (chromCmp == 0 && atoll(row[endIx]) < minEnd))) // on minChrom, but before minEnd - row = self->nextRowRaw(self, minChrom, minEnd); + row = self->nextRowRaw(self); } boolean rightFail = FALSE; struct annoFilter *filterList = self->streamer.filters; while (row && annoFilterRowFails(filterList, row+self->omitBin, numCols, &rightFail)) { if (rightFail) break; - row = self->nextRowRaw(self, minChrom, minEnd); + row = self->nextRowRaw(self); } *retRightFail = rightFail; return row; } static struct annoRow *rowToAnnoRow(struct annoStreamDb *self, char **row, boolean rightFail, struct lm *lm) /* Extract coords from row and return an annoRow including right-fail status. */ { row += self->omitBin; char *chrom = row[self->chromIx]; uint chromStart = sqlUnsigned(row[self->startIx]); uint chromEnd = sqlUnsigned(row[self->endIx]); return annoRowFromStringArray(chrom, chromStart, chromEnd, rightFail, row, self->streamer.numCols, lm); @@ -533,35 +536,48 @@ int numCols = self->streamer.numCols; return annoRowClone(qRow, rowType, numCols, callerLm); } } } static struct annoRow *asdNextRow(struct annoStreamer *vSelf, char *minChrom, uint minEnd, struct lm *callerLm) /* Perform sql query if we haven't already and return a single * annoRow, or NULL if there are no more items. */ { struct annoStreamDb *self = (struct annoStreamDb *)vSelf; if (self->needQuery) self->doQuery(self, minChrom, minEnd); if (self->mergeBins) - return nextRowMergeBins(self, minChrom, minEnd, callerLm); + { + struct annoRow *aRow = nextRowMergeBins(self, minChrom, minEnd, callerLm); + if (aRow == NULL && self->needQuery && !self->eof) + // Recurse: query, then get next merged/filtered row: + return asdNextRow(vSelf, minChrom, minEnd, callerLm); + else + return aRow; + } boolean rightFail = FALSE; char **row = nextRowFiltered(self, &rightFail, minChrom, minEnd); if (row == NULL) + { + if (self->needQuery && !self->eof) + // Recurse: query, then get next merged/filtered row: + return asdNextRow(vSelf, minChrom, minEnd, callerLm); + else return NULL; + } return rowToAnnoRow(self, row, rightFail, callerLm); } static void asdClose(struct annoStreamer **pVSelf) /* Close db connection and free self. */ { if (pVSelf == NULL) return; struct annoStreamDb *self = *(struct annoStreamDb **)pVSelf; lmCleanup(&(self->qLm)); freeMem(self->table); sqlFreeResult(&(self->sr)); hFreeConn(&(self->conn)); annoStreamerFree(pVSelf); }