e2ec5ef77645a662ba31c7cfbd7499794975275a
angie
  Fri Mar 23 16:44:07 2012 -0700
Feature #6152 (Variant Annotation Tool): Initial work, squashed infrom origin/annoGrator branch.  Superclasses annoColumn, annoFilter,
annoRow, annoStreamer, annoGrator, and annoFormatter define the core
interfaces for passing data and configuration to and from components.
The annoGrator superclass can join annoRows on position and pass
forward all rows of secondary source.  The annoGratorQuery module
orchestrates the passing of annoRows between the primary source,
annoGrator(s) and annoFormatter(s).  The subclasses annoStreamDb and
annoFormatTab, together with hg/lib/tests/annoGratorTester.c, can join
columns of two database tables such as hg19's pgNA12878 and knownGene
into tab-separated output.

diff --git src/lib/annoGrator.c src/lib/annoGrator.c
new file mode 100644
index 0000000..c02c13e
--- /dev/null
+++ src/lib/annoGrator.c
@@ -0,0 +1,194 @@
+/* annoGrator -- join two inputs on position, keeping all original fields intact. */
+
+#include "annoGrator.h"
+
+INLINE void agCheckPrimarySorting(struct annoGrator *self, struct annoRow *primaryRow)
+/* Die if primaryRow seems to have arrived out of order. */
+{
+if (self->prevPChrom == NULL)
+    self->prevPChrom = cloneString(primaryRow->chrom);
+else if (differentString(primaryRow->chrom, self->prevPChrom))
+    {
+    if (strcmp(primaryRow->chrom, self->prevPChrom) < 0)
+	errAbort("Unsorted input from primarySource (%s < %s)",
+		 primaryRow->chrom, self->prevPChrom);
+    self->prevPChrom = cloneString(primaryRow->chrom);
+    }
+else if (primaryRow->start < self->prevPStart)
+    errAbort("Unsorted input from primarySource (%s, %u < %u)",
+	     primaryRow->chrom, primaryRow->start, self->prevPStart);
+self->prevPStart = primaryRow->start;
+}
+
+//#*** use localmem for queue? one per chrom?  free when empty?  reuse structs?
+
+INLINE void agTrimToStart(struct annoGrator *self, char *chrom, uint start)
+/* If queue contains items whose end is to the left of start, splice them out. */
+{
+struct annoRow *qRow, *prevQRow = NULL, *nextQRow;
+for (qRow = self->qHead;  qRow != NULL;  qRow = nextQRow)
+    {
+    nextQRow = qRow->next;
+    int cDifRowP = strcmp(qRow->chrom, chrom);
+    if (cDifRowP > 0 || (cDifRowP == 0 && qRow->start >= start))
+	break;
+    else if (cDifRowP < 0 || qRow->end < start)
+	{
+	if (prevQRow == NULL)
+	    self->qHead = qRow->next;
+	else
+	    prevQRow->next = qRow->next;
+	if (self->qTail == qRow)
+	    self->qTail = prevQRow;
+	annoRowFree(&qRow, self->numSrcCols);
+	}
+    else
+	prevQRow = qRow;
+    }
+}
+
+INLINE void agCheckInternalSorting(struct annoRow *newRow, struct annoRow *qTail)
+/* Die if newRow precedes qTail. */
+{
+if (qTail != NULL)
+    {
+    int cDifNewTail = strcmp(newRow->chrom, qTail->chrom);
+    if (cDifNewTail < 0)
+	errAbort("Unsorted input from internal source (%s < %s)",
+		 newRow->chrom, qTail->chrom);
+    else if (cDifNewTail == 0 && newRow->start < qTail->start)
+	errAbort("Unsorted input from internal source (%s, %u < %u)",
+		 newRow->chrom, newRow->start, qTail->start);
+    }
+}
+
+INLINE void agFetchToEnd(struct annoGrator *self, char *chrom, uint end)
+/* Fetch rows until we are sure we have all items that start to the left of end,
+ * i.e. we have an item that starts at/after end or we hit eof. */
+{
+while (!self->eof &&
+       (self->qTail == NULL || strcmp(self->qTail->chrom, chrom) < 0 || self->qTail->start < end))
+    {
+    struct annoRow *newRow = self->mySource->nextRow(self->mySource);
+    if (newRow == NULL)
+	self->eof = TRUE;
+    else
+	{
+	agCheckInternalSorting(newRow, self->qTail);
+	int cDifNewP = strcmp(newRow->chrom, chrom);
+	if (cDifNewP < 0)
+	    // newRow->chrom comes before chrom; skip over newRow
+	    annoRowFree(&newRow, self->numSrcCols);
+	else
+	    {
+	    // Add newRow to qTail
+	    if (self->qTail == NULL)
+		{
+		if (self->qHead != NULL)
+		    errAbort("qTail is NULL but qHead is non-NULL");
+		self->qHead = self->qTail = newRow;
+		}
+	    else
+		{
+		self->qTail->next = newRow;
+		self->qTail = newRow;
+		}
+	    if (cDifNewP > 0)
+		// newRow->chrom comes after chrom; we're done for now
+		break;
+	    }
+	}
+    }
+}
+
+struct annoRow *annoGratorIntegrate(struct annoGrator *self, struct annoRow *primaryRow,
+				    boolean *retRJFilterFailed)
+/* Given a single row from the primary source, get all overlapping rows from internal
+ * source, and produce joined output rows.  If retRJFilterFailed is non-NULL and any
+ * overlapping row has a rightJoin filter failure (see annoFilter.h),
+ * set retRJFilterFailed and stop. */
+{
+struct annoRow *rowList = NULL;
+agCheckPrimarySorting(self, primaryRow);
+agTrimToStart(self, primaryRow->chrom, primaryRow->start);
+agFetchToEnd(self, primaryRow->chrom, primaryRow->end);
+boolean rjFailHard = (retRJFilterFailed != NULL);
+struct annoRow *qRow;
+for (qRow = self->qHead;  qRow != NULL;  qRow = qRow->next)
+    {
+    if (qRow->start < primaryRow->end && qRow->end > primaryRow->start)
+	{
+	slAddHead(&rowList, annoRowClone(qRow, self->numSrcCols));
+	if (rjFailHard && qRow->rightJoinFail)
+	    {
+	    *retRJFilterFailed = TRUE;
+	    break;
+	    }
+	}
+    }
+slReverse(&rowList);
+return rowList;
+}
+
+void annoGratorClose(struct annoStreamer **pSelf)
+/* Free self (including mySource). */
+{
+if (pSelf == NULL)
+    return;
+struct annoGrator *self = *(struct annoGrator **)pSelf;
+self->mySource->close(&(self->mySource));
+annoRowFreeList(&(self->qHead), self->numSrcCols);
+freeMem(self->prevPChrom);
+freez(pSelf);
+}
+
+static struct annoRow *noNextRow(struct annoStreamer *self)
+/* nextRow() is N/A for annoGrator, which needs caller to use integrate() instead. */
+{
+errAbort("nextRow() called on annoGrator object, but integrate() should be called instead");
+return NULL;
+}
+
+static void agReset(struct annoGrator *self)
+/* Reset all position associated with state */
+{
+freez(&self->prevPChrom);
+self->prevPStart = 0;
+self->eof = FALSE;
+annoRowFreeList(&(self->qHead), self->numSrcCols);
+self->qTail = NULL;
+}
+
+void annoGratorSetRegion(struct annoStreamer *vSelf, char *chrom, uint rStart, uint rEnd)
+/* Set genomic region for query, and reset internal state. */
+{
+struct annoGrator *self = (struct annoGrator *)vSelf;
+self->mySource->setRegion((struct annoStreamer *)(self->mySource), chrom, rStart, rEnd);
+agReset(self);
+}
+
+void annoGratorSetQuery(struct annoStreamer *vSelf, struct annoGratorQuery *query)
+/* Set query (to be called only by annoGratorQuery which is created after streamers). */
+{
+struct annoGrator *self = (struct annoGrator *)vSelf;
+self->streamer.query = query;
+self->mySource->setQuery((struct annoStreamer *)(self->mySource), query);
+}
+
+struct annoGrator *annoGratorNew(struct annoStreamer *mySource)
+/* Make a new integrator of columns from mySource with (positions of) rows passed to integrate().
+ * mySource becomes property of the new annoGrator. */
+{
+struct annoGrator *self;
+AllocVar(self);
+struct annoStreamer *streamer = &(self->streamer);
+annoStreamerInit(streamer, mySource->getAutoSqlObject(mySource));
+streamer->setRegion = annoGratorSetRegion;
+streamer->setQuery = annoGratorSetQuery;
+streamer->nextRow = noNextRow;
+streamer->close = annoGratorClose;
+self->integrate = annoGratorIntegrate;
+self->mySource = mySource;
+self->numSrcCols = slCount(mySource->asObj->columnList);
+return self;
+}