42daa4642a781e88f86984a04d826d845679fd1c
markd
  Thu Jul 25 21:40:11 2013 -0700
enable SIGPIPE on linefile gzip subprocesses to allow closing the reading of compressed files before EOF is reached
diff --git src/lib/pipeline.c src/lib/pipeline.c
index 9a0a936..32d3fae 100644
--- src/lib/pipeline.c
+++ src/lib/pipeline.c
@@ -155,36 +155,36 @@
 }
 
 static void plProcStateTrans(struct plProc *proc, enum procState newState)
 /* do state transition for process changing it to a new state  */
 {
 // States must transition in order.  New state must immediately follow the
 // current state.
 if (newState != proc->state+1)
     errAbort("invalid state transition: %d -> %d", proc->state, newState);
 proc->state = newState;
 }
 
 static void plProcSetup(struct plProc* proc, int stdinFd, int stdoutFd, int stderrFd)
 /* setup signal, error handling, and file descriptors after fork */
 {
-/* treat a closed pipe as an EOF rather than getting SIGPIPE */
-struct sigaction sigAct;
-ZeroVar(&sigAct);
-sigAct.sa_handler = SIG_IGN;
-if (sigaction(SIGPIPE, &sigAct, NULL) != 0)
-    errnoAbort("failed to set SIGPIPE to SIG_IGN");
+/* Optionally treat a closed pipe as an EOF rather than getting SIGPIPE */
+if (signal(SIGPIPE, ((proc->pl->options & pipelineSigpipe) ? SIG_DFL : SIG_IGN)) == SIG_ERR)
+    errnoAbort("error ignoring SIGPIPE");
+
+if (setpgid(getpid(), proc->pl->groupLeader) != 0)
+    errnoAbort("error from setpgid(%d, %d)", getpid(), proc->pl->groupLeader);
 
 /* child, first setup stdio files */
 if (stdinFd != STDIN_FILENO)
     {
     if (dup2(stdinFd, STDIN_FILENO) < 0)
         errnoAbort("can't dup2 to stdin");
     }
     
 if (stdoutFd != STDOUT_FILENO)
     {
     if (dup2(stdoutFd, STDOUT_FILENO) < 0)
         errnoAbort("can't dup2 to stdout");
     }
     
 if (stderrFd != STDERR_FILENO)
@@ -209,49 +209,66 @@
 {
 plProcSetup(proc, STDIN_FILENO, stdoutFd, stderrFd);
 ssize_t wrCnt = write(STDOUT_FILENO, otherEndBuf, otherEndBufSize);
 if (wrCnt < 0)
     errnoAbort("pipeline input buffer write failed");
 else if (wrCnt != otherEndBufSize)
     errAbort("pipeline input buffer short write %lld, expected %lld",
              (long long)wrCnt, (long long)otherEndBufSize);
 else
     {
     close(STDOUT_FILENO);
     exit(0);
     }
 }
 
-static void plProcHandleTerminate(struct plProc* proc, int status)
-/* handle one of the processes terminating, save exit status */
+static void plProcHandleSignaled(struct plProc* proc, int status)
+/* handle one of the processes terminating on a signal */
+{
+assert(WIFSIGNALED(proc->status));
+if (!((WTERMSIG(proc->status) == SIGPIPE) && (proc->pl->options & pipelineSigpipe)))
 {
-proc->status = status;
-if (WIFSIGNALED(proc->status))
     errAbort("process terminated on signal %d: \"%s\" in pipeline \"%s\"",
              WTERMSIG(proc->status), joinCmd(proc->cmd), proc->pl->procName);
-assert(WIFEXITED(proc->status));
+    }
+}
 
+static void plProcHandleExited(struct plProc* proc, int status)
+/* handle one of the processes terminating on an exit */
+{
+assert(WIFEXITED(proc->status));
 if (WEXITSTATUS(proc->status) != 0)
     {
     // only print an error message if aborting
     if (!(proc->pl->options & pipelineNoAbort))
         fprintf(stderr, "process exited with %d: \"%s\" in pipeline \"%s\"\n",
                 WEXITSTATUS(proc->status), joinCmd(proc->cmd), proc->pl->procName);
     exit(WEXITSTATUS(proc->status));  // pass back exit code
     }
+}
+
+static void plProcHandleTerminate(struct plProc* proc, int status)
+/* handle one of the processes terminating, save exit status */
+{
 proc->pid = -1;
+proc->status = status;
 plProcStateTrans(proc, procStateDone);
+
+if (WIFSIGNALED(proc->status))
+    plProcHandleSignaled(proc, status);
+else
+    plProcHandleExited(proc, status);
 }
 
 static struct pipeline* pipelineNew(char ***cmds, unsigned options)
 /* create a new pipeline object. Doesn't start processes */
 {
 static char *memPseudoCmd[] = {"[mem]", NULL};
 struct pipeline *pl;
 int iCmd;
 
 AllocVar(pl);
 pl->groupLeader = -1;
 pl->pipeFd = -1;
 pl->options = options;
 pl->procName = joinCmds(cmds);
 
@@ -293,35 +310,30 @@
 /* find a plProc by pid */
 {
 struct plProc *proc;
 for (proc = pl->procs; proc != NULL; proc = proc->next)
     if (proc->pid == pid)
         return proc;
 errAbort("pid not found in pipeline: %d", (int)pid);
 return 0; // never reached
 }
 
 static void execProcChild(struct pipeline* pl, struct plProc *proc,
                           int procStdinFd, int procStdoutFd, int stderrFd,
                           void *otherEndBuf, size_t otherEndBufSize)
 /* handle child process setup after fork.  This does not return */
 {
-if (signal(SIGPIPE, SIG_IGN) == SIG_ERR)
-    errnoAbort("error ignoring SIGPIPE");
-if (setpgid(getpid(), pl->groupLeader) != 0)
-    errnoAbort("error from setpgid(%d, %d)", getpid(), pl->groupLeader);
-
 if (otherEndBuf != NULL)
     plProcMemWrite(proc, procStdoutFd, stderrFd, otherEndBuf, otherEndBufSize);
 else
     plProcExecChild(proc, procStdinFd, procStdoutFd, stderrFd);
 }
 
 static int pipelineExecProc(struct pipeline* pl, struct plProc *proc,
                             int prevStdoutFd, int stdinFd, int stdoutFd, int stderrFd,
                             void *otherEndBuf, size_t otherEndBufSize)
 /* start a process in the pipeline, return the stdout fd of the process */
 {
 /* determine stdin/stdout to use */
 int procStdinFd, procStdoutFd;
 if (proc == pl->procs)
     procStdinFd = stdinFd; /* first process in pipeline */