fd7300195f18fae0c96a61d4c7ae5b5aaed549c7
kent
  Mon Sep 30 19:06:56 2024 -0700
Removing a fifo that worked better in theory than practice.

diff --git src/hg/cirm/cdw/cdwRunDaemon/cdwRunDaemon.c src/hg/cirm/cdw/cdwRunDaemon/cdwRunDaemon.c
index 077fba5..b975193 100644
--- src/hg/cirm/cdw/cdwRunDaemon/cdwRunDaemon.c
+++ src/hg/cirm/cdw/cdwRunDaemon/cdwRunDaemon.c
@@ -1,356 +1,310 @@
-/* cdwRunDaemon - Run jobs on multiple processers in background.  This is done with
- * a combination of infrequent polling of the database, and a unix fifo which can be
- * sent a signal (anything ending with a newline actually) that tells it to go look
- * at database now. */
+/* cdwRunDaemon - Run jobs on multiple processers in background.  
+ * This is done with polling of the database */
 
-/* Copyright (C) 2014 The Regents of the University of California 
+/* Copyright (C) 2024 The Regents of the University of California 
  * See kent/LICENSE or http://genome.ucsc.edu/license/ for licensing information. */
 
 /* Version history -
  *  v2 - making it so daemon only keeps current job in memory consulting db table always
  *       for next job.  This simplifies code and allows the daemon to respond to changes 
  *       the job table at the expense of spinning the DB daemon a little more often.  Also
  *       maintaining a new column, pid, that contains unix process id for the job. 
  *  v3 - put delay to default to 1, since seems to be needed when restarting if there are
- *       jobs in the queue. */
+ *       jobs in the queue. 
+ *  v4 - the fifo has been problematic enough, and the polling is low key enough I just 
+ *       removed the fifo. */
 
 #include <sys/wait.h>
 #include "common.h"
 #include "linefile.h"
 #include "hash.h"
 #include "options.h"
 #include "portable.h"
 #include "obscure.h"
 #include "net.h"
 #include "log.h"
 #include "sqlNum.h"
 #include "cdw.h"
 #include "cdwLib.h"
 
 char *clDatabase, *clTable;
 int clDelay = 1;
 
 void usage()
 /* Explain usage and exit. */
 {
 errAbort(
-  "cdwRunDaemon v3 - Run jobs on multiple processers in background.  This is done with\n"
-  "a combination of infrequent polling of the database, and a unix fifo which can be\n"
-  "sent a signal (anything ending with a newline actually) that tells it to go look\n"
-  "at database now.\n"
+  "cdwRunDaemon v4 - Run jobs on multiple processers in background.\n"  
   "usage:\n"
-  "   cdwRunDaemon database table count fifo\n"
+  "   cdwRunDaemon database table count\n"
   "where:\n"
   "   database - mySQL database where cdwRun table lives\n"
   "   table - table with same six fields as cdwRun table\n"
   "   count - number of simultanious jobs to run\n"
-  "   fifo - named pipe used to notify daemon of new data\n"
   "options:\n"
   "   -debug - don't fork, close stdout, and daemonize self\n"
   "   -log=logFile - send error messages and warnings of daemon itself to logFile\n"
   "        There are not many of these.  Error mesages from jobs daemon runs end up\n"
   "        in errorMessage fields of database.\n"
   "   -logFacility - sends error messages and such to system log facility instead.\n"
   "   -delay=N - delay this many seconds before starting a job, default %d\n"
   , clDelay
   );
 }
 
 /* Command line validation table. */
 static struct optionSpec options[] = {
    {"log", OPTION_STRING},
    {"logFacility", OPTION_STRING},
    {"debug", OPTION_BOOLEAN},
    {"delay", OPTION_INT},
    {NULL, 0},
 };
 
 struct runner
 /* Keeps track of running process. */
     {
     int pid;	/* Process ID or 0 if none */
     char *errFileName;   /* Standard error file for this process */
     struct cdwJob *job;	 /* The job we are running */
     };
 
 int curThreads = 0, maxThreadCount;
 struct runner *runners;
 
 void finishRun(struct runner *run, int status)
 /* Finish up job. Copy results into database */
 {
 /* Clean up wait status so will show 0 for nice successful jobs */
 if (!WIFEXITED(status))
     status = -1;
 else
     status = WEXITSTATUS(status);
 
 /* Get job and fill in two easy fields. */
 struct cdwJob *job = run->job;
 job->endTime = cdwNow();
 job->returnCode = status;
 
 /* Read in stderr */
 size_t errorMessageSize;
 char *errMessage;
 readInGulp(run->errFileName, &errMessage, &errorMessageSize);
 remove(run->errFileName);
 
 /* Clean up stderr a bit before putting it into SQL - first making sure it is not too big. */
 int maxStderr = 100000;
 if (errorMessageSize >= maxStderr)
     {
     errorMessageSize = maxStderr;
     errMessage[maxStderr] = 0;
     }
 /* And then cleaning up nonprintable characters converting non-ascii to spaces */
 int i;
 for (i=0; i<errorMessageSize; ++i)
     {
     char c = errMessage[i];
     if (c < '\n')
         errMessage[i] = ' ' ;
     }
 stripString(errMessage, "NOSQLINJ");
 job->stderr = errMessage;
 
 
 /* Update database with job results */
 struct dyString *dy = dyStringNew(0);
 sqlDyStringPrintf(dy, "update %s set endTime=%lld, stderr='%s', returnCode=%d where id=%u",
     clTable, job->endTime, trimSpaces(job->stderr), job->returnCode, job->id);
 struct sqlConnection *conn = sqlConnect(clDatabase);
 sqlUpdate(conn, dy->string);
 sqlDisconnect(&conn);
 dyStringFree(&dy);
 
 /* Free up runner resources. */
 freez(&run->errFileName);
 cdwJobFree(&job);
 run->pid = 0;
 --curThreads;
 }
 
 struct runner *checkOnChildRunner(boolean doWait)
 /* See if a child has finished and optionally wait for it.  Return
  * a pointer to slot child has freed if it has finished. */
 {
 if (curThreads > 0)
     {
     int status = 0;
     int waitFlags = (doWait ? 0 : WNOHANG);
     int child = waitpid(-1, &status, waitFlags);
     if (child < 0)
 	errnoAbort("Couldn't wait");
     if (child != 0)
 	{
 	int i;
 	for (i=0; i<maxThreadCount; ++i)
 	    {
 	    struct runner *run = &runners[i];
 	    if (run->pid == child)
 		{
 		finishRun(run, status);
 		return run;
 		}
 	    }
 	internalErr();
 	}
     }
 return NULL;
 }
 
 struct runner *waitOnChildRunner()
 /* Wait for child to finish. */
 {
 return checkOnChildRunner(TRUE);
 }
 
 struct runner *findFreeRunner()
 /* Return free runner if there is one,  otherwise wait until there is. */
 {
 int i;
 if (curThreads >= maxThreadCount)
     {
     return waitOnChildRunner();
     }
 else
     {
     for (i=0; i<maxThreadCount; ++i)
         {
 	struct runner *run = &runners[i];
 	if (run->pid == 0)
 	    return run;
 	}
     }
 internalErr();  // Should not get here
 return NULL;
 }
 
 void runJob(struct runner *runner, struct cdwJob *job)
 /* Fork off and run job. */
 {
 /* Create stderr file for child  as a temp file */
 char tempFileName[PATH_LEN];
 safef(tempFileName, PATH_LEN, "%scdwRunDaemonXXXXXX", cdwTempDir());
 int errFd = mkstemp(tempFileName);
 if (errFd < 0)
     errnoAbort("Couldn't open temp file %s", tempFileName);
 
 ++curThreads;
 job->startTime = cdwNow();
 runner->job = job;
 
 int childId;
 if ((childId = mustFork()) == 0)
     {
     /* We be child side - execute command using system call */
     if (dup2(errFd, STDERR_FILENO) < 0)
         errnoAbort("Can't dup2 stderr to %s", tempFileName);
     int status = system(job->commandLine);
     if (status != 0)
 	{
 	errAbort("Error: status %d from system of %s", status, job->commandLine);
 	}
     else
 	exit(0);
     }
 else
     {
     /* Save start time and process ID to database. */
     struct sqlConnection *conn = sqlConnect(clDatabase);
     char query[256];
     sqlSafef(query, sizeof(query), "update %s set startTime=%lld, pid=%d where id=%lld", 
 	clTable, job->startTime, childId, (long long)job->id);
     sqlUpdate(conn, query);
     sqlDisconnect(&conn);
 
     /* We be parent - just fill in job info */
     close(errFd);
     runner->pid = childId;
     runner->errFileName = cloneString(tempFileName);
     }
 }
 
 
