86bb9f0fdcc838887b2d4a84154eea0e0f7b8631
markd
  Fri Oct 29 14:57:24 2021 -0700
Revert "don't create a process group on pipelineOpen, stay in the parent process group"
Another appoch is going to be tried to kill runaway process that requires the process group.

This reverts commit 902dd7f624e16a660ad582533f264b720ffcdb41.

diff --git src/lib/pipeline.c src/lib/pipeline.c
index f65615d..3304b26 100644
--- src/lib/pipeline.c
+++ src/lib/pipeline.c
@@ -24,35 +24,40 @@
     procStateDone  // process finished (ok or failed)
 };
 
 struct plProc
 /* A single process in a pipeline */
 {
     struct plProc *next;   /* order list of processes */
     struct pipeline *pl;   /* pipeline we are associated with */
     char **cmd;            /* null-terminated command for this process */
     pid_t  pid;            /* pid for process, -1 if not running */
     enum procState state;  /* state of process */
     int status;            /* status from wait */
 };
 
 struct pipeline
-/* Object for a process pipeline and associated open files.
+/* Object for a process pipeline and associated open file.  Pipeline process
+ * consist of a process group leader and then all of the child process.  The
+ * group leader does no work, just wait on processes to complete and report
+ * errors to the top level process.  This object is create in the calling
+ * process, and then passed down, but not shared, via forks.
  */
 {
     struct plProc *procs;      /* list of processes */
     int numRunning;            /* number of processes running */
+    pid_t groupLeader;         /* process group id, or -1 if not set. This is pid of group leader */
     char *procName;            /* name to use in error messages. */
     int pipeFd;                /* fd of pipe to/from process, -1 if none */
     unsigned options;          /* options */
     FILE* pipeFh;              /* optional stdio around pipe */
     char* stdioBuf;            /* optional stdio buffer */
     struct lineFile *pipeLf;   /* optional lineFile around pipe */
 };
 
 /* file buffer size */
 #define FILE_BUF_SIZE 64*1024
 
 static int pipeCreate(int *writeFd)
 /* create a pipe or die, return readFd */
 {
 int pipeFds[2];
@@ -158,30 +163,33 @@
 {
 // States must transition in order.  New state must immediately follow the
 // current state.
 if (newState != proc->state+1)
     errAbort("invalid state transition: %d -> %d", proc->state, newState);
 proc->state = newState;
 }
 
 static void plProcSetup(struct plProc* proc, int stdinFd, int stdoutFd, int stderrFd)
 /* setup signal, error handling, and file descriptors after fork */
 {
 /* Optionally treat a closed pipe as an EOF rather than getting SIGPIPE */
 if (signal(SIGPIPE, ((proc->pl->options & pipelineSigpipe) ? SIG_DFL : SIG_IGN)) == SIG_ERR)
     errnoAbort("error ignoring SIGPIPE");
 
+if (setpgid(getpid(), proc->pl->groupLeader) != 0)
+    errnoAbort("error from setpgid(%d, %d)", getpid(), proc->pl->groupLeader);
+
 /* child, first setup stdio files */
 if (stdinFd != STDIN_FILENO)
     {
     if (dup2(stdinFd, STDIN_FILENO) < 0)
         errnoAbort("can't dup2 to stdin");
     }
     
 if (stdoutFd != STDOUT_FILENO)
     {
     if (dup2(stdoutFd, STDOUT_FILENO) < 0)
         errnoAbort("can't dup2 to stdout");
     }
     
 if (stderrFd != STDERR_FILENO)
     {
@@ -205,77 +213,77 @@
 {
 plProcSetup(proc, STDIN_FILENO, stdoutFd, stderrFd);
 ssize_t wrCnt = write(STDOUT_FILENO, otherEndBuf, otherEndBufSize);
 if (wrCnt < 0)
     errnoAbort("pipeline input buffer write failed");
 else if (wrCnt != otherEndBufSize)
     errAbort("pipeline input buffer short write %lld, expected %lld",
              (long long)wrCnt, (long long)otherEndBufSize);
 else
     {
     close(STDOUT_FILENO);
     exit(0);
     }
 }
 
-static boolean isIgnoredSigPipe(struct plProc* proc)
-/* did this process exit with SIGPIPE and we are ignoring it */
-{
-return (WTERMSIG(proc->status) == SIGPIPE) && (proc->pl->options & pipelineSigpipe);
-}
-
-static void plProcHandleSignaled(struct plProc* proc)
-/* handle one of the processes terminating on a signal,
- * return status*/
+static void plProcHandleSignaled(struct plProc* proc, int status)
+/* handle one of the processes terminating on a signal */
 {
 assert(WIFSIGNALED(proc->status));
-if (!isIgnoredSigPipe(proc))
+if (!((WTERMSIG(proc->status) == SIGPIPE) && (proc->pl->options & pipelineSigpipe)))
     {
     errAbort("process terminated on signal %d: \"%s\" in pipeline \"%s\"",
              WTERMSIG(proc->status), joinCmd(proc->cmd), proc->pl->procName);
     }
 }
 
-static void plProcHandleExited(struct plProc* proc)
+static void plProcHandleExited(struct plProc* proc, int status)
 /* handle one of the processes terminating on an exit */
 {
 assert(WIFEXITED(proc->status));
-// only print an error message if aborting
-if ((WEXITSTATUS(proc->status) != 0) && !(proc->pl->options & pipelineNoAbort))
+if (WEXITSTATUS(proc->status) != 0)
     {
+    // only print an error message if aborting
+    if (!(proc->pl->options & pipelineNoAbort))
         fprintf(stderr, "process exited with %d: \"%s\" in pipeline \"%s\"\n",
                 WEXITSTATUS(proc->status), joinCmd(proc->cmd), proc->pl->procName);
+    exit(WEXITSTATUS(proc->status));  // pass back exit code
     }
 }
 
-static void plProcHandleTerminate(struct plProc* proc)
+static void plProcHandleTerminate(struct plProc* proc, int status)
 /* handle one of the processes terminating, save exit status */
 {
+proc->pid = -1;
+proc->status = status;
+plProcStateTrans(proc, procStateDone);
+
 if (WIFSIGNALED(proc->status))
-    plProcHandleSignaled(proc);
+    plProcHandleSignaled(proc, status);
 else
-    plProcHandleExited(proc);
+    plProcHandleExited(proc, status);
 }
 
 static struct pipeline* pipelineNew(char ***cmds, unsigned options)
 /* create a new pipeline object. Doesn't start processes */
 {
 static char *memPseudoCmd[] = {"[mem]", NULL};
 struct pipeline *pl;
 int iCmd;
 
 AllocVar(pl);
+pl->groupLeader = -1;
 pl->pipeFd = -1;
 pl->options = options;
 pl->procName = joinCmds(cmds);
 
 if (cmds[0] == NULL)
     errAbort("no commands in pipeline");
 
 if (options & pipelineMemInput)
     {
     /* add proc for forked process to write memory to pipeline */
     slAddTail(&pl->procs, plProcNew(memPseudoCmd, pl));
     }
 
 for(iCmd = 0; cmds[iCmd] != NULL; iCmd++)
     slAddTail(&pl->procs, plProcNew(cmds[iCmd], pl));
@@ -290,30 +298,41 @@
 if (pl != NULL)
     {
     struct plProc *proc = pl->procs;
     while (proc != NULL)
         {
         struct plProc *delProc = proc;
         proc = proc->next;
         plProcFree(delProc);
         }
     freez(&pl->procName);
     freez(&pl->stdioBuf);
     freez(plPtr);
     }
 }
 
+static struct plProc *pipelineFindProc(struct pipeline *pl, pid_t pid)
+/* find a plProc by pid */
+{
+struct plProc *proc;
+for (proc = pl->procs; proc != NULL; proc = proc->next)
+    if (proc->pid == pid)
+        return proc;
+errAbort("pid not found in pipeline: %d", (int)pid);
+return 0; // never reached
+}
+
 static void execProcChild(struct pipeline* pl, struct plProc *proc,
                           int procStdinFd, int procStdoutFd, int stderrFd,
                           void *otherEndBuf, size_t otherEndBufSize)
 /* handle child process setup after fork.  This does not return */
 {
 if (otherEndBuf != NULL)
     plProcMemWrite(proc, procStdoutFd, stderrFd, otherEndBuf, otherEndBufSize);
 else
     plProcExecChild(proc, procStdinFd, procStdoutFd, stderrFd);
 }
 
 static int pipelineExecProc(struct pipeline* pl, struct plProc *proc,
                             int prevStdoutFd, int stdinFd, int stdoutFd, int stderrFd,
                             void *otherEndBuf, size_t otherEndBufSize)
 /* start a process in the pipeline, return the stdout fd of the process */
@@ -336,78 +355,122 @@
     execProcChild(pl, proc, procStdinFd, procStdoutFd, stderrFd, otherEndBuf, otherEndBufSize);
 
 /* record that we did this */
 plProcStateTrans(proc, procStateRun);
 pl->numRunning++;
 
 /* don't leave intermediate pipes open in parent */
 if (proc != pl->procs)
     safeClose(&procStdinFd);
 if (proc->next != NULL)
     safeClose(&procStdoutFd);
 
 return prevStdoutFd;
 }
 
-static void pipelineExec(struct pipeline* pl, int stdinFd, int stdoutFd, int stderrFd,
+static void pipelineGroupExec(struct pipeline* pl, int stdinFd, int stdoutFd, int stderrFd,
                               void *otherEndBuf, size_t otherEndBufSize)
 /* Start all processes in a pipeline, stdinFd and stdoutFd are the ends of
  * the pipeline, stderrFd is applied to all processed */
 {
 struct plProc *proc;
 int prevStdoutFd = -1;
 for (proc = pl->procs; proc != NULL; proc = proc->next)
     {
     prevStdoutFd = pipelineExecProc(pl, proc, prevStdoutFd,
                                     stdinFd, stdoutFd, stderrFd,
                                     otherEndBuf, otherEndBufSize);
     otherEndBuf = NULL;  /* only for first process (read pipes) */
     otherEndBufSize = 0;
     }
 }
 
-static void waitOnOne(struct pipeline *pl, struct plProc* proc)
+static void waitOnOne(struct pipeline *pl)
 /* wait on one process to finish */
 {
 int status;
-pid_t pid = waitpid(proc->pid, &status, 0);
+pid_t pid = waitpid(-pl->groupLeader, &status, 0);
 if (pid < 0)
     errnoAbort("waitpid failed");
-proc->pid = -1;
-proc->status = status;
-plProcStateTrans(proc, procStateDone);
-plProcHandleTerminate(proc);
+plProcHandleTerminate(pipelineFindProc(pl, pid), status);
 pl->numRunning--;
 assert(pl->numRunning >= 0);
 }
 
-static void waitForProcs(struct pipeline *pl)
+static void groupWait(struct pipeline *pl)
 /* Wait for pipeline to complete */
 {
-for (struct plProc *proc = pl->procs; proc != NULL; proc = proc->next)
-    waitOnOne(pl, proc);
+/* wait on all processes to complete */
+while (pl->numRunning > 0)
+    waitOnOne(pl);
 }
 
-static int waitStatus(struct pipeline *pl)
-/* return exit status of pipeline */
+static void groupLeaderRun(struct pipeline* pl, int stdinFd, int stdoutFd, int stderrFd,
+                           void *otherEndBuf, size_t otherEndBufSize)
+/* group leader process */
 {
-for (struct plProc *proc = pl->procs; proc != NULL; proc = proc->next)
+pl->groupLeader = getpid();
+if (setpgid(pl->groupLeader, pl->groupLeader) != 0)
+    errnoAbort("error from child setpgid(%d, %d)", pl->groupLeader, pl->groupLeader);
+pipelineGroupExec(pl, stdinFd, stdoutFd, stderrFd, otherEndBuf, otherEndBufSize);
+
+// only keep stderr open
+close(STDIN_FILENO);
+close(STDOUT_FILENO);
+closeNonStdDescriptors();
+groupWait(pl);
+_exit(0);
+}
+
+static int groupLeaderWait(struct pipeline *pl)
+/* Wait for group leader to complete. If pipelineNoAbort was specified, return
+ * the exit code of the first group process exit non-zero, or zero if none
+ * failed. */
+{
+int status;
+pid_t pid = waitpid(-pl->groupLeader, &status, 0);
+if (pid < 0)
+    errnoAbort("waitpid failed");
+if (WIFSIGNALED(status))
+    errAbort("process pipeline terminated on signal %d", WTERMSIG(status));
+assert(WIFEXITED(status));
+
+if ((WEXITSTATUS(status) != 0) && !(pl->options & pipelineNoAbort))
+    errAbort("pipeline exited with %d", WEXITSTATUS(status));
+return WEXITSTATUS(status);
+}
+
+static void pipelineExec(struct pipeline* pl, int stdinFd, int stdoutFd, int stderrFd,
+                         void *otherEndBuf, size_t otherEndBufSize)
+/* Fork the group leader, which then launches all processes in a pipeline,
+ * stdinFd and stdoutFd are the ends of the pipeline, stderrFd is applied to
+ * all processes, including group leader */
+{
+assert(pl->groupLeader < 0);  // should not be set
+if ((pl->groupLeader = fork()) < 0)
+    errnoAbort("can't fork");
+if (pl->groupLeader == 0)
     {
-    if (WIFEXITED(proc->status) && (WEXITSTATUS(proc->status) != 0))
-        return WEXITSTATUS(proc->status);
+    groupLeaderRun(pl, stdinFd, stdoutFd, stderrFd, otherEndBuf, otherEndBufSize);
+    exit(1); // doesn't return to here
+    }
+else
+    {
+    // parent also must also setpgid to prevent race condition
+    if (setpgid(pl->groupLeader, pl->groupLeader) != 0)
+        errnoAbort("error from parent setpgid(%d, %d)", pl->groupLeader, pl->groupLeader);
     }
-return 0;
 } 
 
 static int openRead(char *fname)
 /* open a file for reading */
 {
 int fd = open(fname, O_RDONLY);
 if (fd < 0)
     errnoAbort("can't open for read access: %s", fname);
 return fd;
 }
 
 static int openWrite(char *fname, boolean append)
 /* open a file for write access */
 {
 int flags = O_WRONLY|O_CREAT;
@@ -606,38 +669,36 @@
 {
 if (pl->pipeFh != NULL)
     closePipelineFile(pl);
 else if (pl->pipeLf != NULL)
     lineFileClose(&pl->pipeLf);
 else
     {
     if (close(pl->pipeFd) < 0)
         errAbort("close failed on pipeline: %s ", pl->procName);
     }
 pl->pipeFd = -1;
 }
 
 int pipelineWait(struct pipeline *pl)
 /* Wait for processes in a pipeline to complete; normally aborts if any
- * process exits non-zero or signals.  If pipelineNoAbort was specified,
- * return the exit code of the first process exit non-zero.
- */
+ * process exists non-zero.  If pipelineNoAbort was specified, return the exit
+ * code of the first process exit non-zero, or zero if none failed. */
 {
 /* must close before waiting to so processes get pipe EOF */
 closePipeline(pl);
-waitForProcs(pl);
-return waitStatus(pl);
+return groupLeaderWait(pl);
 }
 
 int pipelineClose(struct pipeline **pPl)
 /* Wait for pipeline to finish and free it. Same as pipelineWait then pipelineClose.
  * Returns pipelineWait result (normally 0). */
 {
 struct pipeline *pl = *pPl;
 int ret = 0;
 if (pl != NULL)
     {
     ret = pipelineWait(pl);
     pipelineFree(pPl);
     }
 return ret;
 }