42daa4642a781e88f86984a04d826d845679fd1c markd Thu Jul 25 21:40:11 2013 -0700 enable SIGPIPE on linefile gzip subprocesses to allow closing the reading of compressed files before EOF is reached diff --git src/lib/pipeline.c src/lib/pipeline.c index 9a0a936..32d3fae 100644 --- src/lib/pipeline.c +++ src/lib/pipeline.c @@ -155,36 +155,36 @@ } static void plProcStateTrans(struct plProc *proc, enum procState newState) /* do state transition for process changing it to a new state */ { // States must transition in order. New state must immediately follow the // current state. if (newState != proc->state+1) errAbort("invalid state transition: %d -> %d", proc->state, newState); proc->state = newState; } static void plProcSetup(struct plProc* proc, int stdinFd, int stdoutFd, int stderrFd) /* setup signal, error handling, and file descriptors after fork */ { -/* treat a closed pipe as an EOF rather than getting SIGPIPE */ -struct sigaction sigAct; -ZeroVar(&sigAct); -sigAct.sa_handler = SIG_IGN; -if (sigaction(SIGPIPE, &sigAct, NULL) != 0) - errnoAbort("failed to set SIGPIPE to SIG_IGN"); +/* Optionally treat a closed pipe as an EOF rather than getting SIGPIPE */ +if (signal(SIGPIPE, ((proc->pl->options & pipelineSigpipe) ? SIG_DFL : SIG_IGN)) == SIG_ERR) + errnoAbort("error ignoring SIGPIPE"); + +if (setpgid(getpid(), proc->pl->groupLeader) != 0) + errnoAbort("error from setpgid(%d, %d)", getpid(), proc->pl->groupLeader); /* child, first setup stdio files */ if (stdinFd != STDIN_FILENO) { if (dup2(stdinFd, STDIN_FILENO) < 0) errnoAbort("can't dup2 to stdin"); } if (stdoutFd != STDOUT_FILENO) { if (dup2(stdoutFd, STDOUT_FILENO) < 0) errnoAbort("can't dup2 to stdout"); } if (stderrFd != STDERR_FILENO) @@ -209,49 +209,66 @@ { plProcSetup(proc, STDIN_FILENO, stdoutFd, stderrFd); ssize_t wrCnt = write(STDOUT_FILENO, otherEndBuf, otherEndBufSize); if (wrCnt < 0) errnoAbort("pipeline input buffer write failed"); else if (wrCnt != otherEndBufSize) errAbort("pipeline input buffer short write %lld, expected %lld", (long long)wrCnt, (long long)otherEndBufSize); else { close(STDOUT_FILENO); exit(0); } } -static void plProcHandleTerminate(struct plProc* proc, int status) -/* handle one of the processes terminating, save exit status */ +static void plProcHandleSignaled(struct plProc* proc, int status) +/* handle one of the processes terminating on a signal */ +{ +assert(WIFSIGNALED(proc->status)); +if (!((WTERMSIG(proc->status) == SIGPIPE) && (proc->pl->options & pipelineSigpipe))) { -proc->status = status; -if (WIFSIGNALED(proc->status)) errAbort("process terminated on signal %d: \"%s\" in pipeline \"%s\"", WTERMSIG(proc->status), joinCmd(proc->cmd), proc->pl->procName); -assert(WIFEXITED(proc->status)); + } +} +static void plProcHandleExited(struct plProc* proc, int status) +/* handle one of the processes terminating on an exit */ +{ +assert(WIFEXITED(proc->status)); if (WEXITSTATUS(proc->status) != 0) { // only print an error message if aborting if (!(proc->pl->options & pipelineNoAbort)) fprintf(stderr, "process exited with %d: \"%s\" in pipeline \"%s\"\n", WEXITSTATUS(proc->status), joinCmd(proc->cmd), proc->pl->procName); exit(WEXITSTATUS(proc->status)); // pass back exit code } +} + +static void plProcHandleTerminate(struct plProc* proc, int status) +/* handle one of the processes terminating, save exit status */ +{ proc->pid = -1; +proc->status = status; plProcStateTrans(proc, procStateDone); + +if (WIFSIGNALED(proc->status)) + plProcHandleSignaled(proc, status); +else + plProcHandleExited(proc, status); } static struct pipeline* pipelineNew(char ***cmds, unsigned options) /* create a new pipeline object. Doesn't start processes */ { static char *memPseudoCmd[] = {"[mem]", NULL}; struct pipeline *pl; int iCmd; AllocVar(pl); pl->groupLeader = -1; pl->pipeFd = -1; pl->options = options; pl->procName = joinCmds(cmds); @@ -293,35 +310,30 @@ /* find a plProc by pid */ { struct plProc *proc; for (proc = pl->procs; proc != NULL; proc = proc->next) if (proc->pid == pid) return proc; errAbort("pid not found in pipeline: %d", (int)pid); return 0; // never reached } static void execProcChild(struct pipeline* pl, struct plProc *proc, int procStdinFd, int procStdoutFd, int stderrFd, void *otherEndBuf, size_t otherEndBufSize) /* handle child process setup after fork. This does not return */ { -if (signal(SIGPIPE, SIG_IGN) == SIG_ERR) - errnoAbort("error ignoring SIGPIPE"); -if (setpgid(getpid(), pl->groupLeader) != 0) - errnoAbort("error from setpgid(%d, %d)", getpid(), pl->groupLeader); - if (otherEndBuf != NULL) plProcMemWrite(proc, procStdoutFd, stderrFd, otherEndBuf, otherEndBufSize); else plProcExecChild(proc, procStdinFd, procStdoutFd, stderrFd); } static int pipelineExecProc(struct pipeline* pl, struct plProc *proc, int prevStdoutFd, int stdinFd, int stdoutFd, int stderrFd, void *otherEndBuf, size_t otherEndBufSize) /* start a process in the pipeline, return the stdout fd of the process */ { /* determine stdin/stdout to use */ int procStdinFd, procStdoutFd; if (proc == pl->procs) procStdinFd = stdinFd; /* first process in pipeline */