841d2e3e8372c1901e8ad7a5bfd27a6746250329 braney Mon Mar 26 11:33:25 2012 -0700 Revert "fixed timing problem that caused intermitent errors setting up process group by create a group leader process" This reverts commit c7424a546c4a83220fde16217b502bea2e4daff7. braney says, "we decided to do this commit after the v264 build" diff --git src/lib/pipeline.c src/lib/pipeline.c index 2b97e88..3a671f4 100644 --- src/lib/pipeline.c +++ src/lib/pipeline.c @@ -17,87 +17,74 @@ { procStateNew, // plProc object created procStateRun, // proccess running procStateDone // process finished (ok or failed) }; struct plProc /* A single process in a pipeline */ { struct plProc *next; /* order list of processes */ struct pipeline *pl; /* pipeline we are associated with */ char **cmd; /* null-terminated command for this process */ pid_t pid; /* pid for process, -1 if not running */ enum procState state; /* state of process */ int status; /* status from wait */ + int execPipeParent; /* pipe to wait on for exec */ + int execPipeChild; /* write side is close-on-exec */ }; struct pipeline -/* Object for a process pipeline and associated open file. Pipeline process - * consist of a process group leader and then all of the child process. The - * group leader does no work, just wait on processes to complete and report - * errors to the top level process. This object is create in the calling - * process, and then passed down, but not shared, via forks. - */ +/* Object for a process pipeline and associated open file */ { + struct pipeline *next; struct plProc *procs; /* list of processes */ int numRunning; /* number of processes running */ - pid_t groupLeader; /* process group id, or -1 if not set. This is pid of group leader */ + pid_t pgid; /* process group id, or -1 if not set. */ char *procName; /* name to use in error messages. */ int pipeFd; /* fd of pipe to/from process, -1 if none */ unsigned options; /* options */ FILE* pipeFh; /* optional stdio around pipe */ char* stdioBuf; /* optional stdio buffer */ struct lineFile *pipeLf; /* optional lineFile around pipe */ }; /* file buffer size */ #define FILE_BUF_SIZE 64*1024 static int pipeCreate(int *writeFd) -/* create a pipe or die, return readFd */ +/* create a pipe of die, return readFd */ { int pipeFds[2]; if (pipe(pipeFds) < 0) errnoAbort("can't create pipe"); *writeFd = pipeFds[1]; return pipeFds[0]; } static void safeClose(int *fdPtr) /* Close with error checking. *fdPtr == -1 indicated already closed */ { int fd = *fdPtr; if (fd != -1) { if (close(fd) < 0) errnoAbort("close failed on fd %d", fd); *fdPtr = -1; } } -static void closeNonStdDescriptors(void) -/* close non-standard file descriptors */ -{ -long maxFd = sysconf(_SC_OPEN_MAX); -if (maxFd < 0) - maxFd = 4096; // shouldn't really happen -int fd; -for (fd = STDERR_FILENO+1; fd < maxFd; fd++) - close(fd); -} - static char* joinCmd(char **cmd) /* join an cmd vector into a space separated string */ { struct dyString *str = dyStringNew(512); int i; for (i = 0; cmd[i] != NULL; i++) { if (i > 0) dyStringAppend(str, " "); dyStringAppend(str, cmd[i]); } return dyStringCannibalize(&str); } static char* joinCmds(char ***cmds) @@ -107,162 +94,172 @@ int i, j; for (i = 0; cmds[i] != NULL; i++) { if (i > 0) dyStringAppend(str, " | "); for (j = 0; cmds[i][j] != NULL; j++) { if (j > 0) dyStringAppend(str, " "); dyStringAppend(str, cmds[i][j]); } } return dyStringCannibalize(&str); } -static char** cloneCmdVector(char **cmd) -/* make a copy of the vector */ -{ -int i, cmdLen = 0; -for (i = 0; cmd[i] != NULL; i++) - cmdLen++; -char **cmd2 = needMem((cmdLen+1)*sizeof(char*)); - -for (i = 0; i < cmdLen; i++) - cmd2[i] = cloneString(cmd[i]); -cmd2[cmdLen] = NULL; -return cmd2; -} - static struct plProc* plProcNew(char **cmd, struct pipeline *pl) /* create a new plProc object for a command. */ { +int i, cmdLen = 0; struct plProc* proc; AllocVar(proc); proc->pl = pl; -proc->cmd = cloneCmdVector(cmd); + +for (i = 0; cmd[i] != NULL; i++) + cmdLen++; +proc->cmd = needMem((cmdLen+1)*sizeof(char*)); + +for (i = 0; i < cmdLen; i++) + proc->cmd[i] = cloneString(cmd[i]); +proc->cmd[cmdLen] = NULL; proc->state = procStateNew; +proc->execPipeParent = pipeCreate(&proc->execPipeChild); +if (fcntl(proc->execPipeChild, F_SETFL, FD_CLOEXEC) != 0) + errnoAbort("fcntl set FD_cloexec failed"); return proc; } static void plProcFree(struct plProc *proc) /* free a plProc object. */ { int i; for (i = 0; proc->cmd[i] != NULL; i++) freeMem(proc->cmd[i]); freeMem(proc->cmd); freeMem(proc); } static void plProcStateTrans(struct plProc *proc, enum procState newState) /* do state transition for process changing it to a new state */ { // States must transition in order. New state must immediately follow the // current state. if (newState != proc->state+1) errAbort("invalid state transition: %d -> %d", proc->state, newState); proc->state = newState; } +static void childAbortHandler() +/* abort handler that just exits */ +{ +exit(100); +} + static void plProcSetup(struct plProc* proc, int stdinFd, int stdoutFd, int stderrFd) /* setup signal, error handling, and file descriptors after fork */ { -/* treat a closed pipe as an EOF rather than getting SIGPIPE */ +int fd; struct sigaction sigAct; + +/* make sure abort handler exits */ +pushWarnAbort(); +pushAbortHandler(childAbortHandler); + +/* treat a closed pipe as an EOF rather than getting SIGPIPE */ ZeroVar(&sigAct); sigAct.sa_handler = SIG_IGN; if (sigaction(SIGPIPE, &sigAct, NULL) != 0) errnoAbort("failed to set SIGPIPE to SIG_IGN"); /* child, first setup stdio files */ if (stdinFd != STDIN_FILENO) { if (dup2(stdinFd, STDIN_FILENO) < 0) errnoAbort("can't dup2 to stdin"); } if (stdoutFd != STDOUT_FILENO) { if (dup2(stdoutFd, STDOUT_FILENO) < 0) errnoAbort("can't dup2 to stdout"); } if (stderrFd != STDERR_FILENO) { if (dup2(stderrFd, STDERR_FILENO) < 0) errnoAbort("can't dup2 to stderr"); } -closeNonStdDescriptors(); + +/* close other file descriptors */ +for (fd = STDERR_FILENO+1; fd < 64; fd++) + close(fd); } static void plProcExecChild(struct plProc* proc, int stdinFd, int stdoutFd, int stderrFd) /* child part of process startup. */ { plProcSetup(proc, stdinFd, stdoutFd, stderrFd); +/* FIXME: add close-on-exec startup error reporting here */ execvp(proc->cmd[0], proc->cmd); errnoAbort("exec failed: %s", proc->cmd[0]); } static void plProcMemWrite(struct plProc* proc, int stdoutFd, int stderrFd, void *otherEndBuf, size_t otherEndBufSize) /* implements child process to write memory buffer to pipeline after * fork */ { +safeClose(&proc->execPipeChild); // memWriter proc doesn't exec, so explicitly close + plProcSetup(proc, STDIN_FILENO, stdoutFd, stderrFd); ssize_t wrCnt = write(STDOUT_FILENO, otherEndBuf, otherEndBufSize); if (wrCnt < 0) errnoAbort("pipeline input buffer write failed"); else if (wrCnt != otherEndBufSize) errAbort("pipeline input buffer short write %lld, expected %lld", (long long)wrCnt, (long long)otherEndBufSize); else { close(STDOUT_FILENO); exit(0); } } -static void plProcHandleTerminate(struct plProc* proc, int status) -/* handle one of the processes terminating, save exit status */ +static void plProcWait(struct plProc* proc, int status) +/* wait for a process in a pipeline */ { proc->status = status; if (WIFSIGNALED(proc->status)) errAbort("process terminated on signal %d: \"%s\" in pipeline \"%s\"", WTERMSIG(proc->status), joinCmd(proc->cmd), proc->pl->procName); assert(WIFEXITED(proc->status)); -if (WEXITSTATUS(proc->status) != 0) - { - // only print an error message if aborting - if (!(proc->pl->options & pipelineNoAbort)) - fprintf(stderr, "process exited with %d: \"%s\" in pipeline \"%s\"\n", +if ((WEXITSTATUS(proc->status) != 0) && !(proc->pl->options & pipelineNoAbort)) + errAbort("process exited with %d: \"%s\" in pipeline \"%s\"", WEXITSTATUS(proc->status), joinCmd(proc->cmd), proc->pl->procName); - exit(WEXITSTATUS(proc->status)); // pass back exit code - } proc->pid = -1; plProcStateTrans(proc, procStateDone); } static struct pipeline* pipelineNew(char ***cmds, unsigned options) /* create a new pipeline object. Doesn't start processes */ { static char *memPseudoCmd[] = {"[mem]", NULL}; struct pipeline *pl; int iCmd; AllocVar(pl); -pl->groupLeader = -1; +pl->pgid = -1; pl->pipeFd = -1; pl->options = options; pl->procName = joinCmds(cmds); if (cmds[0] == NULL) errAbort("no commands in pipeline"); if (options & pipelineMemInput) { /* add proc for forked process to write memory to pipeline */ slAddTail(&pl->procs, plProcNew(memPseudoCmd, pl)); } for(iCmd = 0; cmds[iCmd] != NULL; iCmd++) slAddTail(&pl->procs, plProcNew(cmds[iCmd], pl)); @@ -277,185 +274,119 @@ if (pl != NULL) { struct plProc *proc = pl->procs; while (proc != NULL) { struct plProc *delProc = proc; proc = proc->next; plProcFree(delProc); } freez(&pl->procName); freez(&pl->stdioBuf); freez(plPtr); } } -static struct plProc *pipelineFindProc(struct pipeline *pl, pid_t pid) -/* find a plProc by pid */ -{ -struct plProc *proc; -for (proc = pl->procs; proc != NULL; proc = proc->next) - if (proc->pid == pid) - return proc; -errAbort("pid not found in pipeline: %d", (int)pid); -return 0; // never reached -} - static void execProcChild(struct pipeline* pl, struct plProc *proc, int procStdinFd, int procStdoutFd, int stderrFd, void *otherEndBuf, size_t otherEndBufSize) /* handle child process setup after fork. This does not return */ { if (signal(SIGPIPE, SIG_IGN) == SIG_ERR) errnoAbort("error ignoring SIGPIPE"); -if (setpgid(getpid(), pl->groupLeader) != 0) - errnoAbort("error from setpgid(%d, %d)", getpid(), pl->groupLeader); +// set process group to first subprocess id, which might be us +pid_t pgid = (pl->pgid < 0) ? getpid() : pl->pgid; +if (setpgid(getpid(), pgid) != 0) + errnoAbort("error from setpgid(%d, %d)", getpid(), pgid); if (otherEndBuf != NULL) plProcMemWrite(proc, procStdoutFd, stderrFd, otherEndBuf, otherEndBufSize); else plProcExecChild(proc, procStdinFd, procStdoutFd, stderrFd); } static int pipelineExecProc(struct pipeline* pl, struct plProc *proc, int prevStdoutFd, int stdinFd, int stdoutFd, int stderrFd, void *otherEndBuf, size_t otherEndBufSize) /* start a process in the pipeline, return the stdout fd of the process */ { /* determine stdin/stdout to use */ int procStdinFd, procStdoutFd; if (proc == pl->procs) procStdinFd = stdinFd; /* first process in pipeline */ else procStdinFd = prevStdoutFd; if (proc->next == NULL) procStdoutFd = stdoutFd; /* last process in pipeline */ else prevStdoutFd = pipeCreate(&procStdoutFd); /* start process */ if ((proc->pid = fork()) < 0) errnoAbort("can't fork"); if (proc->pid == 0) execProcChild(pl, proc, procStdinFd, procStdoutFd, stderrFd, otherEndBuf, otherEndBufSize); +/* parent only */ +if (pl->pgid < 0) + pl->pgid = proc->pid; // first process defines pgid + /* record that we did this */ plProcStateTrans(proc, procStateRun); pl->numRunning++; /* don't leave intermediate pipes open in parent */ if (proc != pl->procs) safeClose(&procStdinFd); if (proc->next != NULL) safeClose(&procStdoutFd); +safeClose(&proc->execPipeChild); // child end of execPipe return prevStdoutFd; } -static void pipelineGroupExec(struct pipeline* pl, int stdinFd, int stdoutFd, int stderrFd, +static void waitOnExec(struct plProc *proc) +/* wait on exec to happen on this process */ +{ +// execPipeChild will get EOF when exec happens +char buf[1]; +// even with (void) cast, so compilers on some systems complained +// about the result of read not being used. Hack to save unused result. +ssize_t l = read(proc->execPipeParent, buf, sizeof(buf)); +l++; +safeClose(&proc->execPipeParent); +} + +static void pipelineExec(struct pipeline* pl, int stdinFd, int stdoutFd, int stderrFd, void *otherEndBuf, size_t otherEndBufSize) /* Start all processes in a pipeline, stdinFd and stdoutFd are the ends of * the pipeline, stderrFd is applied to all processed */ { struct plProc *proc; int prevStdoutFd = -1; for (proc = pl->procs; proc != NULL; proc = proc->next) { prevStdoutFd = pipelineExecProc(pl, proc, prevStdoutFd, stdinFd, stdoutFd, stderrFd, otherEndBuf, otherEndBufSize); otherEndBuf = NULL; /* only for first process (read pipes) */ otherEndBufSize = 0; } -} -static void waitOnOne(struct pipeline *pl) -/* wait on one process to finish */ -{ -int status; -pid_t pid = waitpid(-pl->groupLeader, &status, 0); -if (pid < 0) - errnoAbort("waitpid failed"); -plProcHandleTerminate(pipelineFindProc(pl, pid), status); -pl->numRunning--; -assert(pl->numRunning >= 0); -} - -static void groupWait(struct pipeline *pl) -/* Wait for pipeline to complete */ -{ -/* wait on all processes to complete */ -while (pl->numRunning > 0) - waitOnOne(pl); -} - -static void groupLeaderRun(struct pipeline* pl, int stdinFd, int stdoutFd, int stderrFd, - void *otherEndBuf, size_t otherEndBufSize) -/* group leader process */ -{ -pl->groupLeader = getpid(); -if (setpgid(pl->groupLeader, pl->groupLeader) != 0) - errnoAbort("error from child setpgid(%d, %d)", pl->groupLeader, pl->groupLeader); -pipelineGroupExec(pl, stdinFd, stdoutFd, stderrFd, otherEndBuf, otherEndBufSize); - -// only keep stderr open -close(STDIN_FILENO); -close(STDOUT_FILENO); -closeNonStdDescriptors(); -groupWait(pl); -exit(0); -} - -static int groupLeaderWait(struct pipeline *pl) -/* Wait for group leader to complete. If pipelineNoAbort was specified, return - * the exit code of the first group process exit non-zero, or zero if none - * failed. */ -{ -// FIXME: get error back, can't send back status in a exit code -int status; -pid_t pid = waitpid(-pl->groupLeader, &status, 0); -if (pid < 0) - errnoAbort("waitpid failed"); -if (WIFSIGNALED(status)) - errAbort("process pipeline terminated on signal %d", WTERMSIG(status)); -assert(WIFEXITED(status)); - -if ((WEXITSTATUS(status) != 0) && !(pl->options & pipelineNoAbort)) - errAbort("pipeline exited with %d", WEXITSTATUS(status)); -return WEXITSTATUS(status); -} - -static void pipelineExec(struct pipeline* pl, int stdinFd, int stdoutFd, int stderrFd, - void *otherEndBuf, size_t otherEndBufSize) -/* Fork the group leader, which then launches all all processes in a pipeline, - * stdinFd and stdoutFd are the ends of the pipeline, stderrFd is applied to - * all processes, including group leader */ -{ -assert(pl->groupLeader < 0); // should not be set -if ((pl->groupLeader = fork()) < 0) - errnoAbort("can't fork"); -if (pl->groupLeader == 0) - { - groupLeaderRun(pl, stdinFd, stdoutFd, stderrFd, otherEndBuf, otherEndBufSize); - exit(1); // doesn't return to here - } -else - { - // parent also must also setpgid to prevent race condition - if (setpgid(pl->groupLeader, pl->groupLeader) != 0) - errnoAbort("error from parent setpgid(%d, %d)", pl->groupLeader, pl->groupLeader); - } +// wait on execs to happen so we know that setpgid has happened +for (proc = pl->procs; proc != NULL; proc = proc->next) + waitOnExec(proc); } static int openRead(char *fname) /* open a file for reading */ { int fd = open(fname, O_RDONLY); if (fd < 0) errnoAbort("can't open for read access: %s", fname); return fd; } static int openWrite(char *fname, boolean append) /* open a file for write access */ { int flags = O_WRONLY|O_CREAT; @@ -652,38 +583,79 @@ static void closePipeline(struct pipeline *pl) /* Close the pipe file */ { if (pl->pipeFh != NULL) closePipelineFile(pl); else if (pl->pipeLf != NULL) lineFileClose(&pl->pipeLf); else { if (close(pl->pipeFd) < 0) errAbort("close failed on pipeline: %s ", pl->procName); } pl->pipeFd = -1; } +static struct plProc *pipelineFindProc(struct pipeline *pl, pid_t pid) +/* find a plProc by pid */ +{ +struct plProc *proc; +for (proc = pl->procs; proc != NULL; proc = proc->next) + if (proc->pid == pid) + return proc; +errAbort("pid not found in pipeline: %d", (int)pid); +return 0; // never reached +} + +static int pipelineFindStatus(struct pipeline *pl) +/* find the status of the pipeline, which is the first failed, or 0 */ +{ +// n.b. can't get here if signaled (see plProcWait) +struct plProc *proc; +for (proc = pl->procs; proc != NULL; proc = proc->next) + { + assert(WIFEXITED(proc->status)); + if (WEXITSTATUS(proc->status) != 0) + return WEXITSTATUS(proc->status); + } +return 0; // all ok +} + +static void waitOnOne(struct pipeline *pl) +/* wait on one process to finish */ +{ +int status; +pid_t pid = waitpid(-pl->pgid, &status, 0); +if (pid < 0) + errnoAbort("waitpid failed"); +plProcWait(pipelineFindProc(pl, pid), status); +pl->numRunning--; +assert(pl->numRunning >= 0); +} + int pipelineWait(struct pipeline *pl) /* Wait for processes in a pipeline to complete; normally aborts if any * process exists non-zero. If pipelineNoAbort was specified, return the exit * code of the first process exit non-zero, or zero if none failed. */ { /* must close before waiting to so processes get pipe EOF */ closePipeline(pl); -return groupLeaderWait(pl); + +/* wait on each process in order */ +while (pl->numRunning > 0) + waitOnOne(pl); +return pipelineFindStatus(pl); } void pipelineDumpCmds(char ***cmds) /* Dump out pipeline-formatted commands to stdout for debugging. */ { char **cmd; boolean first = TRUE; while ((cmd = *cmds++) != NULL) { char *word; if (first) first = FALSE; else printf("| "); while ((word = *cmd++) != NULL)