6f949e90b1ba3de976455fbcf9da21897761d134
markd
  Fri Oct 29 16:11:58 2021 -0700
add timeout option to pipeline to allow kill long-running pipelines, especially ones run from CGIs

diff --git src/lib/pipeline.c src/lib/pipeline.c
index 3304b26..1ec7ad7 100644
--- src/lib/pipeline.c
+++ src/lib/pipeline.c
@@ -34,30 +34,31 @@
     enum procState state;  /* state of process */
     int status;            /* status from wait */
 };
 
 struct pipeline
 /* Object for a process pipeline and associated open file.  Pipeline process
  * consist of a process group leader and then all of the child process.  The
  * group leader does no work, just wait on processes to complete and report
  * errors to the top level process.  This object is create in the calling
  * process, and then passed down, but not shared, via forks.
  */
 {
     struct plProc *procs;      /* list of processes */
     int numRunning;            /* number of processes running */
     pid_t groupLeader;         /* process group id, or -1 if not set. This is pid of group leader */
+    unsigned int timeout;      /* timeout, in seconds, or zero */
     char *procName;            /* name to use in error messages. */
     int pipeFd;                /* fd of pipe to/from process, -1 if none */
     unsigned options;          /* options */
     FILE* pipeFh;              /* optional stdio around pipe */
     char* stdioBuf;            /* optional stdio buffer */
     struct lineFile *pipeLf;   /* optional lineFile around pipe */
 };
 
 /* file buffer size */
 #define FILE_BUF_SIZE 64*1024
 
 static int pipeCreate(int *writeFd)
 /* create a pipe or die, return readFd */
 {
 int pipeFds[2];
@@ -251,41 +252,42 @@
 }
 
 static void plProcHandleTerminate(struct plProc* proc, int status)
 /* handle one of the processes terminating, save exit status */
 {
 proc->pid = -1;
 proc->status = status;
 plProcStateTrans(proc, procStateDone);
 
 if (WIFSIGNALED(proc->status))
     plProcHandleSignaled(proc, status);
 else
     plProcHandleExited(proc, status);
 }
 
-static struct pipeline* pipelineNew(char ***cmds, unsigned options)
+static struct pipeline* pipelineNew(char ***cmds, unsigned options, unsigned int timeout)
 /* create a new pipeline object. Doesn't start processes */
 {
 static char *memPseudoCmd[] = {"[mem]", NULL};
 struct pipeline *pl;
 int iCmd;
 
 AllocVar(pl);
 pl->groupLeader = -1;
 pl->pipeFd = -1;
 pl->options = options;
+pl->timeout = timeout;
 pl->procName = joinCmds(cmds);
 
 if (cmds[0] == NULL)
     errAbort("no commands in pipeline");
 
 if (options & pipelineMemInput)
     {
     /* add proc for forked process to write memory to pipeline */
     slAddTail(&pl->procs, plProcNew(memPseudoCmd, pl));
     }
 
 for(iCmd = 0; cmds[iCmd] != NULL; iCmd++)
     slAddTail(&pl->procs, plProcNew(cmds[iCmd], pl));
 
 return pl;
@@ -392,37 +394,63 @@
 if (pid < 0)
     errnoAbort("waitpid failed");
 plProcHandleTerminate(pipelineFindProc(pl, pid), status);
 pl->numRunning--;
 assert(pl->numRunning >= 0);
 }
 
 static void groupWait(struct pipeline *pl)
 /* Wait for pipeline to complete */
 {
 /* wait on all processes to complete */
 while (pl->numRunning > 0)
     waitOnOne(pl);
 }
 
