c8e1dc987ba5532c65be54d8f43af1ec4337fb10 angie Sat Nov 21 09:52:07 2015 -0800 Back-end implementation of Data Integrator's support for related tables and fields using all.joiner. Most joins are implemented using a new module, hashJoin.c; but SQL joins are used in certain cases when hash joins are impractical and SQL joins are actually faster. A new module joinMixer determines which joins should be implemented by hashJoin vs SQL, and computes row indices for hashJoin objects to find keys (from SQL or other hashJoins) and store results. The SQL join info from joinMixer is translated into SQL queries in annoStreamDb. annoStreamDb also generates its own autoSql asObject, adding the fields from related tables after the fields of the main track table. Main changes: - annoStreamDb.c - main table SQL query now uses . instead of just to avoid clashes with same field name in different tables - SQL joins return multiple rows for a single main table row when there are multiple matching rows in a related table; these rows need to be squashed into one row with the multiple matches comma-separated, both to match hgTables behavior and to avoid overflow of rowBuf. (glomSqlDup) - as mentioned above, generate joining SQL queries when necessary and generate own asObj including selected fields from related tables. - parse JSON config object with relatedTables spec from UI via hgi_querySpec hashJoin basically slurps a related table into a big hash of keys to values, perform lookups (possibly of multiple keys), and formats each column's results. It includes a lot of tweaks to match hgTables/joining.c output char-for-char: collapse adjacent duplicate matches, commas at end of matches from multiple key lookups, reversed order of multiple match values. hgTables/joining.c uses arrays of slNames, but in order to avoid all that allocation I'm just glomming into an array of reused dyStrings. joinMixer takes a list of fields to include in output, gets a list of joins to be performed (from joinerRouteThroughAll), applies some simple rough heuristics to guess whether a join is practical in SQL, and decides which joins to do by SQL and which to do by hashJoin. It plans a row format with several groups of fields in this order: main table fields, related table fields to appear in the output, related table fields needed by hashJoins, hashJoin result fields needed by other hashJoins, and hashJoin result fields to appear in output. It initializes hashJoins with precomputed row indexes and also provides a mapping from big-row columns to the columns that appear in output. Thanks to Matt for testing on demo6 during development. refs #15544 diff --git src/lib/annoFormatTab.c src/lib/annoFormatTab.c index 35aadfb..c09f05f 100644 --- src/lib/annoFormatTab.c +++ src/lib/annoFormatTab.c @@ -1,318 +1,319 @@ /* annoFormatTab -- collect fields from all inputs and print them out, tab-separated. */ /* Copyright (C) 2013 The Regents of the University of California * See README in this or parent directory for licensing information. */ #include "annoFormatTab.h" #include "annoGratorQuery.h" #include "dystring.h" struct annoFormatTab { struct annoFormatter formatter; // External interface char *fileName; // Output file name, can be "stdout" FILE *f; // Output file handle struct hash *columnVis; // Hash of columns that have been explicitly selected // or deselected by user. boolean **columnFlags; // 2D array[sources][columns] of flags for whether each // column of each source is to be included in output. int *columnFlagLengths; // array[sources] of column counts (lengths of subarrays), // for bounds-checking int sourceCount; // Number of sources (primary + grators) boolean needHeader; // TRUE if we should print out the header }; static void makeFullColumnName(char *fullName, size_t size, char *sourceName, char *colName) /* If sourceName is non-empty, make fullName sourceName.colName, otherwise just colName. */ { if (isNotEmpty(sourceName)) safef(fullName, size, "%s.%s", sourceName, colName); else safecpy(fullName, size, colName); } void annoFormatTabSetColumnVis(struct annoFormatter *vSelf, char *sourceName, char *colName, boolean enabled) /* Explicitly include or exclude column in output. sourceName must be the same * as the corresponding annoStreamer source's name. */ { struct annoFormatTab *self = (struct annoFormatTab *)vSelf; if (! self->columnVis) self->columnVis = hashNew(0); char fullName[PATH_LEN]; makeFullColumnName(fullName, sizeof(fullName), sourceName, colName); hashAddInt(self->columnVis, fullName, enabled); } static boolean columnNameIsIncluded(struct annoFormatTab *self, char *sourceName, char *colName) // Return TRUE if column has not been explicitly deselected. { if (self->columnVis) { char fullName[PATH_LEN]; makeFullColumnName(fullName, sizeof(fullName), sourceName, colName); int vis = hashIntValDefault(self->columnVis, fullName, 1); if (vis == 0) return FALSE; } return TRUE; } static void addColumnFlagsForSource(struct annoFormatTab *self, struct annoStreamer *streamer) /* Make an array of flags for whether to include each column from streamer. */ { uint sourceIx = self->sourceCount; int colCount = slCount(streamer->asObj->columnList); AllocArray(self->columnFlags[sourceIx], colCount); self->columnFlagLengths[sourceIx] = colCount; struct asColumn *col; uint colIx; for (colIx = 0, col = streamer->asObj->columnList; col != NULL; col = col->next, colIx++) self->columnFlags[sourceIx][colIx] = columnNameIsIncluded(self, streamer->name, col->name); self->sourceCount++; } static void makeColumnFlags(struct annoFormatTab *self, struct annoStreamer *primary, struct annoStreamer *integrators) /* Build arrays of flags for whether each column from each source is to be included * in the output. */ { int sourceCount = 1 + slCount(integrators); AllocArray(self->columnFlags, sourceCount); AllocArray(self->columnFlagLengths, sourceCount); // self->sourceCount is incremented by each call to addColumnFlags self->sourceCount = 0; addColumnFlagsForSource(self, primary); struct annoStreamer *grator; for (grator = integrators; grator != NULL; grator = grator->next) addColumnFlagsForSource(self, grator); assert(self->sourceCount == sourceCount); } INLINE boolean columnIsIncluded(struct annoFormatTab *self, uint sourceIx, uint colIx) /* Look up flag for whether this source's column is included in output. */ { if (sourceIx >= self->sourceCount) errAbort("annoFormatTab: illegal sourceIx %d (count is %d)", sourceIx, self->sourceCount); if (colIx >= self->columnFlagLengths[sourceIx]) errAbort("annoFormatTab: illegal colIx %d (lengths[%d] is %d) -- out-of-order sources?", colIx, sourceIx, self->columnFlagLengths[sourceIx]); return self->columnFlags[sourceIx][colIx]; } static void printHeaderColumns(struct annoFormatTab *self, struct annoStreamer *source, uint sourceIx, boolean *pIsFirst) /* Print names of included columns from this source. */ { FILE *f = self->f; char *sourceName = source->name; boolean isFirst = (pIsFirst && *pIsFirst); uint colIx; struct asColumn *col; for (colIx = 0, col = source->asObj->columnList; col != NULL; col = col->next, colIx++) { if (columnIsIncluded(self, sourceIx, colIx)) { if (isFirst) isFirst = FALSE; else fputc('\t', f); char fullName[PATH_LEN]; makeFullColumnName(fullName, sizeof(fullName), sourceName, col->name); fputs(fullName, f); } } if (pIsFirst != NULL) *pIsFirst = isFirst; } static void aftInitialize(struct annoFormatter *vSelf, struct annoStreamer *primary, struct annoStreamer *integrators) /* Print header, regardless of whether we get any data after this. * Also build arrays of flags for whether each column from each source is to be included * in the output. */ { struct annoFormatTab *self = (struct annoFormatTab *)vSelf; makeColumnFlags(self, primary, integrators); if (self->needHeader) { char *primaryHeader = primary->getHeader(primary); boolean isFirst = TRUE; + char *newlineAtEnd = (lastChar(primaryHeader) == '\n') ? "" : "\n"; if (isNotEmpty(primaryHeader)) - fprintf(self->f, "# Header from primary input:\n%s", primaryHeader); + fprintf(self->f, "# Header from primary input:\n%s%s", primaryHeader, newlineAtEnd); fputc('#', self->f); printHeaderColumns(self, primary, 0, &isFirst); uint sourceIx; struct annoStreamer *grator; for (sourceIx = 1, grator = integrators; grator != NULL; grator = grator->next, sourceIx++) printHeaderColumns(self, grator, sourceIx, &isFirst); fputc('\n', self->f); self->needHeader = FALSE; } } static char **bed4WordsFromAnnoRow(struct annoRow *row, char *fourth) /* Return an array of 4 words with row's chrom, chromStart, and chromEnd, and cloned fourth. */ { char **words; AllocArray(words, 4); words[0] = cloneString(row->chrom); char buf[PATH_LEN]; safef(buf, sizeof(buf), "%u", row->start); words[1] = cloneString(buf); safef(buf, sizeof(buf), "%u", row->end); words[2] = cloneString(buf); words[3] = cloneString(fourth); return words; } static char **wordsFromWigRow(struct annoRow *row, enum annoRowType rowType) /* Return chrom, chromStart, chromEnd and a string containing comma-sep per-base wiggle values * if rowType is arWigVec, or one value if rowType is arWigSingle. */ { struct dyString *dy = NULL; if (rowType == arWigVec) { float *vector = row->data; int len = row->end - row->start; dy = dyStringNew(10*len); int i; for (i = 0; i < len; i++) { if (i > 0) dyStringAppendC(dy, ','); dyStringPrintf(dy, "%g", vector[i]); } } else if (rowType == arWigSingle) { double *pValue = row->data; dy = dyStringCreate("%lf", *pValue); } else errAbort("wordsFromWigRow: invalid rowType %d", rowType); char **words = bed4WordsFromAnnoRow(row, dy->string); dyStringFree(&dy); return words; } static char **wordsFromRow(struct annoRow *row, struct annoStreamer *source, boolean *retFreeWhenDone) /* If source->rowType is arWords, return its words. Otherwise translate row->data into words. */ { if (row == NULL) return NULL; boolean freeWhenDone = FALSE; char **words = NULL; if (source->rowType == arWords) words = row->data; else if (source->rowType == arWigVec || source->rowType == arWigSingle) { words = wordsFromWigRow(row, source->rowType); if (words != NULL) freeWhenDone = TRUE; } else errAbort("annoFormatTab: unrecognized row type %d from source %s", source->rowType, source->name); if (retFreeWhenDone != NULL) *retFreeWhenDone = freeWhenDone; return words; } static void printColumns(struct annoFormatTab *self, struct annoStreamer *streamer, uint sourceIx, struct annoRow *row, boolean isFirst) /* Print columns in streamer's row (if NULL, print the right number of empty fields). */ { FILE *f = self->f; boolean freeWhenDone = FALSE; char **words = wordsFromRow(row, streamer, &freeWhenDone); struct asColumn *col; uint colIx; for (col = streamer->asObj->columnList, colIx = 0; col != NULL; col = col->next, colIx++) { if (columnIsIncluded(self, sourceIx, colIx)) { if (isFirst) isFirst = FALSE; else fputc('\t', f); if (words != NULL) fputs((words[colIx] ? words[colIx] : ""), f); } } int wordCount = colIx; if (freeWhenDone) { for (colIx = 0; colIx < wordCount; colIx++) freeMem(words[colIx]); freeMem(words); } } static void aftComment(struct annoFormatter *fSelf, char *content) /* Print out a comment line. */ { if (strchr(content, '\n')) errAbort("aftComment: no multi-line input"); struct annoFormatTab *self = (struct annoFormatTab *)fSelf; fprintf(self->f, "# %s\n", content); } static void aftFormatOne(struct annoFormatter *vSelf, struct annoStreamRows *primaryData, struct annoStreamRows *gratorData, int gratorCount) /* Print out tab-separated columns that we have gathered in prior calls to aftCollect, * and start over fresh for the next line of output. */ { struct annoFormatTab *self = (struct annoFormatTab *)vSelf; // Got one row from primary; what's the largest # of rows from any grator? int maxRows = 1; uint iG; for (iG = 0; iG < gratorCount; iG++) { int gratorRowCount = slCount(gratorData[iG].rowList); if (gratorRowCount > maxRows) maxRows = gratorRowCount; } // Print out enough rows to make sure that all grator rows are included. int iR; for (iR = 0; iR < maxRows; iR++) { printColumns(self, primaryData->streamer, 0, primaryData->rowList, TRUE); for (iG = 0; iG < gratorCount; iG++) { struct annoRow *gratorRow = slElementFromIx(gratorData[iG].rowList, iR); printColumns(self, gratorData[iG].streamer, 1+iG, gratorRow, FALSE); } fputc('\n', self->f); } } static void aftClose(struct annoFormatter **pVSelf) /* Close file handle, free self. */ { if (pVSelf == NULL) return; struct annoFormatTab *self = *(struct annoFormatTab **)pVSelf; freeMem(self->fileName); carefulClose(&(self->f)); annoFormatterFree(pVSelf); } struct annoFormatter *annoFormatTabNew(char *fileName) /* Return a formatter that will write its tab-separated output to fileName. */ { struct annoFormatTab *aft; AllocVar(aft); struct annoFormatter *formatter = &(aft->formatter); formatter->getOptions = annoFormatterGetOptions; formatter->setOptions = annoFormatterSetOptions; formatter->initialize = aftInitialize; formatter->comment = aftComment; formatter->formatOne = aftFormatOne; formatter->close = aftClose; aft->fileName = cloneString(fileName); aft->f = mustOpen(fileName, "w"); aft->needHeader = TRUE; return (struct annoFormatter *)aft; }