-static void readAndIgnoreBytes(int fd, int byteCount)
-/* Read byteCount from fd, and just throw it away.  We are just using named pipe
- * as a synchronization device is why. */
-{
-/* This is implemented perhaps too fancily - so as to efficiently discard datae
- * if need be by reading into a reasonable fixed sized buffer as opposed to a single
- * character. */
-int bytesLeft = byteCount;
-char buf[128];
-int bufSize = sizeof(buf);
-while (bytesLeft > 0)
-    {
-    int bytesAttempted = (bytesLeft > bufSize ? bufSize : bytesLeft);
-    int bytesRead = read(fd, buf, bytesAttempted);
-    if (bytesRead == -1)
-        errnoAbort("Problem reading from named pipe.");
-    bytesLeft -= bytesRead;
-    }
-}
-
-void syncWithTimeout(int fd, long long microsecs)
-/* Wait for either input from file fd, or timeout.  Will
- * ignore input - just using it as a sign to short-circuit
- * wait part of a polling loop. */
-{
-int bytesReady = netWaitForData(fd, microsecs);
-if (bytesReady > 0)
-    {
-    readAndIgnoreBytes(fd, bytesReady);
-    }
-}
-
 
-void cdwRunDaemon(char *database, char *table, char *countString, char *namedPipe)
+void cdwRunDaemon(char *database, char *table, char *countString)
 /* cdwRunDaemon - Run jobs on multiple processers in background. . */
 {
-warn("Starting cdwRunDaemon v2 on %s.%s with %s jobs synced on %s", 
-    database, table, countString,namedPipe);
+warn("Starting cdwRunDaemon v3 on %s.%s with %s jobs", 
+    database, table, countString);
 clDatabase = database;
 clTable = table;
 
 /* Set up array with a slot for each simultaneous job. */
 maxThreadCount = sqlUnsigned(countString);
 if (maxThreadCount > 100)
     errAbort("%s jobs at once? Really, that seems excessive", countString);
 AllocArray(runners, maxThreadCount);
 
 
-/* Open our file, which really should be a named pipe. */
-int fd = mustOpenFd(namedPipe, O_RDONLY);      // Program waits here until something written to pipe
-int dummyFd = mustOpenFd(namedPipe, O_WRONLY); // Keeps pipe from generating EOF when empty
-
 long long lastId = 0;  // Keep track of lastId in run table that has already been handled.
 int connMiss = 0;
 long long lastMiss = 0;
 
 for (;;)
     {
     /* Finish up processing on any children that have finished */
     while (checkOnChildRunner(FALSE) != NULL)	
         ;
 
     /* Get next bits of work from database. */
     struct sqlConnection *conn = sqlMayConnect(database);
     struct cdwJob *job = NULL;
     if (conn != NULL)
         {
 	/* Connected no problem, let's get next job. */
 	char query[256];
 	sqlSafef(query, sizeof(query), 
 	    "select * from %s where startTime = 0 and id > %lld order by id limit 1", 
 	    table, lastId);
 	job = cdwJobLoadByQuery(conn, query);
 	sqlDisconnect(&conn);
 	}
     else
         {
 	/* Did not connect to database.  Oh well. Don't just abort, this happens. */
 	/* Track error of no connect.  Give it 10 tries every now and again before giving up. */
 	long long now = cdwNow();
 
 	/* Reset max-retry counter every 10 minutes */
 	warn("Missed connection to %s at %lld", database, now);
 	int betweenBursts = 60 * 10;   // Seconds to allow between bursts of noise
 	if (now - lastMiss > betweenBursts)
 	    connMiss = 0;
 
 	/* Allow 12 retries (2 minutes downtime) */
 	int maxTries = 12;
 	if (++connMiss > maxTries)
 	    errAbort("%d misses connecting to %s, aborting", connMiss, database);
 	lastMiss = now;
 	}
 
     if (job != NULL)
         {
 	lastId = job->id;
 	struct runner *runner = findFreeRunner();
 	runJob(runner, job);
-	sleep(clDelay); // Avoid network storm
-	}
-    else
-        {
-	/* Wait for signal to come on named pipe or 10 seconds to pass */
-	syncWithTimeout(fd, 10*1000000);
 	}
+    sleep(clDelay); // Avoid network storm
     }
-close(dummyFd);
 }
 
 int main(int argc, char *argv[])
 /* Process command line. */
 {
 optionInit(&argc, argv, options);
 clDelay = optionInt("delay", clDelay);
-if (argc != 5)
+if (argc != 4)
     usage();
 logDaemonize(argv[0]);
-cdwRunDaemon(argv[1], argv[2], argv[3], argv[4]);
+cdwRunDaemon(argv[1], argv[2], argv[3]);
 return 0;
 }