6e5ee11ca95cd971984038cf65bae00d9c898707
galt
  Wed Jun 4 15:40:02 2014 -0700
Since we have git, it is easy to rename errabort.c to errAbort.c without losing any history.
diff --git src/parasol/para/para.c src/parasol/para/para.c
index 765f468..2f13a30 100644
--- src/parasol/para/para.c
+++ src/parasol/para/para.c
@@ -1,2271 +1,2271 @@
 /* para - para - manage a batch of jobs in parallel on a compute cluster.. */
 #include "paraCommon.h"
-#include "errabort.h"
+#include "errAbort.h"
 #include "linefile.h"
 #include "options.h"
 #include "hash.h"
 #include "localmem.h"
 #include "dystring.h"
 #include "obscure.h"
 #include "portable.h"
 #include "net.h"
 #include "paraLib.h"
 #include "paraMessage.h"
 #include "jobDb.h"
 #include "jobResult.h"
 #include "verbose.h"
 #include "sqlNum.h"
 
 
 /* command line option specifications */
 static struct optionSpec optionSpecs[] = {
     {"retries"  , OPTION_INT},
     {"maxQueue" , OPTION_INT},
     {"minPush"  , OPTION_INT},
     {"maxPush"  , OPTION_INT},
     {"warnTime" , OPTION_INT},
     {"killTime" , OPTION_INT},
     {"delayTime", OPTION_INT},
     {"eta"      , OPTION_BOOLEAN},  /* default, no longer used */
     {"pri"      , OPTION_STRING},
     {"priority" , OPTION_STRING},
     {"maxJob"   , OPTION_STRING},
     {"cpu"      , OPTION_FLOAT},
     {"ram"      , OPTION_STRING},
     {"batch"    , OPTION_STRING},
     {"jobCwd"   , OPTION_STRING},
     {NULL, 0}
 };
 
 char *version = PARA_VERSION;   /* Version number. */
 
 static int numHappyDots;       /* number of happy dots written */
 
 void usage()
 /* Explain usage and exit. */
 {
 errAbort(
   "para - version %s\n"
   "Manage a batch of jobs in parallel on a compute cluster.\n"
   "Normal usage is to do a 'para create' followed by 'para push' until\n"
   "job is done.  Use 'para check' to check status.\n"
   "usage:\n\n"
   "   para [options] command [command-specific arguments]\n\n"
   "The commands are:\n"
   "\n"
   "para create jobList\n"
   "   This makes the job-tracking database from a text file with the\n"
   "   command line for each job on a separate line.\n"
   "   options:\n"
   "      -cpu=N  Number of CPUs used by the jobs, default 1.\n"
   "      -ram=N  Number of bytes of RAM used by the jobs.\n"
   "         Default is RAM on node divided by number of cpus on node.\n"
   "         Shorthand expressions allow t,g,m,k for tera, giga, mega, kilo.\n"
   "         e.g. 4g = 4 Gigabytes.\n"
   "      -batch=batchDir - specify the directory path that is used to store the\n"
   "       batch control files.  The batchDir can be an absolute path or a path\n"
   "       relative to the current directory.  The resulting path is use as the\n"
   "       batch name.  The directory is created if it doesn't exist.  When\n"
   "       creating a new batch, batchDir should not have been previously used as\n"
   "       a batch name.  The batchDir must be writable by the paraHub process.\n"
   "       This does not affect the working directory assigned to jobs.  It defaults\n"
   "       to the directory where para is run.  If used, this option must be specified\n"
   "       on all para commands for the  batch.  For example to run two batches in the\n"
   "       same directory:\n"
   "          para -batch=b1 make jobs1\n"
   "          para -batch=b2 make jobs2\n"
   "para push \n"
   "   This pushes forward the batch of jobs by submitting jobs to parasol\n"
   "   It will limit parasol queue size to something not too big and\n"
   "   retry failed jobs.\n"
   "   options:\n"
   "      -retries=N  Number of retries per job - default 4.\n"
   "      -maxQueue=N  Number of jobs to allow on parasol queue. \n"
   "         Default 2000000.\n"
   "      -minPush=N  Minimum number of jobs to queue. \n"
   "         Default 1.  Overrides maxQueue.\n"
   "      -maxPush=N  Maximum number of jobs to queue - default 100000.\n"
   "      -warnTime=N  Number of minutes job runs before hang warning. \n"
   "         Default 4320 (3 days).\n"
   "      -killTime=N  Number of minutes hung job runs before push kills it.\n"
   "         By default kill off for backwards compatibility.\n" /* originally to be 2 weeks */
   "      -delayTime=N  Number of seconds to delay before submitting next job \n"
   "         to minimize i/o load at startup - default 0.\n"
   "      -priority=x  Set batch priority to high, medium, or low.\n"
   "         Default medium (use high only with approval).\n"
   "         If needed, use with make, push, create, shove, or try.\n"
   "         Or, set batch priority to a specific numeric value - default %d.\n"
   "         1 is emergency high priority, \n"
   "         %d is normal medium, \n"
   "         %d is low for bottomfeeders.\n"
   "         Setting priority higher than normal (1-%d) will be logged.\n"
   "         Please keep low priority jobs short, they won't be pre-empted.\n"
   "      -maxJob=x  Limit the number of jobs the batch can run.\n"
   "         Specify number of jobs, for example 10 or 'unlimited'.\n"
   "         Default unlimited displays as -1.\n"
   "      -jobCwd=dir - specify the directory path to use as the current working\n"
   "       directory for each job.  The dir can be an absolute path or a path\n"
   "       relative to the current directory. It defaults to the directory where\n"
   "       para is run.\n"
   "para try \n"
   "   This is like para push, but only submits up to 10 jobs.\n"
   "para shove\n"
   "   Push jobs in this database until all are done or one fails after N retries.\n"
   "para make jobList\n"
   "   Create database and run all jobs in it if possible.  If one job\n"
   "   fails repeatedly this will fail.  Suitable for inclusion in makefiles.\n"
   "   Same as a 'create' followed by a 'shove'.\n"
   "para check\n"
   "   This checks on the progress of the jobs.\n"
   "para stop\n"
   "   This stops all the jobs in the batch.\n"
   "para chill\n"
   "   Tells system to not launch more jobs in this batch, but\n"
   "   does not stop jobs that are already running.\n"
   "para finished\n"
   "   List jobs that have finished.\n"
   "para hung\n"
   "   List hung jobs in the batch (running > killTime).\n"
   "para slow\n"
   "   List slow jobs in the batch (running > warnTime).\n"
   "para crashed\n"
   "   List jobs that crashed or failed output checks the last time they were run.\n"
   "para failed\n"
   "   List jobs that crashed after repeated restarts.\n"
   "para status\n"
   "   List individual job status, including times.\n"
   "para problems\n"
   "   List jobs that had problems (even if successfully rerun).\n"
   "   Includes host info.\n"
   "para running\n"
   "   Print info on currently running jobs.\n"
   "para hippos time\n"
   "   Print info on currently running jobs taking > 'time' (minutes) to run.\n"
   "para time\n"
   "   List timing information.\n"
   "para recover jobList newJobList\n"
   "   Generate a job list by selecting jobs from an existing list where\n"
   "   the `check out' tests fail.\n"
   "para priority 999\n"
   "   Set batch priority. Values explained under 'push' options above.\n"
   "para maxJob 999\n"
   "   Set batch maxJob. Values explained under 'push' options above.\n"
   "para ram 999\n"
   "   Set batch ram usage. Values explained under 'push' options above.\n"
   "para cpu 999\n"
   "   Set batch cpu usage. Values explained under 'push' options above.\n"
   "para resetCounts\n"
   "   Set batch done and crash counters to 0.\n"
   "para flushResults\n"
   "   Flush results file.  Warns if batch has jobs queued or running.\n"
   "para freeBatch\n"
   "   Free all batch info on hub.  Works only if batch has nothing queued or running.\n"
   "para showSickNodes\n"
   "   Show sick nodes which have failed when running this batch.\n"
   "para clearSickNodes\n"
   "   Clear sick nodes statistics and consecutive crash counts of batch.\n"
   "\n"
   "Common options\n"
   "   -verbose=1 - set verbosity level.\n",
   version,
   NORMAL_PRIORITY,
   NORMAL_PRIORITY,
   NORMAL_PRIORITY * NORMAL_PRIORITY,
   NORMAL_PRIORITY-1
   );
 }
 
 /* Variables that can be set from command line. */
 
 int retries = 4;
 int maxQueue = 2000000;
 int minPush = 1;
 int maxPush = 100000;
 int warnTime = 3*24*60;
 int killTime = 0;  /* 0 = off, was originally going to be 2 weeks or 14*24*60; */
 int sleepTime = 5*60;
 int delayTime = 0;
 int priority = NORMAL_PRIORITY;
 int maxJob  = -1;
 float cpuUsage = 0;
 long long ramUsage = 0;
 char *batchDir = NULL;
 char *jobCwd = NULL;
 char resultsName[PATH_LEN];
 char errDir[PATH_LEN];
 
 /* Some variable we might want to move to a config file someday. */
 char *resultsFileName = "para.results"; /* Name of results file. */
 char *statusCommand = "parasol pstat2";
 char *killCommand = "parasol remove job";
 
 boolean sickBatch = FALSE;
 
 char *bookMarkFileName = "para.bookmark";  /* name of bookmark file */
 char bookMarkName[PATH_LEN];  /* path to bookmark file */
 off_t bookMark = 0;  /* faster to resume from bookmark only reading new para.results */ 
 off_t resultsSize = 0;  /* where to stop reading results for the current cycle */
 
 void beginHappy()
 /* Call before a loop where happy dots maybe written */
 {
 numHappyDots = 0;
 }
 
 void endHappy()
 /* Call after a loop where happy dots may have been written.  There maybe more
  * than one call to endHappy() for a beginHappy(). It's a good idea to call
  * this before outputing low-level error messages to keep messages readable
  * if dots are being written.
  */
 {
 if (numHappyDots > 0)
     {
     verbose(1, "\n");
     fflush(stderr);
     fflush(stdout);
     numHappyDots = 0;
     }
 }
 
 void vpprintf(FILE *f, char *format, va_list args)
 /* para printf; ensures happy dots are flushed */
 {
 endHappy();
 vfprintf(f, format, args);
 }
 
 void pprintf(FILE *f, char *format, ...)
 /* para printf; ensures happy dots are flushed */
 {
 va_list args;
 va_start(args, format);
 vpprintf(f, format, args);
 va_end(args);
 }
 
 void paraVaWarn(char *format, va_list args)
 /* warn handler, flushes happyDots if needed. */
 {
 endHappy();
 if (format != NULL) {
     fflush(stdout);
     vfprintf(stderr, format, args);
     fprintf(stderr, "\n");
     fflush(stderr);  /* forces to log files */
     }
 }
 
 enum jaState 
 /* A job is in one of these states. */
  {
  jaUnsubmitted,
  jaQueued,
  jaRunning,
  jaHung,
  jaCrashed,
  jaFinished,
  };
 
 char *jaStateShortDesc[] =
 /* short description of job state; indexed by enum jaState */
     {
     "unsub",
     "queue",
     "run",
     "hung",
     "crash",
     "done"
     };
 
 enum jaState figureState(struct job *job)
 /* Figure out state of job. */
 {
 struct submission *sub;
 if ((sub = job->submissionList) == NULL)
     return jaUnsubmitted;
 if (sub->inQueue)
     return jaQueued;
 if (sub->running)
     return jaRunning;
 if (sub->hung)
     return jaHung;
 if (sub->ranOk)
     return jaFinished;
 else
     return jaCrashed;
 }
 
 
 char *useWhats[] = {"cpu", "ram"};
 
 void parseUsage(struct job* job, struct lineFile *lf)
 /* Parse out and keep CPU and RAM usage from the job command. */
 {
 /* allow command-line options to be overridden by spec */
 if (cpuUsage != 0)
     job->cpusUsed = cpuUsage;
 if (ramUsage != 0)
     job->ramUsed = ramUsage;
 char *pattern = "{use";
 char *s, *e, *z;
 char *line = job->command;
 struct dyString *dy = dyStringNew(1024);
 s = line;
 for (;;)
     {
     e = stringIn(pattern, s);
     if (e == NULL)
 	{
 	dyStringAppend(dy, s);
 	break;
 	}
     else
         {
 	char *parts[4];
 	int partCount;
 	dyStringAppendN(dy, s, e-s);
 	z = strchr(e, '}');
 	if (z == NULL)
 	    errAbort("{use without } line %d of %s", lf->lineIx, lf->fileName);
 	*z = 0;
 	partCount = chopLine(e, parts);
 	if (partCount != 3)
 	    errAbort("Badly formatted 'use' clause in line %d of %s", lf->lineIx, lf->fileName);
 	if (stringIx(parts[1], useWhats) < 0)
 	    errAbort("Unrecognized word '%s' in 'use' clause line %d of %s", 
 	    	parts[1], lf->lineIx, lf->fileName);
 	if (sameString(parts[1], "cpu"))
 	    {
     	    job->cpusUsed = sqlFloat(parts[2]);
 	    }
 	if (sameString(parts[1], "ram"))
 	    {
     	    job->ramUsed = paraParseRam(parts[2]);
 	    if (job->ramUsed == -1)
 		errAbort("Invalid RAM expression '%s' in 'use' clause line %d of %s", 
 		    parts[2], lf->lineIx, lf->fileName);
 	    }
 	s = z+1;
 	}
     }
 freeMem(job->command);
 job->command = cloneString(dy->string);
 dyStringFree(&dy);
 }
 
 
 /* Places that can be checked. */
 char *checkWhens[] = {"in", "out"};
 
 /* Types of checks. */
 char *checkTypes[] = {"exists", "exists+", "line", "line+"};
 
 struct job *jobFromLine(struct lineFile *lf, char *line)
 /* Parse out the beginnings of a job from input line. 
  * Parse out and keep checks. */
 {
 struct check *check;
 char *pattern = "{check";
 char *s, *e, *z;
 struct dyString *dy = dyStringNew(1024);
 struct job *job;
 
 AllocVar(job);
 job->spec = cloneString(line);
 s = line;
 for (;;)
     {
     e = stringIn(pattern, s);
     if (e == NULL)
 	{
 	dyStringAppend(dy, s);
 	break;
 	}
     else
         {
 	char *parts[5];
 	int partCount;
 	dyStringAppendN(dy, s, e-s);
 	z = strchr(e, '}');
 	if (z == NULL)
 	    errAbort("{check without } line %d of %s", lf->lineIx, lf->fileName);
 	*z = 0;
 	partCount = chopLine(e, parts);
 	if (partCount != 4)
 	    errAbort("Badly formatted check line %d of %s", lf->lineIx, lf->fileName);
 	AllocVar(check);
 	slAddHead(&job->checkList, check);
 	job->checkCount += 1;
 	if (stringIx(parts[1], checkWhens) < 0)
 	    errAbort("Unrecognized word '%s' in check line %d of %s", 
 	    	parts[1], lf->lineIx, lf->fileName);
 	check->when = cloneString(parts[1]);
 	if (stringIx(parts[2], checkTypes) < 0)
 	    errAbort("Unrecognized word '%s' in check line %d of %s", 
 	    	parts[2], lf->lineIx, lf->fileName);
 	check->what = cloneString(parts[2]);
 	check->file = cloneString(parts[3]);
 	dyStringAppend(dy, check->file);
 	s = z+1;
 	}
     }
 job->command = cloneString(dy->string);
 slReverse(&job->checkList);
 dyStringFree(&dy);
 parseUsage(job, lf);
 return job;
 }
 
 struct fileStatus
 /* Some info on a file. */
     {
     bool exists;	/* TRUE if file exists. */
     bool hasData;	/* TRUE if nonempty. */
     bool completeLastLine; /* TRUE if last line ends with <lf> */
     bool checkedLastLine;   /* TRUE if done completeLastLine check */
     bool reported;	/* TRUE if reported error. */
     };
 
 struct fileStatus *basicFileCheck(char *file, struct hash *hash)
 /* Do quick file checks caching result */
 {
 struct fileStatus *fi = hashFindVal(hash, file);
 struct stat statBuf;
 if (fi != NULL)
     return fi;
 lmAllocVar(hash->lm, fi);
 if (stat(file, &statBuf) == 0)
     {
     fi->exists = TRUE;
     fi->hasData = (statBuf.st_size > 0);
     }
 return fi;
 }
 
 struct fileStatus *completeLastLineCheck(char *file, struct hash *hash)
 /* Do slower checks for complete last line, caching result */
 {
 struct fileStatus *fi = hashFindVal(hash, file);
 FILE *f;
 
 if ((fi != NULL) && (fi->checkedLastLine))
     return fi;
 if (fi == NULL)
     lmAllocVar(hash->lm, fi);
 if ((f = fopen(file, "rb")) != NULL)
     {
     fi->exists = TRUE;
     if (fseek(f, -1, SEEK_END) == 0)
         {
 	int c = fgetc(f);
 	if (c >= 0)
 	    {
 	    fi->hasData = TRUE;
 	    if (c == '\n')
 	        fi->completeLastLine = TRUE;
 	    }
 	}
     fclose(f);
     }
 fi->checkedLastLine = TRUE;
 return fi;
 }
 
 int doOneCheck(struct check *check, struct hash *hash, FILE *f)
 /* Do one check.  Return error count from check. f maybe NULL to discard
  * errors. */
 {
 struct fileStatus *fi;
 char *file = check->file;
 char *what = check->what;
 
 /* only do slower completeLastLine check if needed */
 if (startsWith("line", what))
     fi = completeLastLineCheck(file, hash);
 else
     fi = basicFileCheck(file, hash);
 
 if (!fi->reported)
     {
     if (!fi->exists)
 	{
         if (f != NULL)
             pprintf(f, "%s does not exist\n", file);
 	fi->reported = TRUE;
 	return 1;
 	}
     if (sameWord(what, "exists+"))
 	{
 	if (!fi->hasData)
 	    {
             if (f != NULL)
                 pprintf(f, "%s is empty\n", file);
 	    fi->reported = TRUE;
 	    return 1;
 	    }
 	}
     else if (sameWord(what, "line"))
 	{
 	if (fi->hasData && !fi->completeLastLine)
 	    {
             if (f != NULL)
                 pprintf(f, "%s has an incomplete last line\n", file);
 	    fi->reported = TRUE;
 	    return 1;
 	    }
 	}
     else if (sameWord(what, "line+"))
 	{
 	if (!fi->hasData)
 	    {
             if (f != NULL)
                 pprintf(f, "%s is empty\n", file);
 	    fi->reported = TRUE;
 	    return 1;
 	    }
 	else if (!fi->completeLastLine)
 	    {
             if (f != NULL)
                 pprintf(f, "%s has an incomplete last line\n", file);
 	    fi->reported = TRUE;
 	    return 1;
 	    }
 	}
     else if (sameString(what, "exists"))
 	{
 	/* Check already made. */
 	}
     else
 	{
 	warn("Unknown check '%s'", what);
 	}
     }
 return fi->reported ? 1 : 0;
 }
 
 void occassionalDot()
 /* Write out a dot every 20 times this is called. */
 {
 static int dotMod = 20;
 static int dot = 20;
 if ((--dot <= 0) && verboseDotsEnabled())
     {
     verboseDot();
     dot = dotMod;
     numHappyDots++;
     }
 }
 
 void occassionalSleep()
 /* Sleep every 1000 times this is called. */
 {
 static int dotMod = 600;
 static int dot = 600;
 if (--dot <= 0)
     {
     fflush(stdout);
     fflush(stderr);
     sleep(1);
     dot = dotMod;
     }
 }
 
 
 int checkOneJob(struct job *job, char *when, struct hash *hash, FILE *f)
 /* Perform checks on one job if checks not already in hash. 
  * Returns number of errors. */
 {
 int errCount = 0;
 struct check *check;
 
 for (check = job->checkList; check != NULL; check = check->next)
     {
     if (sameWord(when, check->when))
 	{
 	errCount += doOneCheck(check, hash, f);
 	occassionalDot();
 	}
     }
 return errCount;
 }
 
 void doChecks(struct jobDb *db, char *when)
 /* Do checks on files where check->when matches when. */
 {
 int errCount = 0;
 struct job *job;
 struct hash *hash = newHash(19);
 
 verbose(1, "Checking %sput files\n", when);
 beginHappy();
 for (job = db->jobList; job != NULL; job = job->next)
     errCount += checkOneJob(job, when, hash, stderr);
 endHappy();
 if (errCount > 0)
     errAbort("%d total errors in file check", errCount);
 freeHashAndVals(&hash);
 }
 
 void writeBatch(struct jobDb *db, char *fileName)
 /* Write out batch file. */
 {
 FILE *f = mustOpen(fileName, "w");
 struct job *job;
 for (job = db->jobList; job != NULL; job = job->next)
     {
     jobCommaOut(job, f);
     fprintf(f, "\n");
     }
 if (ferror(f))
     errAbort("error writing %s", fileName);
 carefulClose(&f);
 }
 
 void readBookMark()
 /* Read the value of the bookMark, an offset into the results file.
  * This should be called before reading the results file. */
 {
 /* a bookmark should not exist if results file does not exist */
 if ((!fileExists(resultsName)) && fileExists(bookMarkName))
     unlink(bookMarkName);
 /* read bookmark position */
 struct lineFile *lf = lineFileMayOpen(bookMarkName, TRUE);
 char *line;
 if (!lf)
     return;
 if (lineFileNext(lf, &line, NULL))
     {
     bookMark = sqlLongLong(line);
     }
 lineFileClose(&lf);
 }
 
 void writeBookMark()
 /* Save the value of the bookMark, an offset into the results file.
  * This should be called after the batch has been safely updated. */
 {
 FILE *f = mustOpen(bookMarkName, "w");
 fprintf(f, "%llu\n", (unsigned long long) bookMark);
 carefulClose(&f);
 }
 
 void atomicWriteBatch(struct jobDb *db, char *fileName)
 /* Wrapper to avoid corruption file by two para process being run in the same
  * directory. */
 {
 char hostName[128];
 char tmpName[PATH_LEN];
 long time = clock1000();
 
 /* generate a unique name for tmp file */
 if (gethostname(hostName, sizeof(hostName)) < 0)
     errnoAbort("can't get host name");
 safef(tmpName, sizeof(tmpName), "%s.%d.%s.tmp", fileName, getpid(), 
       hostName);
 
 writeBatch(db, tmpName);
 
 /* now rename (which is attomic) */
 if (rename(tmpName, fileName) < 0)
     errnoAbort("can't rename %s to %s", tmpName, fileName);
 
 writeBookMark();
 verbose(2, "atomicWriteBatch time: %.2f seconds\n", (clock1000() - time) / 1000.0);
 }
 
 struct jobDb *readBatch(char *batch)
 /* Read a batch file. */
 {
 struct jobDb *db;
 struct job *job;
 struct lineFile *lf = lineFileOpen(batch, TRUE);
 char *line;
 long time = clock1000();
 
 AllocVar(db);
 while (lineFileNext(lf, &line, NULL))
     {
     line = skipLeadingSpaces(line);
     if (line[0] == '#' || line[0] == 0)
        continue;
     job = jobCommaIn(&line, NULL);
     slAddHead(&db->jobList, job);
     ++db->jobCount;
     }
 lineFileClose(&lf);
 slReverse(&db->jobList);
 verbose(1, "%d jobs in batch\n", db->jobCount);
 verbose(2, "readBatch time: %.2f seconds\n", (clock1000() - time) / 1000.0);
 return db;
 }
 
 char *hubSingleLineQuery(char *query)
 /* Send message to hub and get single line response.
  * This should be freeMem'd when done. */
 {
 return pmHubSingleLineQuery(query, "localhost");
 }
 
 struct slName *hubMultilineQuery(char *query)
 /* Send a command with a multiline response to hub,
  * and return response as a list of strings. */
 {
 return pmHubMultilineQuery(query, "localhost");
 }
 
 boolean batchRunning(char *batchName)
 /* Return TRUE if a batch is running. */
 {
 #define NUMLISTBATCHCOLUMNS 12
 struct slName *lineList = hubMultilineQuery("listBatches"), *lineEl;
 boolean ret = FALSE;
 for (lineEl = lineList; lineEl != NULL; lineEl = lineEl->next)
     {
     int wordCount;
     char *line = lineEl->name;
     char *row[NUMLISTBATCHCOLUMNS];
     if (line[0] != '#')
 	{
 	char *b;
 	wordCount = chopLine(line, row);
 	b = row[NUMLISTBATCHCOLUMNS-1];
 	if (wordCount < NUMLISTBATCHCOLUMNS || b[0] != '/')
 	    errAbort("paraHub and para out of sync on listBatches");
 	if (sameString(b, batchName))
 	    ret = TRUE;
 	}
     }
 slFreeList(&lineList);
 return ret;
 }
 
 boolean thisBatchRunning()
 /* Return true if this batch is running */
 {
 return batchRunning(batchDir);
 }
 
 struct jobDb *parseJobList(char *jobList)
 /* parse a job list */
 {
 struct lineFile *lf = lineFileOpen(jobList, TRUE);
 char *line;
 struct jobDb *db;
 struct job *job;
 AllocVar(db);
 while (lineFileNext(lf, &line, NULL))
     {
     line = trimSpaces(line);
     if (line == NULL || line[0] == '#' || line[0] == 0)
         continue;
     ++db->jobCount;
     job = jobFromLine(lf, line);
     slAddHead(&db->jobList, job);
     }
 lineFileClose(&lf);
 slReverse(&db->jobList);
 return db;
 }
 
 void sendSetPriorityMessage(int priority)
 /* Tell hub to change priority on batch */
 {
 struct dyString *dy = newDyString(1024);
 char *result;
 if ((priority < 1) || (priority > MAX_PRIORITY))
     errAbort("Priority %d out of range, should be 1 to %d",priority,MAX_PRIORITY);
 dyStringPrintf(dy, "setPriority %s %s %d", getUser(), resultsName, priority);
 result = hubSingleLineQuery(dy->string);
 dyStringFree(&dy);
 if (result == NULL || sameString(result, "0"))
     errAbort("Couldn't set priority for %s\n", batchDir);
 freez(&result);
 verbose(1, "Told hub to set priority %d\n",priority);
 }
 
 
 void paraPriority(char *val)
 /* Tell hub to change priority on batch */
 {
 if (sameWord(val,"high"))
     priority = 1;
 else if (sameWord(val,"medium"))
     priority = NORMAL_PRIORITY;
 else if (sameWord(val,"low"))
     priority = NORMAL_PRIORITY * NORMAL_PRIORITY;
 else
     priority = atoi(val);
 sendSetPriorityMessage(priority);
 }
 
 
 void checkPrioritySetting()
 /* see if we can and need to set the priority */
 {
 if (optionVal("pri",NULL)!=NULL)
     paraPriority(optionVal("pri","medium"));
 if (optionVal("priority",NULL)!=NULL)
     paraPriority(optionVal("priority","medium"));
 }   
 
 void sendSetMaxJobMessage(int maxJob)
 /* Tell hub to change maxJob on batch */
 {
 struct dyString *dy = newDyString(1024);
 char *result;
 if (maxJob <-1) 
     errAbort("maxJob %d out of range, should be >=-1", maxJob);
 dyStringPrintf(dy, "setMaxJob %s %s %d", getUser(), resultsName, maxJob);
 result = hubSingleLineQuery(dy->string);
 dyStringFree(&dy);
 if (result == NULL || sameString(result, "-2"))
     errAbort("Couldn't set maxJob %d for %s", maxJob, batchDir);
 freez(&result);
 verbose(1, "Told hub to set maxJob %d\n",maxJob);
 }
 
 void paraMaxJob(char *val)
 /* Tell hub to change maxJob on batch */
 {
 if (sameWord(val,"unlimited"))
     maxJob = -1;
 else
     maxJob = atoi(val);
 sendSetMaxJobMessage(maxJob);
 }
 
 void checkMaxJobSetting()
 /* see if we can and need to set maxJob */
 {
 if (optionVal("maxJob",NULL)!=NULL)
     paraMaxJob(optionVal("maxJob","unlimited"));
 /* backwards compatibility */
 if (optionVal("maxNode",NULL)!=NULL)
     paraMaxJob(optionVal("maxNode","unlimited"));
 }   
 
 
 void paraCreate(char *batch, char *jobList)
 /* Create a batch database from a job list. */
 {
 char backup[PATH_LEN];
 struct jobDb *db;
 
 if (thisBatchRunning())
     errAbort("This batch is currently running.  Please para stop first.");
 makeDir(batchDir);
 makeDir(errDir);
 db = parseJobList(jobList);
 
 doChecks(db, "in");
 safef(backup, sizeof(backup), "%s.bak", batch);
 atomicWriteBatch(db, backup);
 atomicWriteBatch(db, batch);
 verbose(1, "%d jobs written to %s\n", db->jobCount, batch);
 checkPrioritySetting();
 checkMaxJobSetting();
 }
 
 void paraRecover(char *batch, char *jobList, char *newJobList)
 /* Create a new job list from an existing list based on check outs that
  * failed. */
 {
 struct jobDb *db;
 struct job *job;
 struct hash *hash = newHash(19);
 FILE *newFh = mustOpen(newJobList, "w");
 db = parseJobList(jobList);
 
 beginHappy();
 for (job = db->jobList; job != NULL; job = job->next)
     {
     if (checkOneJob(job, "out", hash, NULL))
         fprintf(newFh, "%s\n", job->spec);
     }
 endHappy();
 if (ferror(newFh))
     errAbort("write failed: %s", newJobList);
 carefulClose(&newFh);
 jobDbFree(&db);
 freeHash(&hash);
 }
 
 boolean submitJob(struct job *job)
 /* Attempt to submit job. */
 {
 struct dyString *cmd = dyStringNew(1024);
 struct submission *sub;
 char *jobId = NULL;
 
 dyStringPrintf(cmd, "addJob2 %s %s /dev/null /dev/null %s %f %lld %s",
                getUser(), jobCwd, resultsName, job->cpusUsed, job->ramUsed, job->command);
 pmCheckCommandSize(cmd->string, cmd->stringSize);
 jobId = hubSingleLineQuery(cmd->string);
 if (sameString(jobId,"0"))
     {
     warn("addJob failed - if batch is bad, correct problem and run para clearSickNodes.");
     freez(&jobId);
     }
 if (jobId != NULL)
     {
     AllocVar(sub);
     slAddHead(&job->submissionList, sub);
     job->submissionCount += 1;
     sub->submitTime = time(NULL);
     sub->host = cloneString("n/a");
     sub->id = jobId;
     sub->inQueue = TRUE;
     sub->errFile = cloneString("n/a");
     }
 dyStringFree(&cmd);
 return jobId != NULL;
 }
 
 boolean killJob(char *jobId)
 /* Tell hub to kill a job.  Return TRUE on
  * success. */
 {
 char buf[256];
 char *result = NULL;
 boolean ok;
 snprintf(buf, sizeof(buf), "removeJob %s", jobId);
 result = hubSingleLineQuery(buf);
 ok = (result != NULL && sameString(result, "ok"));
 freez(&result);
 return ok;
 }
 
 
 void statusOutputChanged()
 /* Complain about status output format change and die. */
 {
 errAbort("\n%s output format changed, please update markQueuedJobs in para.c", 
 	statusCommand);
 }
 
 int markQueuedJobs(struct jobDb *db)
 /* Mark jobs that are queued up. Return total number of jobs in queue. */
 {
 struct hash *hash = newHash(max(12,digitsBaseTwo(db->jobCount)-3));
 
 struct job *job;
 struct submission *sub;
 int queueSize = 0;
 
 long killSeconds = killTime*60;
 long warnSeconds = warnTime*60;
 long duration;
 time_t now = time(NULL);
 
 long queryTime = clock1000();
 
 /* Get job list from paraHub. */
 struct dyString *dy = newDyString(1024);
 dyStringPrintf(dy, "pstat2 %s %s", getUser(), resultsName);
 struct slName *lineList = hubMultilineQuery(dy->string), *lineEl;
 dyStringFree(&dy);
 now = time(NULL);  /* need to refresh this after we get the info */
 
 verbose(2, "pstat2 time: %.2f seconds\n", (clock1000() - queryTime) / 1000.0);
 
 long hashTime = clock1000();
 
 verbose(2, "submission hash size: %d\n", hash->size);
 
 /* Make hash of submissions based on id and clear flags. */
 for (job = db->jobList; job != NULL; job = job->next)
     {
     for (sub = job->submissionList; sub != NULL; sub = sub->next)
         {
 	hashAdd(hash, sub->id, sub);
 	sub->running = FALSE;
 	sub->inQueue = FALSE;
 	}
     }
 
 verbose(2, "submission hash time: %.2f seconds\n", (clock1000() - hashTime) / 1000.0);
 
 long pstatListTime = clock1000();
 
 /* Read status output. */
 for (lineEl = lineList; lineEl != NULL; lineEl = lineEl->next)
     {
     int wordCount;
     char *line = lineEl->name;
     char *row[6];
     if (startsWith("Total Jobs:", line))
 	{
         wordCount = chopLine(line, row);
 	queueSize = sqlSigned(row[2]);
 	verbose(1, "%d jobs (including everybody's) in Parasol queue or running.\n", queueSize);
 	continue;
 	}
     if (startsWith("Sick Batch:", line))
 	{
 	sickBatch = TRUE;
 	warn("%s", line);
 	continue;
 	}
     if (startsWith("Results Size:", line))
 	{
         wordCount = chopLine(line, row);
 	resultsSize = sqlLongLong(row[2]);
 	continue;
 	}
     wordCount = chopLine(line, row);
     if (wordCount < 6 && wordCount != 2)
 	{
 	warn("Expecting at least 6 words in pstat2 output,"
 	     " found %d. paraHub and para out of sync.", wordCount);
 	statusOutputChanged();
 	}
     else
 	{
 	char *state = row[0], *jobId = row[1], *ticks = NULL, *host = NULL;
 	time_t t = 0;
 	if (wordCount > 2)
 	    {
     	    ticks = row[4], host = row[5];
     	    t = atol(ticks);
 	    }
 	if ((sub = hashFindVal(hash, jobId)) != NULL)
 	    {
 	    if (sameString(state, "r"))
 		{
 		sub->running = TRUE;
 		sub->startTime = t;
 		if (sub->host)
 		    freeMem(sub->host);
 		sub->host = cloneString(host);
 		duration = now - sub->startTime;
                 if (duration < 0)
 		    warn("Strange start time in jobId %s: %u (now=%u)", jobId, sub->startTime, (unsigned int) now);
                 else
 		    {
 		    if (duration > killSeconds && killSeconds > 0)
 			{
     			sub->hung = TRUE;
 			sub->slow = FALSE;
 			sub->endTime = now;
 			killJob(sub->id);
                         verbose(1, "killed hung jobId: %s\n", sub->id);
 			}
 		    else if (duration > warnSeconds)
 			sub->slow = TRUE;
 		    }
 		}
 	    else
 		{
 		sub->inQueue = TRUE;
 		}
 	    }
 	}
     }
 verbose(2, "pstat list time: %.2f seconds\n", (clock1000() - pstatListTime) / 1000.0);
 
 slFreeList(&lineList);
 
 freeHash(&hash);
 verbose(2, "markQueuedJobs time (includes pstat2, hash, list): %.2f seconds\n", (clock1000() - queryTime) / 1000.0);
 return queueSize;
 }
 
 struct hash *hashResults(char *fileName)
 /* Return hash of job results keyed on id as string. */
 {
 long time = clock1000();
 struct hash *hash = newHash(0);
 if (fileExists(fileName))
     {
     struct jobResult *el, *list = jobResultLoadAll(fileName, &bookMark, resultsSize);
     for (el = list; el != NULL; el = el->next)
 	hashAdd(hash, el->jobId, el);
     }
 verbose(2, "hashResults time: %.2f seconds\n", (clock1000() - time) / 1000.0);
 return hash;
 }
 
 void freeResults(struct hash **pHash)
 /* Free up results hash and elements in it. */
 {
 struct hash *hash = *pHash;
 struct hashEl *list, *el;
 struct jobResult *jr;
 if ((hash = *pHash) != NULL)
     {
     list = hashElListHash(hash);
     for (el = list; el != NULL; el = el->next)
 	{
 	jr = el->val;
 	jobResultFree(&jr);
 	}
     hashElFreeList(&list);
     hashFree(pHash);
     }
 }
 
 
 boolean isStatusOk(int status)
 /* Convert wait() return status to return value. */
 {
 return WIFEXITED(status) && (WEXITSTATUS(status) == 0);
 }
 
 double jrCpuTime(struct jobResult *jr)
 /* Get CPU time in seconds for job. */
 {
 return 0.01 * (jr->usrTicks + jr->sysTicks);
 }
 
 double subRealTime(struct submission *sub)
 /* Get real time in seconds for job. */
 {
 /* note sub->*Time are unsigned, so we need to convert to double
  * before subtracting or time moving backwards is not detected
  */
 return ((double)sub->endTime) - ((double)sub->startTime);
 }
 
 void showSickNodes(boolean showSummary)
 /* Tell hub to show sick nodes on batch. */
 {
 int count = 0;
 struct dyString *dy = newDyString(1024);
 dyStringPrintf(dy, "showSickNodes %s %s", getUser(), resultsName);
 struct slName *lineList = hubMultilineQuery(dy->string), *lineEl;
 for (lineEl = lineList; lineEl != NULL; lineEl = lineEl->next)
     {
     ++count;
     char *line = lineEl->name;
     /* In show summary mode, only print the last line, 
      * which contains the totals.  Only print this one
      * if there's more than one line (the total is greater than zero). */
     if (!showSummary || !(lineEl->next || count == 1))
 	printf("%s\n", line);
     }
 slFreeList(&lineList);
 dyStringFree(&dy);
 }
 
 
 void markRunJobStatus(struct jobDb *db)
 /* Mark jobs based on results output file. 
  * Returns hash of results. */
 {
 struct job *job;
 struct submission *sub;
 struct hash *checkHash = newHash(0);
 struct hash *resultsHash = hashResults(resultsName);
 long time = clock1000();
 
 verbose(1, "Checking finished jobs\n");
 beginHappy();
 for (job=db->jobList; job != NULL; job = job->next)
     {
     if ((sub = job->submissionList) != NULL)
         {
 	/* Look for hitherto unclassified jobs that are either running or
 	 * possibly finished. */
 	if (!sub->queueError && !sub->inQueue && !sub->crashed && 
 		!sub->hung && !sub->running && !sub->ranOk)
 	    {
 	    struct jobResult *jr = hashFindVal(resultsHash, sub->id);
 	    if (jr == NULL)
 	        {
 		if (!sickBatch)
     		    sub->trackingError = 3;
 		}
 	    else
 	        {
 		sub->startTime = jr->startTime;
 		sub->endTime = jr->endTime;
 		sub->cpuTime = jrCpuTime(jr);
 		sub->status = jr->status;
 		sub->gotStatus = TRUE;
 		if (sub->host)
 		    freeMem(sub->host);
 		sub->host = cloneString(jr->host);
 		sub->inQueue = FALSE;
 		sub->running = FALSE;
 		sub->hung = FALSE;
 		sub->slow = FALSE;
 		if (sub->errFile)
 		    freeMem(sub->errFile);
 		sub->errFile = cloneString(jr->errFile);
 		if (isStatusOk(sub->status) && checkOneJob(job, "out", checkHash, stderr) == 0)
 		    sub->ranOk = TRUE;
 		else
 		    sub->crashed = TRUE;
 		}
 	    }
 	}
     }
 endHappy();
 freeHash(&checkHash);
 freeResults(&resultsHash);
 verbose(2, "markRunJobStatus time (includes hashResults): %.2f seconds\n", (clock1000() - time) / 1000.0);
 }
 
 boolean needsRerun(struct submission *sub)
 /* Return TRUE if submission needs to be rerun. */
 {
 if (sub == NULL)
     return TRUE;
 return sub->submitError || sub->queueError || sub->crashed || sub->trackingError || sub->hung;
 }
 
 struct jobDb *paraCycle(char *batch)
 /* Cycle forward through batch.  Return database. */
 {
 
 struct jobDb *db = readBatch(batch);
 struct job *job;
 int queueSize;
 int pushCount = 0, retryCount = 0;
 int tryCount;
 boolean finished = FALSE;
 long time = clock1000();
 
 queueSize = markQueuedJobs(db);
 
 markRunJobStatus(db);
 
 beginHappy();
 if (!sickBatch)
     for (tryCount=1; tryCount<=retries && !finished; ++tryCount)
 	{
 	for (job = db->jobList; job != NULL; job = job->next)
 	    {
 	    if (job->submissionCount < tryCount && 
 	       (job->submissionList == NULL || needsRerun(job->submissionList)))
 		{
 		if (!submitJob(job))
 		    {
 		    finished = TRUE;
 		    break;
 		    }
 		occassionalDot();
 		// occassionalSleep();
 		if (delayTime > 0)
 		    {
 		    atomicWriteBatch(db, batch);
 		    sleep(delayTime);
 		    }
 		++pushCount;
 		if (tryCount > 1)
 		    ++retryCount;
 		if (pushCount >= maxPush)
 		    {
 		    finished = TRUE;
 		    break;
 		    }
 		if (pushCount + queueSize >= maxQueue && pushCount >= minPush)
 		    {
 		    finished = TRUE;
 		    break;
 		    }
 		}
 	    }
 	}
 endHappy();
 atomicWriteBatch(db, batch);
 verbose(1, "updated job database on disk\n");
 if (pushCount > 0)
     verbose(1, "Pushed Jobs: %d\n", pushCount);
 if (retryCount > 0)
     verbose(1, "Retried jobs: %d\n", retryCount);
 verbose(2, "paraCycle time: %.2f seconds\n", (clock1000() - time) / 1000.0);
 return db;
 }
 
 void paraPush(char *batch)
 /* Push forward batch one time. */
 {
 struct jobDb *db = paraCycle(batch);
 jobDbFree(&db);
 }
 
 
 void clearSickNodes()
 /* Tell hub to clear sick nodes on batch */
 {
 struct dyString *dy = newDyString(1024);
 char *result;
 dyStringPrintf(dy, "clearSickNodes %s %s", getUser(), resultsName);
 result = hubSingleLineQuery(dy->string);
 dyStringFree(&dy);
 if (!sameString(result, "0"))
     errAbort("Couldn't clear sick nodes for %s", batchDir);
 freez(&result);
 verbose(1, "Told hub to clear sick nodes\n");
 }
 
 
 void paraShove(char *batch)
 /* Push batch of jobs and keep pushing until it's finished, polling
  * parasol every 5 minutes. */
 {
 struct jobDb *db;
 struct job *job;
 struct submission *sub;
 int maxSleep = 5*60;
 int curSleep = 15;
 time_t start = time(NULL), now;
 int sickBatchTries = 0;
 #define MAXSICKBATCHTRIES 3
 #define SICKBATCHSLEEP 10*60
 
 for (;;)
     {
     boolean anyUnfinished = FALSE;
     db = paraCycle(batch);
     for (job = db->jobList; job != NULL; job = job->next)
         {
 	if ((sub = job->submissionList) == NULL)
 	    anyUnfinished = TRUE;
 	else
 	    {
 	    enum jaState state = figureState(job);
 	    if (job->submissionCount >= retries)
 	        {
 		if (state != jaUnsubmitted && state != jaQueued && state != jaRunning)
 		    errAbort("Batch failed after %d tries on %s", retries, job->command);
 		}
 	    if (state != jaFinished)
 	        anyUnfinished = TRUE;
 	    }
 	}
     jobDbFree(&db);
     if (!anyUnfinished)
         break;
     showSickNodes(TRUE);
     fflush(stdout);
     fflush(stderr);
     if (sickBatch)
 	{
 	++sickBatchTries;
 	if (sickBatchTries >= MAXSICKBATCHTRIES)
     	    break;
 	warn("Sick batch! will sleep %d minutes, clear sick nodes and retry", SICKBATCHSLEEP/60);
 	sleep(SICKBATCHSLEEP);
 	sickBatch = FALSE;
         clearSickNodes(); 
 	}
     else
 	{
 	sickBatchTries = 0;
 	sleep(curSleep);
 	if (curSleep < maxSleep)
 	    curSleep += 15;
 	}
     now = time(NULL);
     verbose(1, "================\n");
     verbose(1, "Checking job status %d minutes after launch\n",  round((now-start)/60.0));
     }
 if (sickBatch)
     errAbort("Sick batch! Correct problem and then run para clearSickNodes.");
 else
     verbose(1, "Successful batch!\n");
 }
 
 void paraMake(char *batch, char *spec)
 /* Do a create and then a shove. */
 {
 paraCreate(batch, spec);
 paraShove(batch);
 }
 
 void reportOnJobs(struct jobDb *db)
 /* Report on status of jobs. */
 {
 int submitError = 0, inQueue = 0, queueError = 0, trackingError = 0, running = 0, crashed = 0,
     slow = 0, hung = 0, ranOk = 0, unsubmitted = 0, total = 0, failed = 0;
 struct job *job;
 struct submission *sub;
 
 for (job = db->jobList; job != NULL; job = job->next)
     {
     if ((sub = job->submissionList) != NULL)	/* Get most recent submission if any. */
         {
 	if (sub->submitError) ++submitError;
 	if (sub->queueError) ++queueError;
 	if (sub->trackingError) ++trackingError;
 	if (sub->inQueue) ++inQueue;
 	if (sub->crashed) ++crashed;
 	if (sub->slow) ++slow;
 	if (sub->hung) ++hung;
 	if (sub->running) ++running;
 	if (sub->ranOk) ++ranOk;
 	if (job->submissionCount >= retries && needsRerun(sub))
 	     ++failed;
 	}
     else
         ++unsubmitted;
     ++total;
     }
 if (unsubmitted > 0)
    printf("unsubmitted jobs: %d\n", unsubmitted);
 if (submitError > 0)
    printf("submission errors: %d\n", submitError);
 if (queueError > 0)
    printf("queue errors: %d\n", queueError);
 if (trackingError > 0)
    {
    if (!fileExists(resultsName))
        printf("%s: file not found.  paraHub can't write to this dir?\n", resultsName);
    else
        printf("tracking errors: %d\n", trackingError);
    }
 if (inQueue > 0)
    printf("queued and waiting: %d\n", inQueue);
 if (crashed > 0)
    printf("crashed: %d\n", crashed);
 if (slow > 0)
    printf("slow (> %d minutes): %d\n", warnTime, slow);
 if (hung > 0)
    printf("hung (> %d minutes): %d\n", killTime, hung);
 if (running > 0)
    printf("running: %d\n", running);
 if (ranOk > 0)
    printf("ranOk: %d\n", ranOk);
 if (failed > 0)
    printf("failed %d times: %d\n", retries, failed);
 printf("total jobs in batch: %d\n", total);
 }
 
 void paraListFailed(char *batch)
 /* List all jobs that failed. */
 {
 struct jobDb *db = readBatch(batch);
 struct job *job;
 struct submission *sub;
 
 markQueuedJobs(db);
 markRunJobStatus(db);
 for (job = db->jobList; job != NULL; job = job->next)
     {
     sub = job->submissionList;
     if (sub != NULL)
         {
 	if (job->submissionCount >= retries && needsRerun(sub))
 	    printf("%s\n", job->spec);
 	}
     }
 }
 
 void paraListState(char *batch, enum jaState targetState)
 /* List all jobs that match target state. */
 {
 struct jobDb *db = readBatch(batch);
 struct job *job;
 markQueuedJobs(db);
 markRunJobStatus(db);
 for (job = db->jobList; job != NULL; job = job->next)
     {
     enum jaState state = figureState(job);
     if (state == targetState)
 	printf("%s\n", job->spec);
     }
 }
 
 struct submission* getLastSubmission(struct job *job)
 /* get the last submission for a job, or NULL if not submitted */
 {
 struct submission* sub = job->submissionList;
 while ((sub != NULL) && (sub->next != NULL))
     sub = sub->next;
 return sub;
 }
 
 /* header for job status output */
 static char *jobStatusHdr =
     "#state\ttries\treal\tcpu\thost\tjobid\tcmd\n";
 
 void paraJobStatus(struct job *job, time_t now)
 /* Print status of a job. */
 {
 enum jaState state = figureState(job);
 char *stateStr = jaStateShortDesc[state];
 double realTime = 0.0;
 double cpuTime = 0.0;
 struct submission *sub = getLastSubmission(job);
 char *jobId = "";
 char *host = "";
 
 if (sub != NULL)
     {
     if (sub->trackingError)
         stateStr = "track";
     if (state == jaRunning)
         realTime = (now - sub->startTime);
     else
         {
         realTime = subRealTime(sub);
         cpuTime = sub->cpuTime;
         jobId = sub->id;
         }
     host = sub->host;
     }
 
 subChar(job->command, '\t', ' ');  /* tabs not allowed in column */
 
 printf("%s\t%d\t%0.2f\t%0.2f\t%s\t%s\t%s\n",
        stateStr,
        job->submissionCount,
        realTime, cpuTime, host, jobId,
        job->command);
 }
 
 void paraStatus(char *batch)
 /* Print status of all jobs. */
 {
 struct jobDb *db = readBatch(batch);
 struct job *job;
 time_t now = time(NULL);
 
 markQueuedJobs(db);
 markRunJobStatus(db);
 printf("%s", jobStatusHdr);
 for (job = db->jobList; job != NULL; job = job->next)
     paraJobStatus(job, now);
 }
 
 void printErrFile(struct submission *sub)
 /* Print error file if it exists. */
 {
 char localName[PATH_LEN];
 safef(localName, sizeof(localName), "%s/%s", errDir, sub->id);
 if (!fileExists(localName))
     {
     pmFetchFile(sub->host, sub->errFile, localName);
     }
 if (fileExists(localName))
     {
     char *buf;
     size_t size;
     printf("stderr:\n");
     readInGulp(localName, &buf, &size);
     mustWrite(stdout, buf, size);
     freez(&buf);
     }
 }
 
 void problemReport(struct job *job, struct submission *sub, char *type)
 /* Print report on one problem. */
 {
 struct check *check;
 struct hash *hash = newHash(0);
 
 printf("job: %s\n", job->command);
 printf("id: %s\n", sub->id);
 printf("failure type: %s\n", type);
 if (sameString(type, "crash"))
     {
     time_t startTime = sub->startTime;
     printf("host: %s\n", sub->host);
     printf("start time: %s", ctime(&startTime)); /* ctime adds \n */
     printf("return: ");
     if (WIFEXITED(sub->status))
 	printf("%d\n", WEXITSTATUS(sub->status));
     else if (WIFSIGNALED(sub->status))
 	printf("signal %d\n", WTERMSIG(sub->status));
     else if (WIFSTOPPED(sub->status))
 	printf("stopped %d\n", WSTOPSIG(sub->status));
     else 
 	printf("unknown wait status %d\n", sub->status);
     for (check = job->checkList; check != NULL; check = check->next)
 	doOneCheck(check, hash, stdout);
     printErrFile(sub);
     }
 printf("\n");
 hashFree(&hash);
 }
 
 void paraProblems(char *batch)
 /*  List jobs that had problems (even if successfully rerun).  Includes host info */
 {
 struct jobDb *db = readBatch(batch);
 struct job *job;
 struct submission *sub;
 int problemCount = 0;
 
 markQueuedJobs(db);
 markRunJobStatus(db);
 for (job = db->jobList; job != NULL; job = job->next)
     {
     for (sub = job->submissionList; sub != NULL; sub = sub->next)
 	{
 	if (sub->hung)
 	    {
 	    problemReport(job, sub, "hung");
 	    ++problemCount;
 	    }
 	else if (sub->slow)
 	    {
 	    problemReport(job, sub, "slow");
 	    ++problemCount;
 	    }
 	else if (sub->trackingError)
 	    {
 	    problemReport(job, sub, "tracking error");
 	    ++problemCount;
 	    }
 	else if (needsRerun(sub))
 	    {
 	    problemReport(job, sub, "crash");
 	    ++problemCount;
 	    }
 	}
     }
 printf("%d problems total\n", problemCount);
 }
 
 void runningReport(struct job *job, struct submission *sub)
 /* Print report on one running job. */
 {
 time_t startTime = sub->startTime;
 int duration = time(NULL) - startTime;
 
 printf("command: %s\n", job->command);
 printf("jobId: %s\n", sub->id);
 printf("host: %s\n", sub->host);
 printf("start time: %s", ctime(&startTime)); /* ctime adds \n */
 printf("run time so far: %d sec,  %4.2f min, %4.2f hours,  %4.2f days\n", 
 	duration, duration/60.0, duration/3600.0,  duration/(3600.0*24.0));
 if (sub->slow)
   printf("This is a slow job, running time > %d minutes.\n", warnTime);
 printf("\n");
 }
 
 void paraRunning(char *batch, char *minTimeS)
 /*  List jobs that are running.  Includes host and time info */
 {
 struct jobDb *db = readBatch(batch);
 struct job *job;
 struct submission *sub;
 int runCount = 0;
 int minTime = 0;
 
 if (minTimeS)
     minTime = atoi(minTimeS);
 markQueuedJobs(db);
 markRunJobStatus(db);
 for (job = db->jobList; job != NULL; job = job->next)
     {
     if ((sub = job->submissionList) != NULL && sub->running)
 	{
 	time_t startTime = sub->startTime;
 	int duration = time(NULL) - startTime;
 	if (duration > (minTime * 60))
 	    {
 	    runningReport(job, sub);
 	    ++runCount;
 	    }
 	}
     }
 printf("total %s running: %d\n", !minTimeS ? "jobs" : "hippos", runCount);
 }
 
 
 void sendChillMessage()
 /* Tell hub to chill out on job */
 {
 struct dyString *dy = newDyString(1024);
 char *result;
 dyStringPrintf(dy, "chill %s %s", getUser(), resultsName);
 result = hubSingleLineQuery(dy->string);
 dyStringFree(&dy);
 if (result == NULL || !sameString(result, "ok"))
     errAbort("Couldn't chill %s\n", batchDir);
 freez(&result);
 verbose(1, "Told hub to chill out\n");
 }
 
 void paraResetCounts()
 /* Send msg to hub to reset done and crashed counts on batch */
 {
 struct dyString *dy = newDyString(1024);
 char *result;
 dyStringPrintf(dy, "resetCounts %s %s", getUser(), resultsName);
 result = hubSingleLineQuery(dy->string);
 dyStringFree(&dy);
 if (result == NULL || sameString(result, "-2"))
     errAbort("Couldn't reset done and crashed counts on batch %s\n", batchDir);
 freez(&result);
 verbose(1, "Told hub to reset done and crashed counts on batch %s\n", batchDir);
 }
 
 
 void freeBatch()
 /* Send msg to hub to reset done and crashed counts on batch */
 {
 struct dyString *dy = newDyString(1024);
 char *result;
 dyStringPrintf(dy, "freeBatch %s %s", getUser(), resultsName);
 result = hubSingleLineQuery(dy->string);
 dyStringFree(&dy);
 verbose(1, "Told hub to free all batch-related resources\n");
 if (result == NULL)
     errAbort("result == NULL");
 if (sameOk(result, "-3"))
     errAbort("User not found.");
 if (sameOk(result, "-2"))
     errAbort("Batch not found.");
 if (sameOk(result, "-1"))
     warn("Unable to free batch.  Jobs are queued or running.");
 if (sameOk(result, "0"))
     verbose(1, "Batch freed.\n");
 freez(&result);
 }
 
 
 void flushResults()
 /* Send msg to hub to flush results file */
 {
 struct dyString *dy = newDyString(1024);
 char *result;
 dyStringPrintf(dy, "flushResults %s %s", getUser(), resultsName);
 result = hubSingleLineQuery(dy->string);
 dyStringFree(&dy);
 verbose(1, "Told hub to flush the results file\n");
 if (result == NULL)
     errAbort("result == NULL");
 if (sameOk(result, "-3"))
     errAbort("User not found.");
 if (sameOk(result, "-2"))
     errAbort("Batch not found.");
 if (sameOk(result, "-1"))
     warn("Flushed results. Some jobs are still queued or running.");
 if (sameOk(result, "0"))
     verbose(1, "Flushed results.\n");
 freez(&result);
 }
 
 
 
 void paraCheck(char *batch)
 /* Check on progress of a batch. */
 {
 struct jobDb *db = readBatch(batch);
 
 markQueuedJobs(db);
 markRunJobStatus(db);
 reportOnJobs(db);
 
 atomicWriteBatch(db, batch);
 showSickNodes(TRUE);
 if (sickBatch)
     errAbort("Sick batch! Correct problem and then run para clearSickNodes.");
 }
 
 
 
 int cleanTrackingErrors(struct jobDb *db)
 /* Remove submissions with tracking errors. 
  * Returns count of these submissions */
 {
 int count = 0;
 struct job *job;
 struct submission *sub;
 for (job = db->jobList; job != NULL; job = job->next)
     {
     sub = job->submissionList;
     if (sub != NULL)
         {
 	if (sub->trackingError)
 	    {
 	    job->submissionCount -= 1;
 	    job->submissionList = sub->next;
 	    ++count;
 	    }
 	}
     }
 verbose(1, "Chilled %d jobs\n", count);
 return count;
 }
 
 void removeChilledSubmissions(char *batch)
 /* Remove submissions from job database if we
  * asked them to chill out. */
 {
 struct jobDb *db = readBatch(batch);
 int chillCount;
 
 markQueuedJobs(db);
 markRunJobStatus(db);
 chillCount = cleanTrackingErrors(db);
 atomicWriteBatch(db, batch);
 }
 
 void paraChill(char *batch)
 /*  Tells system to not launch more jobs in this batch, but
  *  does not stop jobs that are already running.\n */
 {
 sendChillMessage();
 removeChilledSubmissions(batch);
 }
 
 void paraStopAll(char *batch)
 /* Stop batch of jobs. */
 {
 struct jobDb *db = readBatch(batch);
 struct job *job;
 struct submission *sub;
 int killCount = 0, missCount = 0;
 
 markQueuedJobs(db);
 markRunJobStatus(db);
 cleanTrackingErrors(db);
 for (job = db->jobList; job != NULL; job = job->next)
     {
     sub = job->submissionList;
     if (sub != NULL)
         {
 	if (sub->inQueue || sub->running)
 	    {
 	    if (killJob(sub->id))
 		{
 		job->submissionCount -= 1;
 		job->submissionList = sub->next;
 		killCount += 1;
 		}
 	    else
 	        {
 		missCount += 1;
 		}
 	    }
 	}
     }
 verbose(1, "%d running jobs stopped\n", killCount);
 if (missCount > 0)
     printf("%d jobs not stopped - try another para stop in a little while\n",
     	missCount);
 atomicWriteBatch(db, batch);
 }
 
 void paraStop(char *batch)
 /* Stop batch of jobs. */
 {
 sendChillMessage();	/* Remove waiting jobs first, it's faster. */
 paraStopAll(batch);
 }
 
 void printTimes(char *title, double seconds,  boolean showYears)
 /* Print out times in seconds, hours, days, maybe years. */
 {
 printf("%-27s %9llds %10.2fm %8.2fh %7.2fd", 
    title, roundll(seconds), seconds/60, seconds/3600, seconds/(3600*24));
 if (showYears)
      printf(" %6.3f y", seconds/(3600*24*365));
 printf("\n");
 }
 
 long calcFirstToLast(struct jobDb *db)
 /* Calculate time between first submission and last job finish. */
 {
 boolean first = TRUE;
 struct job *job;
 struct submission *sub;
 long now = time(NULL);
 long subTime, firstSub = BIGNUM, endTime, lastEnd = 0;
 boolean gotEnd = FALSE;
 
 for (job = db->jobList; job != NULL; job = job->next)
     {
     if ((sub = job->submissionList) != NULL)
         {
 	subTime = sub->submitTime;
 	if (subTime != 0 && subTime < now)	/* Protect against wacked out clocks. */
 	    {
 	    if (first)
 		{
 		firstSub = subTime;
 		first = FALSE;
 		}
 	    else
 		{
 		if (subTime < firstSub) 
 		    firstSub = subTime;
 		}
 	    }
 	if (sub->endTime != 0)
 	    {
 	    endTime = sub->endTime;
 	    if (endTime < now)	/* Protect against wacked out clocks. */
 		{
 		if (endTime > lastEnd) 
 		    {
 		    lastEnd = endTime;
 		    gotEnd = TRUE;
 		    }
 		}
 	    }
 	}
     }
 if (gotEnd)
     return lastEnd - firstSub;
 else
     return now - firstSub;
 }
 
 
 void paraRam(char *batch, char *val)
 /* set batch ram = val */
 {
 long long newRam = paraParseRam(val);
 if (newRam == -1)
     usage();
 struct jobDb *db = readBatch(batch);
 struct job *job;
 for (job = db->jobList; job != NULL; job = job->next)
     {
     job->ramUsed = newRam;
     }
 atomicWriteBatch(db, batch);
 }
 
 void paraCpu(char *batch, char *val)
 /* set batch cpu = val */
 {
 float newCpus = sqlFloat(val);
 if (newCpus < 0)
     usage();
 struct jobDb *db = readBatch(batch);
 struct job *job;
 for (job = db->jobList; job != NULL; job = job->next)
     {
     job->cpusUsed = newCpus;
     }
 atomicWriteBatch(db, batch);
 }
 
 void paraTimes(char *batch)
 /* Report times of run. */
 {
 struct jobDb *db = readBatch(batch);
 double totalCpu = 0, totalWall = 0;
 double oneWall, longestWall = 0;
 struct job *job;
 char *longestWallId = NULL;
 struct submission *sub;
 int jobCount = 0;
 int runningCount = 0;
 int timedCount = 0;
 int crashCount = 0;
 int queueCount = 0;
 double runTime = 0, longestRun = 0;
 int otherCount = 0;
 long now = time(NULL);
 double ioTime;
 
 markQueuedJobs(db);
 markRunJobStatus(db);
 for (job = db->jobList; job != NULL; job = job->next)
     {
     ++jobCount;
     if ((sub = job->submissionList) != NULL)
         {
 	if (sub->running)
 	   {
 	   int oneTime = now - sub->startTime;
 	   if (oneTime < 0)
 	       warn("Strange start time in %s: %u", batch, sub->startTime);
 	   else
 	       runTime += oneTime;
 	   if (oneTime > longestRun)
 	     longestRun = oneTime;
 	   ++runningCount;
 	   }
 	else if (sub->inQueue)
 	   {
 	   ++queueCount;
 	   }
 	else  if (sub->crashed || sub->hung)
 	   {
 	   ++crashCount;
 	   }
 	else if (sub->ranOk)
 	   {
 	   ++timedCount;
 	   totalCpu += sub->cpuTime;
 	   oneWall = subRealTime(sub);
 	   if (oneWall < 0)	/* Protect against clock reset. */
 	       {
 	       warn("End before start job %s host %s", sub->id, sub->host);
 	       warn("Start %u,  End %u", sub->startTime, sub->endTime);
 	       oneWall = totalCpu;	
 	       }
 	   totalWall += oneWall;
 	   if (oneWall > longestWall) 
 	       {
 	       longestWall = oneWall;
 	       longestWallId = sub->id;
 	       }
 	   }
        else
 	   {
 	   ++otherCount;
 	   }
 	}
     }
 
 printf("Completed: %d of %d jobs\n", timedCount, jobCount);
 if (runningCount > 0)
     printf("Jobs currently running: %d\n", runningCount);
 if (crashCount > 0)
     printf("Crashed: %d jobs\n", crashCount);
 if (otherCount > 0)
     {
     if (!fileExists(resultsName))
 	printf("%s: file not found.  paraHub can't write to this dir?\n", resultsName);
     else
 	printf("Other count: %d jobs\n", otherCount);
     }
 if (queueCount > 0)
     printf("In queue waiting: %d jobs\n", queueCount);
 printTimes("CPU time in finished jobs:", totalCpu, TRUE);
 ioTime = totalWall - totalCpu;
 if (ioTime < 0) ioTime = 0;
 printTimes("IO & Wait Time:", ioTime, TRUE);
 if (runningCount > 0)
     {
     printTimes("Time in running jobs:", runTime, TRUE);
     }
 if (timedCount > 0)
     {
     printTimes("Average job time:", totalWall/timedCount, FALSE);
     if (runningCount > 0)
         printTimes("Longest running job:", longestRun, FALSE);
     printTimes("Longest finished job:", longestWall, FALSE);
     printTimes("Submission to last job:", calcFirstToLast(db), FALSE);
     if (runningCount < 1)
         printTimes("Estimated complete:", 0.0, FALSE);
     else
         {
         int jobsInBatch = db->jobCount;
         int jobsToRun = jobsInBatch - timedCount;
         double timeMultiple = (double)jobsToRun / (double)runningCount;
         verbose(2, "inBatch: %d, toRun: %d, multiple: %.3f\n",
 		jobsInBatch, jobsToRun, timeMultiple);
         printTimes("Estimated complete:",
                    timeMultiple*totalWall/timedCount, FALSE);
 	}
     }
 atomicWriteBatch(db, batch);
 }
 
 static char *determineCwdOptDir(char *optName)
 /* determine batch or job directory from cwd and option */
 {
 char path[PATH_LEN];
 char *opt = optionVal(optName, NULL);
 if (opt == NULL)
     safecpy(path, sizeof(path), getCurrentDir());
 else if (opt[0] != '/')
     {
     safecpy(path, sizeof(path), getCurrentDir());
     safecat(path, sizeof(path), "/");
     safecat(path, sizeof(path), opt);
     }
 else
     safecpy(path, sizeof(path), opt);
 // remove trailing "/" or "/." from name.  While the whole path should be normalized,
 // this at least removes a common difference when using file name completion.
 if (endsWith(path, "/."))
     path[strlen(path)-2] = '\0';
 if (endsWith(path, "/"))
     path[strlen(path)-1] = '\0';
 
 return cloneString(path);
 }
 
 int main(int argc, char *argv[])
 /* Process command line. */
 {
 char *command;
 char batch[PATH_LEN];
 long startTime = clock1000();
 
 optionInit(&argc, argv, optionSpecs);
 if (argc < 2)
     usage();
 retries = optionInt("retries",  retries);
 maxQueue = optionInt("maxQueue",  maxQueue);
 minPush = optionInt("minPush",  minPush);
 maxPush = optionInt("maxPush",  maxPush);
 warnTime = optionInt("warnTime", warnTime);
 killTime = optionInt("killTime", killTime);
 delayTime = optionInt("delayTime", delayTime);
 if (optionExists("eta"))
     fprintf(stderr, "note: the -eta option is no longer required\n");
 cpuUsage = optionFloat("cpu", cpuUsage);
 if (cpuUsage < 0)
     usage();
 ramUsage = paraParseRam(optionVal("ram","0"));
 if (ramUsage == -1)
     usage();
 batchDir = determineCwdOptDir("batch");
 jobCwd = determineCwdOptDir("jobCwd");
 safef(resultsName, sizeof(resultsName), "%s/%s", batchDir, resultsFileName);
 safef(errDir, sizeof(errDir), "%s/err", batchDir);
 safef(bookMarkName, sizeof(bookMarkName), "%s/%s", batchDir, bookMarkFileName);
 
 command = argv[1];
 safef(batch, sizeof(batch), "%s/batch", batchDir);
 
 if (!sameWord(command,"create") && !sameWord(command,"make"))
     {
     checkPrioritySetting();
     checkMaxJobSetting();
     }
 
 pushWarnHandler(paraVaWarn);
 
 readBookMark();  /* read the para.bookmark file to initialize bookmark */
 
 if (sameWord(command, "create") || sameWord(command, "creat"))
     {
     if (argc != 3)
         usage();
     paraCreate(batch, argv[2]);
     }
 else if (sameWord(command, "recover"))
     {
     if (argc != 4)
         usage();
     paraRecover(batch, argv[2], argv[3]);
     }
 else if (sameWord(command, "check"))
     {
     paraCheck(batch);
     }
 else if (sameWord(command, "push"))
     {
     paraPush(batch);
     }
 else if (sameWord(command, "shove"))
     {
     paraShove(batch);
     }
 else if (sameWord(command, "make"))
     {
     if (argc != 3)
         usage();
     paraMake(batch, argv[2]);
     }
 else if (sameWord(command, "try"))
     {
     maxPush = 10;
     paraPush(batch);
     }
 else if (sameWord(command, "stop"))
     {
     paraStop(batch);
     }
 else if (sameWord(command, "chill"))
     {
     paraChill(batch);
     }
 else if (sameWord(command, "hung"))
     {
     paraListState(batch, jaHung);
     }
 else if (sameWord(command, "slow"))
     {
     char temp[256];
     safef(temp, sizeof(temp), "%d", warnTime);
     paraRunning(batch, temp);
     }
 else if (sameWord(command, "crashed"))
     {
     paraListState(batch, jaCrashed);
     }
 else if (sameWord(command, "failed"))
     {
     paraListFailed(batch);
     }
 else if (sameWord(command, "finished"))
     {
     paraListState(batch, jaFinished);
     }
 else if (sameWord(command, "status"))
     {
     paraStatus(batch);
     }
 else if (sameWord(command, "problems") || sameWord(command, "problem"))
     {
     paraProblems(batch);
     }
 else if (sameWord(command, "running") || sameWord(command, "hippos") || sameWord(command, "hippo"))
     {
     paraRunning(batch, argv[2]);
     }
 else if (sameWord(command, "time") || sameWord(command, "times"))
     {
     paraTimes(batch);
     }
 else if (sameWord(command, "priority"))
     {
     if (argc != 3)
         usage();
     paraPriority(argv[2]);
     }
 else if (sameWord(command, "maxJob") || sameWord(command, "maxNode"))
     {
     if (argc != 3)
         usage();
     /* backwards compatibility */
     if (sameWord(command, "maxNode"))
 	warn("maxNode deprecated, use maxJob");
     paraMaxJob(argv[2]);
     }
 else if (sameWord(command, "ram"))
     {
     if (argc != 3)
         usage();
     paraRam(batch, argv[2]);
     }
 else if (sameWord(command, "cpu"))
     {
     if (argc != 3)
         usage();
     paraCpu(batch, argv[2]);
     }
 else if (sameWord(command, "resetCounts"))
     {
     if (argc != 2)
         usage();
     paraResetCounts();
     }
 else if (sameWord(command, "freeBatch"))
     {
     if (argc != 2)
         usage();
     freeBatch();
     }
 else if (sameWord(command, "flushResults"))
     {
     if (argc != 2)
         usage();
     flushResults();
     }
 else if (sameWord(command, "clearSickNodes"))
     {
     clearSickNodes();
     }
 else if (sameWord(command, "showSickNodes"))
     {
     showSickNodes(FALSE);
     }
 else
     {
     errAbort("Unrecognized command '%s'.  Run para with no arguments for usage summary", 
     	command);
     }
 
 verbose(2, "Total para time: %.2f seconds\n", (clock1000() - startTime) / 1000.0);
 
 return 0;
 }