6f949e90b1ba3de976455fbcf9da21897761d134 markd Fri Oct 29 16:11:58 2021 -0700 add timeout option to pipeline to allow kill long-running pipelines, especially ones run from CGIs diff --git src/lib/pipeline.c src/lib/pipeline.c index 3304b26..1ec7ad7 100644 --- src/lib/pipeline.c +++ src/lib/pipeline.c @@ -34,30 +34,31 @@ enum procState state; /* state of process */ int status; /* status from wait */ }; struct pipeline /* Object for a process pipeline and associated open file. Pipeline process * consist of a process group leader and then all of the child process. The * group leader does no work, just wait on processes to complete and report * errors to the top level process. This object is create in the calling * process, and then passed down, but not shared, via forks. */ { struct plProc *procs; /* list of processes */ int numRunning; /* number of processes running */ pid_t groupLeader; /* process group id, or -1 if not set. This is pid of group leader */ + unsigned int timeout; /* timeout, in seconds, or zero */ char *procName; /* name to use in error messages. */ int pipeFd; /* fd of pipe to/from process, -1 if none */ unsigned options; /* options */ FILE* pipeFh; /* optional stdio around pipe */ char* stdioBuf; /* optional stdio buffer */ struct lineFile *pipeLf; /* optional lineFile around pipe */ }; /* file buffer size */ #define FILE_BUF_SIZE 64*1024 static int pipeCreate(int *writeFd) /* create a pipe or die, return readFd */ { int pipeFds[2]; @@ -251,41 +252,42 @@ } static void plProcHandleTerminate(struct plProc* proc, int status) /* handle one of the processes terminating, save exit status */ { proc->pid = -1; proc->status = status; plProcStateTrans(proc, procStateDone); if (WIFSIGNALED(proc->status)) plProcHandleSignaled(proc, status); else plProcHandleExited(proc, status); } -static struct pipeline* pipelineNew(char ***cmds, unsigned options) +static struct pipeline* pipelineNew(char ***cmds, unsigned options, unsigned int timeout) /* create a new pipeline object. Doesn't start processes */ { static char *memPseudoCmd[] = {"[mem]", NULL}; struct pipeline *pl; int iCmd; AllocVar(pl); pl->groupLeader = -1; pl->pipeFd = -1; pl->options = options; +pl->timeout = timeout; pl->procName = joinCmds(cmds); if (cmds[0] == NULL) errAbort("no commands in pipeline"); if (options & pipelineMemInput) { /* add proc for forked process to write memory to pipeline */ slAddTail(&pl->procs, plProcNew(memPseudoCmd, pl)); } for(iCmd = 0; cmds[iCmd] != NULL; iCmd++) slAddTail(&pl->procs, plProcNew(cmds[iCmd], pl)); return pl; @@ -392,37 +394,63 @@ if (pid < 0) errnoAbort("waitpid failed"); plProcHandleTerminate(pipelineFindProc(pl, pid), status); pl->numRunning--; assert(pl->numRunning >= 0); } static void groupWait(struct pipeline *pl) /* Wait for pipeline to complete */ { /* wait on all processes to complete */ while (pl->numRunning > 0) waitOnOne(pl); } +/* uses to stash pipeline object for group leader process only, which is + * single-threaded */ +static struct pipeline* groupApoptosisPipeline = NULL; + +static void groupApoptosis(int signum) +/* signal handler for SIGALRM expiration */ +{ +// hopefully this gets logged or seen by user +fprintf(stderr, "pipeline timeout kill after %d seconds: %s\n", groupApoptosisPipeline->timeout, + pipelineDesc(groupApoptosisPipeline)); +fflush(stderr); +(int)kill(0, SIGKILL); // kill off process group +} + +static void setupTimeout(struct pipeline* pl) +/* setup timeout handling */ +{ +groupApoptosisPipeline = pl; +if (signal(SIGALRM, groupApoptosis) != 0) + errnoAbort("signal failed"); +(void)alarm(pl->timeout); +} + static void groupLeaderRun(struct pipeline* pl, int stdinFd, int stdoutFd, int stderrFd, void *otherEndBuf, size_t otherEndBufSize) /* group leader process */ { pl->groupLeader = getpid(); if (setpgid(pl->groupLeader, pl->groupLeader) != 0) errnoAbort("error from child setpgid(%d, %d)", pl->groupLeader, pl->groupLeader); +if (pl->timeout > 0) + setupTimeout(pl); + pipelineGroupExec(pl, stdinFd, stdoutFd, stderrFd, otherEndBuf, otherEndBufSize); // only keep stderr open close(STDIN_FILENO); close(STDOUT_FILENO); closeNonStdDescriptors(); groupWait(pl); _exit(0); } static int groupLeaderWait(struct pipeline *pl) /* Wait for group leader to complete. If pipelineNoAbort was specified, return * the exit code of the first group process exit non-zero, or zero if none * failed. */ { @@ -501,113 +529,117 @@ pipelineExec(pl, pipeRdFd, stdoutFd, stderrFd, NULL, 0); safeClose(&pipeRdFd); } static void checkOpts(unsigned opts) /* check option set for consistency */ { if (((opts & (pipelineRead|pipelineWrite)) == 0) || ((opts & (pipelineRead|pipelineWrite)) == (pipelineRead|pipelineWrite))) errAbort("must specify one of pipelineRead or pipelineWrite to pipelineOpen"); if ((opts & pipelineAppend) && ((opts & pipelineWrite) == 0)) errAbort("pipelineAppend is valid only in conjunction with pipelineWrite"); } struct pipeline *pipelineOpenFd(char ***cmds, unsigned opts, - int otherEndFd, int stderrFd) + int otherEndFd, int stderrFd, + unsigned int timeout) /* Create a pipeline from an array of commands. See pipeline.h for * full documentation. */ { struct pipeline *pl; checkOpts(opts); -pl = pipelineNew(cmds, opts); +pl = pipelineNew(cmds, opts, timeout); if (opts & pipelineRead) pipelineStartRead(pl, otherEndFd, stderrFd, NULL, 0); else pipelineStartWrite(pl, otherEndFd, stderrFd); return pl; } struct pipeline *pipelineOpen(char ***cmds, unsigned opts, - char *otherEndFile, char *stderrFile) + char *otherEndFile, char *stderrFile, + unsigned int timeout) /* Create a pipeline from an array of commands. See pipeline.h for * full documentation */ { int otherEndFd; int stderrFd = (stderrFile == NULL) ? STDERR_FILENO : openWrite(stderrFile, FALSE); checkOpts(opts); boolean append = ((opts & pipelineAppend) != 0); if (opts & pipelineRead) otherEndFd = (otherEndFile == NULL) ? STDIN_FILENO : openRead(otherEndFile); else otherEndFd = (otherEndFile == NULL) ? STDOUT_FILENO : openWrite(otherEndFile, append); -struct pipeline *pl = pipelineOpenFd(cmds, opts, otherEndFd, stderrFd); +struct pipeline *pl = pipelineOpenFd(cmds, opts, otherEndFd, stderrFd, timeout); safeClose(&otherEndFd); if (stderrFile != NULL) safeClose(&stderrFd); return pl; } struct pipeline *pipelineOpenMem(char ***cmds, unsigned opts, void *otherEndBuf, size_t otherEndBufSize, - int stderrFd) + int stderrFd, unsigned int timeout) /* Create a pipeline from an array of commands, with the pipeline input/output * in a memory buffer. See pipeline.h for full documentation. Currently only * input to a read pipeline is supported */ { struct pipeline *pl; checkOpts(opts); if (opts & pipelineWrite) errAbort("pipelineOpenMem only supports read pipelines at this time"); opts |= pipelineMemInput; -pl = pipelineNew(cmds, opts); +pl = pipelineNew(cmds, opts, timeout); pipelineStartRead(pl, STDIN_FILENO, stderrFd, otherEndBuf, otherEndBufSize); return pl; } struct pipeline *pipelineOpenFd1(char **cmd, unsigned opts, - int otherEndFd, int stderrFd) + int otherEndFd, int stderrFd, + unsigned int timeout) /* like pipelineOpenFd(), only takes a single command */ { char **cmds[2]; cmds[0] = cmd; cmds[1] = NULL; -return pipelineOpenFd(cmds, opts, otherEndFd, stderrFd); +return pipelineOpenFd(cmds, opts, otherEndFd, stderrFd, timeout); } struct pipeline *pipelineOpen1(char **cmd, unsigned opts, - char *otherEndFile, char *stderrFile) + char *otherEndFile, char *stderrFile, + unsigned int timeout) /* like pipelineOpen(), only takes a single command */ { char **cmds[2]; cmds[0] = cmd; cmds[1] = NULL; -return pipelineOpen(cmds, opts, otherEndFile, stderrFile); +return pipelineOpen(cmds, opts, otherEndFile, stderrFile, timeout); } struct pipeline *pipelineOpenMem1(char **cmd, unsigned opts, void *otherEndBuf, size_t otherEndBufSize, - int stderrFd) + int stderrFd, unsigned int timeout) /* like pipelineOpenMem(), only takes a single command */ { char **cmds[2]; cmds[0] = cmd; cmds[1] = NULL; -return pipelineOpenMem(cmds, opts, otherEndBuf, otherEndBufSize, stderrFd); +return pipelineOpenMem(cmds, opts, otherEndBuf, otherEndBufSize, stderrFd, timeout); } char *pipelineDesc(struct pipeline *pl) /* Get the description of a pipeline for use in error messages */ { return pl->procName; } int pipelineFd(struct pipeline *pl) /* Get the file descriptor for a pipeline */ { return pl->pipeFd; } FILE *pipelineFile(struct pipeline *pl)