2ded47d85dc1ea9e5cecda7504cfdbb103bef6fe kent Fri Dec 27 19:50:59 2013 -0800 Adding -wait option to parasol add job to wait for job to finish. diff --git src/parasol/parasol/parasol.c src/parasol/parasol/parasol.c index 331e919..cfbed1f 100644 --- src/parasol/parasol/parasol.c +++ src/parasol/parasol/parasol.c @@ -1,25 +1,26 @@ /* parasol - Parasol program - for launching programs in parallel on a computer cluster. */ #include "paraCommon.h" #include "net.h" #include "linefile.h" #include "errabort.h" #include "dystring.h" #include "hash.h" #include "options.h" #include "paraLib.h" #include "paraMessage.h" +#include "jobResult.h" char *version = PARA_VERSION; /* Version number. */ static char *paraHubHost = "localhost"; // hub host static float cpuUsage = 0; static long long ramUsage = 0; struct rudp *hubRudp; /* Network connection to paraHub. */ char *userName; /* Name of user. */ void mustBeRoot() /* Abort if we're not root. */ { if (!sameString(userName, "root")) @@ -32,31 +33,33 @@ errAbort( "Parasol version %s\n" "Parasol is the name given to the overall system for managing jobs on\n" "a computer cluster and to this specific command. This command is\n" "intended primarily for system administrators. The 'para' command\n" "is the primary command for users.\n" "Usage in brief:\n" " parasol add machine machineFullHostName localTempDir - Add new machine to pool.\n" " or \n" " parasol add machine machineFullHostName cpus ramSizeMB localTempDir localDir localSizeMB switchName\n" " parasol remove machine machineFullHostName \"reason why\" - Remove machine from pool.\n" " parasol check dead - Check machines marked dead ASAP, some have been fixed.\n" " parasol add spoke - Add a new spoke daemon.\n" " parasol [options] add job command-line - Add job to list.\n" " options:\n" - " -out=out -in=in -dir=dir -verbose\n" + " -wait - If set wait for job to finish to return and return with job status code\n" + " -err=path -out=out -in=in - Where to put stderr, stdin, stdout output\n" + " -verbose=N - set verbosity level, default level is 1\n" " -results=resultFile fully qualified path to the results file, \n" " or `results' in the current directory if not specified.\n" " -cpu=N Number of CPUs used by the jobs, default 1.\n" " -ram=N Number of bytes of RAM used by the jobs.\n" " Default is RAM on node divided by number of cpus on node.\n" " Shorthand expressions allow t,g,m,k for tera, giga, mega, kilo.\n" " e.g. 4g = 4 Gigabytes.\n" " parasol [options] clear sick - Clear sick stats on a batch.\n" " options:\n" " -results=resultFile fully qualified path to the results file, \n" " or `results' in the current directory if not specified.\n" " parasol remove job id - Remove job of given ID.\n" " parasol ping [count] - Ping hub server to make sure it's alive.\n" " parasol remove jobs userName [jobPattern] - Remove jobs submitted by user that\n" " match jobPattern (which may include ? and * escaped for shell).\n" @@ -189,65 +192,103 @@ } void addMachine(int argc, char *argv[]) /* Tell hub about a new machine. */ { char buf[1024]; mustBeRoot(); if (argc == 3) safef(buf, sizeof(buf), "addMachine %s %s", argv[1], argv[2]); else /* argc == 7 */ safef(buf, sizeof(buf), "addMachine %s %s %s %s %s %s %s", argv[1], argv[2], argv[3], argv[4], argv[5], argv[6], argv[7] ); commandHub(buf); } +void waitAndExit(char *resultsFile, char *jobIdString, char *err) +/* Read results file until jobId appears in it, and then if necessary + * copy over stderr to err, and finally exit with the same result + * as command did. */ +{ +off_t resultsPos = 0; +int pollTime = 5; // Poll every 5 seconds for file to open. +char *row[JOBRESULT_NUM_COLS]; +for (;;) + { + verbose(1, "waiting for %d\n", pollTime); + sleep(pollTime); + if (!fileExists(resultsFile)) + continue; + struct lineFile *lf = lineFileOpen(resultsFile, TRUE); + lineFileSeek(lf, resultsPos, SEEK_SET); + while (lineFileRow(lf, row)) + { + resultsPos = lineFileTell(lf); + struct jobResult jr; + jobResultStaticLoad(row, &jr); + if (sameString(jr.jobId, jobIdString)) + { + if (err != NULL) + pmFetchFile(jr.host, jr.errFile, err); + exit(jr.status); + } + } + lineFileClose(&lf); + } +} + void addJob(int argc, char *argv[], boolean printId) /* Tell hub about a new job. */ { struct dyString *dy = newDyString(1024); char *in = optionVal("in", "/dev/null"); char *out = optionVal("out", "/dev/null"); +char *err = optionVal("err", NULL); char *jobIdString; int i; char curDir[PATH_LEN]; getcwd(curDir, sizeof(curDir)); char *dir = optionVal("dir", curDir); char results[PATH_LEN]; getResultsFile(results); dyStringPrintf(dy, "addJob2 %s %s %s %s %s %f %lld", userName, dir, in, out, results, cpuUsage, ramUsage); for (i=0; istringSize > rudpMaxSize) errAbort("The following string has %d bytes, but can only be %d:\n%s\n" "Please either shorten the current directory or the command line\n" "possibly by making a shell script that encapsulates a long command.\n" , dy->stringSize, (int)rudpMaxSize, dy->string); jobIdString = hubCommandGetReciept(dy->string); dyStringFree(&dy); if (sameString(jobIdString, "0")) errAbort("sick batch?: hub returned jobId==%s", jobIdString); if (printId) { printf("your job %s (\"%s", jobIdString, argv[0]); for (i=1; istring); if (!sameString(response, "0"))