902dd7f624e16a660ad582533f264b720ffcdb41
markd
  Thu Oct 28 12:07:31 2021 -0700
don't create a process group on pipelineOpen, stay in the parent process group

diff --git src/lib/pipeline.c src/lib/pipeline.c
index 3304b26..f65615d 100644
--- src/lib/pipeline.c
+++ src/lib/pipeline.c
@@ -24,40 +24,35 @@
     procStateDone  // process finished (ok or failed)
 };
 
 struct plProc
 /* A single process in a pipeline */
 {
     struct plProc *next;   /* order list of processes */
     struct pipeline *pl;   /* pipeline we are associated with */
     char **cmd;            /* null-terminated command for this process */
     pid_t  pid;            /* pid for process, -1 if not running */
     enum procState state;  /* state of process */
     int status;            /* status from wait */
 };
 
 struct pipeline
-/* Object for a process pipeline and associated open file.  Pipeline process
- * consist of a process group leader and then all of the child process.  The
- * group leader does no work, just wait on processes to complete and report
- * errors to the top level process.  This object is create in the calling
- * process, and then passed down, but not shared, via forks.
+/* Object for a process pipeline and associated open files.
  */
 {
     struct plProc *procs;      /* list of processes */
     int numRunning;            /* number of processes running */
-    pid_t groupLeader;         /* process group id, or -1 if not set. This is pid of group leader */
     char *procName;            /* name to use in error messages. */
     int pipeFd;                /* fd of pipe to/from process, -1 if none */
     unsigned options;          /* options */
     FILE* pipeFh;              /* optional stdio around pipe */
     char* stdioBuf;            /* optional stdio buffer */
     struct lineFile *pipeLf;   /* optional lineFile around pipe */
 };
 
 /* file buffer size */
 #define FILE_BUF_SIZE 64*1024
 
 static int pipeCreate(int *writeFd)
 /* create a pipe or die, return readFd */
 {
 int pipeFds[2];
@@ -163,33 +158,30 @@
 {
 // States must transition in order.  New state must immediately follow the
 // current state.
 if (newState != proc->state+1)
     errAbort("invalid state transition: %d -> %d", proc->state, newState);
 proc->state = newState;
 }
 
 static void plProcSetup(struct plProc* proc, int stdinFd, int stdoutFd, int stderrFd)
 /* setup signal, error handling, and file descriptors after fork */
 {
 /* Optionally treat a closed pipe as an EOF rather than getting SIGPIPE */
 if (signal(SIGPIPE, ((proc->pl->options & pipelineSigpipe) ? SIG_DFL : SIG_IGN)) == SIG_ERR)
     errnoAbort("error ignoring SIGPIPE");
 
-if (setpgid(getpid(), proc->pl->groupLeader) != 0)
-    errnoAbort("error from setpgid(%d, %d)", getpid(), proc->pl->groupLeader);
-
 /* child, first setup stdio files */
 if (stdinFd != STDIN_FILENO)
     {
     if (dup2(stdinFd, STDIN_FILENO) < 0)
         errnoAbort("can't dup2 to stdin");
     }
     
 if (stdoutFd != STDOUT_FILENO)
     {
     if (dup2(stdoutFd, STDOUT_FILENO) < 0)
         errnoAbort("can't dup2 to stdout");
     }
     
 if (stderrFd != STDERR_FILENO)
     {
@@ -213,77 +205,77 @@
 {
 plProcSetup(proc, STDIN_FILENO, stdoutFd, stderrFd);
 ssize_t wrCnt = write(STDOUT_FILENO, otherEndBuf, otherEndBufSize);
 if (wrCnt < 0)
     errnoAbort("pipeline input buffer write failed");
 else if (wrCnt != otherEndBufSize)
     errAbort("pipeline input buffer short write %lld, expected %lld",
              (long long)wrCnt, (long long)otherEndBufSize);
 else
     {
     close(STDOUT_FILENO);
     exit(0);
     }
 }
 
-static void plProcHandleSignaled(struct plProc* proc, int status)
-/* handle one of the processes terminating on a signal */
+static boolean isIgnoredSigPipe(struct plProc* proc)
+/* did this process exit with SIGPIPE and we are ignoring it */
+{
+return (WTERMSIG(proc->status) == SIGPIPE) && (proc->pl->options & pipelineSigpipe);
+}
+
+static void plProcHandleSignaled(struct plProc* proc)
+/* handle one of the processes terminating on a signal,
+ * return status*/
 {
 assert(WIFSIGNALED(proc->status));
-if (!((WTERMSIG(proc->status) == SIGPIPE) && (proc->pl->options & pipelineSigpipe)))
+if (!isIgnoredSigPipe(proc))
     {
     errAbort("process terminated on signal %d: \"%s\" in pipeline \"%s\"",
              WTERMSIG(proc->status), joinCmd(proc->cmd), proc->pl->procName);
     }
 }
 
-static void plProcHandleExited(struct plProc* proc, int status)
+static void plProcHandleExited(struct plProc* proc)
 /* handle one of the processes terminating on an exit */
 {
 assert(WIFEXITED(proc->status));
-if (WEXITSTATUS(proc->status) != 0)
-    {
 // only print an error message if aborting
-    if (!(proc->pl->options & pipelineNoAbort))
+if ((WEXITSTATUS(proc->status) != 0) && !(proc->pl->options & pipelineNoAbort))
+    {
     fprintf(stderr, "process exited with %d: \"%s\" in pipeline \"%s\"\n",
             WEXITSTATUS(proc->status), joinCmd(proc->cmd), proc->pl->procName);
-    exit(WEXITSTATUS(proc->status));  // pass back exit code
     }
 }
 
-static void plProcHandleTerminate(struct plProc* proc, int status)
+static void plProcHandleTerminate(struct plProc* proc)
 /* handle one of the processes terminating, save exit status */
 {
-proc->pid = -1;
-proc->status = status;
-plProcStateTrans(proc, procStateDone);
-
 if (WIFSIGNALED(proc->status))
-    plProcHandleSignaled(proc, status);
+    plProcHandleSignaled(proc);
 else
-    plProcHandleExited(proc, status);
+    plProcHandleExited(proc);
 }
 
 static struct pipeline* pipelineNew(char ***cmds, unsigned options)
 /* create a new pipeline object. Doesn't start processes */
 {
 static char *memPseudoCmd[] = {"[mem]", NULL};
 struct pipeline *pl;
 int iCmd;
 
 AllocVar(pl);
-pl->groupLeader = -1;
 pl->pipeFd = -1;
 pl->options = options;
 pl->procName = joinCmds(cmds);
 
 if (cmds[0] == NULL)
     errAbort("no commands in pipeline");
 
 if (options & pipelineMemInput)
     {
     /* add proc for forked process to write memory to pipeline */
     slAddTail(&pl->procs, plProcNew(memPseudoCmd, pl));
     }
 
 for(iCmd = 0; cmds[iCmd] != NULL; iCmd++)
     slAddTail(&pl->procs, plProcNew(cmds[iCmd], pl));
@@ -298,41 +290,30 @@
 if (pl != NULL)
     {
     struct plProc *proc = pl->procs;
     while (proc != NULL)
         {
         struct plProc *delProc = proc;
         proc = proc->next;
         plProcFree(delProc);
         }
     freez(&pl->procName);
     freez(&pl->stdioBuf);
     freez(plPtr);
     }
 }
 
-static struct plProc *pipelineFindProc(struct pipeline *pl, pid_t pid)
-/* find a plProc by pid */
-{
-struct plProc *proc;
-for (proc = pl->procs; proc != NULL; proc = proc->next)
-    if (proc->pid == pid)
-        return proc;
-errAbort("pid not found in pipeline: %d", (int)pid);
-return 0; // never reached
-}
-
 static void execProcChild(struct pipeline* pl, struct plProc *proc,
                           int procStdinFd, int procStdoutFd, int stderrFd,
                           void *otherEndBuf, size_t otherEndBufSize)
 /* handle child process setup after fork.  This does not return */
 {
 if (otherEndBuf != NULL)
     plProcMemWrite(proc, procStdoutFd, stderrFd, otherEndBuf, otherEndBufSize);
 else
     plProcExecChild(proc, procStdinFd, procStdoutFd, stderrFd);
 }
 
 static int pipelineExecProc(struct pipeline* pl, struct plProc *proc,
                             int prevStdoutFd, int stdinFd, int stdoutFd, int stderrFd,
                             void *otherEndBuf, size_t otherEndBufSize)
 /* start a process in the pipeline, return the stdout fd of the process */
@@ -355,122 +336,78 @@
     execProcChild(pl, proc, procStdinFd, procStdoutFd, stderrFd, otherEndBuf, otherEndBufSize);
 
 /* record that we did this */
 plProcStateTrans(proc, procStateRun);
 pl->numRunning++;
 
 /* don't leave intermediate pipes open in parent */
 if (proc != pl->procs)
     safeClose(&procStdinFd);
 if (proc->next != NULL)
     safeClose(&procStdoutFd);
 
 return prevStdoutFd;
 }
 
-static void pipelineGroupExec(struct pipeline* pl, int stdinFd, int stdoutFd, int stderrFd,
+static void pipelineExec(struct pipeline* pl, int stdinFd, int stdoutFd, int stderrFd,
                          void *otherEndBuf, size_t otherEndBufSize)
 /* Start all processes in a pipeline, stdinFd and stdoutFd are the ends of
  * the pipeline, stderrFd is applied to all processed */
 {
 struct plProc *proc;
 int prevStdoutFd = -1;
 for (proc = pl->procs; proc != NULL; proc = proc->next)
     {
     prevStdoutFd = pipelineExecProc(pl, proc, prevStdoutFd,
                                     stdinFd, stdoutFd, stderrFd,
                                     otherEndBuf, otherEndBufSize);
     otherEndBuf = NULL;  /* only for first process (read pipes) */
     otherEndBufSize = 0;
     }
 }
 
-static void waitOnOne(struct pipeline *pl)
+static void waitOnOne(struct pipeline *pl, struct plProc* proc)
 /* wait on one process to finish */
 {
 int status;
-pid_t pid = waitpid(-pl->groupLeader, &status, 0);
+pid_t pid = waitpid(proc->pid, &status, 0);
 if (pid < 0)
     errnoAbort("waitpid failed");
-plProcHandleTerminate(pipelineFindProc(pl, pid), status);
+proc->pid = -1;
+proc->status = status;
+plProcStateTrans(proc, procStateDone);
+plProcHandleTerminate(proc);
 pl->numRunning--;
 assert(pl->numRunning >= 0);
 }
 
-static void groupWait(struct pipeline *pl)
+static void waitForProcs(struct pipeline *pl)
 /* Wait for pipeline to complete */
 {
-/* wait on all processes to complete */
-while (pl->numRunning > 0)
-    waitOnOne(pl);
+for (struct plProc *proc = pl->procs; proc != NULL; proc = proc->next)
+    waitOnOne(pl, proc);
 }
 
-static void groupLeaderRun(struct pipeline* pl, int stdinFd, int stdoutFd, int stderrFd,
-                           void *otherEndBuf, size_t otherEndBufSize)
-/* group leader process */
+static int waitStatus(struct pipeline *pl)
+/* return exit status of pipeline */
 {
-pl->groupLeader = getpid();
-if (setpgid(pl->groupLeader, pl->groupLeader) != 0)
-    errnoAbort("error from child setpgid(%d, %d)", pl->groupLeader, pl->groupLeader);
-pipelineGroupExec(pl, stdinFd, stdoutFd, stderrFd, otherEndBuf, otherEndBufSize);
-
-// only keep stderr open
-close(STDIN_FILENO);
-close(STDOUT_FILENO);
-closeNonStdDescriptors();
-groupWait(pl);
-_exit(0);
-}
-
-static int groupLeaderWait(struct pipeline *pl)
-/* Wait for group leader to complete. If pipelineNoAbort was specified, return
- * the exit code of the first group process exit non-zero, or zero if none
- * failed. */
-{
-int status;
-pid_t pid = waitpid(-pl->groupLeader, &status, 0);
-if (pid < 0)
-    errnoAbort("waitpid failed");
-if (WIFSIGNALED(status))
-    errAbort("process pipeline terminated on signal %d", WTERMSIG(status));
-assert(WIFEXITED(status));
-
-if ((WEXITSTATUS(status) != 0) && !(pl->options & pipelineNoAbort))
-    errAbort("pipeline exited with %d", WEXITSTATUS(status));
-return WEXITSTATUS(status);
-}
-
-static void pipelineExec(struct pipeline* pl, int stdinFd, int stdoutFd, int stderrFd,
-                         void *otherEndBuf, size_t otherEndBufSize)
-/* Fork the group leader, which then launches all processes in a pipeline,
- * stdinFd and stdoutFd are the ends of the pipeline, stderrFd is applied to
- * all processes, including group leader */
-{
-assert(pl->groupLeader < 0);  // should not be set
-if ((pl->groupLeader = fork()) < 0)
-    errnoAbort("can't fork");
-if (pl->groupLeader == 0)
+for (struct plProc *proc = pl->procs; proc != NULL; proc = proc->next)
     {
-    groupLeaderRun(pl, stdinFd, stdoutFd, stderrFd, otherEndBuf, otherEndBufSize);
-    exit(1); // doesn't return to here
-    }
-else
-    {
-    // parent also must also setpgid to prevent race condition
-    if (setpgid(pl->groupLeader, pl->groupLeader) != 0)
-        errnoAbort("error from parent setpgid(%d, %d)", pl->groupLeader, pl->groupLeader);
+    if (WIFEXITED(proc->status) && (WEXITSTATUS(proc->status) != 0))
+        return WEXITSTATUS(proc->status);
     }
+return 0;
 }
 
 static int openRead(char *fname)
 /* open a file for reading */
 {
 int fd = open(fname, O_RDONLY);
 if (fd < 0)
     errnoAbort("can't open for read access: %s", fname);
 return fd;
 }
 
 static int openWrite(char *fname, boolean append)
 /* open a file for write access */
 {
 int flags = O_WRONLY|O_CREAT;
@@ -669,36 +606,38 @@
 {
 if (pl->pipeFh != NULL)
     closePipelineFile(pl);
 else if (pl->pipeLf != NULL)
     lineFileClose(&pl->pipeLf);
 else
     {
     if (close(pl->pipeFd) < 0)
         errAbort("close failed on pipeline: %s ", pl->procName);
     }
 pl->pipeFd = -1;
 }
 
 int pipelineWait(struct pipeline *pl)
 /* Wait for processes in a pipeline to complete; normally aborts if any
- * process exists non-zero.  If pipelineNoAbort was specified, return the exit
- * code of the first process exit non-zero, or zero if none failed. */
+ * process exits non-zero or signals.  If pipelineNoAbort was specified,
+ * return the exit code of the first process exit non-zero.
+ */
 {
 /* must close before waiting to so processes get pipe EOF */
 closePipeline(pl);
-return groupLeaderWait(pl);
+waitForProcs(pl);
+return waitStatus(pl);
 }
 
 int pipelineClose(struct pipeline **pPl)
 /* Wait for pipeline to finish and free it. Same as pipelineWait then pipelineClose.
  * Returns pipelineWait result (normally 0). */
 {
 struct pipeline *pl = *pPl;
 int ret = 0;
 if (pl != NULL)
     {
     ret = pipelineWait(pl);
     pipelineFree(pPl);
     }
 return ret;
 }