4448634af67576d93b95e8b3c863b05943cd27b9
galt
  Mon Sep 9 14:37:08 2013 -0700
ubuntu needs #include <sys/resource.h>
diff --git src/parasol/paraNode/paraNode.c src/parasol/paraNode/paraNode.c
index 278bfb5..7b7c7b0 100644
--- src/parasol/paraNode/paraNode.c
+++ src/parasol/paraNode/paraNode.c
@@ -1,917 +1,917 @@
 /* paraNode - parasol node server. */
 
 #include "paraCommon.h"
 #include <grp.h>
 #include "errabort.h"
 #include "dystring.h"
 #include "dlist.h"
 #include "hash.h"
 #include "localmem.h"
 #include "options.h"
 #include "subText.h"
 #include "paraLib.h"
 #include "net.h"
 #include "portable.h"
 #include "rudp.h"
 #include "paraMessage.h"
 #include "internet.h"
 #include "log.h"
-
+#include <sys/resource.h>
 
 /* command line option specifications */
 static struct optionSpec optionSpecs[] = {
     {"logFacility", OPTION_STRING},
     {"logMinPriority", OPTION_STRING},
     {"log", OPTION_STRING},
     {"debug", OPTION_BOOLEAN},
     {"hub", OPTION_STRING},
     {"umask", OPTION_INT},
     {"userPath", OPTION_STRING},
     {"sysPath", OPTION_STRING},
     {"env", OPTION_STRING|OPTION_MULTI},
     {"randomDelay", OPTION_INT},
     {"cpu", OPTION_INT},
     {"localhost", OPTION_STRING},
     {NULL, 0}
 };
 
 char *version = PARA_VERSION;   /* Version number. */
 
 void usage()
 /* Explain usage and exit. */
 {
 errAbort("paraNode - version %s\n"
 	 "Parasol node server.\n"
          "usage:\n"
 	 "    paraNode start\n"
 	 "options:\n"
 	 "    -logFacility=facility  Log to the specified syslog facility - default local0.\n"
          "    -logMinPriority=pri minimum syslog priority to log, also filters file logging.\n"
          "     defaults to \"warn\"\n"
          "    -log=file  Log to file instead of syslog.\n"
          "    -debug  Don't daemonize\n"
 	 "    -hub=host  Restrict access to connections from hub.\n"
 	 "    -umask=000  Set umask to run under - default 002.\n"
 	 "    -userPath=bin:bin/i386  User dirs to add to path.\n"
 	 "    -sysPath=/sbin:/local/bin  System dirs to add to path.\n"
          "    -env=name=value - add environment variable to jobs.  Maybe repeated.\n"
 	 "    -randomDelay=N  Up to this many milliseconds of random delay before\n"
 	 "        starting a job.  This is mostly to avoid swamping NFS with\n"
 	 "        file opens when loading up an idle cluster.  Also it limits\n"
 	 "        the impact on the hub of very short jobs. Default 5000.\n"
 	 "    -cpu=N  Number of CPUs to use - default 1.\n"
 	, version
 	);
 }
 
 
 /* Command line overwriteable variables. */
 char *hubName;			/* Name of hub machine, may be NULL. */
 int umaskVal = 0002;		/* File creation mask. */
 int maxProcs = 1;		/* Number of processers allowed to use. */
 char *userPath = "";		/* User stuff to add to path. */
 char *sysPath = "";		/* System stuff to add to path. */
 struct slName *envExtra = NULL; /* Add to environment */ 
 int randomDelay = 5000;		/* How much to delay job startup. */
 
 /* Other globals. */
 char *hostName;			/* Name of this machine. */
 in_addr_t hubIp;		/* Hub IP address. */
 in_addr_t localIp;		/* localhost IP address. */
 int busyProcs = 0;		/* Number of processers in use. */
 struct rudp *mainRudp;		/* Rudp wrapper around main socket. */ 
 struct paraMessage pmIn;	/* Input message */
 double ticksToHundreths;	/* Conversion factor from system ticks
                                  * to 100ths of second. */
 
 struct job
 /* Info on one job in this node. */
     {
     struct job *next;	/* Next job. */
     int jobId;		/* Job ID for hub. */
     int pid;		/* Process ID of running job. */
     char *startMessage;	/* Full message that started this job. */
     char *doneMessage;  /* Full message that ended this job if any. */
     struct dlNode *node; /* Node for list this is on. */
     };
 struct dlList *jobsRunning;		/* List of currently running jobs. */
 struct dlList *jobsFinished;	/* List of recent finished jobs. */
 
 struct job *findJobOnList(struct dlList *list, int jobId)
 /* Return job if it's on list, otherwise NULL */
 {
 struct dlNode *node;
 struct job *job;
 for (node = list->head; !dlEnd(node); node = node->next)
     {
     job = node->val;
     if (job->jobId == jobId)
         return job;
     }
 return NULL;
 }
 
 struct job *findRunningJob(int jobId)
 /* Return job if it's on running list, otherwise NULL */
 {
 return findJobOnList(jobsRunning, jobId);
 }
 
 struct job *findFinishedJob(int jobId)
 /* Return recently finished job or NULL if it doesn't exist */
 {
 return findJobOnList(jobsFinished, jobId);
 }
 
 extern char **environ;	/* The environment strings. */
 
 char **hashToEnviron(struct hash *hash)
 /* Create an environ formatted string from hash. */
 {
 struct dyString *dy = newDyString(512);
 struct hashEl *list = hashElListHash(hash), *el;
 char **newEnv;
 int envCount = slCount(list), i;
 AllocArray(newEnv, envCount+1);
 for (i=0, el=list; i<envCount; ++i, el = el->next)
     {
     dyStringClear(dy);
     dyStringAppend(dy, el->name);
     dyStringAppend(dy, "=");
     dyStringAppend(dy, el->val);
     newEnv[i] = cloneString(dy->string);
     }
 freeDyString(&dy);
 hashElFreeList(&list);
 return newEnv;
 }
 
 struct hash *environToHash(char **env)
 /* Put environment into hash. */
 {
 struct hash *hash = newHash(7);
 char *name, *val, *s;
 while ((s = *env++) != NULL)
     {
     name = cloneString(s);
     val = strchr(name, '=');
     if (val != NULL)
         {
 	*val++ = 0;
 	hashAdd(hash, name, cloneString(val));
 	}
     freez(&name);
     }
 return hash;
 }
 
 unsigned randomNumber;	/* Last random number generated. */
 
 void initRandom()
 /* Initialize random number generator */
 {
 /* Set up random number generator with seed depending on host name. */
 unsigned seed = hashCrc(hostName)&0xfffff;
 srand(seed);
 }
 
 void nextRandom()
 /* Generate next random number.  This is in
  * executed main thread in response to any message
  * rather than in child threads. */
 {
 randomNumber = rand();
 }
 
 void randomSleep()
 /* Put in a random sleep time of 0-5 seconds to help
  * prevent overloading hub with jobDone messages all
  * at once, and file server with file open requests
  * all at once. */
 {
 if (randomDelay > 0)
     {
     int sleepTime = randomNumber%randomDelay;
     sleep1000(sleepTime);
     }
 }
 
 
 void hashUpdate(struct hash *hash, char *name, char *val)
 /* Update hash with name/val pair.   If name not in hash
  * already, put it in hash, otherwise free up old value
  * associated with name and put new value in it's place. */
 {
 struct hashEl *hel = hashLookup(hash, name);
 val = cloneString(val);
 if (hel == NULL)
     hashAdd(hash, name, val);
 else
     {
     freez(&hel->val);
     hel->val = val;
     }
 }
 
 void changeUid(char *name, char **retDir)
 /* Try and change process user (and groups) id to that of name. if 
  * we are root.  Get home dir no matter what. */
 {
 struct passwd *pw = getpwnam(name);
 if (pw == NULL)
     errnoAbort("can't obtain user passwd entry for user %s", name);
 if (geteuid() == 0)
     {
     if (initgroups(name, pw->pw_gid) < 0)
         errnoAbort("initgroups failed");
     if (setgid(pw->pw_gid) < 0)
         errnoAbort("setgid failed");
     if (setuid(pw->pw_uid) < 0)
         errnoAbort("setuid failed");
     }
 if (retDir != NULL)
     *retDir = pw->pw_dir;
 }
 
 static int grandChildId = 0;
 
 void termHandler()
 /* Handle termination signal. */
 {
 if (grandChildId != 0)
     {
     kill(-grandChildId, SIGTERM);
     grandChildId = 0;
     // sleep(3);
     // kill(-grandChildId, SIGKILL);
     }
 }
 
 void updatePath(struct hash *hash, char *userPath, 
 	char *homeDir, char *sysPath)
 /* Prepend userPath and system path to existing path. 
  * Add homeDir in front of all elements of user path. */
 {
 struct dyString *dy = newDyString(1024);
 char *s, *e;
 char *oldPath;
 
 /* Go through user path - which is colon separated
  * and prepend homeDir to it. */
 userPath = cloneString(userPath);
 s = userPath;
 for (;;)
     {
     if (s == NULL || s[0] == 0)
         break;
     e = strchr(s, ':');
     if (e != NULL)
        *e++ = 0;
     dyStringAppend(dy, homeDir);
     dyStringAppend(dy, "/");
     dyStringAppend(dy, s);
     dyStringAppend(dy, ":");
     s = e;
     }
 
 /* Add system path next. */
 if (sysPath != NULL && sysPath[0] != 0)
     {
     dyStringAppend(dy, sysPath);
     if (lastChar(sysPath) != ':')
 	dyStringAppend(dy, ":");
     }
 
 /* Add paths we inherited from root. */
 oldPath = hashFindVal(hash, "PATH");
 if (oldPath == NULL || oldPath[0] == 0)
     oldPath = "/bin:/usr/bin";
 dyStringAppend(dy, oldPath);
 
 hashUpdate(hash, "PATH", dy->string);
 freez(&userPath);
 dyStringFree(&dy);
 }
 
 void addEnvExtra(struct hash *hash, char *nameVal)
 /* parse and add one of the environment extra entries */
 {
 char *eq = strchr(nameVal, '=');
 if (eq == NULL)
     errAbort("invalid -env argument, expected -env=name=value, got -env=%s", nameVal);
 *eq = '\0';
 hashUpdate(hash, nameVal, eq+1);
 *eq = '=';
 }
 
 void addEnvExtras(struct hash *hash)
 /* add environment extras */
 {
 struct slName *nameVal;
 for (nameVal = envExtra; nameVal != NULL; nameVal = nameVal->next)
     addEnvExtra(hash, nameVal->name);
 }
 
 void getTicksToHundreths()
 /* Return number of hundreths of seconds per system tick.
  * It used to be CLK_TCK would work for this, but
  * under recent Linux's it doesn't. */
 {
 #ifdef CLK_TCK
     ticksToHundreths = 100.0/CLK_TCK;
 #else
     ticksToHundreths = 100.0/sysconf(_SC_CLK_TCK);
 #endif /* CLK_TCK */
 }
 
 void setupProcStdio(char *in, char *out, char *err)
 /* setup stdio for process (after fork) */
 {
 int newStdin, newStdout, newStderr;
 
 close(mainRudp->socket);
 
 /* do stderr first */
 newStderr = open(err, O_WRONLY | O_CREAT, 0666);
 if (newStderr < 0)
     errnoAbort("can't open job stderr file %s", err);
 if (dup2(newStderr, STDERR_FILENO) < 0)
     errnoAbort("can't dup2 stderr");
 
 newStdin = open(in, O_RDONLY);
 if (newStdin < 0)
     errnoAbort("can't open job stdin file %s", in);
 if (dup2(newStdin, STDIN_FILENO) < 0)
     errnoAbort("can't dup2 stdin");
 
 newStdout = open(out, O_WRONLY | O_CREAT, 0666);
 if (newStdout < 0)
     errnoAbort("can't open job stdout file %s", out);
 if (dup2(newStdout, STDOUT_FILENO) < 0)
     errnoAbort("can't dup2 stdout");
 }
 
 void execProc(char *managingHost, char *jobIdString, char *reserved,
 	char *user, char *dir, char *in, char *out, char *err, long long memLimit,
 	char *exe, char **params)
 /* This routine is the child process of doExec.
  * It spawns a grandchild that actually does the
  * work and waits on it.  It sends message to the
  * main message loop here when done. */
 {
 if ((grandChildId = forkOrDie()) == 0)
     {
     // use rlimit to limit RAM available to process so it will not hog the machine unfairly.
     if (sizeof(long) == 4 && memLimit > 4294967295LL)  // 4GB is max value on 32-bit platform.
 	{
 	logWarn("memLimit %lld exceeds address space on 32-bit machine, treating it as 4GB limit", memLimit);
 	memLimit = 4294967295LL;
 	}
     struct rlimit rlim;
     rlim.rlim_cur = rlim.rlim_max = memLimit;
     if (setrlimit(RLIMIT_DATA, &rlim) < 0)
 	logWarn("setrlimit failed with RLIMIT_DATA rlim_cur=%lld rlim_max=%lld"
 	    , (long long) rlim.rlim_cur , (long long) rlim.rlim_max); 
     // although RLIMIT_AS is not supported/enforced on all platforms,
     // it is useful for linux and some other unix OSes. 
     if (setrlimit(RLIMIT_AS, &rlim) < 0)
 	logWarn("setrlimit failed with RLIMIT_AS rlim_cur=%lld rlim_max=%lld"
 	    , (long long) rlim.rlim_cur , (long long) rlim.rlim_max); 
 
     if (0) { // change to 1 for debugging
     logInfo("memLimit=%lld", memLimit);
     struct rlimit rlim; 
     int rv; 
     rv = getrlimit(RLIMIT_DATA,&rlim); 
     if ( rv == -1 ) 
 	logWarn("error getrlimit RLIMIT_DATA %s", strerror(errno)); 
     else 
     	logInfo("rlimit_data:%lu,%lu\n", (unsigned long) rlim.rlim_max, (unsigned long) rlim.rlim_cur); 
     rv = getrlimit(RLIMIT_AS,&rlim); 
     if ( rv == -1 ) 
 	logWarn("error getrlimit RLIMIT_AS %s", strerror(errno)); 
     else 
     	logInfo("rlimit_as:%lu,%lu\n", (unsigned long) rlim.rlim_max, (unsigned long) rlim.rlim_cur); 
     }
 
 
     char *homeDir = "";
 
     /* Change to given user (if root) */
     changeUid(user, &homeDir);
 
     /* create output files just after becoming user so that errors in the rest
      * of this proc will go to the err file and be available via para
      * problems */
     setupProcStdio(in, out, err);
 
     if (chdir(dir) < 0)
         errnoAbort("can't chdir to %s", dir);
     setsid();
     // setpgid(0,0);
     umask(umaskVal); 
 
 
     /* Update environment. */
         {
 	struct hash *hash = environToHash(environ);
 	hashUpdate(hash, "JOB_ID", jobIdString);
 	hashUpdate(hash, "USER", user);
 	hashUpdate(hash, "HOME", homeDir);
 	hashUpdate(hash, "HOST", hostName);
 	hashUpdate(hash, "PARASOL", "7");
 	updatePath(hash, userPath, homeDir, sysPath);
         addEnvExtras(hash);
 	environ = hashToEnviron(hash);
 	freeHashAndVals(&hash);
 	}
 
     randomSleep();	/* Sleep a random bit before executing this thing
                          * to help spread out i/o when a big batch of jobs
 			 * hit idle cluster */
     execvp(exe, params);
     errnoAbort("execvp'ing %s", exe);
     }
 else
     {
     /* Wait on executed job and send jobID and status back to 
      * main process. */
     int status = -1;
     int cid;
     struct paraMessage pm;
     struct rudp *ru = NULL;
     struct tms tms;
     unsigned long uTime = 0;
     unsigned long sTime = 0;
 
     if (grandChildId >= 0)
 	{
 	signal(SIGTERM, termHandler);
 	cid = waitpid(grandChildId, &status, 0);
         if (cid < 0)
             errnoAbort("wait on grandchild failed");
 	times(&tms);
 	uTime = ticksToHundreths*tms.tms_cutime;
 	sTime = ticksToHundreths*tms.tms_cstime;
 	}
     ru = rudpOpen();
     if (ru != NULL)
 	{
 	ru->maxRetries = 20;
 	pmInit(&pm, localIp, paraNodePort);
 	pmPrintf(&pm, "jobDone %s %s %d %lu %lu", managingHost, 
 	    jobIdString, status, uTime, sTime);
 	pmSend(&pm, ru);
 	rudpClose(&ru);
 	}
     }
 }
 
 void clearZombies()
 /* Clear any zombie processes */
 {
 int stat;
 for (;;)
     {
     if (waitpid(-1, &stat, WNOHANG) <= 0)
         break;
     }
 }
 
 in_addr_t lookupIp(char *host)
 /* Return IP address of host. */
 {
 static char *lastHost = NULL;
 static in_addr_t lastAddress;
 
 if (lastHost != NULL && sameString(lastHost, host))
     return lastAddress;
 freez(&lastHost);
 lastHost = cloneString(host);
 lastAddress = internetHostIp(host);
 return lastAddress;
 }
 
 
 void tellManagerJobIsDone(char *managingHost, char *jobIdString, char *line)
 /* Try and send message to host saying job is done. */
 {
 struct paraMessage pm;
 bits32 ip;
 if (!internetDottedQuadToIp(managingHost, &ip))
     {
     warn("%s doesn't seem to be in dotted quad form\n", managingHost);
     return;
     }
 pmInit(&pm, ip, paraHubPort);
 pmPrintf(&pm, "jobDone %s %s", jobIdString, line);
 if (!pmSend(&pm, mainRudp))
     warn("Couldn't send message to %s to say %s is done\n", managingHost, jobIdString);
 }
 
 void jobFree(struct job **pJob)
 /* Free up memory associated with job */
 {
 struct job *job = *pJob;
 if (job != NULL)
     {
     freeMem(job->startMessage);
     freeMem(job->doneMessage);
     freeMem(job->node);
     freez(pJob);
     }
 }
 
 void jobDone(char *line)
 /* Handle job-done message - forward it to managing host. */
 {
 char *managingHost = nextWord(&line);
 char *jobIdString = nextWord(&line);
 
 // clearZombies();
 if (jobIdString != NULL && line != NULL && line[0] != 0)
     {
     /* Remove job from list running list and put on recently finished list. */
     struct job *job = findRunningJob(atoi(jobIdString));
     if (job != NULL)
         {
 	int status, err;
 	err = waitpid(job->pid, &status, 0);
 	if (err == -1 || !WIFEXITED(status) || WEXITSTATUS(status) != 0)
 	    {
 	    logDebug("paraNode sheparding %s pid %d status %d err %d errno %d", 
                      jobIdString, job->pid, status, err, errno);
 	    }
 	job->doneMessage = cloneString(line);
 	dlRemove(job->node);
 	if (dlCount(jobsFinished) >= 4*maxProcs)
 	    {
 	    struct dlNode *node = dlPopTail(jobsFinished);
 	    struct job *oldJob = node->val;
 	    jobFree(&oldJob);
 	    }
 	dlAddHead(jobsFinished, job->node);
 	--busyProcs;
 	}
     tellManagerJobIsDone(managingHost, jobIdString, line);
     }
 }
 
 void doCheck(char *line, struct sockaddr_in *hubIp)
 /* Send back check result - either a check in message or
  * jobDone. */
 {
 char *jobIdString = nextWord(&line);
 if (jobIdString != NULL)
     {
     int jobId = atoi(jobIdString);
     struct job *job = findRunningJob(jobId);
     struct paraMessage pm;
     pmInit(&pm, ntohl(hubIp->sin_addr.s_addr), paraHubPort);
     if (job != NULL)
 	pmPrintf(&pm, "checkIn %s %s running", hostName, jobIdString);
     else
 	{
 	struct job *job = findFinishedJob(jobId);
 	if (job == NULL)
 	    pmPrintf(&pm, "checkIn %s %s free", hostName, jobIdString);
 	else
 	    pmPrintf(&pm, "jobDone %s %s", jobIdString, job->doneMessage);
 	}
     pmSend(&pm, mainRudp);
     }
 }
 
 void doResurrect(char *line, struct sockaddr_in *hubIp)
 /* Send back I'm alive message */
 {
 struct paraMessage pm;
 struct dlNode *node;
 int jobsReported = 0;
 pmInit(&pm, ntohl(hubIp->sin_addr.s_addr), paraHubPort);
 pmPrintf(&pm, "alive %s", hostName);
 for (node = jobsRunning->head; !dlEnd(node); node = node->next)
     {
     struct job *job = node->val;
     pmPrintf(&pm, " %d", job->jobId);
     ++jobsReported;
     }
 for (node = jobsFinished->head; !dlEnd(node); node = node->next)
     {
     struct job *job = node->val;
     if (jobsReported >= maxProcs)
 	break;
     pmPrintf(&pm, " %d", job->jobId);
     ++jobsReported;
     }
 pmSend(&pm, mainRudp);
 }
 
 void doRun(char *line, struct sockaddr_in *hubIp)
 /* Execute command. */
 {
 char *jobMessage = cloneString(line);
 static char *args[1024];
 int argCount;
 char hubDottedQuad[17];
 
 nextRandom();
 if (line == NULL)
     warn("Executing nothing...");
 else if (!internetIpToDottedQuad(ntohl(hubIp->sin_addr.s_addr), hubDottedQuad))
     warn("Can't convert ipToDottedQuad");
 else
     {
     struct runJobMessage rjm;
     if (parseRunJobMessage(line, &rjm))
 	{
 	int jobId = atoi(rjm.jobIdString);
 	if (findRunningJob(jobId) == NULL && findFinishedJob(jobId) == NULL)
 	    {
 	    if (busyProcs < maxProcs)
 		{
 		int childPid;
 		argCount = chopLine(rjm.command, args);
 		if (argCount >= ArraySize(args))
 		    warn("Too many arguments to run");
 		else
 		    {
 		    args[argCount] = NULL;
 		    if ((childPid = forkOrDie()) == 0)
 			{
 			/* Do JOB_ID substitutions */
 			struct subText *st = subTextNew("$JOB_ID", rjm.jobIdString);
 			int i;
 			rjm.in = subTextString(st, rjm.in);
 			rjm.out = subTextString(st, rjm.out);
 			rjm.err = subTextString(st, rjm.err);
 			for (i=0; i<argCount; ++i)
 			    args[i] = subTextString(st, args[i]);
 
 			execProc(hubDottedQuad, rjm.jobIdString, rjm.reserved,
 			    rjm.user, rjm.dir, rjm.in, rjm.out, rjm.err, rjm.ram,
 			    args[0], args);
 			exit(0);
 			}
 		    else
 			{
 			struct job *job;
 			AllocVar(job);
 			job->jobId = atoi(rjm.jobIdString);
 			job->pid = childPid;
 			job->startMessage = jobMessage;
 			jobMessage = NULL;	/* No longer own memory. */
 			job->node = dlAddValTail(jobsRunning, job);
 			++busyProcs;
 			}
 		    }
 		}
 	    else
 		{
 		warn("Trying to run when busy.");
 		}
 	    }
 	else
 	    {
 	    warn("Duplicate run-job %d\n", jobId);
 	    }
 	}
     }
 freez(&jobMessage);
 }
 
 void doFetch(char *line)
 /* Fetch first part of file.  Protocol is to send the
  * file one UDP packet at a time.  A zero length packet
  * indicates end of file. */
 {
 char *user = cloneString(nextWord(&line));
 char *fileName = cloneString(nextWord(&line));
 if ((user == NULL) || (fileName != NULL))
     {
     FILE *f = fopen(fileName, "r");
     pmClear(&pmIn);
     if (f == NULL)
 	{
         if (user == NULL)
             user = "<null>";
 	pmPrintf(&pmIn, "Couldn't open fetch file: \"%s\" %s for user %s",
                  fileName, strerror(errno), user);
 	warn("Couldn't open fetch file: \"%s\" %s for user %s",
              fileName, strerror(errno), user);
 	pmSend(&pmIn, mainRudp);
 	pmClear(&pmIn);
 	pmSend(&pmIn, mainRudp);
 	}
     else
 	{
 	int size;
 	for (;;)
 	    {
 	    size = fread(pmIn.data,1,  sizeof(pmIn.data)-1, f);
 	    if (size < 0)
 		{
 		size = 0;
 		warn("Couldn't read fetch file: \"%s\" %s",
                      fileName, strerror(errno));
 		}
 	    pmIn.size = size;
 	    pmSend(&pmIn, mainRudp);
 	    if (size == 0)
 		break;
 	    }
 	fclose(f);
 	}
     }
 freez(&user);
 freez(&fileName);
 }
 
 void doKill(char *line)
 /* Kill a specific job. */
 {
 char *jobIdString = nextWord(&line);
 int jobId = atoi(jobIdString);
 if (jobId != 0)
     {
     struct job *job = findRunningJob(jobId);
     if (job != NULL)
         {
 	kill(job->pid, SIGTERM);
 	dlRemove(job->node);
 	jobFree(&job);
 	--busyProcs;
 	}
     }
 else
     {
     warn("Nothing to kill\n");
     }
 }
 
 
 void doStatus()
 /* Report status. */
 {
 pmClear(&pmIn);
 pmPrintf(&pmIn, "%d of %d CPUs busy", busyProcs, maxProcs);
 if (busyProcs > 0)
     {
     struct dlNode *node;
     pmPrintf(&pmIn, ". Jobs:");
     for (node = jobsRunning->head; !dlEnd(node); node = node->next)
         {
 	struct job *job = node->val;
         pmPrintf(&pmIn, " %d", job->jobId);
 	}
     }
 pmSend(&pmIn, mainRudp);
 }
 
 void listJobs()
 /* Report jobs running and recently finished. */
 {
 struct job *job;
 struct dlNode *node;
 
 pmClear(&pmIn);
 pmPrintf(&pmIn, "%d running", dlCount(jobsRunning));
 if (!pmSend(&pmIn, mainRudp))
     return;
 for (node = jobsRunning->head; !dlEnd(node); node=node->next)
     {
     job = node->val;
     pmClear(&pmIn);
     pmPrintf(&pmIn, "%s", job->startMessage);
     if (!pmSend(&pmIn, mainRudp))
         return;
     }
 pmClear(&pmIn);
 pmPrintf(&pmIn, "%d recent", dlCount(jobsFinished));
 if (!pmSend(&pmIn, mainRudp))
     return;
 for (node = jobsFinished->head; !dlEnd(node); node=node->next)
     {
     job = node->val;
     pmClear(&pmIn);
     pmPrintf(&pmIn, "%s", job->startMessage);
     if (!pmSend(&pmIn, mainRudp))
         return;
     pmClear(&pmIn);
     pmPrintf(&pmIn, "%s", job->doneMessage);
     if (!pmSend(&pmIn, mainRudp))
         return;
     }
 }
 
 void paraNode()
 /* paraNode - a net server. */
 {
 char *line;
 char *command;
 struct sockaddr_in sai;
 
 /* We have to know who we are... */
 hostName = getMachine();
 initRandom();
 getTicksToHundreths();
 
 /* log init */
 if (optionExists("log"))
     logOpenFile("paraNode", optionVal("log", NULL));
 else    
     logOpenSyslog("paraNode", optionVal("logFacility", NULL));
 logSetMinPriority(optionVal("logMinPriority", "info"));
 logInfo("starting paraNode on %s", hostName);
 
 /* Make job lists. */
 jobsRunning = newDlList();
 jobsFinished = newDlList();
 
 /* Set up socket and self to listen to it. */
 ZeroVar(&sai);
 sai.sin_family = AF_INET;
 sai.sin_port = htons(paraNodePort);
 sai.sin_addr.s_addr = INADDR_ANY;
 mainRudp = rudpMustOpenBound(&sai);
 mainRudp->maxRetries = 12;
 
 /* Event loop. */
 findNow();
 for (;;)
     {
     /* Get next incoming message and optionally check to make
      * sure that it's from a host we trust, and check signature
      * on first bit of incoming data. */
     if (pmReceive(&pmIn, mainRudp))
 	{
 	findNow();
 	if (hubName == NULL || ntohl(pmIn.ipAddress.sin_addr.s_addr) == hubIp 
 		|| ntohl(pmIn.ipAddress.sin_addr.s_addr) == localIp)
 	    {
 	    /* Host and signature look ok,  read a string and
 	     * parse out first word as command. */
 	    line = pmIn.data;
 	    logDebug("message from %s: \"%s\"",
                      paraFormatIp(ntohl(pmIn.ipAddress.sin_addr.s_addr)),
                      line);
 	    command = nextWord(&line);
 	    if (command != NULL)
 		{
 		if (sameString("quit", command))
 		    break;
 		else if (sameString("run", command))
 		    doRun(line, &pmIn.ipAddress);
 		else if (sameString("jobDone", command))
 		    jobDone(line);
 		else if (sameString("status", command))
 		    doStatus();
 		else if (sameString("kill", command))
 		    doKill(line);
 		else if (sameString("check", command))
 		    doCheck(line, &pmIn.ipAddress);
 		else if (sameString("resurrect", command))
 		    doResurrect(line, &pmIn.ipAddress);
 		else if (sameString("listJobs", command))
 		    listJobs();
 		else if (sameString("fetch", command))
 		    doFetch(line);
                 else
                     logWarn("invalid command: \"%s\"", command);
 		}
 	    logDebug("done command");
 	    }
 	else
 	    {
 	    logWarn("command from unauthorized host %s",
                     paraFormatIp(ntohl(pmIn.ipAddress.sin_addr.s_addr)));
 	    }
 	}
     }
 rudpClose(&mainRudp);
 }
 
 int main(int argc, char *argv[])
 /* Process command line. */
 {
 optionInit(&argc, argv, optionSpecs);
 if (argc != 2)
     usage();
 maxProcs = optionInt("cpu", 1);
 umaskVal = optionInt("umask", 0002);
 userPath = optionVal("userPath", userPath);
 sysPath = optionVal("sysPath", sysPath);
 envExtra = optionMultiVal("env", NULL);
 randomDelay = optionInt("randomDelay", randomDelay);
 
 /* Look up IP addresses. */
 localIp = lookupIp("localhost");
 hubName = optionVal("hub", NULL);
 if (hubName != NULL)
     hubIp = lookupIp(hubName);
 paraDaemonize("paraNode");
 paraNode();
 return 0;
 }