902dd7f624e16a660ad582533f264b720ffcdb41 markd Thu Oct 28 12:07:31 2021 -0700 don't create a process group on pipelineOpen, stay in the parent process group diff --git src/lib/pipeline.c src/lib/pipeline.c index 3304b26..f65615d 100644 --- src/lib/pipeline.c +++ src/lib/pipeline.c @@ -24,40 +24,35 @@ procStateDone // process finished (ok or failed) }; struct plProc /* A single process in a pipeline */ { struct plProc *next; /* order list of processes */ struct pipeline *pl; /* pipeline we are associated with */ char **cmd; /* null-terminated command for this process */ pid_t pid; /* pid for process, -1 if not running */ enum procState state; /* state of process */ int status; /* status from wait */ }; struct pipeline -/* Object for a process pipeline and associated open file. Pipeline process - * consist of a process group leader and then all of the child process. The - * group leader does no work, just wait on processes to complete and report - * errors to the top level process. This object is create in the calling - * process, and then passed down, but not shared, via forks. +/* Object for a process pipeline and associated open files. */ { struct plProc *procs; /* list of processes */ int numRunning; /* number of processes running */ - pid_t groupLeader; /* process group id, or -1 if not set. This is pid of group leader */ char *procName; /* name to use in error messages. */ int pipeFd; /* fd of pipe to/from process, -1 if none */ unsigned options; /* options */ FILE* pipeFh; /* optional stdio around pipe */ char* stdioBuf; /* optional stdio buffer */ struct lineFile *pipeLf; /* optional lineFile around pipe */ }; /* file buffer size */ #define FILE_BUF_SIZE 64*1024 static int pipeCreate(int *writeFd) /* create a pipe or die, return readFd */ { int pipeFds[2]; @@ -163,33 +158,30 @@ { // States must transition in order. New state must immediately follow the // current state. if (newState != proc->state+1) errAbort("invalid state transition: %d -> %d", proc->state, newState); proc->state = newState; } static void plProcSetup(struct plProc* proc, int stdinFd, int stdoutFd, int stderrFd) /* setup signal, error handling, and file descriptors after fork */ { /* Optionally treat a closed pipe as an EOF rather than getting SIGPIPE */ if (signal(SIGPIPE, ((proc->pl->options & pipelineSigpipe) ? SIG_DFL : SIG_IGN)) == SIG_ERR) errnoAbort("error ignoring SIGPIPE"); -if (setpgid(getpid(), proc->pl->groupLeader) != 0) - errnoAbort("error from setpgid(%d, %d)", getpid(), proc->pl->groupLeader); - /* child, first setup stdio files */ if (stdinFd != STDIN_FILENO) { if (dup2(stdinFd, STDIN_FILENO) < 0) errnoAbort("can't dup2 to stdin"); } if (stdoutFd != STDOUT_FILENO) { if (dup2(stdoutFd, STDOUT_FILENO) < 0) errnoAbort("can't dup2 to stdout"); } if (stderrFd != STDERR_FILENO) { @@ -213,77 +205,77 @@ { plProcSetup(proc, STDIN_FILENO, stdoutFd, stderrFd); ssize_t wrCnt = write(STDOUT_FILENO, otherEndBuf, otherEndBufSize); if (wrCnt < 0) errnoAbort("pipeline input buffer write failed"); else if (wrCnt != otherEndBufSize) errAbort("pipeline input buffer short write %lld, expected %lld", (long long)wrCnt, (long long)otherEndBufSize); else { close(STDOUT_FILENO); exit(0); } } -static void plProcHandleSignaled(struct plProc* proc, int status) -/* handle one of the processes terminating on a signal */ +static boolean isIgnoredSigPipe(struct plProc* proc) +/* did this process exit with SIGPIPE and we are ignoring it */ +{ +return (WTERMSIG(proc->status) == SIGPIPE) && (proc->pl->options & pipelineSigpipe); +} + +static void plProcHandleSignaled(struct plProc* proc) +/* handle one of the processes terminating on a signal, + * return status*/ { assert(WIFSIGNALED(proc->status)); -if (!((WTERMSIG(proc->status) == SIGPIPE) && (proc->pl->options & pipelineSigpipe))) +if (!isIgnoredSigPipe(proc)) { errAbort("process terminated on signal %d: \"%s\" in pipeline \"%s\"", WTERMSIG(proc->status), joinCmd(proc->cmd), proc->pl->procName); } } -static void plProcHandleExited(struct plProc* proc, int status) +static void plProcHandleExited(struct plProc* proc) /* handle one of the processes terminating on an exit */ { assert(WIFEXITED(proc->status)); -if (WEXITSTATUS(proc->status) != 0) - { // only print an error message if aborting - if (!(proc->pl->options & pipelineNoAbort)) +if ((WEXITSTATUS(proc->status) != 0) && !(proc->pl->options & pipelineNoAbort)) + { fprintf(stderr, "process exited with %d: \"%s\" in pipeline \"%s\"\n", WEXITSTATUS(proc->status), joinCmd(proc->cmd), proc->pl->procName); - exit(WEXITSTATUS(proc->status)); // pass back exit code } } -static void plProcHandleTerminate(struct plProc* proc, int status) +static void plProcHandleTerminate(struct plProc* proc) /* handle one of the processes terminating, save exit status */ { -proc->pid = -1; -proc->status = status; -plProcStateTrans(proc, procStateDone); - if (WIFSIGNALED(proc->status)) - plProcHandleSignaled(proc, status); + plProcHandleSignaled(proc); else - plProcHandleExited(proc, status); + plProcHandleExited(proc); } static struct pipeline* pipelineNew(char ***cmds, unsigned options) /* create a new pipeline object. Doesn't start processes */ { static char *memPseudoCmd[] = {"[mem]", NULL}; struct pipeline *pl; int iCmd; AllocVar(pl); -pl->groupLeader = -1; pl->pipeFd = -1; pl->options = options; pl->procName = joinCmds(cmds); if (cmds[0] == NULL) errAbort("no commands in pipeline"); if (options & pipelineMemInput) { /* add proc for forked process to write memory to pipeline */ slAddTail(&pl->procs, plProcNew(memPseudoCmd, pl)); } for(iCmd = 0; cmds[iCmd] != NULL; iCmd++) slAddTail(&pl->procs, plProcNew(cmds[iCmd], pl)); @@ -298,41 +290,30 @@ if (pl != NULL) { struct plProc *proc = pl->procs; while (proc != NULL) { struct plProc *delProc = proc; proc = proc->next; plProcFree(delProc); } freez(&pl->procName); freez(&pl->stdioBuf); freez(plPtr); } } -static struct plProc *pipelineFindProc(struct pipeline *pl, pid_t pid) -/* find a plProc by pid */ -{ -struct plProc *proc; -for (proc = pl->procs; proc != NULL; proc = proc->next) - if (proc->pid == pid) - return proc; -errAbort("pid not found in pipeline: %d", (int)pid); -return 0; // never reached -} - static void execProcChild(struct pipeline* pl, struct plProc *proc, int procStdinFd, int procStdoutFd, int stderrFd, void *otherEndBuf, size_t otherEndBufSize) /* handle child process setup after fork. This does not return */ { if (otherEndBuf != NULL) plProcMemWrite(proc, procStdoutFd, stderrFd, otherEndBuf, otherEndBufSize); else plProcExecChild(proc, procStdinFd, procStdoutFd, stderrFd); } static int pipelineExecProc(struct pipeline* pl, struct plProc *proc, int prevStdoutFd, int stdinFd, int stdoutFd, int stderrFd, void *otherEndBuf, size_t otherEndBufSize) /* start a process in the pipeline, return the stdout fd of the process */ @@ -355,122 +336,78 @@ execProcChild(pl, proc, procStdinFd, procStdoutFd, stderrFd, otherEndBuf, otherEndBufSize); /* record that we did this */ plProcStateTrans(proc, procStateRun); pl->numRunning++; /* don't leave intermediate pipes open in parent */ if (proc != pl->procs) safeClose(&procStdinFd); if (proc->next != NULL) safeClose(&procStdoutFd); return prevStdoutFd; } -static void pipelineGroupExec(struct pipeline* pl, int stdinFd, int stdoutFd, int stderrFd, +static void pipelineExec(struct pipeline* pl, int stdinFd, int stdoutFd, int stderrFd, void *otherEndBuf, size_t otherEndBufSize) /* Start all processes in a pipeline, stdinFd and stdoutFd are the ends of * the pipeline, stderrFd is applied to all processed */ { struct plProc *proc; int prevStdoutFd = -1; for (proc = pl->procs; proc != NULL; proc = proc->next) { prevStdoutFd = pipelineExecProc(pl, proc, prevStdoutFd, stdinFd, stdoutFd, stderrFd, otherEndBuf, otherEndBufSize); otherEndBuf = NULL; /* only for first process (read pipes) */ otherEndBufSize = 0; } } -static void waitOnOne(struct pipeline *pl) +static void waitOnOne(struct pipeline *pl, struct plProc* proc) /* wait on one process to finish */ { int status; -pid_t pid = waitpid(-pl->groupLeader, &status, 0); +pid_t pid = waitpid(proc->pid, &status, 0); if (pid < 0) errnoAbort("waitpid failed"); -plProcHandleTerminate(pipelineFindProc(pl, pid), status); +proc->pid = -1; +proc->status = status; +plProcStateTrans(proc, procStateDone); +plProcHandleTerminate(proc); pl->numRunning--; assert(pl->numRunning >= 0); } -static void groupWait(struct pipeline *pl) +static void waitForProcs(struct pipeline *pl) /* Wait for pipeline to complete */ { -/* wait on all processes to complete */ -while (pl->numRunning > 0) - waitOnOne(pl); +for (struct plProc *proc = pl->procs; proc != NULL; proc = proc->next) + waitOnOne(pl, proc); } -static void groupLeaderRun(struct pipeline* pl, int stdinFd, int stdoutFd, int stderrFd, - void *otherEndBuf, size_t otherEndBufSize) -/* group leader process */ +static int waitStatus(struct pipeline *pl) +/* return exit status of pipeline */ { -pl->groupLeader = getpid(); -if (setpgid(pl->groupLeader, pl->groupLeader) != 0) - errnoAbort("error from child setpgid(%d, %d)", pl->groupLeader, pl->groupLeader); -pipelineGroupExec(pl, stdinFd, stdoutFd, stderrFd, otherEndBuf, otherEndBufSize); - -// only keep stderr open -close(STDIN_FILENO); -close(STDOUT_FILENO); -closeNonStdDescriptors(); -groupWait(pl); -_exit(0); -} - -static int groupLeaderWait(struct pipeline *pl) -/* Wait for group leader to complete. If pipelineNoAbort was specified, return - * the exit code of the first group process exit non-zero, or zero if none - * failed. */ -{ -int status; -pid_t pid = waitpid(-pl->groupLeader, &status, 0); -if (pid < 0) - errnoAbort("waitpid failed"); -if (WIFSIGNALED(status)) - errAbort("process pipeline terminated on signal %d", WTERMSIG(status)); -assert(WIFEXITED(status)); - -if ((WEXITSTATUS(status) != 0) && !(pl->options & pipelineNoAbort)) - errAbort("pipeline exited with %d", WEXITSTATUS(status)); -return WEXITSTATUS(status); -} - -static void pipelineExec(struct pipeline* pl, int stdinFd, int stdoutFd, int stderrFd, - void *otherEndBuf, size_t otherEndBufSize) -/* Fork the group leader, which then launches all processes in a pipeline, - * stdinFd and stdoutFd are the ends of the pipeline, stderrFd is applied to - * all processes, including group leader */ -{ -assert(pl->groupLeader < 0); // should not be set -if ((pl->groupLeader = fork()) < 0) - errnoAbort("can't fork"); -if (pl->groupLeader == 0) +for (struct plProc *proc = pl->procs; proc != NULL; proc = proc->next) { - groupLeaderRun(pl, stdinFd, stdoutFd, stderrFd, otherEndBuf, otherEndBufSize); - exit(1); // doesn't return to here - } -else - { - // parent also must also setpgid to prevent race condition - if (setpgid(pl->groupLeader, pl->groupLeader) != 0) - errnoAbort("error from parent setpgid(%d, %d)", pl->groupLeader, pl->groupLeader); + if (WIFEXITED(proc->status) && (WEXITSTATUS(proc->status) != 0)) + return WEXITSTATUS(proc->status); } +return 0; } static int openRead(char *fname) /* open a file for reading */ { int fd = open(fname, O_RDONLY); if (fd < 0) errnoAbort("can't open for read access: %s", fname); return fd; } static int openWrite(char *fname, boolean append) /* open a file for write access */ { int flags = O_WRONLY|O_CREAT; @@ -669,36 +606,38 @@ { if (pl->pipeFh != NULL) closePipelineFile(pl); else if (pl->pipeLf != NULL) lineFileClose(&pl->pipeLf); else { if (close(pl->pipeFd) < 0) errAbort("close failed on pipeline: %s ", pl->procName); } pl->pipeFd = -1; } int pipelineWait(struct pipeline *pl) /* Wait for processes in a pipeline to complete; normally aborts if any - * process exists non-zero. If pipelineNoAbort was specified, return the exit - * code of the first process exit non-zero, or zero if none failed. */ + * process exits non-zero or signals. If pipelineNoAbort was specified, + * return the exit code of the first process exit non-zero. + */ { /* must close before waiting to so processes get pipe EOF */ closePipeline(pl); -return groupLeaderWait(pl); +waitForProcs(pl); +return waitStatus(pl); } int pipelineClose(struct pipeline **pPl) /* Wait for pipeline to finish and free it. Same as pipelineWait then pipelineClose. * Returns pipelineWait result (normally 0). */ { struct pipeline *pl = *pPl; int ret = 0; if (pl != NULL) { ret = pipelineWait(pl); pipelineFree(pPl); } return ret; }