85be21057a058a0bdb12caac2bae553376442f5a
angie
  Mon Apr 15 10:56:28 2013 -0700
Refactoring to remove dependencies on annoGratorQuery from streamers,grators and formatters.  Instead, provide basic assembly info and
explicitly pass streamers(/grators) along with the rows that they
produced into formatters. ref #6152

diff --git src/lib/annoGrator.c src/lib/annoGrator.c
index 4ca39c7..4386787 100644
--- src/lib/annoGrator.c
+++ src/lib/annoGrator.c
@@ -1,256 +1,247 @@
 /* annoGrator -- join two inputs on position, keeping all original fields intact. */
 
 #include "annoGrator.h"
 
 INLINE void agCheckPrimarySorting(struct annoGrator *self, struct annoRow *primaryRow)
 /* Die if primaryRow seems to have arrived out of order. */
 {
 if (self->prevPChrom == NULL)
     self->prevPChrom = cloneString(primaryRow->chrom);
 else if (differentString(primaryRow->chrom, self->prevPChrom))
     {
     if (strcmp(primaryRow->chrom, self->prevPChrom) < 0)
 	errAbort("Unsorted input from primarySource (%s < %s)",
 		 primaryRow->chrom, self->prevPChrom);
     self->prevPChrom = cloneString(primaryRow->chrom);
     }
 else if (primaryRow->start < self->prevPStart)
     errAbort("Unsorted input from primarySource (%s, %u < %u)",
 	     primaryRow->chrom, primaryRow->start, self->prevPStart);
 self->prevPStart = primaryRow->start;
 }
 
 INLINE void agTrimToStart(struct annoGrator *self, char *chrom, uint start)
 /* If queue contains items whose end is to the left of start, splice them out. */
 {
 struct annoRow *qRow, *prevQRow = NULL, *nextQRow;
 for (qRow = self->qHead;  qRow != NULL;  qRow = nextQRow)
     {
     nextQRow = qRow->next;
     int cDifRowP = strcmp(qRow->chrom, chrom);
     if (cDifRowP > 0 || (cDifRowP == 0 && qRow->start >= start))
 	break;
     else if (cDifRowP < 0 || qRow->end < start)
 	{
 	if (prevQRow == NULL)
 	    self->qHead = qRow->next;
 	else
 	    prevQRow->next = qRow->next;
 	if (self->qTail == qRow)
 	    self->qTail = prevQRow;
 	}
     else
 	prevQRow = qRow;
     }
 if (self->qHead == NULL)
     {
     // Queue is empty - clean up lm
     lmCleanup(&(self->qLm));
     self->qLm = lmInit(0);
     }
 }
 
 INLINE void agCheckInternalSorting(struct annoRow *newRow, struct annoRow *qTail)
 /* Die if newRow precedes qTail. */
 {
 if (qTail != NULL)
     {
     int cDifNewTail = strcmp(newRow->chrom, qTail->chrom);
     if (cDifNewTail < 0)
 	errAbort("Unsorted input from internal source (%s < %s)",
 		 newRow->chrom, qTail->chrom);
     else if (cDifNewTail == 0 && newRow->start < qTail->start)
 	errAbort("Unsorted input from internal source (%s, %u < %u)",
 		 newRow->chrom, newRow->start, qTail->start);
     }
 }
 
 INLINE void agFetchToEnd(struct annoGrator *self, char *chrom, uint end)
 /* Fetch rows until we are sure we have all items that start to the left of end,
  * i.e. we have an item that starts at/after end or we hit eof. */
 {
 while (!self->eof &&
        (self->qTail == NULL || strcmp(self->qTail->chrom, chrom) < 0 || self->qTail->start < end))
     {
     struct annoRow *newRow = self->mySource->nextRow(self->mySource, self->qLm);
     if (newRow == NULL)
 	self->eof = TRUE;
     else
 	{
 	agCheckInternalSorting(newRow, self->qTail);
 	int cDifNewP = strcmp(newRow->chrom, chrom);
 	if (cDifNewP >= 0)
 	    {
 	    // Add newRow to qTail
 	    if (self->qTail == NULL)
 		{
 		if (self->qHead != NULL)
 		    errAbort("qTail is NULL but qHead is non-NULL");
 		self->qHead = self->qTail = newRow;
 		}
 	    else
 		{
 		self->qTail->next = newRow;
 		self->qTail = newRow;
 		}
 	    if (cDifNewP > 0)
 		// newRow->chrom comes after chrom; we're done for now
 		break;
 	    }
 	}
     }
 }
 
 struct annoRow *annoGratorIntegrate(struct annoGrator *self, struct annoRow *primaryRow,
 				    boolean *retRJFilterFailed, struct lm *callerLm)
 /* Given a single row from the primary source, get all overlapping rows from internal
  * source, and produce joined output rows.
  * If retRJFilterFailed is non-NULL:
  * - any overlapping row has a rightJoin filter failure (see annoFilter.h), or
  * - overlap rule is agoMustOverlap and no rows overlap, or
  * - overlap rule is agoMustNotOverlap and any overlapping row is found,
  * then set retRJFilterFailed and stop. */
 {
 struct annoRow *rowList = NULL;
 agCheckPrimarySorting(self, primaryRow);
 agTrimToStart(self, primaryRow->chrom, primaryRow->start);
 agFetchToEnd(self, primaryRow->chrom, primaryRow->end);
 boolean rjFailHard = (retRJFilterFailed != NULL);
 if (rjFailHard)
     *retRJFilterFailed = FALSE;
 struct annoRow *qRow;
 for (qRow = self->qHead;  qRow != NULL;  qRow = qRow->next)
     {
     if (qRow->start < primaryRow->end && qRow->end > primaryRow->start &&
 	sameString(qRow->chrom, primaryRow->chrom))
 	{
 	int numCols = self->mySource->numCols;
 	enum annoRowType rowType = self->mySource->rowType;
 	slAddHead(&rowList, annoRowClone(qRow, rowType, numCols, callerLm));
 	if (rjFailHard && qRow->rightJoinFail)
 	    {
 	    *retRJFilterFailed = TRUE;
 	    break;
 	    }
 	}
     }
 slReverse(&rowList);
 // If no rows overlapped primary, and there is a right-join, !isExclude (i.e. isInclude) filter,
 // then we need to set retRJFilterFailed because the condition was not met to include
 // the primary item.
 if (retRJFilterFailed &&
     ((rowList == NULL && (self->haveRJIncludeFilter || self->overlapRule == agoMustOverlap)) ||
      (rowList != NULL && self->overlapRule == agoMustNotOverlap)))
     *retRJFilterFailed = TRUE;
 return rowList;
 }
 
 void annoGratorClose(struct annoStreamer **pSelf)
 /* Free self (including mySource). */
 {
 if (pSelf == NULL)
     return;
 struct annoGrator *self = *(struct annoGrator **)pSelf;
 lmCleanup(&(self->qLm));
 self->mySource->close(&(self->mySource));
 freeMem(self->prevPChrom);
 freez(pSelf);
 }
 
 static struct annoRow *noNextRow(struct annoStreamer *self, struct lm *callerLm)
 /* nextRow() is N/A for annoGrator, which needs caller to use integrate() instead. */
 {
 errAbort("nextRow() called on annoGrator object, but integrate() should be called instead");
 return NULL;
 }
 
 static void agReset(struct annoGrator *self)
 /* Reset all state associated with position */
 {
 freez(&(self->prevPChrom));
 self->prevPStart = 0;
 self->eof = FALSE;
 lmCleanup(&(self->qLm));
 self->qLm = lmInit(0);
 self->qHead = self->qTail = NULL;
 }
 
 static boolean filtersHaveRJInclude(struct annoFilter *filters)
 /* Return TRUE if filters have at least one active filter with !isExclude && rightJoin. */
 {
 struct annoFilter *filter;
 for (filter = filters;  filter != NULL;  filter = filter->next)
     if (filter->op != afNoFilter && !filter->isExclude && filter->rightJoin)
 	return TRUE;
 return FALSE;
 }
 
 static void agSetFilters(struct annoStreamer *vSelf, struct annoFilter *newFilters)
 /* Update filters and re-evaluate self->haveRJIncludeFilter */
 {
 annoStreamerSetFilters(vSelf, newFilters);
 struct annoGrator *self = (struct annoGrator *)vSelf;
 self->haveRJIncludeFilter = filtersHaveRJInclude(vSelf->filters);
 }
 
 void annoGratorSetRegion(struct annoStreamer *vSelf, char *chrom, uint rStart, uint rEnd)
 /* Set genomic region for query, and reset internal state. */
 {
 struct annoGrator *self = (struct annoGrator *)vSelf;
 self->mySource->setRegion((struct annoStreamer *)(self->mySource), chrom, rStart, rEnd);
 agReset(self);
 }
 
