fd7300195f18fae0c96a61d4c7ae5b5aaed549c7
kent
  Mon Sep 30 19:06:56 2024 -0700
Removing a fifo that worked better in theory than practice.

diff --git src/hg/cirm/cdw/cdwRunDaemon/cdwRunDaemon.c src/hg/cirm/cdw/cdwRunDaemon/cdwRunDaemon.c
index 077fba5..b975193 100644
--- src/hg/cirm/cdw/cdwRunDaemon/cdwRunDaemon.c
+++ src/hg/cirm/cdw/cdwRunDaemon/cdwRunDaemon.c
@@ -1,62 +1,58 @@
-/* cdwRunDaemon - Run jobs on multiple processers in background.  This is done with
- * a combination of infrequent polling of the database, and a unix fifo which can be
- * sent a signal (anything ending with a newline actually) that tells it to go look
- * at database now. */
+/* cdwRunDaemon - Run jobs on multiple processers in background.  
+ * This is done with polling of the database */
 
-/* Copyright (C) 2014 The Regents of the University of California 
+/* Copyright (C) 2024 The Regents of the University of California 
  * See kent/LICENSE or http://genome.ucsc.edu/license/ for licensing information. */
 
 /* Version history -
  *  v2 - making it so daemon only keeps current job in memory consulting db table always
  *       for next job.  This simplifies code and allows the daemon to respond to changes 
  *       the job table at the expense of spinning the DB daemon a little more often.  Also
  *       maintaining a new column, pid, that contains unix process id for the job. 
  *  v3 - put delay to default to 1, since seems to be needed when restarting if there are
- *       jobs in the queue. */
+ *       jobs in the queue. 
+ *  v4 - the fifo has been problematic enough, and the polling is low key enough I just 
+ *       removed the fifo. */
 
 #include <sys/wait.h>
 #include "common.h"
 #include "linefile.h"
 #include "hash.h"
 #include "options.h"
 #include "portable.h"
 #include "obscure.h"
 #include "net.h"
 #include "log.h"
 #include "sqlNum.h"
 #include "cdw.h"
 #include "cdwLib.h"
 
 char *clDatabase, *clTable;
 int clDelay = 1;
 
 void usage()
 /* Explain usage and exit. */
 {
 errAbort(
-  "cdwRunDaemon v3 - Run jobs on multiple processers in background.  This is done with\n"
-  "a combination of infrequent polling of the database, and a unix fifo which can be\n"
-  "sent a signal (anything ending with a newline actually) that tells it to go look\n"
-  "at database now.\n"
+  "cdwRunDaemon v4 - Run jobs on multiple processers in background.\n"  
   "usage:\n"
-  "   cdwRunDaemon database table count fifo\n"
+  "   cdwRunDaemon database table count\n"
   "where:\n"
   "   database - mySQL database where cdwRun table lives\n"
   "   table - table with same six fields as cdwRun table\n"
   "   count - number of simultanious jobs to run\n"
-  "   fifo - named pipe used to notify daemon of new data\n"
   "options:\n"
   "   -debug - don't fork, close stdout, and daemonize self\n"
   "   -log=logFile - send error messages and warnings of daemon itself to logFile\n"
   "        There are not many of these.  Error mesages from jobs daemon runs end up\n"
   "        in errorMessage fields of database.\n"
   "   -logFacility - sends error messages and such to system log facility instead.\n"
   "   -delay=N - delay this many seconds before starting a job, default %d\n"
   , clDelay
   );
 }
 
 /* Command line validation table. */
 static struct optionSpec options[] = {
    {"log", OPTION_STRING},
    {"logFacility", OPTION_STRING},
@@ -221,82 +217,46 @@
     struct sqlConnection *conn = sqlConnect(clDatabase);
     char query[256];
     sqlSafef(query, sizeof(query), "update %s set startTime=%lld, pid=%d where id=%lld", 
 	clTable, job->startTime, childId, (long long)job->id);
     sqlUpdate(conn, query);
     sqlDisconnect(&conn);
 
     /* We be parent - just fill in job info */
     close(errFd);
     runner->pid = childId;
     runner->errFileName = cloneString(tempFileName);
     }
 }
 
 