+/* uses to stash pipeline object for group leader process only, which is
+ * single-threaded */
+static struct pipeline* groupApoptosisPipeline = NULL;
+
+static void groupApoptosis(int signum)
+/* signal handler for SIGALRM expiration */
+{
+// hopefully this gets logged or seen by user
+fprintf(stderr, "pipeline timeout kill after %d seconds: %s\n", groupApoptosisPipeline->timeout,
+        pipelineDesc(groupApoptosisPipeline));
+fflush(stderr);
+(int)kill(0, SIGKILL); // kill off process group
+}
+
+static void setupTimeout(struct pipeline* pl)
+/* setup timeout handling */
+{
+groupApoptosisPipeline = pl;
+if (signal(SIGALRM, groupApoptosis) != 0)
+    errnoAbort("signal failed");
+(void)alarm(pl->timeout);
+}
+
 static void groupLeaderRun(struct pipeline* pl, int stdinFd, int stdoutFd, int stderrFd,
                            void *otherEndBuf, size_t otherEndBufSize)
 /* group leader process */
 {
 pl->groupLeader = getpid();
 if (setpgid(pl->groupLeader, pl->groupLeader) != 0)
     errnoAbort("error from child setpgid(%d, %d)", pl->groupLeader, pl->groupLeader);
+if (pl->timeout > 0)
+    setupTimeout(pl);
+    
 pipelineGroupExec(pl, stdinFd, stdoutFd, stderrFd, otherEndBuf, otherEndBufSize);
 
 // only keep stderr open
 close(STDIN_FILENO);
 close(STDOUT_FILENO);
 closeNonStdDescriptors();
 groupWait(pl);
 _exit(0);
 }
 
 static int groupLeaderWait(struct pipeline *pl)
 /* Wait for group leader to complete. If pipelineNoAbort was specified, return
  * the exit code of the first group process exit non-zero, or zero if none
  * failed. */
 {
@@ -501,113 +529,117 @@
 pipelineExec(pl, pipeRdFd, stdoutFd, stderrFd, NULL, 0);
 safeClose(&pipeRdFd);
 }
 
 static void checkOpts(unsigned opts)
 /* check option set for consistency */
 {
 if (((opts & (pipelineRead|pipelineWrite)) == 0)
     || ((opts & (pipelineRead|pipelineWrite)) == (pipelineRead|pipelineWrite)))
     errAbort("must specify one of pipelineRead or pipelineWrite to pipelineOpen");
 if ((opts & pipelineAppend) && ((opts & pipelineWrite) == 0))
     errAbort("pipelineAppend is valid only in conjunction with pipelineWrite");
 }
 
 struct pipeline *pipelineOpenFd(char ***cmds, unsigned opts,
-                                int otherEndFd, int stderrFd)
+                                int otherEndFd, int stderrFd,
+                                unsigned int timeout)
 /* Create a pipeline from an array of commands.  See pipeline.h for
  * full documentation. */
 {
 struct pipeline *pl;
 
 checkOpts(opts);
-pl = pipelineNew(cmds, opts);
+pl = pipelineNew(cmds, opts, timeout);
 if (opts & pipelineRead)
     pipelineStartRead(pl, otherEndFd, stderrFd, NULL, 0);
 else
     pipelineStartWrite(pl, otherEndFd, stderrFd);
 return pl;
 }
 
 struct pipeline *pipelineOpen(char ***cmds, unsigned opts,
-                              char *otherEndFile, char *stderrFile)
+                              char *otherEndFile, char *stderrFile,
+                              unsigned int timeout)
 /* Create a pipeline from an array of commands.  See pipeline.h for
  * full documentation */
 {
 int otherEndFd;
 int stderrFd = (stderrFile == NULL) ? STDERR_FILENO : openWrite(stderrFile, FALSE);
 
 checkOpts(opts);
 boolean append = ((opts & pipelineAppend) != 0);
 if (opts & pipelineRead)
     otherEndFd = (otherEndFile == NULL) ? STDIN_FILENO : openRead(otherEndFile);
 else
     otherEndFd = (otherEndFile == NULL) ? STDOUT_FILENO : openWrite(otherEndFile, append);
-struct pipeline *pl = pipelineOpenFd(cmds, opts, otherEndFd, stderrFd);
+struct pipeline *pl = pipelineOpenFd(cmds, opts, otherEndFd, stderrFd, timeout);
 safeClose(&otherEndFd);
 if (stderrFile != NULL)
     safeClose(&stderrFd);
 return pl;
 }
 
 struct pipeline *pipelineOpenMem(char ***cmds, unsigned opts,
                                  void *otherEndBuf, size_t otherEndBufSize,
-                                 int stderrFd)
+                                 int stderrFd, unsigned int timeout)
 /* Create a pipeline from an array of commands, with the pipeline input/output
  * in a memory buffer.  See pipeline.h for full documentation.  Currently only
  * input to a read pipeline is supported  */
 {
 struct pipeline *pl;
 checkOpts(opts);
 if (opts & pipelineWrite)
     errAbort("pipelineOpenMem only supports read pipelines at this time");
 opts |= pipelineMemInput;
 
-pl = pipelineNew(cmds, opts);
+pl = pipelineNew(cmds, opts, timeout);
 pipelineStartRead(pl, STDIN_FILENO, stderrFd, otherEndBuf, otherEndBufSize);
 return pl;
 }
 
 struct pipeline *pipelineOpenFd1(char **cmd, unsigned opts,
-                                 int otherEndFd, int stderrFd)
+                                 int otherEndFd, int stderrFd,
+                                 unsigned int timeout)
 /* like pipelineOpenFd(), only takes a single command */
 {
 char **cmds[2];
 cmds[0] = cmd;
 cmds[1] = NULL;
-return pipelineOpenFd(cmds, opts, otherEndFd, stderrFd);
+return pipelineOpenFd(cmds, opts, otherEndFd, stderrFd, timeout);
 }
 
 struct pipeline *pipelineOpen1(char **cmd, unsigned opts,
-                               char *otherEndFile, char *stderrFile)
+                               char *otherEndFile, char *stderrFile,
+                               unsigned int timeout)
 /* like pipelineOpen(), only takes a single command */
 {
 char **cmds[2];
 cmds[0] = cmd;
 cmds[1] = NULL;
-return pipelineOpen(cmds, opts, otherEndFile, stderrFile);
+return pipelineOpen(cmds, opts, otherEndFile, stderrFile, timeout);
 }
 
 struct pipeline *pipelineOpenMem1(char **cmd, unsigned opts,
                                   void *otherEndBuf, size_t otherEndBufSize,
-                                  int stderrFd)
+                                  int stderrFd, unsigned int timeout)
 /* like pipelineOpenMem(), only takes a single command */
 {
 char **cmds[2];
 cmds[0] = cmd;
 cmds[1] = NULL;
-return pipelineOpenMem(cmds, opts, otherEndBuf, otherEndBufSize, stderrFd);
+return pipelineOpenMem(cmds, opts, otherEndBuf, otherEndBufSize, stderrFd, timeout);
 }
 
 char *pipelineDesc(struct pipeline *pl)
 /* Get the description of a pipeline for use in error messages */
 {
 return pl->procName;
 }
 
 int pipelineFd(struct pipeline *pl)
 /* Get the file descriptor for a pipeline */
 {
 return pl->pipeFd;
 }
 
 FILE *pipelineFile(struct pipeline *pl)