src/lib/pipeline.c 1.23

1.23 2010/03/20 14:22:17 markd
wait on process group rather than individual process to prevent deadlocks in certain cases
Index: src/lib/pipeline.c
===================================================================
RCS file: /projects/compbio/cvsroot/kent/src/lib/pipeline.c,v
retrieving revision 1.22
retrieving revision 1.23
diff -b -B -U 4 -r1.22 -r1.23
--- src/lib/pipeline.c	5 Apr 2009 18:10:00 -0000	1.22
+++ src/lib/pipeline.c	20 Mar 2010 14:22:17 -0000	1.23
@@ -11,25 +11,35 @@
 #include <unistd.h>
 #include <sys/wait.h>
 #include <signal.h>
 
-/* FIXME: add close-on-exec startup error reporting */
+enum procState
+/* process state, in order of transition */
+{
+    procStateNew,  // plProc object created
+    procStateRun,  // proccess running
+    procStateDone  // process finished (ok or failed)
+};
 
 struct plProc
 /* A single process in a pipeline */
 {
     struct plProc *next;   /* order list of processes */
     struct pipeline *pl;   /* pipeline we are associated with */
     char **cmd;            /* null-terminated command for this process */
     pid_t  pid;            /* pid for process, -1 if not running */
+    enum procState state;  /* state of process */
     int status;            /* status from wait */
+    int execPipe[2];       /* pipeline to wait on exec to happen */
 };
 
 struct pipeline
 /* Object for a process pipeline and associated open file */
 {
     struct pipeline *next;
     struct plProc *procs;      /* list of processes */
+    int numRunning;            /* number of processes running */
+    pid_t pgid;                /* process group id, or -1 if not set. */
     char *procName;            /* name to use in error messages. */
     int pipeFd;                /* fd of pipe to/from process, -1 if none */
     unsigned options;          /* options */
     FILE* pipeFh;              /* optional stdio around pipe */
@@ -109,8 +119,13 @@
 
 for (i = 0; i < cmdLen; i++)
     proc->cmd[i] = cloneString(cmd[i]);
 proc->cmd[cmdLen] = NULL;
+proc->state = procStateNew;
+if (pipe(proc->execPipe) != 0)
+    errnoAbort("pipe open failed");
+if (fcntl(proc->execPipe[1], F_SETFL, FD_CLOEXEC) != 0)
+    errnoAbort("fcntl set cloexec failed");
 return proc;
 }
 
 static void plProcFree(struct plProc *proc)
@@ -122,10 +137,18 @@
 freeMem(proc->cmd);
 freeMem(proc);
 }
 
+static void plProcStateTrans(struct plProc *proc, enum procState newState)
+/* do state transition for process, checking validity */
+{
+if (newState != proc->state+1)
+    errAbort("invalid state transition: %d -> %d", proc->state, newState);
+proc->state = newState;
+}
+
 static void childAbortHandler()
-/* abort handler that just exts */
+/* abort handler that just exits */
 {
 exit(100);
 }
 
@@ -181,11 +204,12 @@
 static void plProcMemWrite(struct plProc* proc, int stdoutFd, int stderrFd, void *otherEndBuf, size_t otherEndBufSize)
 /* implements child process to write memory buffer to pipeline after
  * fork */
 {
-ssize_t wrCnt;
+safeClose(&proc->execPipe[1]);  // don't exec, so explicitly close
+
 plProcSetup(proc, STDIN_FILENO, stdoutFd, stderrFd);
-wrCnt = write(STDOUT_FILENO, otherEndBuf, otherEndBufSize);
+ssize_t wrCnt = write(STDOUT_FILENO, otherEndBuf, otherEndBufSize);
 if (wrCnt < 0)
     errnoAbort("pipeline input buffer write failed");
 else if (wrCnt != otherEndBufSize)
     errAbort("pipeline input buffer short write %lld, expected %lld",
@@ -196,14 +220,12 @@
     exit(0);
     }
 }
 