-static void readAndIgnoreBytes(int fd, int byteCount)
-/* Read byteCount from fd, and just throw it away.  We are just using named pipe
- * as a synchronization device is why. */
-{
-/* This is implemented perhaps too fancily - so as to efficiently discard datae
- * if need be by reading into a reasonable fixed sized buffer as opposed to a single
- * character. */
-int bytesLeft = byteCount;
-char buf[128];
-int bufSize = sizeof(buf);
-while (bytesLeft > 0)
-    {
-    int bytesAttempted = (bytesLeft > bufSize ? bufSize : bytesLeft);
-    int bytesRead = read(fd, buf, bytesAttempted);
-    if (bytesRead == -1)
-        errnoAbort("Problem reading from named pipe.");
-    bytesLeft -= bytesRead;
-    }
-}
-
-void syncWithTimeout(int fd, long long microsecs)
-/* Wait for either input from file fd, or timeout.  Will
- * ignore input - just using it as a sign to short-circuit
- * wait part of a polling loop. */
-{
-int bytesReady = netWaitForData(fd, microsecs);
-if (bytesReady > 0)
-    {
-    readAndIgnoreBytes(fd, bytesReady);
-    }
-}
-
 
-void cdwRunDaemon(char *database, char *table, char *countString, char *namedPipe)
+void cdwRunDaemon(char *database, char *table, char *countString)
 /* cdwRunDaemon - Run jobs on multiple processers in background. . */
 {
-warn("Starting cdwRunDaemon v2 on %s.%s with %s jobs synced on %s", 
-    database, table, countString,namedPipe);
+warn("Starting cdwRunDaemon v3 on %s.%s with %s jobs", 
+    database, table, countString);
 clDatabase = database;
 clTable = table;
 
 /* Set up array with a slot for each simultaneous job. */
 maxThreadCount = sqlUnsigned(countString);
 if (maxThreadCount > 100)
     errAbort("%s jobs at once? Really, that seems excessive", countString);
 AllocArray(runners, maxThreadCount);
 
 
-/* Open our file, which really should be a named pipe. */
-int fd = mustOpenFd(namedPipe, O_RDONLY);      // Program waits here until something written to pipe
-int dummyFd = mustOpenFd(namedPipe, O_WRONLY); // Keeps pipe from generating EOF when empty
-
 long long lastId = 0;  // Keep track of lastId in run table that has already been handled.
 int connMiss = 0;
 long long lastMiss = 0;
 
 for (;;)
     {
     /* Finish up processing on any children that have finished */
     while (checkOnChildRunner(FALSE) != NULL)	
         ;
 
     /* Get next bits of work from database. */
     struct sqlConnection *conn = sqlMayConnect(database);
     struct cdwJob *job = NULL;
     if (conn != NULL)
         {
@@ -320,37 +280,31 @@
 	if (now - lastMiss > betweenBursts)
 	    connMiss = 0;
 
 	/* Allow 12 retries (2 minutes downtime) */
 	int maxTries = 12;
 	if (++connMiss > maxTries)
 	    errAbort("%d misses connecting to %s, aborting", connMiss, database);
 	lastMiss = now;
 	}
 
     if (job != NULL)
         {
 	lastId = job->id;
 	struct runner *runner = findFreeRunner();
 	runJob(runner, job);
-	sleep(clDelay); // Avoid network storm
-	}
-    else
-        {
-	/* Wait for signal to come on named pipe or 10 seconds to pass */
-	syncWithTimeout(fd, 10*1000000);
 	}
+    sleep(clDelay); // Avoid network storm
     }
-close(dummyFd);
 }
 
 int main(int argc, char *argv[])
 /* Process command line. */
 {
 optionInit(&argc, argv, options);
 clDelay = optionInt("delay", clDelay);
-if (argc != 5)
+if (argc != 4)
     usage();
 logDaemonize(argv[0]);
-cdwRunDaemon(argv[1], argv[2], argv[3], argv[4]);
+cdwRunDaemon(argv[1], argv[2], argv[3]);
 return 0;
 }