40ed3e546ef868bffe4c338d93d6528138ecfc44
angie
Wed Sep 30 09:25:12 2015 -0700
Add test cases (and bug fixes!) to make sure that we get the desired
behavior when an insertion is adjacent to a non-insertion, and when
an insertion falls at the start or end of the search region (if applicable):
- Include insertions that fall at the start or end of the search region
- If the primary row is an insertion, keep secondary non-insertion rows
to the left and right.
- If the primary row is a non-insertion, keep secondary insertion rows
at its start and end.
(i.e. keep insertions at boundaries -- but don't let any non-insertions
slip through)
The VCF logic is more complicated because VCF indels always include an
extra base to the left, so they appear to start before they actually do,
and can be interspersed with non-indels that start there.
diff --git src/hg/lib/annoStreamDb.c src/hg/lib/annoStreamDb.c
index 3eb57e7..8348420 100644
--- src/hg/lib/annoStreamDb.c
+++ src/hg/lib/annoStreamDb.c
@@ -96,100 +96,125 @@
static void resetChunkState(struct annoStreamDb *self)
/* Reset members that track chunked queries. */
{
self->queryChrom = NULL;
self->eof = FALSE;
self->doNextChunk = FALSE;
self->needQuery = TRUE;
resetRowBuf(&self->rowBuf);
}
static void startMerging(struct annoStreamDb *self)
/* Set self->mergeBins flag and create self->qLm if necessary. */
{
self->mergeBins = TRUE;
+self->gotFinestBin = FALSE;
if (self->qLm == NULL)
self->qLm = lmInit(0);
}
static void asdSetRegion(struct annoStreamer *vSelf, char *chrom, uint regionStart, uint regionEnd)
/* Set region -- and free current sqlResult if there is one. */
{
annoStreamerSetRegion(vSelf, chrom, regionStart, regionEnd);
struct annoStreamDb *self = (struct annoStreamDb *)vSelf;
sqlFreeResult(&(self->sr));
resetMergeState(self);
resetChunkState(self);
}
static char **nextRowFromSqlResult(struct annoStreamDb *self)
/* Stream rows directly from self->sr. */
{
return sqlNextRow(self->sr);
}
static struct dyString *asdMakeBaselineQuery(struct annoStreamDb *self, boolean *retHasWhere)
/* Return a baseline query, i.e. "select * from
". This is the default implementation
* of annoStreamDb.makeBaselineQuery. */
{
if (retHasWhere)
*retHasWhere = FALSE;
return sqlDyStringCreate("select * from %s ", self->table);
}
+static void addRangeToQuery(struct annoStreamDb *self, struct dyString *query,
+ char *chrom, uint start, uint end, boolean hasWhere)
+/* Add position constraints to query. */
+{
+sqlDyStringAppend(query, hasWhere ? " and " : " where ");
+sqlDyStringPrintf(query, "%s = '%s' ", self->chromField, chrom);
+uint chromSize = annoAssemblySeqSize(self->streamer.assembly, chrom);
+boolean addStartConstraint = (start > 0);
+boolean addEndConstraint = (end < chromSize);
+if (addStartConstraint || addEndConstraint)
+ {
+ sqlDyStringAppend(query, "and ");
+ if (self->hasBin)
+ hAddBinToQuery(start, end, query);
+ if (addStartConstraint)
+ {
+ if (self->doNextChunk)
+ sqlDyStringPrintf(query, "%s >= %u ", self->startField, start);
+ else
+ // Make sure to include insertions at start:
+ sqlDyStringPrintf(query, "(%s > %u or (%s = %s and %s = %u)) ",
+ self->endField, start,
+ self->endField, self->startField, self->startField, start);
+ }
+ if (addEndConstraint)
+ {
+ if (addStartConstraint)
+ sqlDyStringAppend(query, "and ");
+ // Make sure to include insertions at end:
+ sqlDyStringPrintf(query, "(%s < %u or (%s = %s and %s = %u)) ",
+ self->startField, end,
+ self->startField, self->endField, self->endField, end);
+ }
+ }
+}
static void asdDoQuerySimple(struct annoStreamDb *self, char *minChrom, uint minEnd)
/* Return a sqlResult for a query on table items in position range.
* If doing a whole genome query. just select all rows from table. */
// NOTE: it would be possible to implement filters at this level, as in hgTables.
{
struct annoStreamer *streamer = &(self->streamer);
boolean hasWhere = FALSE;
struct dyString *query = self->makeBaselineQuery(self, &hasWhere);
if (!streamer->positionIsGenome)
{
if (minChrom && differentString(minChrom, streamer->chrom))
errAbort("annoStreamDb %s: nextRow minChrom='%s' but region chrom='%s'",
streamer->name, minChrom, streamer->chrom);
if (self->hasBin)
{
// Results will be in bin order, but we can restore chromStart order by
// accumulating initial coarse-bin items and merge-sorting them with
// subsequent finest-bin items which will be in chromStart order.
resetMergeState(self);
startMerging(self);
}
if (self->endFieldIndexName != NULL)
// Don't let mysql use a (chrom, chromEnd) index because that messes up
// sorting by chromStart.
sqlDyStringPrintf(query, " IGNORE INDEX (%s)", self->endFieldIndexName);
- sqlDyStringAppend(query, hasWhere ? " and " : " where ");
- sqlDyStringPrintf(query, "%s='%s'", self->chromField, streamer->chrom);
- int chromSize = annoAssemblySeqSize(streamer->assembly, streamer->chrom);
- if (streamer->regionStart != 0 || streamer->regionEnd != chromSize)
- {
- dyStringAppend(query, " and ");
- if (self->hasBin)
- hAddBinToQuery(streamer->regionStart, streamer->regionEnd, query);
- sqlDyStringPrintf(query, "%s < %u and %s > %u", self->startField, streamer->regionEnd,
- self->endField, streamer->regionStart);
+ addRangeToQuery(self, query, streamer->chrom, streamer->regionStart, streamer->regionEnd,
+ hasWhere);
}
if (self->notSorted)
- sqlDyStringPrintf(query, " order by %s", self->startField);
- }
-else if (self->notSorted)
sqlDyStringPrintf(query, " order by %s,%s", self->chromField, self->startField);
if (self->maxOutRows > 0)
dyStringPrintf(query, " limit %d", self->maxOutRows);
struct sqlResult *sr = sqlGetResult(self->conn, query->string);
dyStringFree(&query);
self->sr = sr;
self->needQuery = FALSE;
}
static void rowBufInit(struct rowBuf *rowBuf, int size)
/* Clean up rowBuf and give it a new lm and buffer[size]. */
{
resetRowBuf(rowBuf);
rowBuf->lm = lmInit(0);
rowBuf->size = size;
@@ -248,155 +273,154 @@
char **row = NULL;
int ix = 0;
while ((row = sqlNextRow(sr)) != NULL)
{
if (ix >= rowBuf->size)
errAbort("annoStreamDb %s: rowBuf overflow, got more than %d rows",
sSelf->name, rowBuf->size);
rowBuf->buf[ix++] = lmCloneRow(rowBuf->lm, row, sSelf->numCols+self->omitBin);
}
// Set rowBuf->size to the number of rows we actually stored.
rowBuf->size = ix;
sqlFreeResult(&sr);
updateNextChunkState(self, queryMaxItems);
}
+static void updateQueryChrom(struct annoStreamDb *self, char *minChrom)
+/* Figure out whether we need to query the next chunk on the current chromosome
+ * or move on to the next chromosome. */
+{
+if (self->queryChrom == NULL)
+ self->queryChrom = self->chromList;
+else if (!self->doNextChunk)
+ {
+ self->queryChrom = self->queryChrom->next;
+ if (self->hasBin)
+ {
+ resetMergeState(self);
+ startMerging(self);
+ }
+ }
+// -- don't resetMergeState if doNextChunk.
+if (minChrom != NULL)
+ {
+ // Skip chroms that precede minChrom
+ while (self->queryChrom != NULL && strcmp(self->queryChrom->name, minChrom) < 0)
+ {
+ self->queryChrom = self->queryChrom->next;
+ self->doNextChunk = FALSE;
+ }
+ if (self->hasBin)
+ {
+ resetMergeState(self);
+ startMerging(self);
+ }
+ }
+}
+
+static void doOneChunkQuery(struct annoStreamDb *self, struct dyString *query,
+ char *chrom, uint start, uint end,
+ boolean hasWhere, int maxItems)
+/* Add range constraints to query, perform query and buffer the results. */
+{
+addRangeToQuery(self, query, chrom, start, end, hasWhere);
+if (self->notSorted)
+ sqlDyStringPrintf(query, "order by %s ", self->startField);
+sqlDyStringPrintf(query, "limit %d", maxItems);
+bufferRowsFromSqlQuery(self, query->string, maxItems);
+}
+
static void asdDoQueryChunking(struct annoStreamDb *self, char *minChrom, uint minEnd)
-/* Return a sqlResult for a query on table items in position range.
- * If doing a whole genome query, just select all rows from table. */
+/* Get rows from mysql with a limit on the number of rows returned at one time (ASD_CHUNK_SIZE),
+ * to avoid long delays for very large tables. This will be called multiple times if
+ * the number of rows in region is more than ASD_CHUNK_SIZE. If doing a genome-wide query,
+ * break it up into chrom-by-chrom queries because the code that merges large bin items
+ * in with small bin items assumes that all rows are on the same chrom. */
{
struct annoStreamer *sSelf = &(self->streamer);
boolean hasWhere = FALSE;
struct dyString *query = self->makeBaselineQuery(self, &hasWhere);
if (sSelf->chrom != NULL && self->rowBuf.size > 0 && !self->doNextChunk)
{
// We're doing a region query, we already got some rows, and don't need another chunk:
resetRowBuf(&self->rowBuf);
self->eof = TRUE;
}
if (self->useMaxOutRows)
{
self->maxOutRows -= self->rowBuf.size;
if (self->maxOutRows <= 0)
self->eof = TRUE;
}
if (self->eof)
return;
int queryMaxItems = ASD_CHUNK_SIZE;
if (self->useMaxOutRows && self->maxOutRows < queryMaxItems)
queryMaxItems = self->maxOutRows;
if (self->hasBin)
{
// Results will be in bin order, but we can restore chromStart order by
// accumulating initial coarse-bin items and merge-sorting them with
// subsequent finest-bin items which will be in chromStart order.
if (self->doNextChunk && self->mergeBins && !self->gotFinestBin)
errAbort("annoStreamDb %s: can't continue merge in chunking query; "
"increase ASD_CHUNK_SIZE", sSelf->name);
+ // Don't reset merge state here in case bigItemQueue has a large-bin item
+ // at the end of the chrom, past all smallest-bin items.
startMerging(self);
}
if (self->endFieldIndexName != NULL)
// Don't let mysql use a (chrom, chromEnd) index because that messes up
// sorting by chromStart.
sqlDyStringPrintf(query, " IGNORE INDEX (%s) ", self->endFieldIndexName);
if (sSelf->chrom != NULL)
{
+ // Region query (but might end up as multiple chunked queries)
+ char *chrom = sSelf->chrom;
uint start = sSelf->regionStart;
+ uint end = sSelf->regionEnd;
if (minChrom)
{
- if (differentString(minChrom, sSelf->chrom))
+ if (differentString(minChrom, chrom))
errAbort("annoStreamDb %s: nextRow minChrom='%s' but region chrom='%s'",
- sSelf->name, minChrom, sSelf->chrom);
+ sSelf->name, minChrom, chrom);
if (start < minEnd)
start = minEnd;
}
if (self->doNextChunk && start < self->nextChunkStart)
start = self->nextChunkStart;
- sqlDyStringAppend(query, hasWhere ? " and " : " where ");
- sqlDyStringPrintf(query, "%s = '%s' and ", self->chromField, sSelf->chrom);
- if (self->hasBin)
- {
- hAddBinToQuery(start, sSelf->regionEnd, query);
- }
- if (self->doNextChunk)
- sqlDyStringPrintf(query, "%s >= %u and ", self->startField, self->nextChunkStart);
- sqlDyStringPrintf(query, "%s < %u and %s > %u ", self->startField, sSelf->regionEnd,
- self->endField, start);
- if (self->notSorted)
- sqlDyStringPrintf(query, "order by %s ", self->startField);
- sqlDyStringPrintf(query, "limit %d", queryMaxItems);
- bufferRowsFromSqlQuery(self, query->string, queryMaxItems);
+ doOneChunkQuery(self, query, chrom, start, end, hasWhere, queryMaxItems);
if (self->rowBuf.size == 0)
self->eof = TRUE;
}
else
{
- // Genome-wide query: break it into chrom-by-chrom queries.
- if (self->queryChrom == NULL)
- self->queryChrom = self->chromList;
- else if (!self->doNextChunk)
- {
- self->queryChrom = self->queryChrom->next;
- if (self->hasBin)
- {
- resetMergeState(self);
- startMerging(self);
- }
- }
- if (minChrom != NULL)
- {
- // Skip chroms that precede minChrom
- while (self->queryChrom != NULL && strcmp(self->queryChrom->name, minChrom) < 0)
- {
- self->queryChrom = self->queryChrom->next;
- self->doNextChunk = FALSE;
- }
- if (self->hasBin)
- {
- resetMergeState(self);
- startMerging(self);
- }
- }
+ // Genome-wide query: break it into chrom-by-chrom queries (that might be chunked)
+ // because the mergeBins stuff assumes that all rows are from the same chrom.
+ updateQueryChrom(self, minChrom);
if (self->queryChrom == NULL)
self->eof = TRUE;
else
{
char *chrom = self->queryChrom->name;
int start = 0;
if (minChrom != NULL && sameString(chrom, minChrom))
start = minEnd;
if (self->doNextChunk && start < self->nextChunkStart)
start = self->nextChunkStart;
- uint end = annoAssemblySeqSize(self->streamer.assembly, self->queryChrom->name);
- sqlDyStringAppend(query, hasWhere ? " and " : " where ");
- sqlDyStringPrintf(query, "%s = '%s' ", self->chromField, chrom);
- if (start > 0 || self->doNextChunk)
- {
- dyStringAppend(query, "and ");
- if (self->hasBin)
- {
- hAddBinToQuery(start, end, query);
- }
- if (self->doNextChunk)
- sqlDyStringPrintf(query, "%s >= %u and ", self->startField, self->nextChunkStart);
- // region end is chromSize, so no need to constrain startField here:
- sqlDyStringPrintf(query, "%s > %u ", self->endField, start);
- }
- if (self->notSorted)
- sqlDyStringPrintf(query, "order by %s ", self->startField);
- dyStringPrintf(query, "limit %d", queryMaxItems);
- bufferRowsFromSqlQuery(self, query->string, queryMaxItems);
+ uint end = annoAssemblySeqSize(self->streamer.assembly, chrom);
+ doOneChunkQuery(self, query, chrom, start, end, hasWhere, queryMaxItems);
// If there happens to be no items on chrom, try again with the next chrom:
if (! self->eof && self->rowBuf.size == 0)
asdDoQueryChunking(self, minChrom, minEnd);
}
}
dyStringFree(&query);
}
static char **nextRowFromBuffer(struct annoStreamDb *self)
/* Instead of streaming directly from self->sr, we have buffered up the results
* of a chunked query; return the head of that queue. */
{
struct rowBuf *rowBuf = &self->rowBuf;
if (rowBuf->ix > rowBuf->size)
errAbort("annoStreamDb %s: rowBuf overflow (%d > %d)", self->streamer.name,