-static void plProcWait(struct plProc* proc)
+static void plProcWait(struct plProc* proc, int status)
 /* wait for a process in a pipeline */
 {
-if (waitpid(proc->pid, &proc->status, 0) < 0)
-    errnoAbort("process lost for: \"%s\" in pipeline \"%s\"", joinCmd(proc->cmd),
-               proc->pl->procName);
+proc->status = status;
 if (WIFSIGNALED(proc->status))
     errAbort("process terminated on signal %d: \"%s\" in pipeline \"%s\"",
              WTERMSIG(proc->status), joinCmd(proc->cmd), proc->pl->procName);
 assert(WIFEXITED(proc->status));
@@ -211,8 +233,9 @@
 if ((WEXITSTATUS(proc->status) != 0) && !(proc->pl->options & pipelineNoAbort))
     errAbort("process exited with %d: \"%s\" in pipeline \"%s\"",
              WEXITSTATUS(proc->status), joinCmd(proc->cmd), proc->pl->procName);
 proc->pid = -1;
+plProcStateTrans(proc, procStateDone);
 }
 
 static struct pipeline* pipelineNew(char ***cmds, unsigned options)
 /* create a new pipeline object. Doesn't start processes */
@@ -221,8 +244,9 @@
 struct pipeline *pl;
 int iCmd;
 
 AllocVar(pl);
+pl->pgid = -1;
 pl->pipeFd = -1;
 pl->options = options;
 pl->procName = joinCmds(cmds);
 
@@ -259,8 +283,25 @@
     freez(plPtr);
     }
 }
 
+static void execProcChild(struct pipeline* pl, struct plProc *proc,
+                          int procStdinFd, int procStdoutFd, int stderrFd,
+                          void *otherEndBuf, size_t otherEndBufSize)
+/* handle child process setup after fork.  This does not return */
+{
+if (signal(SIGPIPE, SIG_IGN) == SIG_ERR)
+    errnoAbort("error ignoring SIGPIPE");
+// set process group to first subprocess id, which might be us
+if (setpgid(getpid(), ((pl->pgid < 0) ? getpid() : pl->pgid)))
+    errnoAbort("error from setpgid");
+
+if (otherEndBuf != NULL)
+    plProcMemWrite(proc, procStdoutFd, stderrFd, otherEndBuf, otherEndBufSize);
+else
+    plProcExecChild(proc, procStdinFd, procStdoutFd, stderrFd);
+}
+
 static int pipelineExecProc(struct pipeline* pl, struct plProc *proc,
                             int prevStdoutFd, int stdinFd, int stdoutFd, int stderrFd,
                             void *otherEndBuf, size_t otherEndBufSize)
 /* start a process in the pipeline, return the stdout fd of the process */
@@ -279,26 +320,38 @@
 /* start process */
 if ((proc->pid = fork()) < 0)
     errnoAbort("can't fork");
 if (proc->pid == 0)
-    {
-    // child
-    if (signal(SIGPIPE, SIG_IGN) == SIG_ERR)
-        errnoAbort("error ignoring SIGPIPE");
-    if (otherEndBuf != NULL)
-        plProcMemWrite(proc, procStdoutFd, stderrFd, otherEndBuf, otherEndBufSize);
-    else
-        plProcExecChild(proc, procStdinFd, procStdoutFd, stderrFd);
-    }
+    execProcChild(pl, proc, procStdinFd, procStdoutFd, stderrFd, otherEndBuf, otherEndBufSize);
+
+/* parent only */
+if (pl->pgid < 0)
+    pl->pgid = proc->pid; // first process defines pgid
+
+/* record that we did this */
+plProcStateTrans(proc, procStateRun);
+pl->numRunning++;
 
 /* don't leave intermediate pipes open in parent */
 if (proc != pl->procs)
     safeClose(&procStdinFd);
 if (proc->next != NULL)
     safeClose(&procStdoutFd);
