df9e8bd3895a98d6324eea6fe30458e222172353
angie
  Mon Jun 3 12:47:14 2013 -0700
Fixed implementation of maxItems.  To reduce the delay before getting thefirst item in range, now bbi queries are limited to 100,000 items at a time,
so now there is a series of "chunk" queries instead of one query per region
or chromosome.  for #6152

diff --git src/lib/annoStreamBigBed.c src/lib/annoStreamBigBed.c
index 4b8541d..278d4e1 100644
--- src/lib/annoStreamBigBed.c
+++ src/lib/annoStreamBigBed.c
@@ -2,163 +2,243 @@
 
 #include "annoStreamBigBed.h"
 #include "bigBed.h"
 #include "localmem.h"
 #include "sqlNum.h"
 
 struct annoStreamBigBed
     {
     struct annoStreamer streamer;	// Parent class members & methods
     // Private members
     struct bbiFile *bbi;		// bbi handle for bigBed file/URL.
     struct lm *intervalQueryLm;		// localmem object for bigBedIntervalQuery
     struct bigBedInterval *intervalList;	// results of bigBedIntervalQuery
     struct bigBedInterval *nextInterval;	// next result to be translated into row
     struct bbiChromInfo *chromList;	// list of chromosomes for which bbi actually has data
-    struct bbiChromInfo *queryChrom;	// most recently queried chrom (or NULL) for whole-genome
-    int maxItems;			// max items returned from bigBedIntervalQuery
+    struct bbiChromInfo *queryChrom;	// most recently queried chrom for whole-genome (or NULL)
+    boolean useMaxItems;		// TRUE if maxItems passed to annoStreamBigBedNew is > 0
+    int maxItems;			// max remaining annoRows that nextRow can return
+    boolean eof;			// TRUE when we are done (maxItems or no more items)
+    boolean doNextChunk;		// TRUE if intervalList ends before end of chrom/region
+    uint nextChunkStart;		// Start coord for next chunk of intervals to query
     char **row;				// storage for results of bigBedIntervalToRow
     char *startBuf;			// storage for stringified start from bigBedIntervalToRow
     char *endBuf;			// storage for stringified end from bigBedIntervalToRow
     };
 
+// For performance reasons, even if !useMaxItems (no limit), we need to limit the
+// number of intervals that are slurped into memory for a bigBedIntervalQuery, so
+// we don't sit waiting too long when a chromosome has millions of intervals.
+#define ASBB_CHUNK_SIZE 100000
+
 static void asbbSetRegion(struct annoStreamer *vSelf, char *chrom, uint regionStart, uint regionEnd)
 /* Set region -- and free localmem from previous query if necessary. */
 {
 annoStreamerSetRegion(vSelf, chrom, regionStart, regionEnd);
 struct annoStreamBigBed *self = (struct annoStreamBigBed *)vSelf;
 self->nextInterval = self->intervalList = NULL;
+self->queryChrom = NULL;
+self->eof = FALSE;
 lmCleanup(&(self->intervalQueryLm));
 }
 
+static void updateNextChunkState(struct annoStreamBigBed *self, int queryMaxItems)
+/* If the just-fetched interval list was limited to ASBB_CHUNK_SIZE, set doNextChunk
+ * and trim the last interval(s) so that when we query the next chunk, we don't get
+ * repeat rows due to querying a start coord that was already returned. */
+{
+if (queryMaxItems == ASBB_CHUNK_SIZE)
+    {
+    int itemCount = slCount(self->intervalList);
+    if (itemCount == ASBB_CHUNK_SIZE)
+	{
+	self->doNextChunk = TRUE;
+	struct bigBedInterval *lastIv = NULL, *iv;
+	for (iv = self->intervalList;  iv->next != NULL;  iv = iv->next)
+	    {
+	    if (iv->start != iv->next->start)
+		{
+		lastIv = iv;
+		self->nextChunkStart = iv->next->start;
+		}
+	    }
+	lastIv->next = NULL;
+	}
+    }
+else
+    self->doNextChunk = FALSE;
+}
+
 static void asbbDoQuery(struct annoStreamBigBed *self, char *minChrom, uint minEnd)
