2aa9a5801154010a033d3cdc760a8c3e296e835a galt Fri Oct 9 02:30:57 2020 -0700 Changed default -timeout to 90 seconds instead of 10. Improved handling skips further sending on socket after failure to avoid multiple long timeouts. refs #26285 diff --git src/gfServer/gfServer.c src/gfServer/gfServer.c index 84c435e..32a299c 100644 --- src/gfServer/gfServer.c +++ src/gfServer/gfServer.c @@ -56,31 +56,32 @@ int minMatch = gfMinMatch; /* Can be overridden from command line. */ int tileSize = gfTileSize; /* Can be overridden from command line. */ int stepSize = 0; /* Can be overridden from command line. */ boolean doTrans = FALSE; /* Do translation? */ boolean allowOneMismatch = FALSE; boolean noSimpRepMask = FALSE; int repMatch = 1024; /* Can be overridden from command line. */ int maxDnaHits = 100; /* Can be overridden from command line. */ int maxTransHits = 200; /* Can be overridden from command line. */ int maxGap = gfMaxGap; boolean seqLog = FALSE; boolean ipLog = FALSE; boolean doMask = FALSE; boolean canStop = FALSE; -int timeout = 10; // default timeout in seconds +int timeout = 90; // default timeout in seconds + void usage() /* Explain usage and exit. */ { errAbort( "gfServer v %s - Make a server to quickly find where DNA occurs in genome\n" " To set up a server:\n" " gfServer start host port file(s)\n" " where the files are .2bit or .nib format files specified relative to the current directory\n" " To remove a server:\n" " gfServer stop host port\n" " To query a server with DNA sequence:\n" " gfServer query host port probe.fa\n" " To query a server with protein sequence:\n" " gfServer protQuery host port probe.fa\n" @@ -147,30 +148,50 @@ hgBlat will append the path(s) given to dbDb.nibPath. webBlat will append the path(s) given to path specified in webBlat.cfg. gfClient will append the path(s) given to the seqDir path specified. */ static void setSocketTimeout(int sockfd, int delayInSeconds) // put socket read and write timeout so it will not take forever to timeout during a read or write { struct timeval tv; tv.tv_sec = delayInSeconds; tv.tv_usec = 0; setsockopt(sockfd, SOL_SOCKET, SO_RCVTIMEO, (const char*)&tv, sizeof tv); setsockopt(sockfd, SOL_SOCKET, SO_SNDTIMEO, (const char*)&tv, sizeof tv); } +static boolean sendOk = TRUE; + +void setSendOk() +// Reset to OK to send +{ +sendOk = TRUE; +} + +void errSendString(int sd, char *s) +// Send string. If not OK, remember we had an error, do not try to write anything more on this connection. +{ +if (sendOk) sendOk = netSendString(sd, s); +} + +void errSendLongString(int sd, char *s) +// Send string unless we had an error already on the connection. +{ +if (sendOk) sendOk = netSendLongString(sd, s); +} + void genoFindDirect(char *probeName, int fileCount, char *seqFiles[]) /* Don't set up server - just directly look for matches. */ { struct genoFind *gf = NULL; struct lineFile *lf = lineFileOpen(probeName, TRUE); struct dnaSeq seq; int hitCount = 0, clumpCount = 0, oneHit; ZeroVar(&seq); if (doTrans) errAbort("Don't support translated direct stuff currently, sorry"); gf = gfIndexNibsAndTwoBits(fileCount, seqFiles, minMatch, maxGap, tileSize, repMatch, FALSE, allowOneMismatch, stepSize, noSimpRepMask); @@ -263,140 +284,140 @@ int limit = 1000; int clumpCount = 0, hitCount = -1; struct lm *lm = lmInit(0); if (seq->size > gf->tileSize + gf->stepSize + gf->stepSize) limit = maxDnaHits; clumpList = gfFindClumps(gf, seq, lm, &hitCount); if (clumpList == NULL) ++missCount; for (clump = clumpList; clump != NULL; clump = clump->next) { struct gfSeqSource *ss = clump->target; sprintf(buf, "%d\t%d\t%s\t%d\t%d\t%d", clump->qStart, clump->qEnd, ss->fileName, clump->tStart-ss->start, clump->tEnd-ss->start, clump->hitCount); - netSendString(connectionHandle, buf); + errSendString(connectionHandle, buf); ++clumpCount; int perSeqCount = -1; if (perSeqMaxHash && ((perSeqCount = hashIntValDefault(perSeqMaxHash, ss->fileName, -1)) >= 0)) { if (perSeqCount >= (maxDnaHits / 2)) break; hashIncInt(perSeqMaxHash, ss->fileName); } else if (--limit < 0) break; } gfClumpFreeList(&clumpList); lmCleanup(&lm); logDebug("%lu %d clumps, %d hits", clock1000(), clumpCount, hitCount); } void transQuery(struct genoFind *transGf[2][3], aaSeq *seq, int connectionHandle, char buf[256]) /* Handle a query for protein/translated DNA match. */ { struct gfClump *clumps[3], *clump; int isRc, frame; char strand; struct dyString *dy = newDyString(1024); struct gfHit *hit; int clumpCount = 0, hitCount = 0, oneHit; struct lm *lm = lmInit(0); sprintf(buf, "tileSize %d", tileSize); -netSendString(connectionHandle, buf); +errSendString(connectionHandle, buf); for (frame = 0; frame < 3; ++frame) clumps[frame] = NULL; for (isRc = 0; isRc <= 1; ++isRc) { strand = (isRc ? '-' : '+'); gfTransFindClumps(transGf[isRc], seq, clumps, lm, &oneHit); hitCount += oneHit; for (frame = 0; frame < 3; ++frame) { int limit = maxTransHits; for (clump = clumps[frame]; clump != NULL; clump = clump->next) { struct gfSeqSource *ss = clump->target; sprintf(buf, "%d\t%d\t%s\t%d\t%d\t%d\t%c\t%d", clump->qStart, clump->qEnd, ss->fileName, clump->tStart-ss->start, clump->tEnd-ss->start, clump->hitCount, strand, frame); - netSendString(connectionHandle, buf); + errSendString(connectionHandle, buf); dyStringClear(dy); for (hit = clump->hitList; hit != NULL; hit = hit->next) dyStringPrintf(dy, " %d %d", hit->qStart, hit->tStart - ss->start); - netSendLongString(connectionHandle, dy->string); + errSendLongString(connectionHandle, dy->string); ++clumpCount; if (--limit < 0) break; } gfClumpFreeList(&clumps[frame]); } } if (clumpCount == 0) ++missCount; freeDyString(&dy); lmCleanup(&lm); logDebug("%lu %d clumps, %d hits", clock1000(), clumpCount, hitCount); } void transTransQuery(struct genoFind *transGf[2][3], struct dnaSeq *seq, int connectionHandle, char buf[256]) /* Handle a query for protein/translated DNA match. */ { struct gfClump *clumps[3][3], *clump; int isRc, qFrame, tFrame; char strand; struct trans3 *t3 = trans3New(seq); struct dyString *dy = newDyString(1024); struct gfHit *hit; int clumpCount = 0, hitCount = 0, oneCount; sprintf(buf, "tileSize %d", tileSize); -netSendString(connectionHandle, buf); +errSendString(connectionHandle, buf); for (qFrame = 0; qFrame<3; ++qFrame) for (tFrame=0; tFrame<3; ++tFrame) clumps[qFrame][tFrame] = NULL; for (isRc = 0; isRc <= 1; ++isRc) { struct lm *lm = lmInit(0); strand = (isRc ? '-' : '+'); gfTransTransFindClumps(transGf[isRc], t3->trans, clumps, lm, &oneCount); hitCount += oneCount; for (qFrame = 0; qFrame<3; ++qFrame) { for (tFrame=0; tFrame<3; ++tFrame) { int limit = maxTransHits; for (clump = clumps[qFrame][tFrame]; clump != NULL; clump = clump->next) { struct gfSeqSource *ss = clump->target; sprintf(buf, "%d\t%d\t%s\t%d\t%d\t%d\t%c\t%d\t%d", clump->qStart, clump->qEnd, ss->fileName, clump->tStart-ss->start, clump->tEnd-ss->start, clump->hitCount, strand, qFrame, tFrame); - netSendString(connectionHandle, buf); + errSendString(connectionHandle, buf); dyStringClear(dy); for (hit = clump->hitList; hit != NULL; hit = hit->next) { dyStringPrintf(dy, " %d %d", hit->qStart, hit->tStart - ss->start); } - netSendLongString(connectionHandle, dy->string); + errSendLongString(connectionHandle, dy->string); ++clumpCount; if (--limit < 0) break; } gfClumpFreeList(&clumps[qFrame][tFrame]); } } lmCleanup(&lm); } trans3Free(&t3); if (clumpCount == 0) ++missCount; logDebug("%lu %d clumps, %d hits", clock1000(), clumpCount, hitCount); } @@ -404,47 +425,47 @@ int maxDistance, int connectionHandle) /* Do PCR query and report results down socket. */ { int fPrimerSize = strlen(fPrimer); int rPrimerSize = strlen(rPrimer); struct gfClump *clumpList, *clump; int clumpCount = 0; char buf[256]; clumpList = gfPcrClumps(gf, fPrimer, fPrimerSize, rPrimer, rPrimerSize, 0, maxDistance); for (clump = clumpList; clump != NULL; clump = clump->next) { struct gfSeqSource *ss = clump->target; safef(buf, sizeof(buf), "%s\t%d\t%d\t+", ss->fileName, clump->tStart, clump->tEnd); - netSendString(connectionHandle, buf); + errSendString(connectionHandle, buf); ++clumpCount; } gfClumpFreeList(&clumpList); clumpList = gfPcrClumps(gf, rPrimer, rPrimerSize, fPrimer, fPrimerSize, 0, maxDistance); for (clump = clumpList; clump != NULL; clump = clump->next) { struct gfSeqSource *ss = clump->target; safef(buf, sizeof(buf), "%s\t%d\t%d\t-", ss->fileName, clump->tStart, clump->tEnd); - netSendString(connectionHandle, buf); + errSendString(connectionHandle, buf); ++clumpCount; } gfClumpFreeList(&clumpList); -netSendString(connectionHandle, "end"); +errSendString(connectionHandle, "end"); logDebug("%lu PCR %s %s %d clumps\n", clock1000(), fPrimer, rPrimer, clumpCount); } static jmp_buf gfRecover; static char *ripCord = NULL; /* A little memory to give back to system * during error recovery. */ static void gfAbort() /* Abort query. */ { freez(&ripCord); longjmp(gfRecover, -1); } @@ -456,31 +477,31 @@ ripCord = needMem(64*1024); /* Memory for error recovery. memTrackerEnd frees */ } static void errorSafeCleanup() /* Clean up and report problem. */ { memTrackerEnd(); popAbortHandler(); // must come after memTracker } static void errorSafeCleanupMess(int connectionHandle, char *message) /* Clean up and report problem. */ { errorSafeCleanup(); logError("Recovering from error via longjmp"); -netSendString(connectionHandle, message); +errSendString(connectionHandle, message); } static void errorSafeQuery(boolean doTrans, boolean queryIsProt, struct dnaSeq *seq, struct genoFind *gf, struct genoFind *transGf[2][3], int connectionHandle, char *buf, struct hash *perSeqMaxHash) /* Wrap error handling code around index query. */ { int status; errorSafeSetup(); status = setjmp(gfRecover); if (status == 0) /* Always true except after long jump. */ { if (doTrans) { if (queryIsProt) @@ -614,30 +635,31 @@ logInfo("indexing complete"); /* Set up socket. Get ready to listen to it. */ socketHandle = netAcceptingSocket(port, 100); if (socketHandle < 0) errAbort("Fatal Error: Unable to open listening socket on port %d.", port); logInfo("Server ready for queries!"); printf("Server ready for queries!\n"); int connectFailCount = 0; for (;;) { ZeroVar(&fromAddr); fromLen = sizeof(fromAddr); connectionHandle = accept(socketHandle, (struct sockaddr*)&fromAddr, &fromLen); + setSendOk(); if (connectionHandle < 0) { warn("Error accepting the connection"); ++warnCount; ++connectFailCount; if (connectFailCount >= 100) errAbort("100 continuous connection failures, no point in filling up the log in an infinite loop."); continue; } else { connectFailCount = 0; } setSocketTimeout(connectionHandle, timeout); if (ipLog) @@ -673,63 +695,63 @@ close(connectionHandle); continue; } line = buf + strlen(gfSignature()); command = nextWord(&line); if (sameString("quit", command)) { if (canStop) break; else logError("Ignoring quit message"); } else if (sameString("status", command)) { sprintf(buf, "version %s", gfVersion); - netSendString(connectionHandle, buf); + errSendString(connectionHandle, buf); sprintf(buf, "type %s", (doTrans ? "translated" : "nucleotide")); - netSendString(connectionHandle, buf); + errSendString(connectionHandle, buf); sprintf(buf, "host %s", hostName); - netSendString(connectionHandle, buf); + errSendString(connectionHandle, buf); sprintf(buf, "port %s", portName); - netSendString(connectionHandle, buf); + errSendString(connectionHandle, buf); sprintf(buf, "tileSize %d", tileSize); - netSendString(connectionHandle, buf); + errSendString(connectionHandle, buf); sprintf(buf, "stepSize %d", stepSize); - netSendString(connectionHandle, buf); + errSendString(connectionHandle, buf); sprintf(buf, "minMatch %d", minMatch); - netSendString(connectionHandle, buf); + errSendString(connectionHandle, buf); sprintf(buf, "pcr requests %ld", pcrCount); - netSendString(connectionHandle, buf); + errSendString(connectionHandle, buf); sprintf(buf, "blat requests %ld", blatCount); - netSendString(connectionHandle, buf); + errSendString(connectionHandle, buf); sprintf(buf, "bases %ld", baseCount); - netSendString(connectionHandle, buf); + errSendString(connectionHandle, buf); if (doTrans) { sprintf(buf, "aa %ld", aaCount); - netSendString(connectionHandle, buf); + errSendString(connectionHandle, buf); } sprintf(buf, "misses %d", missCount); - netSendString(connectionHandle, buf); + errSendString(connectionHandle, buf); sprintf(buf, "noSig %d", noSigCount); - netSendString(connectionHandle, buf); + errSendString(connectionHandle, buf); sprintf(buf, "trimmed %d", trimCount); - netSendString(connectionHandle, buf); + errSendString(connectionHandle, buf); sprintf(buf, "warnings %d", warnCount); - netSendString(connectionHandle, buf); - netSendString(connectionHandle, "end"); + errSendString(connectionHandle, buf); + errSendString(connectionHandle, "end"); } else if (sameString("query", command) || sameString("protQuery", command) || sameString("transQuery", command)) { boolean queryIsProt = sameString(command, "protQuery"); char *s = nextWord(&line); if (s == NULL || !isdigit(s[0])) { warn("Expecting query size after query command"); ++warnCount; } else { struct dnaSeq seq; ZeroVar(&seq); @@ -782,31 +804,31 @@ else baseCount += seq.size; if (seqLog && (logGetFile() != NULL)) { FILE *lf = logGetFile(); faWriteNext(lf, "query", seq.dna, seq.size); fflush(lf); } errorSafeQuery(doTrans, queryIsProt, &seq, gf, transGf, connectionHandle, buf, perSeqMaxHash); if (perSeqMaxHash) hashZeroVals(perSeqMaxHash); } freez(&seq.dna); } - netSendString(connectionHandle, "end"); + errSendString(connectionHandle, "end"); } } } } else if (sameString("pcr", command)) { char *f = nextWord(&line); char *r = nextWord(&line); char *s = nextWord(&line); int maxDistance; ++pcrCount; if (s == NULL || !isdigit(s[0])) { warn("Badly formatted pcr command"); ++warnCount; @@ -819,35 +841,35 @@ else if (badPcrPrimerSeq(f) || badPcrPrimerSeq(r)) { warn("Can only handle ACGT in primer sequences."); ++warnCount; } else { maxDistance = atoi(s); errorSafePcr(gf, f, r, maxDistance, connectionHandle); } } else if (sameString("files", command)) { int i; sprintf(buf, "%d", fileCount); - netSendString(connectionHandle, buf); + errSendString(connectionHandle, buf); for (i=0; i<fileCount; ++i) { sprintf(buf, "%s", seqFiles[i]); - netSendString(connectionHandle, buf); + errSendString(connectionHandle, buf); } } else { warn("Unknown command %s", command); ++warnCount; } close(connectionHandle); connectionHandle = 0; } close(socketHandle); } void stopServer(char *hostName, char *portName) /* Send stop message to server. */