+
+/* child end of execPipe */
+safeClose(&proc->execPipe[1]);
 return prevStdoutFd;
 }
 
+static void waitOnExec(struct plProc *proc)
+/* wait on exec to happen on this process */
+{
+// execPipe will be close when exec happens
+char buf[1];
+read(proc->execPipe[0], buf, sizeof(buf));
+safeClose(&proc->execPipe[0]);
+}
+
 static void pipelineExec(struct pipeline* pl, int stdinFd, int stdoutFd, int stderrFd,
                          void *otherEndBuf, size_t otherEndBufSize)
 /* Start all processes in a pipeline, stdinFd and stdoutFd are the ends of
  * the pipeline, stderrFd is applied to all processed */
@@ -312,8 +365,12 @@
                                     otherEndBuf, otherEndBufSize);
     otherEndBuf = NULL;  /* only for first process (read pipes) */
     otherEndBufSize = 0;
     }
+
+// wait on execs to happen so we know that setpgid has happened
+for (proc = pl->procs; proc != NULL; proc = proc->next)
+    waitOnExec(proc);
 }
 
 static int openRead(char *fname)
 /* open a file for reading */
@@ -532,33 +589,57 @@
     }
 pl->pipeFd = -1;
 }
 
-int pipelineWait(struct pipeline *pl)
-/* Wait for processes in a pipeline to complete; normally aborts if any
- * process exists non-zero.  If pipelineNoAbort was specified, return the exit
- * code of the first process exit non-zero, or zero if none failed. */
+static struct plProc *pipelineFindProc(struct pipeline *pl, pid_t pid)
+/* find a plProc by pid */
 {
 struct plProc *proc;
-int exitCode = 0;
-
-/* must close before waits for output pipeline */
-if (pl->options & pipelineWrite)
-    closePipeline(pl);
+for (proc = pl->procs; proc != NULL; proc = proc->next)
+    if (proc->pid == pid)
+        return proc;
+errAbort("pid not found in pipeline: %d", pid);
+return 0; // never reached
+}
 
-/* wait on each process in order */
+static int pipelineFindStatus(struct pipeline *pl)
+/* find the status of the pipeline, which is the first failed, or 0 */
+{
+// n.b. can't get here if signaled (see plProcWait)
+struct plProc *proc;
 for (proc = pl->procs; proc != NULL; proc = proc->next)
     {
-    plProcWait(proc);
-    if ((WEXITSTATUS(proc->status) != 0) && (exitCode == 0))
-        exitCode = WEXITSTATUS(proc->status);
-    }
+    assert(WIFEXITED(proc->status));
+    if (WEXITSTATUS(proc->status) != 0)
+        return WEXITSTATUS(proc->status);
+    }
+return 0; // all ok
+}
+
+static void waitOnOne(struct pipeline *pl)
+/* wait on one process to finish */
+{
+int status;
+pid_t pid = waitpid(-pl->pgid, &status, 0);
+if (pid < 0)
+    errnoAbort("waitpid failed");
+plProcWait(pipelineFindProc(pl, pid), status);
+pl->numRunning--;
+assert(pl->numRunning >= 0);
+}
 
-/* must close after waits for input pipeline */
-if (pl->options & pipelineRead)
-    closePipeline(pl);
+int pipelineWait(struct pipeline *pl)
+/* Wait for processes in a pipeline to complete; normally aborts if any
+ * process exists non-zero.  If pipelineNoAbort was specified, return the exit
+ * code of the first process exit non-zero, or zero if none failed. */
+{
+/* must close before waiting to so processes get pipe EOF */
+closePipeline(pl);
 
-return exitCode;
+/* wait on each process in order */
+while (pl->numRunning > 0)
+    waitOnOne(pl);
+return pipelineFindStatus(pl);
 }
 
 void pipelineDumpCmds(char ***cmds)
 /* Dump out pipeline-formatted commands to stdout for debugging. */