36f8f6fb024b20cc523cdf9ebde7491eca84fd7c markd Sun Dec 6 20:33:20 2020 -0800 multiple request per connect works except hgBlat diff --git src/gfServer/gfServer.c src/gfServer/gfServer.c index 88a6a98..482276b 100644 --- src/gfServer/gfServer.c +++ src/gfServer/gfServer.c @@ -581,31 +581,30 @@ return TRUE; } return FALSE; } static boolean haveFileBaseName(char *baseName, int fileCount, char *seqFiles[]) /* check if the file list contains the base name of the per-seq max spec */ { int i; for (i = 0; i < fileCount; i++) if (sameString(findTail(seqFiles[i], '/'), baseName)) return TRUE; return FALSE; } - static struct hash *buildPerSeqMax(int fileCount, char *seqFiles[], char* perSeqMaxFile) /* do work of building perSeqMaxhash */ { struct hash *perSeqMaxHash = hashNew(0); struct lineFile *lf = lineFileOpen(perSeqMaxFile, TRUE); char *line; while (lineFileNextReal(lf, &line)) { // Make sure line contains a valid seq filename (before optional ':seq'), directories are ignored char *seqFile = findTail(trimSpaces(line), '/'); char copy[strlen(seqFile)+1]; safecpy(copy, sizeof copy, seqFile); char *colon = strrchr(copy, ':'); if (colon) *colon = '\0'; @@ -1081,83 +1080,141 @@ repMatch, doTrans, NULL, allowOneMismatch, doMask, stepSize, noSimpRepMask); genoFindIndexWrite(gfIdx, gfxFile); } static void dynWarnErrorVa(char* msg, va_list args) /* warnHandler to log and send back an error response */ { char buf[4096]; int msgLen = vsnprintf(buf, sizeof(buf) - 1, msg, args); buf[msgLen] = '\0'; logError("%s", buf); printf("Error: %s\n", buf); } -static void dynReadBytes(char *buf, int bufSize) -/* read pending bytes */ +struct dynSession +/* information on dynamic server connection session. This is all data + * currently cached. If is not changed if the genome and query mode is the + * same as the previous request. + */ +{ + boolean isTrans; // translated + char genome[256]; // genome name + char genomeDataDir[PATH_LEN]; // relative directory of data dir + char gfIdxFile[PATH_LEN]; // index file location + struct hash *perSeqMaxHash; // max hits per sequence + struct genoFindIndex *gfIdx; // index +}; + +static void dynSessionInit(struct dynSession *dynSession, char *rootDir, + char *genome, char *genomeDataDir, boolean isTrans) +/* Initialize or reinitialize a dynSession object */ +{ +// will free current content if initialized +genoFindIndexFree(&dynSession->gfIdx); +hashFree(&dynSession->perSeqMaxHash); + +time_t startTime = clock1000(); +dynSession->isTrans = isTrans; +safecpy(dynSession->genome, sizeof(dynSession->genome), genome); +safecpy(dynSession->genomeDataDir, sizeof(dynSession->genomeDataDir), genomeDataDir); + +char seqFile[PATH_LEN]; +safef(seqFile, PATH_LEN, "%s/%s/%s.2bit", rootDir, genomeDataDir, genome); +if (!fileExists(seqFile)) + errAbort("sequence file for %s does not exist: %s", genome, seqFile); + +char gfIdxFile[PATH_LEN]; +safef(gfIdxFile, PATH_LEN, "%s/%s/%s.%s.gfidx", rootDir, genomeDataDir, genome, isTrans ? "trans" : "untrans"); +if (!fileExists(gfIdxFile)) + errAbort("gf index file for %s does not exist: %s", genome, gfIdxFile); +dynSession->gfIdx = genoFindIndexLoad(gfIdxFile, isTrans); + +char perSeqMaxFile[PATH_LEN]; +safef(perSeqMaxFile, PATH_LEN, "%s/%s/%s.perseqmax", rootDir, genomeDataDir, genome); +if (fileExists(perSeqMaxFile)) + { + /* only the basename of the file is saved in the index */ + char *slash = strrchr(seqFile, '/'); + char *seqFiles[1] = {(slash != NULL) ? slash + 1 : seqFile}; + dynSession->perSeqMaxHash = buildPerSeqMax(1, seqFiles, perSeqMaxFile); + } +logInfo("dynserver: index loading completed in %4.3f seconds", 0.001 * (clock1000() - startTime)); +} + +static char *dynReadCommand(char* rootDir, char *buf, int bufSize) +/* read command and log, NULL if no more */ { int readSize = read(STDIN_FILENO, buf, bufSize-1); if (readSize < 0) errAbort("EOF from client"); +if (readSize == 0) + return NULL; buf[readSize] = '\0'; +logDebug("query received: %s", buf); +if (!startsWith(gfSignature(), buf)) + errAbort("query does not start with signature, got '%s'", buf); +return buf + strlen(gfSignature()); } -static void dynReadCommand(char **commandRet, int *qsizeRet, boolean *isTransRet, - char **genomeRet, char **genomeDataDirRet) -/* read query request from stdin, same as server expect includes database +static boolean dynNextCommand(char* rootDir, struct dynSession *dynSession, char **commandRet, int *qsizeRet) +/* Read query request from stdin and (re)initialize session to match paramters. * Format for query commands: - * signature+command qsize genome genomeDataDir + * signature+command genome genomeDataDir qsize * Formats for info command: * signature+command genome genomeDataDir */ { -char buf[256]; -dynReadBytes(buf, sizeof(buf)); -logDebug("query: %s", buf); - -if (!startsWith(gfSignature(), buf)) - errAbort("query does not start with signature, got '%s'", buf); +char buf[PATH_LEN]; +char *cmdStr = dynReadCommand(rootDir, buf, sizeof(buf)); +if (cmdStr == NULL) + return FALSE; char *words[6]; -int numWords = chopByWhite(buf, words, ArraySize(words)); +int numWords = chopByWhite(cmdStr, words, ArraySize(words)); if (numWords == 0) errAbort("empty command"); -char *command = buf + strlen(gfSignature()); +char *command = words[0]; *commandRet = cloneString(command); -*isTransRet = sameString("protQuery", command) || sameString("transQuery", command) +boolean isTrans = sameString("protQuery", command) || sameString("transQuery", command) || sameString("transInfo", command); +char genome[256], genomeDataDir[PATH_LEN]; if (sameString("query", command) || sameString("protQuery", command) || sameString("transQuery", command)) { if (numWords != 4) errAbort("expected 4 words in query command, got %d", numWords); - *qsizeRet = atoi(words[1]); - *genomeRet = cloneString(words[2]); - *genomeDataDirRet = cloneString(words[3]); + *qsizeRet = atoi(words[3]); } else if (sameString("untransInfo", command) || sameString("transInfo", command)) { if (numWords != 3) - errAbort("expected 3 words in query command, got %d", numWords); + errAbort("expected 3 words in info command, got %d", numWords); *qsizeRet = 0; - *genomeRet = cloneString(words[1]); - *genomeDataDirRet = cloneString(words[2]); } else errAbort("invalid command '%s'", command); + +safecpy(genome, sizeof(genome), words[1]); +safecpy(genomeDataDir, sizeof(genomeDataDir), words[2]); + +// initialize session if new or changed +if ((dynSession->isTrans != isTrans) || (!sameString(dynSession->genome, genome)) || (!sameString(dynSession->genomeDataDir, genomeDataDir))) + dynSessionInit(dynSession, rootDir, genome, genomeDataDir, isTrans); +return TRUE; } static struct dnaSeq* dynReadQuerySeq(int qSize, boolean isTrans, boolean queryIsProt) /* read the DNA sequence from the query, filtering junk */ { struct dnaSeq *seq; AllocVar(seq); seq->size = qSize; seq->dna = needLargeMem(qSize+1); if (gfReadMulti(STDIN_FILENO, seq->dna, qSize) != qSize) errAbort("read of %d bytes of query sequence failed", qSize); seq->dna[qSize] = '\0'; if (queryIsProt) { @@ -1167,122 +1224,100 @@ else { seq->size = dnaFilteredSize(seq->dna); dnaFilter(seq->dna, seq->dna); } int maxSize = (isTrans ? maxAaSize : maxNtSize); if (seq->size > maxSize) { seq->size = maxSize; seq->dna[maxSize] = 0; } return seq; } -static void dynGetDataFiles(char *rootDir, char *genome, char *genomeDataDir, - boolean isTrans, char gfIdxFile[PATH_LEN], - struct hash **perSeqMaxHashRet) -/* get paths for sequence files to handle requests and validate they exist */ -{ -char seqFile[PATH_LEN]; -safef(seqFile, PATH_LEN, "%s/%s/%s.2bit", rootDir, genomeDataDir, genome); -if (!fileExists(seqFile)) - errAbort("sequence file for %s does not exist: %s", genome, seqFile); - -safef(gfIdxFile, PATH_LEN, "%s/%s/%s.%s.gfidx", rootDir, genomeDataDir, genome, isTrans ? "trans" : "untrans"); -if (!fileExists(gfIdxFile)) - errAbort("gf index file for %s does not exist: %s", genome, gfIdxFile); - -char perSeqMaxFile[PATH_LEN]; -safef(perSeqMaxFile, PATH_LEN, "%s/%s/%s.perseqmax", rootDir, genomeDataDir, genome); -*perSeqMaxHashRet = NULL; -if (fileExists(perSeqMaxFile)) - { - /* only the basename of the file is saved in the index */ - char *slash = strrchr(seqFile, '/'); - char *seqFiles[1] = {(slash != NULL) ? slash + 1 : seqFile}; - *perSeqMaxHashRet = buildPerSeqMax(1, seqFiles, perSeqMaxFile); - } -} - static void dynamicServerQuery(char *command, int qSize, struct genoFindIndex *gfIdx, struct hash *perSeqMaxHash) /* handle search queries */ { mustWriteFd(STDOUT_FILENO, "Y", 1); boolean queryIsProt = sameString(command, "protQuery"); struct dnaSeq* seq = dynReadQuerySeq(qSize, gfIdx->isTrans, queryIsProt); if (gfIdx->isTrans) { if (queryIsProt) transQuery(gfIdx->transGf, seq, STDOUT_FILENO); else transTransQuery(gfIdx->transGf, seq, STDOUT_FILENO); } else { dnaQuery(gfIdx->untransGf, seq, STDOUT_FILENO, perSeqMaxHash); } netSendString(STDOUT_FILENO, "end"); -logDebug("query done"); } static void dynamicServerInfo(char *command, struct genoFindIndex *gfIdx) /* handle one of the info commands */ { char buf[256]; struct genoFind *gf = gfIdx->isTrans ? gfIdx->transGf[0][0] : gfIdx->untransGf; sprintf(buf, "version %s", gfVersion); netSendString(STDOUT_FILENO, buf); sprintf(buf, "type %s", (gfIdx->isTrans ? "translated" : "nucleotide")); netSendString(STDOUT_FILENO, buf); sprintf(buf, "tileSize %d", gf->tileSize); netSendString(STDOUT_FILENO, buf); sprintf(buf, "stepSize %d", gf->stepSize); netSendString(STDOUT_FILENO, buf); sprintf(buf, "minMatch %d", gf->minMatch); netSendString(STDOUT_FILENO, buf); netSendString(STDOUT_FILENO, "end"); } +static bool dynamicServerCommand(char* rootDir, struct dynSession* dynSession) +/* Execute one command from stdin, (re)initializing session as needed */ +{ +time_t startTime = clock1000(); +char *command; +int qSize; +if (!dynNextCommand(rootDir, dynSession, &command, &qSize)) + return FALSE; +logInfo("dynserver: %s %s %s %s size=%d ", command, dynSession->genome, dynSession->genomeDataDir, (dynSession->isTrans ? "trans" : "untrans"), qSize); +if (endsWith(command, "Info")) + dynamicServerInfo(command, dynSession->gfIdx); +else + dynamicServerQuery(command, qSize, dynSession->gfIdx, dynSession->perSeqMaxHash); +logInfo("dynserver: %s completed in %4.3f seconds", command, 0.001 * (clock1000() - startTime)); +return TRUE; +} + static void dynamicServer(char* rootDir) /* dynamic server for inetd. Read query from stdin, open index, query, respond, exit. * only one query at a time */ { -// make sure error is logged -pushWarnHandler(dynWarnErrorVa); +logDebug("dynamicServer connect"); -char *command, *genome, *genomeDataDir; -int qSize; -boolean isTrans; -dynReadCommand(&command, &qSize, &isTrans, &genome, &genomeDataDir); -logInfo("dynserver: %s %s %s %s size=%d ", command, genome, genomeDataDir, (isTrans ? "trans" : "untrans"), qSize); - -time_t startTime = clock1000(); -char gfIdxFile[PATH_LEN]; -struct hash *perSeqMaxHash = NULL; -dynGetDataFiles(rootDir, genome, genomeDataDir, isTrans, gfIdxFile, &perSeqMaxHash); -logInfo("dynserver: index loading completed in %4.3f seconds", 0.001 * (clock1000() - startTime)); -startTime = clock1000(); +// make sure errors are logged +pushWarnHandler(dynWarnErrorVa); +struct dynSession dynSession; +ZeroVar(&dynSession); -struct genoFindIndex *gfIdx = genoFindIndexLoad(gfIdxFile, isTrans); -if (endsWith(command, "Info")) - dynamicServerInfo(command, gfIdx); -else - dynamicServerQuery(command, qSize, gfIdx, perSeqMaxHash); -logInfo("dynserver: %s completed in %4.3f seconds", command, 0.001 * (clock1000() - startTime)); +while (dynamicServerCommand(rootDir, &dynSession)) + continue; +logDebug("dynamicServer disconnect"); } int main(int argc, char *argv[]) /* Process command line. */ { char *command; gfCatchPipes(); dnaUtilOpen(); optionInit(&argc, argv, optionSpecs); command = argv[1]; if (optionExists("trans")) { doTrans = TRUE; tileSize = 4;