-void annoGratorSetQuery(struct annoStreamer *vSelf, struct annoGratorQuery *query)
-/* Set query (to be called only by annoGratorQuery which is created after streamers). */
-{
-struct annoGrator *self = (struct annoGrator *)vSelf;
-self->streamer.query = query;
-self->mySource->setQuery((struct annoStreamer *)(self->mySource), query);
-}
-
 static void agSetAutoSqlObject(struct annoStreamer *sSelf, struct asObject *asObj)
 /* Use new asObj and update internal state derived from asObj. */
 {
 struct annoGrator *gSelf = (struct annoGrator *)sSelf;
 annoStreamerSetAutoSqlObject(sSelf, asObj);
 gSelf->haveRJIncludeFilter = filtersHaveRJInclude(sSelf->filters);
 }
 
 void agSetOverlapRule(struct annoGrator *self, enum annoGratorOverlap rule)
 /* Tell annoGrator how to handle overlap of its rows with primary row. */
 {
 self->overlapRule = rule;
 }
 
 void annoGratorInit(struct annoGrator *self, struct annoStreamer *mySource)
 /* Initialize an integrator of columns from mySource with (positions of)
  * rows passed to integrate().
  * mySource becomes property of the annoGrator. */
 {
 struct annoStreamer *streamer = &(self->streamer);
-annoStreamerInit(streamer, mySource->getAutoSqlObject(mySource));
+annoStreamerInit(streamer, mySource->assembly, mySource->getAutoSqlObject(mySource));
 streamer->rowType = mySource->rowType;
 streamer->setAutoSqlObject = agSetAutoSqlObject;
 streamer->setFilters = agSetFilters;
 streamer->setRegion = annoGratorSetRegion;
-streamer->setQuery = annoGratorSetQuery;
 streamer->nextRow = noNextRow;
 streamer->close = annoGratorClose;
 self->qLm = lmInit(0);
 self->integrate = annoGratorIntegrate;
 self->setOverlapRule = agSetOverlapRule;
 self->overlapRule = agoNoConstraint;
 self->mySource = mySource;
 self->haveRJIncludeFilter = filtersHaveRJInclude(streamer->filters);
 }
 
 struct annoGrator *annoGratorNew(struct annoStreamer *mySource)
 /* Make a new integrator of columns from mySource with (positions of) rows passed to integrate().
  * mySource becomes property of the new annoGrator. */
 {
 struct annoGrator *self;
 AllocVar(self);
 annoGratorInit(self, mySource);
 return self;
 }