-/* Store results of an interval query. [Would be nice to make a streaming version of this.] */
+/* Store results of an interval query. */
 {
 struct annoStreamer *sSelf = &(self->streamer);
-if (self->intervalQueryLm == NULL)
+if (sSelf->chrom != NULL && self->intervalList != NULL)
+    self->eof = TRUE;
+if (self->useMaxItems)
+    {
+    int lastIntervalCount = slCount(self->intervalList);
+    self->maxItems -= lastIntervalCount;
+    if (self->maxItems <= 0)
+	self->eof = TRUE;
+    }
+self->nextInterval = self->intervalList = NULL;
+lmCleanup(&self->intervalQueryLm);
+if (self->eof)
+    return;
     self->intervalQueryLm = lmInit(0);
+int queryMaxItems = ASBB_CHUNK_SIZE;
+if (self->useMaxItems && self->maxItems < queryMaxItems)
+    queryMaxItems = self->maxItems;
 if (sSelf->chrom != NULL)
     {
     uint start = sSelf->regionStart;
     if (minChrom)
 	{
 	if (differentString(minChrom, sSelf->chrom))
 	    errAbort("annoStreamBigBed %s: nextRow minChrom='%s' but region chrom='%s'",
 		     sSelf->name, minChrom, sSelf->chrom);
 	if (start < minEnd)
 	    start = minEnd;
 	}
+    if (self->doNextChunk && start < self->nextChunkStart)
+	start = self->nextChunkStart;
     self->intervalList = bigBedIntervalQuery(self->bbi, sSelf->chrom, start, sSelf->regionEnd,
-					     self->maxItems, self->intervalQueryLm);
+					     queryMaxItems, self->intervalQueryLm);
+    updateNextChunkState(self, queryMaxItems);
     }
 else
     {
     // Genome-wide query: break it into chrom-by-chrom queries.
     if (self->queryChrom == NULL)
 	self->queryChrom = self->chromList;
-    else
+    else if (!self->doNextChunk)
 	self->queryChrom = self->queryChrom->next;
     if (minChrom != NULL)
 	{
 	// Skip chroms that precede minChrom
 	while (self->queryChrom != NULL && strcmp(self->queryChrom->name, minChrom) < 0)
+	    {
 	    self->queryChrom = self->queryChrom->next;
+	    self->doNextChunk = FALSE;
+	    }
 	}
     if (self->queryChrom == NULL)
 	{
-	self->chromList = NULL; // EOF, don't start over!
-	self->intervalList = NULL;
+	self->eof = TRUE;
 	}
     else
 	{
 	char *chrom = self->queryChrom->name;
 	int start = 0;
 	if (minChrom != NULL && sameString(chrom, minChrom))
 	    start = minEnd;
+	if (self->doNextChunk && start < self->nextChunkStart)
+	    {
+	    start = self->nextChunkStart;
+	    }
 	uint end = self->queryChrom->size;
 	self->intervalList = bigBedIntervalQuery(self->bbi, chrom, start, end,
-						 self->maxItems, self->intervalQueryLm);
+						 queryMaxItems, self->intervalQueryLm);
+	updateNextChunkState(self, queryMaxItems);
 	}
     }
 self->nextInterval = self->intervalList;
 }
 
-static char **nextRowUnfiltered(struct annoStreamBigBed *self)
+static char **nextRowUnfiltered(struct annoStreamBigBed *self, char *minChrom, uint minEnd)
 /* Convert the next available interval into a row of words, or return NULL. */
 {
 struct annoStreamer *sSelf = &(self->streamer);
+if (self->nextInterval == NULL ||
+    (minChrom != NULL && self->queryChrom != NULL &&
+     (strcmp(self->queryChrom->name, minChrom) < 0)))
+    asbbDoQuery(self, minChrom, minEnd);
 if (self->nextInterval == NULL)
     return NULL;
 char *chrom = sSelf->chrom ? sSelf->chrom : self->queryChrom->name;
+// If current set of intervals is for minChrom, skip items to the left of minEnd:
+if (minChrom != NULL && sameString(chrom, minChrom))
+    {
+    while (self->nextInterval && self->nextInterval->end < minEnd)
+	self->nextInterval = self->nextInterval->next;
+    // If no more items are left in intervalList, query again:
+    if (self->nextInterval == NULL)
+	asbbDoQuery(self, minChrom, minEnd);
+    if (self->nextInterval == NULL)
+	return NULL;
+    }
 int fieldCount = bigBedIntervalToRow(self->nextInterval, chrom,
 				     self->startBuf, self->endBuf,
 				     self->row, sSelf->numCols+1);
 if (fieldCount != sSelf->numCols)
     errAbort("annoStreamBigBed %s: expected %d columns, got %d",
 	     sSelf->name, sSelf->numCols, fieldCount);
 self->nextInterval = self->nextInterval->next;
 return self->row;
 }
 
-static struct annoRow *asbbNextRow(struct annoStreamer *vSelf, char *minChrom, uint minEnd, struct lm *lm)
+static struct annoRow *asbbNextRow(struct annoStreamer *vSelf, char *minChrom, uint minEnd,
+				   struct lm *lm)
 /* Return a single annoRow, or NULL if there are no more items. */
 {
 struct annoStreamBigBed *self = (struct annoStreamBigBed *)vSelf;
-if (self->intervalList == NULL)
-    asbbDoQuery(self, minChrom, minEnd);
-char **row = nextRowUnfiltered(self);
+if (self->eof)
+    return NULL;
+char **row = nextRowUnfiltered(self, minChrom, minEnd);
 if (row == NULL)
     return NULL;
 // Skip past any left-join failures until we get a right-join failure, a passing row, or EOF.
 boolean rightFail = FALSE;
 while (annoFilterRowFails(vSelf->filters, row, vSelf->numCols, &rightFail))
     {
     if (rightFail)
 	break;
-    row = nextRowUnfiltered(self);
+    row = nextRowUnfiltered(self, minChrom, minEnd);
     if (row == NULL)
 	return NULL;
     }
 char *chrom = row[0];
 uint chromStart = sqlUnsigned(row[1]);
 uint chromEnd = sqlUnsigned(row[2]);
 return annoRowFromStringArray(chrom, chromStart, chromEnd, rightFail, row, vSelf->numCols, lm);
 }
 
 static void asbbClose(struct annoStreamer **pVSelf)
 /* Close bbi handle and free self. */
 {
 if (pVSelf == NULL)
     return;
 struct annoStreamBigBed *self = *(struct annoStreamBigBed **)pVSelf;
 bigBedFileClose(&(self->bbi));
 self->intervalList = NULL;
 lmCleanup(&(self->intervalQueryLm));
 freeMem(self->row);
 freeMem(self->startBuf);
 freeMem(self->endBuf);
 annoStreamerFree(pVSelf);
 }
 
 struct annoStreamer *annoStreamBigBedNew(char *fileOrUrl, struct annoAssembly *aa, int maxItems)
 /* Create an annoStreamer (subclass) object from a file or URL; if
  * maxItems is 0, all items from a query will be returned, otherwise
- * each query is limited to maxItems. */
+ * output is limited to maxItems. */
 {
 struct bbiFile *bbi = bigBedFileOpen(fileOrUrl);
 struct asObject *asObj = bigBedAsOrDefault(bbi);
 struct annoStreamBigBed *self = NULL;
 AllocVar(self);
 struct annoStreamer *streamer = &(self->streamer);
 annoStreamerInit(streamer, aa, asObj, fileOrUrl);
 streamer->rowType = arWords;
 streamer->setRegion = asbbSetRegion;
 streamer->nextRow = asbbNextRow;
 streamer->close = asbbClose;
 self->bbi = bbi;
+self->useMaxItems = (maxItems > 0);
 self->maxItems = maxItems;
 AllocArray(self->row, streamer->numCols + 1);
 self->startBuf = needMem(32);
 self->endBuf = needMem(32);
 self->chromList = bbiChromList(bbi);
 return (struct annoStreamer *)self;
 }