6e5ee11ca95cd971984038cf65bae00d9c898707
galt
  Wed Jun 4 15:40:02 2014 -0700
Since we have git, it is easy to rename errabort.c to errAbort.c without losing any history.
diff --git src/parasol/paraHub/paraHub.c src/parasol/paraHub/paraHub.c
index 58c3002..39071cb 100644
--- src/parasol/paraHub/paraHub.c
+++ src/parasol/paraHub/paraHub.c
@@ -1,3489 +1,3489 @@
 /* paraHub - Parasol hub server.  This is the heart of the parasol system
  * and consists of several threads - sucketSucker, heartbeat, a collection
  * of spokes, as well as the main hub thread.  The system is synchronized
  * around a message queue that the hub reads and the other threads write.
  *
  * The purpose of socketSucker is to move messages from the UDP
  * socket, which has a limited queue size, to the message queue, which
  * can be much larger.  The spoke daemons exist to send messages to compute
  * nodes.  Since sending a message to a node can take a while depending on
  * the network conditions, the multiple spokes allow the system to be
  * delivering messages to multiple nodes simultaniously.  The heartbeat
  * daemon simply sits in a loop adding a heartbeat message to the message
  * queue every 15 seconds or so. The hub thead is responsible for
  * keeping track of everything. 
  * 
  * The hub keeps track of users, batches, jobs, and machines.  It tries
  * to balance machine usage between users and between batches.  If a machine
  * goes down it will restart the jobs the machine was running on other machines.
  * When a job finishes it will add a line about the job to the results file
  * associated with the batch.
  *
  * A fair bit of the hub's code is devoted to scheduling.  It does this by
  * periodically "planning" what batches to associate with what machines.
  * When a machine is free it will run the next job from one of it's batches.
  * A number of events including a new batch of jobs, machines being added or
  * removed, and so forth can make the system decide it needs to replan.  The
  * replanning itself is done in the next heartbeat.
  *
  * When the plan is in place, the most common thing the system does is
  * try to run the next job.  It keeps lists of free machines and free spokes,
  * and for the most part just just takes the next machine, a job from one
  * of the batches the machine is running, and the next free spoke, and sends
  * a message to the machine via the spoke to run the job. This
  * indirection of starting jobs via a separate spoke process avoids the
  * hub daemon itself having to wait for a response from a compute node
  * over the network.
  *
  * When a spoke is done assigning a job, the spoke sends a 'recycleSpoke'
  * message to the hub, which puts the spoke back on the freeSpoke list.
  * Likewise when a job is done the machine running the jobs sends a 
  * 'job done' message to the hub, which puts the machine back on the
  * free list,  writes the job exit code to a file, and removes the job
  * from the system.
  *
  * Sometimes a spoke will find that a machine is down.  In this case it
  * sends a 'node down' message to the hub as well as the 'spoke free'
  * message.   The hub will then move the machine to the deadMachines list,
  * and put the job back on the top of the pending list.
  *
  * The heartbeat messages stimulate the hub to do various background
  * chores.  When the hub gets a heartbeat message it
  * does a few things:
  *     o - It calls runner to try and start some more jobs.  (Runner
  *         is also called at the end of processing a recycleSpoke, 
  *         jobDone, addJob or addMachine message.  Typically runner
  *         won't find anything new to run in the heartbeat, but this
  *         is put here mostly just in case of unforseen issues.)
  *    o -  It calls graveDigger, a routine which sees if machines
  *         on the dead list have come back to life.
  *    o -  It calls hangman, a routine which sees if jobs the system
  *         thinks have been running for a long time are still 
  *         running on the machine they have been assigned to.
  *         If the machine has gone down it is moved to the dead list
  *         and the job is reassigned. 
  */
 
 #include "paraCommon.h"
 #include "options.h"
 #include "linefile.h"
 #include "hash.h"
-#include "errabort.h"
+#include "errAbort.h"
 #include "dystring.h"
 #include "dlist.h"
 #include "net.h"
 #include "internet.h"
 #include "paraHub.h"
 #include "machSpec.h"
 #include "log.h"
 #include "obscure.h"
 #include "sqlNum.h"
 
 
 /* command line option specifications */
 static struct optionSpec optionSpecs[] = {
     {"spokes", OPTION_INT},
     {"jobCheckPeriod", OPTION_INT},
     {"machineCheckPeriod", OPTION_INT},
     {"subnet", OPTION_STRING},
     {"nextJobId", OPTION_INT},
     {"logFacility", OPTION_STRING},
     {"logMinPriority", OPTION_STRING},
     {"log", OPTION_STRING},
     {"debug", OPTION_BOOLEAN},
     {"noResume", OPTION_BOOLEAN},
     {"ramUnit", OPTION_STRING},
     {"defaultJobRam", OPTION_INT},
     {NULL, 0}
 };
 
 char *version = PARA_VERSION;	/* Version number. */
 
 /* Some command-line configurable quantities and their defaults. */
 int jobCheckPeriod = 10;      /* Minutes between checking running jobs. */
 int machineCheckPeriod = 20;  /* Minutes between checking dead machines. */
 int assumeDeadPeriod = 60;    /* If haven't heard from job in this long assume
                                  * machine running it is dead. */
 int initialSpokes = 30;		/* Number of spokes to start with. */
 unsigned char hubSubnet[4] = {255,255,255,255};   /* Subnet to check. */
 int nextJobId = 0;		/* Next free job id. */
 time_t startupTime;		/* Clock tick of paraHub startup. */
 
 /* not yet configurable */
 int sickNodeThreshold = 3;          /* Treat node as sick if this number of failures */
 int sickBatchThreshold = 25;        /* Auto-chill sick batch if this number of continuous failures */
 
 
 
 void usage()
 /* Explain usage and exit. */
 {
 errAbort("paraHub - parasol hub server version %s\n"
          "usage:\n"
 	 "    paraHub machineList\n"
 	 "Where machine list is a file with the following columns:\n"
          "    name - Network name\n"
          "    cpus - Number of CPUs we can use\n"
          "    ramSize - Megabytes of memory\n"
          "    tempDir - Location of (local) temp dir\n"
          "    localDir - Location of local data dir\n"
          "    localSize - Megabytes of local disk\n"
          "    switchName - Name of switch this is on\n"
 	 "\n"
 	 "options:\n"
 	 "   -spokes=N  Number of processes that feed jobs to nodes - default %d.\n"
 	 "   -jobCheckPeriod=N  Minutes between checking on job - default %d.\n"
 	 "   -machineCheckPeriod=N  Minutes between checking on machine - default %d.\n"
 	 "   -subnet=XXX.YYY.ZZZ  Only accept connections from subnet (example 192.168).\n"
 	 "   -nextJobId=N  Starting job ID number.\n"
 	 "   -logFacility=facility  Log to the specified syslog facility - default local0.\n"
          "   -logMinPriority=pri minimum syslog priority to log, also filters file logging.\n"
          "    defaults to \"warn\"\n"
          "   -log=file  Log to file instead of syslog.\n"
          "   -debug  Don't daemonize\n"
 	 "   -noResume  Don't try to reconnect with jobs running on nodes.\n"
          "   -ramUnit=N  Number of bytes of RAM in the base unit used by the jobs.\n"
          "      Default is RAM on node divided by number of cpus on node.\n"
          "      Shorthand expressions allow t,g,m,k for tera, giga, mega, kilo.\n"
          "      e.g. 4g = 4 Gigabytes.\n"
 	 "   -defaultJobRam=N Number of ram units in a job has no specified ram usage.\n"
 	 "      Defaults to 1.\n"
 	               ,
 	 version, initialSpokes, jobCheckPeriod, machineCheckPeriod
 	 );
 }
 
 struct spoke *spokeList;	/* List of all spokes. */
 struct dlList *freeSpokes;      /* List of free spokes. */
 struct dlList *busySpokes;	/* List of busy spokes. */
 struct dlList *deadSpokes;	/* List of dead spokes. */
 
 struct machine *machineList;    /* List of all machines. */
 struct dlList *freeMachines;    /* List of machines idle. */
 struct dlList *readyMachines;   /* List of machines ready for jobs. */
 struct dlList *blockedMachines; /* List of machines ready but blocked by runningCount. */
 struct dlList *busyMachines;    /* List of machines running jobs. */
 struct dlList *deadMachines;    /* List of machines that aren't running. */
 
 struct dlList *runningJobs;     /* Jobs that are running. Preserves oldest first order. */
 struct dlList *hangJobs;        /* Jobs running hang check list. */
 
 struct hash *userHash;		/* Hash of all users. */
 struct user *userList;		/* List of all users. */
 struct batch *batchList;	/* List of all batches. */
 struct dlList *queuedUsers;	/* Users with jobs in queue. */
 struct dlList *unqueuedUsers;   /* Users with no jobs in queue. */
 
 struct hash *machineHash;	/* Find if machine exists already */
 
 struct hash *stringHash;	/* Unique strings throughout system go here
                                  * including directory names and results file
 				 * names/batch names. */
 
 struct resultQueue *resultQueues; /* Result files. */
 int finishedJobCount = 0;		/* Number of finished jobs. */
 int crashedJobCount = 0;		/* Number of crashed jobs. */
 
 char *jobIdFileName = "parasol.jid";	/* File name where jobId file is. */
 FILE *jobIdFile = NULL;			/* Handle to jobId file. */
 
 char *hubHost;	/* Name of machine running this. */
 struct rudp *rudpOut;	/* Our rUDP socket. */
 
 
 /* Variables for new scheduler */
 
 // TODO make commandline param options to override defaults for unit sizes?
 /*  using machines list spec info for defaults */
 int cpuUnit = 1;                   /* 1 CPU */  /* someday this could be float 0.5 */
 long long ramUnit = 512 * 1024 * 1024;  /* 500 MB */
 int defaultJobCpu = 1;        /* number of cpuUnits in default job usage */  
 int defaultJobRam = 1;        /* number of ramUnits in default job usage */
 /* for the resource array dimensions */
 int maxCpuInCluster = 0;      /* node with largest number of cpu units */
 int maxRamInCluster = 0;      /* node with largest number of ram units */
 struct slRef ***perCpu = NULL;  /* an array of resources sharing the same cpu units free units count */
 boolean needsPlanning = FALSE;  /* remember if situation changed, need new plan */  
 
 
 void setupLists()
 /* Make up machine, spoke, user and job lists - all doubly linked
  * so it is fast to remove items from one list and put them
  * on another. */
 {
 freeMachines = newDlList();
 readyMachines = newDlList();
 blockedMachines = newDlList();
 busyMachines = newDlList();
 deadMachines = newDlList();
 runningJobs = newDlList();
 hangJobs = newDlList();
 freeSpokes = newDlList();
 busySpokes = newDlList();
 deadSpokes = newDlList();
 queuedUsers = newDlList();
 unqueuedUsers = newDlList();
 userHash = newHash(6);
 }
 
 int avgBatchTime(struct batch *batch)
 /**/
 {
 if (batch->doneCount == 0) return 0;
 return batch->doneTime / batch->doneCount;
 }
 
 boolean nodeSickOnAllBatches(struct user *user, char *machineName)
 /* Return true if all of a user's current batches believe the machine is sick. */
 {
 struct dlNode *node = user->curBatches->head; 
 if (dlEnd(node))
     return FALSE;
 for (; !dlEnd(node); node = node->next)
     {
     struct batch *batch = node->val;
     /* does any batch think the node is not sick? */
     if (hashIntValDefault(batch->sickNodes, machineName, 0) < sickNodeThreshold)
 	{
 	return FALSE;
 	}
     }
 return TRUE;
 }
 
 void updateUserSickNode(struct user *user, char *machineName)
 /* If all of a users batches reject a sick machine, then the user rejects it. */
 {
 boolean allSick = nodeSickOnAllBatches(user, machineName);
 if (allSick)
     hashStore(user->sickNodes, machineName);
 else
     hashRemove(user->sickNodes, machineName);
 }
 
 void updateUserSickNodes(struct user *user)
 /* Update user sickNodes. A node is only sick if all batches call it sick. */
 {
 struct dlNode *node;
 struct batch *batch;
 hashFree(&user->sickNodes);
 user->sickNodes = newHashExt(6, FALSE);
 node = user->curBatches->head; 
 if (!dlEnd(node))
     {
     batch = node->val;
     struct hashEl *el, *list = hashElListHash(batch->sickNodes);
     for (el = list; el != NULL; el = el->next)
 	{
 	updateUserSickNode(user, el->name);
 	}
     hashElFreeList(&list);
     }
 }
 
 boolean userIsActive(struct user *user)
 /* Return TRUE if user has jobs running or in queue */
 {
 return user->runningCount > 0 || !dlEmpty(user->curBatches);
 }
 
 
 int listSickNodes(struct paraMessage *pm)
 /* find nodes that are sick for all active users */
 {
 int sickNodeCount = 0, userCount = 0;
 struct user *user;
 if (userList)
     {
     struct hashEl *el, *list = NULL;
     /* get list from an active user if any, and get active-users count */
     for (user = userList; user != NULL; user = user->next)
 	{
 	if (userIsActive(user))
 	    {
 	    ++userCount;
 	    if (!list)
     		list = hashElListHash(user->sickNodes);
 	    }
 	}
     if (list)
 	{    
 	for (el = list; el != NULL; el = el->next)
 	    {
 	    boolean allSick = TRUE;
 	    for (user = userList; user != NULL; user = user->next)
 		{
 		if (userIsActive(user))
 		    if (!hashLookup(user->sickNodes, el->name))
 			allSick = FALSE;
 		}
 	    if (allSick)
 		{
 		++sickNodeCount;
 		if (pm)
 		    {
 		    pmClear(pm);
 		    pmPrintf(pm, "%s", el->name);
 		    pmSend(pm, rudpOut);
 		    }
 		}
 	    }
 	hashElFreeList(&list);
 	}
     }
 if (sickNodeCount > 0)
     {
     if (pm)
 	{
 	pmClear(pm);
 	pmPrintf(pm, "Strength of evidence: %d users", userCount);
 	pmSend(pm, rudpOut);
 	}
     }
 if (pm)
     pmSendString(pm, rudpOut, "");
 return sickNodeCount;
 }
 
 
 
 void updateUserMaxJob(struct user *user)
 /* Update user maxJob. >=0 only if all batches have >=0 maxJob values */
 {
 /* Note - at this point the user->maxJob is mostly ornamental,
  * it has been left in for people who want to see it in list users */
 
 struct dlNode *node;
 struct batch *batch;
 boolean unlimited = FALSE;
 user->maxJob = 0;
 for (node = user->curBatches->head; !dlEnd(node); node = node->next)
     {
     batch = node->val;
     if (batch->maxJob >= 0)
 	user->maxJob += batch->maxJob;
     else
 	unlimited = TRUE;
     }
 if (unlimited) user->maxJob = -1;
 }
 
 void updateUserPriority(struct user *user)
 /* Update user priority. Equals minimum of current batch priorities */
 {
 struct dlNode *node;
 struct batch *batch;
 user->priority = MAX_PRIORITY;
 for (node = user->curBatches->head; !dlEnd(node); node = node->next)
     {
     batch = node->val;
     if (batch->priority < user->priority)
 	user->priority = batch->priority;
     }
 }
 
 struct batch *findBatchInList(struct dlList *list,  char *nameString)
 /* Find a batch of jobs in list or return NULL. 
  * nameString must be from stringHash. */
 {
 struct dlNode *node;
 for (node = list->head; !dlEnd(node); node = node->next)
     {
     struct batch *batch = node->val;
     if (nameString == batch->name)
         return batch;
     }
 return NULL;
 }
 
 struct batch *newBatch(char *nameString, struct user *user)
 /* Make new batch.  NameString must be in stringHash already */
 {
 struct batch *batch;
 AllocVar(batch);
 slAddHead(&batchList, batch);
 AllocVar(batch->node);
 batch->node->val = batch;
 batch->name = nameString;
 batch->user = user;
 batch->jobQueue = newDlList();
 batch->priority = NORMAL_PRIORITY;
 batch->maxJob = -1;
 batch->sickNodes = newHashExt(6, FALSE);
 
 batch->cpu = defaultJobCpu;    /* number of cpuUnits in default job usage */  
 batch->ram = defaultJobRam;    /* number of ramUnits in default job usage */
 
 needsPlanning = TRUE;
 
 return batch;
 }
 
 struct batch *findBatch(struct user *user, char *name, boolean holding)
 /* Find batch of jobs.  If no such batch yet make it. */
 {
 struct batch *batch;
 name = hashStoreName(stringHash, name);
 batch = findBatchInList(user->curBatches, name);
 if (batch == NULL)
     {
     batch = findBatchInList(user->oldBatches, name);
     if (batch != NULL)
 	dlRemove(batch->node);
     else
 	batch = newBatch(name, user);
     if (holding && dlEmpty(batch->jobQueue)) 
         /* setPriority must not release batch if jobs not yet pushed */
     	dlAddTail(user->oldBatches, batch->node);
     else
 	dlAddTail(user->curBatches, batch->node);
 
     needsPlanning = TRUE;
 
     updateUserPriority(user);
     updateUserMaxJob(user);
     updateUserSickNodes(user);
     }
 return batch;
 }
 
 struct user *findUser(char *name)
 /* Find user.  If it's the first time we've seen this
  * user then make up a user object and put it on the
  * idle user list. */
 {
 struct user *user = hashFindVal(userHash, name);
 if (user == NULL)
     {
     AllocVar(user);
     slAddHead(&userList, user);
     hashAddSaveName(userHash, name, user, &user->name);
     AllocVar(user->node);
     user->node->val = user;
     dlAddTail(unqueuedUsers, user->node);
     user->curBatches = newDlList();
     user->oldBatches = newDlList();
     user->sickNodes = newHashExt(6, FALSE);
     }
 return user;
 }
 
 
 int userQueuedCount(struct user *user)
 /* Count up jobs user has waiting */
 {
 struct dlNode *node;
 struct batch *batch;
 int count = 0;
 for (node = user->curBatches->head; !dlEnd(node); node = node->next)
     {
     batch = node->val;
     count += batch->queuedCount;
     }
 return count;
 }
 
 
 struct batch *findLuckyBatch(struct user *user)
 /* Find the batch that gets to run a job. */
 {
 struct batch *minBatch = NULL;
 int minScore = BIGNUM;
 struct dlNode *node;
 for (node = user->curBatches->head; !dlEnd(node); node = node->next)
     {
     struct batch *batch = node->val;
     if (batch->planning)
 	{
 	if (batch->planScore < minScore)
 	    {
 	    minScore = batch->planScore;
 	    minBatch = batch;
 	    }
 	}
     }
 return minBatch;
 }
 
 
 struct user *findLuckyUser()
 /* Find lucky user who gets to run a job. */
 {
 struct user *minUser = NULL;
 int minScore = BIGNUM;
 struct dlNode *node;
 for (node = queuedUsers->head; !dlEnd(node); node = node->next)
     {
     struct user *user = node->val;
     if (user->planningBatchCount > 0)
 	{
 	if (user->planScore < minScore) 
 	    {
 	    minScore = user->planScore;
 	    minUser = user;
 	    }
 	}
     }
 return minUser;
 }
 
 
 void resetBatchesForPlanning(struct user *user)
 /* Initialize batches for given user for planning.*/
 {
 struct dlNode *node;
 for (node = user->curBatches->head; !dlEnd(node); node = node->next)
     {
     struct batch *batch = node->val;
     batch->planning = TRUE;
     batch->planCount = 0;
     /* adding 1 to planCount helps suppress running any jobs when priority is set very high */
     batch->planScore = 1 * batch->priority; 
     if (batch->maxJob == 0)
        batch->planning = FALSE;	
     if (batch->planning)
 	{
 	++user->planningBatchCount;
 	}
     }
 }
 
 
 void resetUsersForPlanning()
 /* Initialize users for planning. */
 {
 struct dlNode *node;
 for (node = queuedUsers->head; !dlEnd(node); node = node->next)
     {
     struct user *user = node->val;
     user->planCount = 0;
     user->planningBatchCount = 0;
     updateUserPriority(user);
     updateUserMaxJob(user);
     updateUserSickNodes(user);
     /* adding 1 to planCount helps suppress running any jobs when priority is set very high */
     user->planScore = 1 * user->priority;  
     resetBatchesForPlanning(user);
     }
 }
 
 
 
 void unactivateBatchIfEmpty(struct batch *batch)
 /* If job queue on batch is empty then remove batch from
  * user's active batch list, and possibly user from active
  * user list. */
 {
 if (dlEmpty(batch->jobQueue))
     {
     struct user *user = batch->user;
     batch->queuedCount = 0;
     dlRemove(batch->node);
     dlAddTail(user->oldBatches, batch->node);
 
     batch->planCount = 0;   /* use as a signal that it's not active any more */
 
     needsPlanning = TRUE;  /* remember if situation changed, need new plan */  
 
     updateUserPriority(user);
     updateUserMaxJob (user);
     updateUserSickNodes(user);
 
     /* Check if it's last user batch and if so take them off queue */
     if (dlEmpty(user->curBatches))
 	{
 	dlRemove(user->node);
 	dlAddTail(unqueuedUsers, user->node);
 	}
     }
 }
 
 
 void readTotalMachineResources(struct machine *machine, int *cpuReturn, int *ramReturn)
 /* Return in units the cpu and ram resources of given machine */
 {
 int c = 0, r = 0;
 c = machine->machSpec->cpus / cpuUnit; 
 r = ((long long)machine->machSpec->ramSize * 1024 * 1024) / ramUnit; 
 *cpuReturn = c;
 *ramReturn = r;
 }
 
 
 void readRemainingMachineResources(struct machine *machine, int *cpuReturn, int *ramReturn)
 /* Calculate available cpu and ram resources in given machine */
 {
 int c = 0, r = 0;
 readTotalMachineResources(machine, &c, &r);
 /* subtract all the resources now in-use */
 struct dlNode *jobNode = NULL;
 for (jobNode = machine->jobs->head; !dlEnd(jobNode); jobNode = jobNode->next)
     {
     struct job *job = jobNode->val;
     struct batch * batch =job->batch;
     c -= batch->cpu;
     r -= batch->ram;
     }
 *cpuReturn = c;
 *ramReturn = r;
 }
 
 struct batch *findRunnableBatch(struct machine *machine, struct slRef **pEl, boolean *pCouldRun)
 /* Search machine for runnable batch, preferable something not at maxJob */
 {
 int c = 0, r = 0;
 readRemainingMachineResources(machine, &c, &r);
 struct slRef* el;
 for(el = machine->plannedBatches; el; el=el->next)
     {
     struct batch *batch = el->val;
     /* Prevent too many from this batch from running.
      * This is helpful for keeping the balance with longrunning batches
      * and maxJob. */
     if (batch->cpu <= c && batch->ram <= r) 
 	{
 	if (pCouldRun)
 	    *pCouldRun = TRUE;
 	if (batch->runningCount < batch->planCount)
 	    {
 	    if (pEl)
 		*pEl = el;
 	    return batch;
 	    }
 	}
     }
 if (pEl)
     *pEl = NULL;
 return NULL;
 }
 
 int scoreCost(struct batch *batch)
 /* calculate score cost of using resources */
 {
 return max(batch->cpu * defaultJobRam, batch->ram * defaultJobCpu);
 }
 
 void allocateResourcesToMachine(struct machine *mach, 
     struct batch *batch, struct user *user, int *pC, int *pR)
 /* Allocate Resources to machine*/
 {
 
 *pC -= batch->cpu;
 *pR -= batch->ram;
 
 ++batch->planCount;
 ++user->planCount;
 /* incrementally update score for batches and users */
 /* scoring that accounts the resources carefully, e.g. actual ram and cpu. */
 int cost = scoreCost(batch);
 batch->planScore += cost * batch->priority;
 user->planScore += cost * user->priority;
 
 /*  add batch to plannedBatches queue */
 refAdd(&mach->plannedBatches, batch);
 
 /* maxJob handling */
 if ((batch->maxJob!=-1) && (batch->planCount >= batch->maxJob))
     {
     /* remove batch from the allocating */
     batch->planning = FALSE;
     --user->planningBatchCount;
     }
 }
 
 
 void plan(struct paraMessage *pm) 
 /* Make a new plan allocating resources to batches */
 {
 
 logDebug("executing new plan");
 
 if (pm)
     {
     pmClear(pm);
     pmPrintf(pm, "cpuUnit=%d, ramUnit=%lld", cpuUnit, ramUnit); 
     pmSend(pm, rudpOut);
     pmClear(pm);
     pmPrintf(pm, "job default units: Cpu=%d, ram=%d", defaultJobCpu, defaultJobRam); 
     pmSend(pm, rudpOut);
     pmClear(pm);
     pmPrintf(pm, "max cluster units: Cpu=%d, ram=%d", maxCpuInCluster, maxRamInCluster); 
     pmSend(pm, rudpOut);
     pmSendString(pm, rudpOut, "-----"); 
     }
 
 //if (pm) pmSendString(pm, rudpOut, "about to initialize cpu/ram 2d arrays"); 
 
 /* Initialize Resource Arrays for CPU and RAM */
 /* allocate memory like a 2D array */
 int c = 0, r = 0;
 /*  +1 to allow for zero slot simplifies the code */
 AllocArray(perCpu, maxCpuInCluster+1);  
 for (c = 1; c <= maxCpuInCluster; ++c)
   AllocArray(perCpu[c], maxRamInCluster+1);  
 
 //if (pm) pmSendString(pm, rudpOut, "about to add machines resources to cpu/ram arrays");
 
 
 resetUsersForPlanning();
 
 /* allocate machines to resource lists */
 struct machine *mach;
 for (mach = machineList; mach != NULL; mach = mach->next)
      {
      slFreeList(&mach->plannedBatches); // free any from last plan
      if (!mach->isDead)
 	{
 
 	readTotalMachineResources(mach, &c, &r);
 
         /* Sweep mark all running jobs as oldPlan,
 	 *  this helps us deal with jobsDone from old plan.
 	 * For better handling of long-running maxJob batches
          *  with frequent replanning, 
 	 *  preserve the same resources on the same machines.
          */
 	struct dlNode *jobNode = NULL;
 	for (jobNode = mach->jobs->head; !dlEnd(jobNode); jobNode = jobNode->next)
 	    {
 	    struct job *job = jobNode->val;
 	    struct batch *batch = job->batch;
 	    struct user *user = batch->user;
 	    job->oldPlan = TRUE;
 	    if (batch->planning && (batch->maxJob != -1))
 		{
 		if (pm) 
 		    {
 		    //pmClear(pm);
 		    //pmPrintf(pm, "preserving batch %s on machine %s", batch->name, mach->name);
 		    //pmSend(pm, rudpOut);
 		    }
 		allocateResourcesToMachine(mach, batch, user, &c, &r);
 		}
 	    }
 
 	if (pm) 
 	    {
 	    //pmClear(pm);
 	    //pmPrintf(pm, "machSpec (%s) cpus:%d ramSize=%d"
 		//, mach->name, mach->machSpec->cpus, mach->machSpec->ramSize);
 	    //pmSend(pm, rudpOut);
 	    }
      
 
 	if (c < 1 || r < 1)
 	    {
 	    if (pm) 
 		{
 		//pmClear(pm);
 		//pmPrintf(pm, "IGNORING mach: %s c=%d cpu units; r=%d ram units", mach->name, c, r);
 		//pmSend(pm, rudpOut);
 		}
 	    }
 	else
 	    {
 
 	    if (pm) 
 		{
 		pmClear(pm);
 		pmPrintf(pm, "mach: %s c=%d cpu units; r=%d ram units", mach->name, c, r);
 		pmSend(pm, rudpOut);
 		} 
 
 	    refAdd(&perCpu[c][r], mach); 
 	    }
 	}
      }
 
 
 
 /* allocate machines to resource lists */
 
 while(TRUE)
     {
 
     /* find lucky user/batch */
     struct user *user = findLuckyUser();
     if (!user)
 	break;
     struct batch *batch = findLuckyBatch(user);
     if (!batch)
 	{
 	errAbort("unexpected error: batch not found while planning for lucky user");
 	break;
 	}
 
     if (pm) 
 	{
 	//pmClear(pm);
 	//pmPrintf(pm, "lucky user: %s; lucky batch=%s", user->name, batch->name);
 	//pmSend(pm, rudpOut);
 	}
      
     /* find machine with adequate resources in resource array (if any) */
     boolean found = FALSE;
     struct slRef **perRam = NULL; 
     struct slRef *el = NULL;
     for (c = batch->cpu; c <= maxCpuInCluster; ++c)
 	{
 	/* an array of resources sharing the same cpu and ram free units count */
 	perRam = perCpu[c];      
 	for (r = batch->ram; r <= maxRamInCluster; ++r)
 	    {
 	    if (perRam[r])
 		{
 		/* avoid any machine in the sickNodes */
 		/* extract from list if found */
 		el = perRam[r];
 		struct slRef **listPt = &perRam[r];
 		while (el)
 		    {
 		    mach = (struct machine *) el->val;
 		    if (hashIntValDefault(batch->sickNodes, mach->name, 0) < sickNodeThreshold)
 			{
 			found = TRUE;
 			*listPt = el->next;
 			el->next = NULL;
 			break;
 			}
 		    listPt = &el->next;
 		    el = el->next;
 		    }
 		}
 	    if (found)
 		break;  // preserve value of r
 	    }
 	if (found)
 	    break;  // preserve value of c
 	}
     if (found)
 	{
 
 	/* allocate plan, reduce resources, calc new resources and pos.
 	 *   move machine from old array pos to new pos. (slPopHead, slAddHead)
 	 *   update its stats, and if heaps, update heaps.
 	 */
 
 
 	if (pm) 
 	    {
 	    //pmClear(pm);
 	    //pmPrintf(pm, "found hardware cpu %d ram %d in machine %s c=%d r=%d batch=%s", 
 		//batch->cpu, batch->ram, mach->name, c, r, batch->name);
 	    //pmSend(pm, rudpOut);
 	    }
 
 	allocateResourcesToMachine(mach, batch, user, &c, &r);
 
 	if (pm) 
 	    {
 	    //pmClear(pm);
 	    //pmPrintf(pm, "remaining hardware c=%d r=%d", c, r);
 	    //pmSend(pm, rudpOut);
 	    }
      
 	if (c < 1 || r < 1)
 	    freeMem(el);  /* this node has insufficient resources remaining */
 	else
 	    slAddHead(&perCpu[c][r], el);
 
 	}
     else
 	{
 
 	if (pm) 
 	    {
 	    //pmClear(pm);
 	    //pmPrintf(pm, "no suitable machines left, removing from planning:  user %s; lucky batch %s", 
 		//user->name, batch->name);
 	    //pmSend(pm, rudpOut);
 	    }
 
 	/* no suitable machine found */
 	/* remove batch from the allocating */
 	batch->planning = FALSE;
 	--user->planningBatchCount;
 	}
 
     }
 
 
 /* free arrays when finished */
 for (c = 1; c <= maxCpuInCluster; ++c)
     {
     for (r = 1; r <= maxRamInCluster; ++r)
 	{
 	slFreeList(&perCpu[c][r]);
 	}
     freeMem(perCpu[c]);
     }
 freeMem(perCpu);
 
 
 /* allocate machines to busy, ready, free lists */
 for (mach = machineList; mach != NULL; mach = mach->next)
      {
      if (!mach->isDead)
 	{
 	/* See if any machines have enough resources free
 	 *  to start their plan, and start those jobs.
          *  If so, add them to the readyMachines list. */
 	
 	struct dlNode *mNode = mach->node;
 	dlRemove(mNode);  /* remove it from whichever list it was on */
 
 	if (mach->plannedBatches) /* was anything planned for this machine? */
     	    {
 	    boolean couldRun = FALSE;
 	    struct batch *batch = findRunnableBatch(mach, NULL, &couldRun);
 	    if (batch)
 		dlAddTail(readyMachines, mNode);
 	    else
 		if (couldRun)
     		    dlAddTail(blockedMachines, mNode);
 		else
     		    dlAddTail(busyMachines, mNode);
 	    }
 	else
 	    {
 	    struct dlNode *jobNode = mach->jobs->head;
 	    if (dlEnd(jobNode))
 		dlAddTail(freeMachines, mNode);
 	    else
 		dlAddTail(busyMachines, mNode);
 	    }
 
 	}
      }
 
 
 if (pm) 
     {
     pmClear(pm);
     pmPrintf(pm, 
 	"# machines:"
 	" busy %d" 
 	" ready %d" 
 	" blocked %d" 
 	" free %d" 
 	" dead %d" 
 	, dlCount(busyMachines)
 	, dlCount(readyMachines)
 	, dlCount(blockedMachines)
 	, dlCount(freeMachines)
 	, dlCount(deadMachines)
     );
     pmSend(pm, rudpOut);
 
     pmSendString(pm, rudpOut, "end of planning"); 
 
     pmSendString(pm, rudpOut, "");
     }
 
 needsPlanning = FALSE;
 logDebug("plan finished");
 
 }
 
 
 boolean runNextJob()
 /* Assign next job in pending queue if any to a machine. */
 {
 
 /* give blocked machines another chance */
 while (!dlEmpty(blockedMachines))
     {
     struct dlNode *mNode;
     mNode = dlPopHead(blockedMachines);
     dlAddTail(readyMachines, mNode);
     }
 
 while(TRUE)
     {
 
     if (dlEmpty(readyMachines))
      return FALSE;
 
     if (dlEmpty(freeSpokes))
      return FALSE;
 
     struct dlNode *mNode;
     struct machine *machine;
      /* Get free machine */
     mNode = dlPopHead(readyMachines);
     machine = mNode->val;
 
     if (!machine->plannedBatches) /* anything to do for this machine? */
 	{
 	struct dlNode *jobNode = machine->jobs->head;
 	if (dlEnd(jobNode))
 	    dlAddTail(freeMachines, mNode);
 	else
 	    dlAddTail(busyMachines, mNode);
 	continue;
 	}
 
     boolean couldRun = FALSE;    /* was it limited only by runningCount? */
     struct slRef *batchEl = NULL;
     struct batch *batch = findRunnableBatch(machine, &batchEl, &couldRun);
 
     if (!batch)
 	{ 
 	if (couldRun)
 	    dlAddTail(blockedMachines, mNode);
 	else
 	    dlAddTail(busyMachines, mNode);
 	continue;
 	}
 
     /* remove the batch from the planning list */
     if (!slRemoveEl(&machine->plannedBatches, batchEl))
 	{ /* this should not happen */
 	logWarn("unable to remove batch from machine->plannedBatches, length: %d\n", 
 	    slCount(machine->plannedBatches));
 	dlAddTail(freeMachines, mNode);
 	continue;
 	}
 
     freeMem(batchEl);
 
     if (batch->queuedCount == 0)
 	{
 	/* probably the batch has been chilled */
 	/* needsPlanning=TRUE and a new plan will come along soon. */
 	/* just put it back on the ready list, it will get looked at again */
 	/* this has the effect of removing the batch from this machine's plannedBatches */
 	dlAddTail(readyMachines, mNode);  
 	continue;
 	}
 
     struct user *user = batch->user; 
 
     struct dlNode *jNode, *sNode;
     struct spoke *spoke;
     struct job *job;
 
     /* Get free spoke and move them to busy lists. */
     machine->lastChecked = now; 
     sNode = dlPopHead(freeSpokes);
     dlAddTail(busySpokes, sNode);
     spoke = sNode->val;
 
     /* Get active batch from user and take job off of it.
      * If it's the last job in the batch move batch to
      * finished list. */
     jNode = dlPopHead(batch->jobQueue);
     dlAddTail(runningJobs, jNode);
     job = jNode->val;
     dlAddTail(hangJobs, job->hangNode);
     ++batch->runningCount;
     --batch->queuedCount;
     ++user->runningCount;
     unactivateBatchIfEmpty(batch); 
 
     /* Tell machine, job, and spoke about each other. */
     dlAddTail(machine->jobs, job->jobNode);
 
     /* just put it back on the ready list, it will get looked at again */
     dlAddTail(readyMachines, mNode);
 
     job->machine = machine;
     job->lastChecked = job->startTime = job->lastClockIn = now;
     spokeSendJob(spoke, machine, job);
     return TRUE;
     }
 }
 
 void runner(int count)
 /* Try to run a couple of jobs. */
 {
 while (--count >= 0)
     if (!runNextJob())
         break;
 }
 
 struct machine *machineNew(char *name, char *tempDir, struct machSpec *m)
 /* Create a new machine structure. */
 {
 struct machine *mach;
 AllocVar(mach);
 mach->name = cloneString(name);
 mach->tempDir = cloneString(tempDir);
 AllocVar(mach->node);
 mach->node->val = mach;
 mach->machSpec = m;
 mach->jobs = newDlList();
 return mach;
 }
 
 void machineFree(struct machine **pMach)
 /* Delete machine structure. */
 {
 struct machine *mach = *pMach;
 if (mach != NULL)
     {
     freeMem(mach->node);
     freeMem(mach->name);
     freeMem(mach->tempDir);
     machSpecFree(&mach->machSpec);
     freeDlList(&mach->jobs);
     freez(pMach);
     }
 }
 
 struct machine *doAddMachine(char *name, char *tempDir, bits32 ip, struct machSpec *m)
 /* Add machine to pool.  If you don't know ip yet just pass
  * in 0 for that argument. */
 {
 struct machine *mach;
 mach = machineNew(name, tempDir, m);
 mach->ip = ip;
 dlAddTail(freeMachines, mach->node);
 slAddHead(&machineList, mach);
 needsPlanning = TRUE;  
 return mach;
 }
 
 void addMachine(char *line)
 /* Process message to add machine to pool. */
 {
 char *name = nextWord(&line);
 if (hashLookup(machineHash, name))  /* ignore duplicate machines */
     {
     warn("machine already added: %s",  name);
     return;
     }
 char *param2 = nextWord(&line);
 struct machSpec *m = NULL;
 AllocVar(m);
 if (!line)
     {  /* for backwards compatibility, allow running without full spec,
 	* just copy the machSpec of the first machine on the list */
     *m = *machineList->machSpec;
     m->name = cloneString(name);
     m->tempDir = cloneString(param2);
     if (!m->tempDir)
 	{
 	freeMem(m);
 	warn("incomplete addMachine request");
 	return;
 	}
     }
 else
     {
     m->name = cloneString(name);
     m->cpus = atoi(param2);	
     m->ramSize = atoi(nextWord(&line));	
     m->tempDir = cloneString(nextWord(&line));
     m->localDir = cloneString(nextWord(&line));
     m->localSize = atoi(nextWord(&line));	
     m->switchName = cloneString(nextWord(&line));
     if (!m->switchName)
 	{
 	freeMem(m);
 	warn("incomplete addMachine request");
 	return;
 	}
     }
 
 doAddMachine(name, m->tempDir, 0, m);
 runner(1);
 }
 
 struct machine *findMachine(char *name)
 /* Find named machine. */
 {
 struct machine *mach;
 for (mach = machineList; mach != NULL; mach = mach->next)
      {
      if (sameString(mach->name, name))
          return mach;
      }
 return NULL;
 }
 
 struct job *jobFind(struct dlList *list, int id)
 /* Find node of job with given id on list.  Return NULL if
  * not found. */
 {
 struct dlNode *el;
 struct job *job;
 for (el = list->head; !dlEnd(el); el = el->next)
     {
     job = el->val;
     if (job->id == id)
         return job;
     }
 return NULL;
 }
 
 struct job *findWaitingJob(int id)
 /* Find job that's waiting (as opposed to running).  Return
  * NULL if it can't be found. */
 {
 /* If it's not running look in user job queues. */
 struct user *user;
 struct job *job = NULL;
 for (user = userList; user != NULL; user = user->next)
     {
     struct dlNode *node;
     for (node = user->curBatches->head; !dlEnd(node); node = node->next)
 	{
 	struct batch *batch = node->val;
 	if ((job = jobFind(batch->jobQueue, id)) != NULL)
 	    break;
 	}
     if (job != NULL)
 	break;
     }
 return job;
 }
 
 
 void requeueJob(struct job *job)
 /* Move job from running queue back to a user pending
  * queue.  This happens when a node is down or when
  * it missed the message about a job. */
 {
 struct batch *batch = job->batch;
 struct user *user = batch->user;
 job->machine = NULL;
 dlRemove(job->node);
 dlAddTail(batch->jobQueue, job->node);
 dlRemove(job->jobNode);
 dlRemove(job->hangNode);
 batch->runningCount -= 1;
 batch->queuedCount += 1;
 user->runningCount -= 1;
 dlRemove(batch->node);
 dlAddHead(user->curBatches, batch->node);
 dlRemove(user->node);
 dlAddHead(queuedUsers, user->node);
 
 if (batch->planCount == 0)
     needsPlanning = TRUE;
 
 updateUserPriority(user);
 updateUserMaxJob(user);
 updateUserSickNodes(user);
 }
 
 void requeueAllJobs(struct machine *mach, boolean doDead)
 /* Requeue all jobs on machine. */
 {
 struct dlNode *next = NULL;
 struct dlNode *jobNode = NULL;
 for (jobNode = mach->jobs->head; !dlEnd(jobNode); jobNode = next)
     {
     struct job *job = jobNode->val;
     next = jobNode->next;
     if (doDead)
 	{
 	struct slInt *i = slIntNew(job->id);
 	slAddHead( &mach->deadJobIds, i ); 
 	}
     /* this affects the mach->jobs list itself by removing this node */
     requeueJob(job);  
     }
 }
 
 boolean removeMachine(char *machName, char *user, char *reason)
 /* Remove machine from pool. */
 {
 struct machine *mach;
 if ((mach = findMachine(machName)))
     {
     // logged as an error because it's important for admins to know that there is an
     // error with this machine
     logError("hub: user %s removed machine %s because: %s",user,machName,reason);
     requeueAllJobs(mach, FALSE);
     dlRemove(mach->node);
     slRemoveEl(&machineList, mach);
     hashRemove(machineHash, mach->name);
     machineFree(&mach);
     return TRUE;
     }
 else
     {
     logDebug("hub: user %s wanted to removed machine %s because: %s but machine was not found",user,machName,reason);
     return FALSE;
     }
 }
 
 
 void removeMachineAcknowledge(char *line, struct paraMessage *pm)
 /* Remove machine and send response back. */
 {
 char *machName = nextWord(&line);
 char *user = nextWord(&line);
 char *reason = line;
 machName = trimSpaces(machName);
 char *retVal = "ok";
 if (!removeMachine(machName, user, reason))
     retVal = "Machine not found.";
 pmSendString(pm, rudpOut, retVal);
 pmSendString(pm, rudpOut, "");
 }
 
 
 
 void machineDown(struct machine *mach)
 /* Mark machine as down and move it to dead list. */
 {
 dlRemove(mach->node);
 mach->lastChecked = time(NULL);
 mach->isDead = TRUE;
 dlAddTail(deadMachines, mach->node);
 }
 
 
 void buryMachine(struct machine *machine)
 /* Reassign jobs that machine is processing and bury machine
  * in dead list. */
 {
 requeueAllJobs(machine, TRUE);  
 machineDown(machine);
 }
 
 void nodeDown(char *line)
 /* Deal with a node going down - move it to dead list and
  * put job back on job list. */
 {
 struct machine *mach;
 char *machName = nextWord(&line);
 
 if ((mach = findMachine(machName)) != NULL)
     buryMachine(mach);
 runner(1);
 }
 
 char *exeFromCommand(char *cmd)
 /* Return executable name (without path) given command line. */
 {
 static char exe[128];
 char *s,*e;
 int i, size;
 int lastSlash = -1;
 
 /* Isolate first space-delimited word between s and e. */
 s = skipLeadingSpaces(cmd);
 e = skipToSpaces(cmd);
 if (e == NULL) 
     e = s + strlen(s);
 size = e - s;
 
 /* Find last '/' in this word if any, and reposition s after it. */
 for (i=0; i<size; ++i)
     {
     if (s[i] == '/')
         lastSlash = i;
     }
 if (lastSlash > 0)
     s += lastSlash + 1;
 
 /* Copy whats left to string to return . */
 size = e - s;
 if (size >= sizeof(exe))
     size = sizeof(exe)-1;
 memcpy(exe, s, size);
 exe[size] = 0;
 return exe;
 }
 
 struct job *jobNew(char *cmd, char *userName, char *dir, char *in, char *out, 
 	float cpus, long long ram, char *results, boolean forQueue)
 /* Create a new job structure */
 {
 struct job *job;
 struct user *user = findUser(userName);
 struct batch *batch = findBatch(user, results, FALSE);
 
 if (forQueue && (batch->continuousCrashCount >= sickBatchThreshold))
     {
     warn("not adding job [%s] for %s, sick batch %s", cmd, userName, batch->name);
     unactivateBatchIfEmpty(batch);  /* handle side-effect of findBatch call above */
     return NULL;
     }
 
 AllocVar(job);
 AllocVar(job->jobNode);
 job->jobNode->val = job;
 AllocVar(job->node);
 job->node->val = job;
 job->id = ++nextJobId;
 job->exe = cloneString(exeFromCommand(cmd));
 job->cmd = cloneString(cmd);
 job->batch = batch;
 job->dir = hashStoreName(stringHash, dir);
 job->in = cloneString(in);
 job->out = cloneString(out);
 job->cpus = cpus;
 job->ram = ram;
 AllocVar(job->hangNode);
 job->hangNode->val = job;
 return job;
 }
 
 void jobFree(struct job **pJob)
 /* Free up a job. */
 {
 struct job *job = *pJob;
 if (job != NULL)
     {
     freeMem(job->jobNode);
     freeMem(job->node);
     freeMem(job->exe);
     freeMem(job->cmd);
     freeMem(job->in);
     freeMem(job->out);
     freeMem(job->err);
     freeMem(job->hangNode);
     freez(pJob);
     }
 }
 
 boolean sendViaSpoke(struct machine *machine, char *message)
 /* Send a message to machine via spoke. */
 {
 struct dlNode *node = dlPopHead(freeSpokes);
 struct spoke *spoke;
 if (node == NULL)
     {
     logDebug("hub: out of spokes!");
     return FALSE;
     }
 dlAddTail(busySpokes, node);
 spoke = node->val;
 spokeSendMessage(spoke, machine, message);
 return TRUE;
 }
 
 void checkDeadNodesASAP()
 /* Check dead nodes ASAP, some may have been fixed. 
  * It tweaks the time since last check on all dead machines
  * so that grave digger will send them resurrect messages
  * to see if they are alive. */
 {
 struct dlNode *mNode;
 struct machine *machine;
 for (mNode = deadMachines->head; !dlEnd(mNode); mNode = mNode->next)
     {
     machine = mNode->val;
     machine->lastChecked = now - MINUTE * machineCheckPeriod;
     }
 }
 
 void checkPeriodically(struct dlList *machList, int period, char *checkMessage,
 	int spokesToUse)
 /* Periodically send checkup messages to machines on list. */
 {
 struct dlNode *mNode;
 struct machine *machine;
 char message[512];
 int i;
 
 safef(message, sizeof(message), "%s", checkMessage);
 for (i=0; i<spokesToUse; ++i)
     {
     /* If we have some free spokes and some busy machines, and
      * the busy machines haven't been checked for a while, go
      * check them. */
     if (dlEmpty(freeSpokes) || dlEmpty(machList))
         break;
     machine = machList->head->val;
     if (now - machine->lastChecked < period)
         break;
     machine->lastChecked = now;
     mNode = dlPopHead(machList);
     dlAddTail(machList, mNode);
     sendViaSpoke(machine, message);
     logDebug("hub: sending resurrect message to %s",machine->name);
     }
 }
 
 void hangman(int spokesToUse)
 /* Check that jobs are alive, sense if nodes are dead.  Also send message for 
  * busy nodes to check in for specific jobs, in case we missed one of their earlier
  * jobDone messages. */
 {
 int i, period = jobCheckPeriod*MINUTE;
 struct dlNode *hangNode;
 struct job *job;
 struct machine *machine;
 
 for (i=0; i<spokesToUse; ++i)
     {
     if (dlEmpty(freeSpokes) || dlEmpty(hangJobs))
         break;
     job = hangJobs->head->val;
     if (now - job->lastChecked < period)
         break;
     job->lastChecked = now;
     hangNode = dlPopHead(hangJobs);
     dlAddTail(hangJobs, hangNode);
     machine = job->machine;
     if (now - job->lastClockIn >= MINUTE * assumeDeadPeriod)
 	{
 	warn("hub: node %s running %d looks dead, burying", machine->name, job->id);
 	buryMachine(machine);
 	break;  /* jobs list has been freed by bury, break immediately */
 	}
     else
 	{
 	char message[512];
 	safef(message, sizeof(message), "check %d", job->id);
 	sendViaSpoke(machine, message);
 	}
     }
 }
 
 void graveDigger(int spokesToUse)
 /* Check out dead nodes.  Try and resurrect them periodically. */
 {
 checkPeriodically(deadMachines, MINUTE * machineCheckPeriod, "resurrect", 
 	spokesToUse);
 }
 
 
 void flushResults(char *batchName)
 /* Flush all results files. batchName can be NULL for all. */
 {
 struct resultQueue *rq;
 for (rq = resultQueues; rq != NULL; rq = rq->next)
     {
     if (!batchName || (rq->name == batchName))
 	if (rq->f != NULL)
 	   fflush(rq->f);
     }
 }
 
 void changeFileOwner(char *fileName, char *newOwner)
 /* Attempt to change ownership of file. */
 {
 struct passwd *pwd = getpwnam(newOwner);
 if (pwd == NULL)
     {
     perror("getpwnam");
     return;
     }
 if (chown(fileName, pwd->pw_uid, -1) == -1)
     perror("chown");
 }
 
 void writeResults(char *fileName, char *userName, char *machineName,
 	int jobId, char *exe, time_t submitTime, time_t startTime,
 	char *errFile, char *cmd,
 	char *status, char *uTime, char *sTime)
 /* Write out job results to output queue.  This
  * will create the output queue if it doesn't yet
  * exist. */
 {
 struct resultQueue *rq;
 for (rq = resultQueues; rq != NULL; rq = rq->next)
     if (sameString(fileName, rq->name))
         break;
 if (rq == NULL)
     {
     AllocVar(rq);
     slAddHead(&resultQueues, rq);
     rq->name = fileName;
     rq->f = fopen(rq->name, "a");
     if (rq->f == NULL)
         warn("hub: couldn't open results file %s", rq->name);
     rq->lastUsed = now;
     changeFileOwner(fileName, userName);
     }
 if (rq->f != NULL)
     {
     fprintf(rq->f, "%s %s %d %s %s %s %lu %lu %lu %s %s '%s'\n",
         status, machineName, jobId, exe, 
 	uTime, sTime, 
 	submitTime, startTime, now,
 	userName, errFile, cmd);
     fflush(rq->f);
     rq->lastUsed = now;
     }
 }
 
 void writeJobResults(struct job *job, char *status,
 	char *uTime, char *sTime)
 /* Write out job results to output queue.  This
  * will create the output queue if it doesn't yet
  * exist. */
 {
 struct batch *batch = job->batch;
 if (sameString(status, "0"))
     {
     ++finishedJobCount;
     ++batch->doneCount;
     batch->doneTime += (now - job->startTime);
     ++batch->user->doneCount;
     batch->continuousCrashCount = 0;
     /* remember the continuous number of times this batch has crashed on this node */
     hashRemove(batch->sickNodes, job->machine->name);
     hashRemove(batch->user->sickNodes, job->machine->name);
     }
 else
     {
     ++crashedJobCount;
     ++batch->crashCount;
     ++batch->continuousCrashCount;
     /* remember the continuous number of times this batch has crashed on this node */
     hashIncInt(batch->sickNodes, job->machine->name);
     updateUserSickNode(batch->user, job->machine->name);  
     }
 
 
 writeResults(batch->name, batch->user->name, job->machine->name,
 	job->id, job->exe, job->submitTime, 
 	job->startTime, job->err, job->cmd,
 	status, uTime, sTime);
 }
 
 void resultQueueFree(struct resultQueue **pRq)
 /* Free up a results queue, closing file if open. */
 {
 struct resultQueue *rq = *pRq;
 if (rq != NULL)
     {
     carefulCloseWarn(&rq->f);
     freez(pRq);
     }
 }
 
 
 void sweepResultsWithRemove(char *name)
 /* Get rid of result queues that haven't been accessed for
  * a while. Also remove any matching name if not NULL.
  * Flushes all results. */
 {
 struct resultQueue *newList = NULL, *rq, *next;
 for (rq = resultQueues; rq != NULL; rq = next)
     {
     next = rq->next;
     if ((now - rq->lastUsed > 1*MINUTE) || (name && name == rq->name))
 	{
 	logDebug("hub: closing results file %s", rq->name);
         resultQueueFree(&rq);
 	}
     else
         {
 	slAddHead(&newList, rq);
 	}
     }
 slReverse(&newList);
 resultQueues = newList;
 flushResults(NULL);
 }
 
 void saveJobId()
 /* Save job ID. */
 {
 rewind(jobIdFile);
 writeOne(jobIdFile, nextJobId);
 fflush(jobIdFile);
 if (ferror(jobIdFile))
     errnoAbort("can't write job id file %s", jobIdFileName);
 }
 
 void openJobId()
 /* Open file with jobID in it and read jobId.  Bump it
  * by 100000 in case we crashed to avoid reusing job
  * id's, but do reuse every 2 billion. Let command line
  * overwrite this though . */
 {
 jobIdFile = fopen(jobIdFileName, "r+");
 if (jobIdFile != NULL)
     {
     (void)readOne(jobIdFile, nextJobId);
     nextJobId += 100000;
     }
 else
     jobIdFile = mustOpen(jobIdFileName, "w");
 if (nextJobId < 0)
     nextJobId = 0;
 nextJobId = optionInt("nextJobId", nextJobId);
 }
 
 void processHeartbeat()
 /* Check that system is ok.  See if we can do anything useful. */
 {
 int spokesToUse;
 
 if (needsPlanning)
     plan(NULL);
 
 runner(30);
 spokesToUse = dlCount(freeSpokes);
 if (spokesToUse > 0)
     {
     spokesToUse >>= 1;
     spokesToUse -= 1;
     if (spokesToUse < 1) spokesToUse = 1;
     graveDigger(spokesToUse);
     hangman(spokesToUse);
     sweepResultsWithRemove(NULL);
     saveJobId();
     }
 }
 
 boolean sendKillJobMessage(struct machine *machine, int jobId)
 /* Send message to compute node to kill job there. */
 {
 char message[64];
 safef(message, sizeof(message), "kill %d", jobId);
 logDebug("hub: %s %s", machine->name, message);
 if (!sendViaSpoke(machine, message))
     {
     return FALSE;
     }
 return TRUE;
 }
 
 
 void nodeAlive(char *line)
 /* Deal with message from node that says it's alive.
  * Move it from dead to free list.  The major complication
  * of this occurs if the node was running a job and it
  * didn't really go down, we just lost communication with it.
  * In this case we will have restarted the job elsewhere, and
  * that other copy could be conflicting with the copy of
  * the job the node is still running. */
 {
 char *name = nextWord(&line), *jobIdString;
 int jobId;
 struct machine *mach;
 struct dlNode *node;
 boolean hostFound = FALSE;
 for (node = deadMachines->head; !dlEnd(node); node = node->next)
     {
     mach = node->val;
     if (sameString(mach->name, name) && mach->isDead)
         {
 	hostFound = TRUE;
 	dlRemove(node);
 	dlAddTail(freeMachines, node);
 	needsPlanning = TRUE;
 	mach->isDead = FALSE;
 
 	if (mach->deadJobIds != NULL)
 	    {
 	    struct dyString *dy = newDyString(0);
 	    struct slInt *i = mach->deadJobIds;
 	    dyStringPrintf(dy, "hub: node %s assigned ", name); 
 	    for(i = mach->deadJobIds; i; i = i->next)
 		dyStringPrintf(dy, "%d ", i->val);
 	    dyStringPrintf(dy, "came back.");
 	    logWarn("%s", dy->string);
 	    dyStringFree(&dy);
 	    while ((jobIdString = nextWord(&line)) != NULL)
 	        {
 		jobId = atoi(jobIdString);
                 if ((i = slIntFind(mach->deadJobIds, jobId)))
 		    {
 		    struct job *job;
 		    warn("hub: Looks like %s is still keeping track of %d", name, jobId);
 		    if ((job = findWaitingJob(jobId)) != NULL)
 			{
 			warn("hub: Luckily rerun of job %d has not yet happened.", 
                              jobId);
 			job->machine = mach;
 			dlAddTail(mach->jobs, job->jobNode);
 			job->lastChecked = mach->lastChecked = job->lastClockIn = now;
 			dlRemove(job->node);
 			dlAddTail(runningJobs, job->node);
 			dlRemove(mach->node);
 			dlAddTail(busyMachines, mach->node);
 			dlAddTail(hangJobs, job->hangNode);
 			struct batch *batch = job->batch;
 			struct user *user = batch->user;
 			batch->runningCount += 1;
 			batch->queuedCount -= 1;
 			user->runningCount += 1;
 			}
 		    else if ((job = jobFind(runningJobs, jobId)) != NULL)
 		        {
 			/* Job is running on resurrected machine and another.
 			 * Kill it on both since the output it created could
 			 * be corrupt at this point.  Then add it back to job
 			 * queue. */
 			warn("hub: Job %d is running on %s as well.", jobId,
                              job->machine->name);
 			sendKillJobMessage(mach, job->id);
 			sendKillJobMessage(job->machine, job->id);
 			requeueJob(job);
 			}
 		    else
 		        {
 			/* This case should be very rare.  It should happen when
 			 * a node is out of touch for 2 hours, but when it comes
 			 * back is running a job that we reran to completion
 			 * on another node. */
 			warn("hub: Job %d has finished running, there is a conflict. "
 			     "Data may be corrupted, and it will take a lot of logic to fix.", 
                              jobId);
 			}
 		    }
 		}
 	    }
 	slFreeList(&mach->deadJobIds);
 	runner(1);
 	break;
 	}
     }
 if (!hostFound)
     {
     warn("hub 'alive $HOST' msg handler: unable to resurrect host %s, "
 	 "not find in deadMachines list.",  name);
     }
 }
 
 void recycleMachine(struct machine *mach)
 /* Recycle machine into free list. */
 {
 dlRemove(mach->node);
 dlAddTail(readyMachines, mach->node);
 }
 
 void recycleJob(struct job *job)
 /* Remove job from lists and free up memory associated with it. */
 {
 dlRemove(job->node);
 jobFree(&job);
 }
 
 void nodeCheckIn(char *line)
 /* Deal with check in message from node. */
 {
 char *machine = nextWord(&line);
 char *jobIdString = nextWord(&line);
 char *status = nextWord(&line);
 int jobId = atoi(jobIdString);
 if (status != NULL)
     {
     struct job *job = jobFind(runningJobs, jobId);
     if (job != NULL)
 	{
         job->lastClockIn = now;
         if (!sameWord(job->machine->name, machine))
             {
             logError("hub: checkIn %s %s %s should be from %s",
                      machine, jobIdString, status, job->machine->name);
             }
 	}
     else
         {
         logError("hub: checkIn of unknown job: %s %s %s",
                  machine, jobIdString, status);
         }
     if (sameString(status, "free"))
 	{
 	/* Node thinks it's free, we think it has a job.  Node
 	 * must have missed our job assignment... */
 	if (job != NULL)
 	    {
 	    struct machine *mach = job->machine;
 	    if (mach != NULL)
 	        {
 	        dlRemove(mach->node);
 	        dlAddTail(readyMachines, mach->node); 
 		}
 	    requeueJob(job);
 	    logDebug("hub:  requeueing job in nodeCheckIn");
 	    runner(1);
 	    }
 	}
     }
 }
 
 void recycleSpoke(char *spokeName)
 /* Try to find spoke and put it back on free list. */
 {
 struct dlNode *node;
 struct spoke *spoke;
 boolean foundSpoke = FALSE;
 for (node = busySpokes->head; !dlEnd(node); node = node->next)
     {
     spoke = node->val;
     if (sameString(spoke->name, spokeName))
         {
 	dlRemove(spoke->node);
 	dlAddTail(freeSpokes, spoke->node);
 	foundSpoke = TRUE;
 	break;
 	}
     }
 if (!foundSpoke)
     warn("Couldn't free spoke %s", spokeName);
 else
     runner(1);
 }
 
 int addJob(char *userName, char *dir, char *in, char *out, char *results,
 	float cpus, long long ram, char *command)
 /* Add job to queues. */
 {
 struct job *job;
 struct user *user;
 struct batch *batch;
 
 job = jobNew(command, userName, dir, in, out, cpus, ram, results, TRUE);
 if (!job)
     {
     return 0;
     }
 batch = job->batch;
 dlAddTail(batch->jobQueue, job->node);
 ++batch->queuedCount;
 
 int oldCpu = batch->cpu;  
 int oldRam = batch->ram; 
 if (job->cpus) 
     batch->cpu = (job->cpus + 0.5) / cpuUnit;  /* rounding */
 else
     {
     /* if no cpus specified, use the default */
     batch->cpu = defaultJobCpu;
     job->cpus = defaultJobCpu * cpuUnit;
     }
 if (job->ram) 
     batch->ram = 1 + (job->ram - 1) / ramUnit;   /* any remainder will be rounded upwards
         e.g.  1 to 1024m --> 1G but 1025m --> 2G if unit is 1G.   0m would just cause default ram usage. */
 else
     {
     /* if no ram size specified, use the default */
     batch->ram = defaultJobRam;
     job->ram = defaultJobRam * ramUnit;
     }
 
 if (oldCpu != batch->cpu || oldRam != batch->ram)
     {
     needsPlanning = TRUE; 
     }
 
 if (batch->planCount == 0)
     {
     needsPlanning = TRUE; 
     }
 user = batch->user;
 dlRemove(user->node);
 dlAddTail(queuedUsers, user->node);
 job->submitTime = time(NULL);
 return job->id;
 }
 
 int addJobFromMessage(char *line, int addJobVersion)
 /* Parse out addJob message and add job to queues. */
 {
 char *userName, *dir, *in, *out, *results, *command;
 float cpus = 0;
 long long ram = 0;
 if ((userName = nextWord(&line)) == NULL)
     return 0;
 if ((dir = nextWord(&line)) == NULL)
     return 0;
 if ((in = nextWord(&line)) == NULL)
     return 0;
 if ((out = nextWord(&line)) == NULL)
     return 0;
 if ((results = nextWord(&line)) == NULL)
     return 0;
 if (addJobVersion == 2)
     {
     char *tempCpus = NULL;
     char *tempRam = NULL;
     if ((tempCpus = nextWord(&line)) == NULL)
 	return 0;
     if ((tempRam = nextWord(&line)) == NULL)
 	return 0;
     cpus = sqlFloat(tempCpus);
     ram = sqlLongLong(tempRam);
     }
 if (line == NULL || line[0] == 0)
     return 0;
 command = line;
 return addJob(userName, dir, in, out, results, cpus, ram, command);
 }
 
 void addJobAcknowledge(char *line, struct paraMessage *pm, int addJobVersion)
 /* Add job.  Line format is <user> <dir> <stdin> <stdout> <results> <command> 
  * Returns job ID or 0 if a problem.  Send jobId back to client. */
 {
 int id = addJobFromMessage(line, addJobVersion);
 pmClear(pm);
 pmPrintf(pm, "%d", id);
 pmSend(pm, rudpOut);
 runner(1);
 }
 
 int setMaxJob(char *userName, char *dir, int maxJob)
 /* Set new maxJob for batch */
 {
 struct user *user = findUser(userName);
 struct batch *batch = findBatch(user, dir, TRUE);
 if (user == NULL) return -2;
 if (batch == NULL) return -2;
 needsPlanning = TRUE;
 batch->maxJob = maxJob;
 updateUserMaxJob(user);
 if (maxJob>=-1)
     logDebug("paraHub: User %s set maxJob=%d for batch %s", userName, maxJob, dir);
 return maxJob;
 }
 
 
 int setMaxJobFromMessage(char *line)
 /* Parse out setMaxJob message and set new maxJob for batch, update user-maxJob. */
 {
 char *userName, *dir;
 int maxJob;
 
 if ((userName = nextWord(&line)) == NULL)
     return -2;
 if ((dir = nextWord(&line)) == NULL)
     return -2;
 if ((maxJob = atoi(nextWord(&line))) < -1)
     return -2;
 return setMaxJob(userName, dir, maxJob);
 }
 
 
 void setMaxJobAcknowledge(char *line, struct paraMessage *pm)
 /* Set batch maxJob.  Line format is <user> <dir> <maxJob>
 * Returns new maxJob or -2 if a problem.  Send new maxJob back to client. */
 {
 int maxJob = setMaxJobFromMessage(line);
 pmClear(pm);
 pmPrintf(pm, "%d", maxJob);
 pmSend(pm, rudpOut);
 }
 
 int resetCounts(char *userName, char *dir)
 /* Reset done and crashed batch counts */
 {
 struct user *user = findUser(userName);
 struct batch *batch = findBatch(user, dir, TRUE);
 if (user == NULL) return -2;
 if (batch == NULL) return -2;
 batch->doneCount = 0;
 batch->doneTime = 0;
 batch->crashCount = 0;
 logDebug("paraHub: User %s reset done and crashed counts for batch %s", userName, dir);
 return 0;
 }
 
 int resetCountsFromMessage(char *line)
 /* Parse out resetCounts message and reset counts for batch. */
 {
 char *userName, *dir;
 if ((userName = nextWord(&line)) == NULL)
     return -2;
 if ((dir = nextWord(&line)) == NULL)
     return -2;
 return resetCounts(userName, dir);
 }
 
 void resetCountsAcknowledge(char *line, struct paraMessage *pm)
 /* Resets batch counts for done and crashed.  Line format is <user> <dir>
 * Returns new maxJob or -2 if a problem.  Send new maxJob back to client. */
 {
 int resetCounts = resetCountsFromMessage(line);
 pmClear(pm);
 pmPrintf(pm, "%d",resetCounts);
 pmSend(pm, rudpOut);
 }
 
 
 int freeBatch(char *userName, char *batchName)
 /* Free batch resources, if possible */
 {
 struct user *user = findUser(userName);
 if (user == NULL) return -3;
 struct hashEl *hel = hashLookup(stringHash, batchName);
 if (hel == NULL) return -2;
 char *name = hel->name;
 struct batch *batch = findBatchInList(user->curBatches, name);
 if (batch == NULL)
     batch = findBatchInList(user->oldBatches, name);
 if (batch == NULL) return -2;
 /* make sure nothing running and queue empty */
 if (batch->runningCount > 0) return -1;
 if (!dlEnd(batch->jobQueue->head)) return -1;
 sweepResultsWithRemove(name);
 logDebug("paraHub: User %s freed batch %s", userName, batchName);
 /* remove batch from batchList */
 slRemoveEl(&batchList, batch);
 /* remove from user cur/old batches */
 dlRemove(batch->node);
 /* free batch and its members */
 freeMem(batch->node);
 hashRemove(stringHash, name);
 freeDlList(&batch->jobQueue);
 freeHash(&batch->sickNodes);
 freeMem(batch);
 return 0;
 }
 
 int freeBatchFromMessage(char *line)
 /* Parse out freeBatch message and free batch. */
 {
 char *userName, *batchName;
 if ((userName = nextWord(&line)) == NULL)
     return -2;
 if ((batchName = nextWord(&line)) == NULL)
     return -2;
 return freeBatch(userName, batchName);
 }
 
 void freeBatchAcknowledge(char *line, struct paraMessage *pm)
 /* Free batch resources.  Line format is <user> <dir>
  * Returns 0 if success or some err # if a problem.  Sends result back to client. */
 {
 int result = freeBatchFromMessage(line);
 pmClear(pm);
 pmPrintf(pm, "%d",result);
 pmSend(pm, rudpOut);
 }
 
 
 int flushResultsByRequest(char *userName, char *batchName)
 /* Flush results file. Return 0 if nothing running and queue empty. */
 {
 struct user *user = findUser(userName);
 if (user == NULL) return -3;
 struct hashEl *hel = hashLookup(stringHash, batchName);
 if (hel == NULL) return -2;
 char *name = hel->name;
 struct batch *batch = findBatchInList(user->curBatches, name);
 if (batch == NULL)
     batch = findBatchInList(user->oldBatches, name);
 if (batch == NULL) return -2;
 flushResults(batch->name);
 logDebug("paraHub: User %s flushed results batch %s", userName, batchName);
 /* return 0 if nothing running and queue empty */
 if (batch->runningCount > 0) return -1;
 if (!dlEnd(batch->jobQueue->head)) return -1;
 return 0;
 }
 
 int flushResultsFromMessage(char *line)
 /* Parse out flushResults message and flush the results file. */
 {
 char *userName, *batchName;
 if ((userName = nextWord(&line)) == NULL)
     return -2;
 if ((batchName = nextWord(&line)) == NULL)
     return -2;
 return flushResultsByRequest(userName, batchName);
 }
 
 void flushResultsAcknowledge(char *line, struct paraMessage *pm)
 /* Flush results file.  Line format is <user> <dir>
  * Returns 0 if success or some err # if a problem.  Sends result back to client. */
 {
 int result = flushResultsFromMessage(line);
 pmClear(pm);
 pmPrintf(pm, "%d",result);
 pmSend(pm, rudpOut);
 }
 
 
 int clearSickNodes(char *userName, char *dir)
 /* Clear sick nodes for batch */
 {
 struct user *user = findUser(userName);
 struct batch *batch = findBatch(user, dir, TRUE);
 if (user == NULL) return -2;
 if (batch == NULL) return -2;
 hashFree(&batch->sickNodes);
 batch->sickNodes = newHashExt(6, FALSE);
 batch->continuousCrashCount = 0;  /* reset so user can retry */
 needsPlanning = TRUE;
 updateUserSickNodes(user);
 logDebug("paraHub: User %s cleared sick nodes for batch %s", userName, dir);
 return 0;
 }
 
 int clearSickNodesFromMessage(char *line)
 /* Parse out clearSickNodes message and call clear nodes. */
 {
 char *userName, *dir;
 if ((userName = nextWord(&line)) == NULL)
     return -2;
 if ((dir = nextWord(&line)) == NULL)
     return -2;
 return clearSickNodes(userName, dir);
 }
 
 void clearSickNodesAcknowledge(char *line, struct paraMessage *pm)
 /* Clear sick nodes from batch.  Line format is <user> <dir>
 * Returns 0 or -2 if a problem back to client. */
 {
 int result = clearSickNodesFromMessage(line);
 pmClear(pm);
 pmPrintf(pm, "%d", result);
 pmSend(pm, rudpOut);
 }
 
 
 int showSickNodes(char *userName, char *dir, struct paraMessage *pm)
 /* Show sick nodes for batch */
 {
 int machineCount = 0, sickCount = 0;
 struct user *user = findUser(userName);
 struct batch *batch = findBatch(user, dir, TRUE);
 if (user == NULL) return -2;
 if (batch == NULL) return -2;
 logDebug("paraHub: User %s ran showSickNodes for batch %s", userName, dir);
 struct hashEl *el, *list = hashElListHash(batch->sickNodes);
 slSort(&list, hashElCmp);
 for (el = list; el != NULL; el = el->next)
     {
     int failures = ptToInt(el->val);
     if (failures >= sickNodeThreshold)
 	{
 	++machineCount;
 	sickCount += failures;
 	pmClear(pm);
 	pmPrintf(pm, "%s %d", el->name, ptToInt(el->val));
 	pmSend(pm, rudpOut);
 	}
     }
 hashElFreeList(&list);
 pmClear(pm);
 pmPrintf(pm, "total sick machines: %d failures: %d", machineCount, sickCount);
 pmSend(pm, rudpOut);
 return 0;
 }
 
 int showSickNodesFromMessage(char *line, struct paraMessage *pm)
 /* Parse out showSickNodes message and print sick nodes for batch. */
 {
 char *userName, *dir;
 if ((userName = nextWord(&line)) == NULL)
     return -2;
 if ((dir = nextWord(&line)) == NULL)
     return -2;
 return showSickNodes(userName, dir, pm);
 }
 
 void showSickNodesAcknowledge(char *line, struct paraMessage *pm)
 /* Show sick nodes from batch.  Line format is <user> <dir>
 * Returns just empty line if a problem back to client. */
 {
 showSickNodesFromMessage(line,pm);
 pmClear(pm);
 pmSend(pm, rudpOut);
 }
 
 
 
 int setPriority(char *userName, char *dir, int priority)
 /* Set new priority for batch */
 {
 struct user *user = findUser(userName);
 struct batch *batch = findBatch(user, dir, TRUE);
 if (user == NULL) return 0;
 if (batch == NULL) return 0;
 needsPlanning = TRUE;
 batch->priority = priority;
 updateUserPriority(user);
 if ((priority>=1)&&(priority<NORMAL_PRIORITY))
     logDebug("paraHub: User %s set priority=%d for batch %s", userName, priority, dir);
 return priority;
 }
 
 
 int setPriorityFromMessage(char *line)
 /* Parse out setPriority message and set new priority for batch, update user-priority. */
 {
 char *userName, *dir;
 int priority;
 
 if ((userName = nextWord(&line)) == NULL)
     return 0;
 if ((dir = nextWord(&line)) == NULL)
     return 0;
 if ((priority = atoi(nextWord(&line))) < 1)
     return 0;
 return setPriority(userName, dir, priority);
 }
 
 
 void setPriorityAcknowledge(char *line, struct paraMessage *pm)
 /* Set batch priority.  Line format is <user> <dir> <priority>
 * Returns new priority or 0 if a problem.  Send new priority back to client. */
 {
 int priority = setPriorityFromMessage(line);
 pmClear(pm);
 pmPrintf(pm, "%d", priority);
 pmSend(pm, rudpOut);
 }
 
 void respondToPing(struct paraMessage *pm)
 /* Someone want's to know we're alive. */
 {
 pmSendString(pm, rudpOut, "ok");
 processHeartbeat();
 }
 
 void finishJob(struct job *job)
 /* Recycle job memory and the machine it's running on. */
 {
 struct machine *mach = job->machine;
 struct batch *batch = job->batch;
 struct user *user = batch->user;
 if (mach != NULL)
     {
     /* see if the node appears to be sick for this batch */
     if (hashIntValDefault(batch->sickNodes, mach->name, 0) >= sickNodeThreshold)
 	{ /* skip adding back to the mach->plannedBatches list */
 	needsPlanning = TRUE;
 	}
     else if (!job->oldPlan)
 	{  /* add its batch to end of list so it gets run again on same machine */
 	struct slRef *el = slRefNew(batch);
 	slAddTail(&mach->plannedBatches, el);
 	}
     recycleMachine(mach);
     /* NOTE I moved the following two lines inside the if (mach != NULL) block
      *  because this may fix the problem where we were seeing users get duplicate
      *  jobDone messages or something like that causing users to get
      *  e.g. -200 user->runningCount which then made them hog the whole cluster.
      */
     batch->runningCount -= 1;
     user->runningCount -= 1;
     }
 dlRemove(job->jobNode);
 dlRemove(job->hangNode);
 recycleJob(job);
 }
 
 boolean removeRunningJob(struct job *job)
 /* Remove job - if it's running kill it,  remove from job list. */
 {
 if (!sendKillJobMessage(job->machine, job->id))
     return FALSE;
 finishJob(job);
 return TRUE;
 }
 
 void removePendingJob(struct job *job)
 /* Remove job from pending queue. */
 {
 struct batch *batch = job->batch;
 recycleJob(job);
 unactivateBatchIfEmpty(batch);
 }
 
 boolean removeJobId(int id)
 /* Remove job of a given id. */
 {
 struct job *job = jobFind(runningJobs, id);
 if (job != NULL)
     {
     logDebug("Removing %s's %s", job->batch->user->name, job->cmd);
     if (!removeRunningJob(job))
         return FALSE;
     }
 else
     {
     job = findWaitingJob(id);
     if (job != NULL)
 	{
 	logDebug("Pending job %s", job->cmd);
 	removePendingJob(job);
 	}
     }
 return TRUE;
 }
 
 void removeJobAcknowledge(char *names, struct paraMessage *pm)
 /* Remove job of a given name(s). */
 {
 char *name;
 char *retVal = "ok";
 
 while ((name = nextWord(&names)) != NULL)
     {
     /* It is possible for this remove to fail if we
      * run out of spokes at the wrong time.  Currently
      * the para client will just report the problem. */
     if (!removeJobId(atoi(name)))
         {
 	retVal = "err";
 	break;
 	}
     }
 pmSendString(pm, rudpOut, retVal);
 }
 
 
 void chillABatch(struct batch *batch)
 /* Stop launching jobs from a batch, but don't disturb
  * running jobs. */
 {
 struct user *user = batch->user;
 struct dlNode *el, *next;
 for (el = batch->jobQueue->head; !dlEnd(el); el = next)
     {
     struct job *job = el->val;
     next = el->next;
     recycleJob(job);	/* This free's el too! */
     }
 batch->queuedCount = 0;
 batch->planCount = 0;
 dlRemove(batch->node);
 dlAddTail(user->oldBatches, batch->node);
 needsPlanning = TRUE;
 updateUserPriority(user);
 updateUserMaxJob(user);
 updateUserSickNodes(user);
 }
 
 
 void chillBatch(char *line, struct paraMessage *pm)
 /* Parse user and batch names from message, 
  * call chillABatch to clear the queue,
  * and send response ok or err to client. */
 {
 char *userName = nextWord(&line);
 char *batchName = nextWord(&line);
 char *res = "err";
 if (batchName != NULL)
     {
     struct user *user = hashFindVal(userHash, userName);
     if (user != NULL)
 	{
 	struct batch *batch;
 	batchName = hashStoreName(stringHash, batchName);
 	batch = findBatchInList(user->curBatches, batchName);
 	if (batch != NULL)
 	    {
             chillABatch(batch);
 	    }
 	res = "ok";
 	}
     }
 pmSendString(pm, rudpOut, res);
 }
 
 
 
 
 
 void jobDone(char *line)
 /* Handle job is done message. */
 {
 struct job *job;
 char *id = nextWord(&line);
 char *status = nextWord(&line);
 char *uTime = nextWord(&line);
 char *sTime = nextWord(&line);
 
 if (sTime != NULL)
     {
     job = jobFind(runningJobs, atoi(id));
     if (job != NULL)
 	{
 	struct machine *machine = job->machine;
 	if (machine != NULL)
 	    {
 	    machine->lastChecked = now;
 	    if (sameString(status, "0"))
 	        machine->goodCount += 1;
 	    else
 		machine->errCount += 1;
 	    }
 	writeJobResults(job, status, uTime, sTime);
 	struct batch *batch = job->batch;
 	finishJob(job);
 	/* is the batch sick? */
 	if (batch->continuousCrashCount >= sickBatchThreshold)
 	    {
             chillABatch(batch);
 	    }
 	runner(1);
 	}
     }
 }
 
 void listMachines(struct paraMessage *pm)
 /* Write list of machines to fd.  Format is one machine per message
  * followed by a blank message. */
 {
 struct machine *mach;
 for (mach = machineList; mach != NULL; mach = mach->next)
     {
     struct dlNode *jobNode = mach->jobs->head;
     do
 	{
 	/* this list may output multiple rows per machine, one for each running job */
 	pmClear(pm);
 	pmPrintf(pm, "%-10s good %d, err %d, ", mach->name, mach->goodCount, mach->errCount);
 	if (dlEmpty(mach->jobs))
 	    {
 	    if (mach->isDead)
 		pmPrintf(pm, "dead");
 	    else
 		pmPrintf(pm, "idle");
 	    }
 	else
 	    {
 	    struct job *job = jobNode->val;
 	    pmPrintf(pm, "running %-10s %s ", job->batch->user->name, job->cmd);
 	    jobNode = jobNode->next;
 	    }
 	pmSend(pm, rudpOut);
 	}
     while (!(dlEmpty(mach->jobs) || dlEnd(jobNode)));
     }
 pmSendString(pm, rudpOut, "");
 }
 
 int countUserActiveBatches(struct user *user)
 /* Count active batches for user. */
 {
 int count = dlCount(user->curBatches);
 /* Start with batches with pending jobs. */
 struct dlNode *node;
 
 /* Add in batches with running but no pending jobs. */
 for (node = user->oldBatches->head; !dlEnd(node); node = node->next)
     {
     struct batch *batch = node->val;
     if (batch->runningCount > 0)
 	++count;
     }
 return count;
 }
 
 void listUsers(struct paraMessage *pm)
 /* Write list of users to fd.  Format is one user per line
  * followed by a blank line. */
 {
 struct user *user;
 for (user = userList; user != NULL; user = user->next)
     {
     int totalBatch = dlCount(user->curBatches) + dlCount(user->oldBatches);
     pmClear(pm);
     pmPrintf(pm, "%s ", user->name);
     pmPrintf(pm, 
     	"%d jobs running, %d waiting, %d finished, %d of %d batches active"
     	", priority=%d"
     	", maxJob=%d" 
 	, user->runningCount,  userQueuedCount(user), user->doneCount,
 	countUserActiveBatches(user), totalBatch, user->priority 
 	, user->maxJob 
 	);
     pmSend(pm, rudpOut);
     }
 pmSendString(pm, rudpOut, "");
 }
 
 void writeOneBatchInfo(struct paraMessage *pm, struct user *user, struct batch *batch)
 /* Write out info on one batch. */
 {
 char shortBatchName[512];
 splitPath(batch->name, shortBatchName, NULL, NULL);
 pmClear(pm);
 pmPrintf(pm, "%-8s %4d %6d %6d %5d %3d %3d %3d %4.1fg %4d %3d %s",
 	user->name, batch->runningCount, 
 	batch->queuedCount, batch->doneCount,
 	batch->crashCount, batch->priority, batch->maxJob, 
 	batch->cpu, ((float)batch->ram*ramUnit)/(1024*1024*1024),
 	batch->planCount,
 	(avgBatchTime(batch)+30)/60,
 	shortBatchName);
 pmSend(pm, rudpOut);
 }
 
 void listSomeBatches(struct paraMessage *pm, int runThreshold)
 /* Write list of batches.  Format is one batch per
  * line followed by a blank line. */
 {
 struct user *user;
 pmSendString(pm, rudpOut, "#user     run   wait   done crash pri max cpu  ram  plan min batch");
 for (user = userList; user != NULL; user = user->next)
     {
     struct dlNode *bNode;
     for (bNode = user->curBatches->head; !dlEnd(bNode); bNode = bNode->next)
         {
 	writeOneBatchInfo(pm, user, bNode->val);
 	}
     for (bNode = user->oldBatches->head; !dlEnd(bNode); bNode = bNode->next)
         {
 	struct batch *batch = bNode->val;
 	if (batch->runningCount >= runThreshold)
 	    writeOneBatchInfo(pm, user, batch);
 	}
     }
 pmSendString(pm, rudpOut, "");
 }
 
 void listBatches(struct paraMessage *pm)
 /* Write list of all active batches.  Format is one batch per
  * line followed by a blank line. */
 {
 listSomeBatches(pm, 1);
 }
 
 void listAllBatches(struct paraMessage *pm)
 /* Write list of batches including inactive ones.  Format is one batch per
  * line followed by a blank line. */
 {
 listSomeBatches(pm, 0);
 }
 
 void appendLocalTime(struct paraMessage *pm, time_t t)
 /* Append time t converted to day/time format to dy. */
 {
 struct tm *tm;
 tm = localtime(&t);
 pmPrintf(pm, "%04d/%02d/%02d %02d:%02d:%02d",
    1900+tm->tm_year, 1+tm->tm_mon, tm->tm_mday, tm->tm_hour, tm->tm_min, tm->tm_sec);
 }
 
 char *upToFirstDot(char *s, bool dotQ)
 /* Return string up to first dot. */
 {
 static char ret[128];
 int size;
 char *e = strchr(s, '.');
 if (e == NULL)
     size = strlen(s);
 else
     size = e - s;
 if (size >= sizeof(ret)-2)	/* Leave room for .q */
     size = sizeof(ret)-3;
 memcpy(ret, s, size);
 ret[size] = 0;
 if (dotQ)
     strcat(ret, ".q");
 return ret;
 }
 
 boolean oneJobList(struct paraMessage *pm, struct dlList *list, 
 	boolean sinceStart, boolean extended)
 /* Write out one job list. Return FALSE if there is a problem. */
 {
 struct dlNode *el;
 struct job *job;
 char *machName;
 for (el = list->head; !dlEnd(el); el = el->next)
     {
     job = el->val;
     if (job->machine != NULL)
         machName = upToFirstDot(job->machine->name, FALSE);
     else
 	machName = "none";
     pmClear(pm);
     pmPrintf(pm, "%-4d %-10s %-10s ", job->id, machName, job->batch->user->name);
     if (sinceStart)
         appendLocalTime(pm, job->startTime);
     else
         appendLocalTime(pm, job->submitTime);
     pmPrintf(pm, " %s", job->cmd);
     if (extended)
       pmPrintf(pm, " %s", job->batch->name);
     if (!pmSend(pm, rudpOut))
         return FALSE;
     }
 return TRUE;
 }
 
 void listJobs(struct paraMessage *pm, boolean extended)
 /* Write list of jobs. Format is one job per message
  * followed by a blank message. */
 {
 struct user *user;
 struct dlNode *bNode;
 struct batch *batch;
 
 if (!oneJobList(pm, runningJobs, TRUE, extended))
     return;
 for (user = userList; user != NULL; user = user->next)
     {
     for (bNode = user->curBatches->head; !dlEnd(bNode); bNode = bNode->next)
         {
 	batch = bNode->val;
 	if (!oneJobList(pm, batch->jobQueue, FALSE, extended))
 	    return;
 	}
     }
 pmSendString(pm, rudpOut, "");
 }
 
 boolean onePstatList(struct paraMessage *pm, struct dlList *list, boolean running, boolean extended, int *resultCount)
 /* Write out one job list in pstat format.  Return FALSE if there is
  * a problem. */
 {
 struct dlNode *node;
 struct job *job;
 time_t t;
 char *machName;
 char *state = (running ? "r" : "q");
 int count = 0;
 char buf[rudpMaxSize];
 char *terminator = "";
 if (extended)
     terminator = "\n";
 pmClear(pm);
 for (node = list->head; !dlEnd(node); node = node->next)
     {
     ++count;
     job = node->val;
     if (job->machine != NULL)
 	machName = job->machine->name;
     else
         machName = "none";
     if (running)
         t = job->startTime;
     else
         t = job->submitTime;
     if (!running && extended)
 	safef(buf, sizeof(buf), "%s %d\n", 
 	    state, job->id);
     else
 	safef(buf, sizeof(buf), "%s %d %s %s %lu %s%s", 
 	    state, job->id, job->batch->user->name, job->exe, t, machName, terminator);
     if ((!extended && pm->size > 0) || (pm->size + strlen(buf) > rudpMaxSize))
 	{
 	if (!pmSend(pm, rudpOut))
 	    {
 	    *resultCount += count;
 	    return FALSE;
 	    }
 	pmClear(pm);
 	}
     pmPrintf(pm, "%s", buf); 
     }
 if (pm->size > 0)
     {
     if (!pmSend(pm, rudpOut))
 	{
 	*resultCount += count;
 	return FALSE;
 	}
     }
 *resultCount += count;
 return TRUE;
 }
 
 void pstat(char *line, struct paraMessage *pm, boolean extended)
 /* Write list of jobs in pstat format. 
  * Extended pstat2 format means we only show queued jobs for the 
  * specific batch, but we still need to return total queue size
  * and also we can include a status about batch failure.
  * Older versions of para will not call extended but should still work.*/
 {
 struct user *user;
 struct dlNode *bNode;
 struct batch *batch = NULL;
 char *userName, *dir;
 struct user *thisUser = NULL;
 struct batch *thisBatch = NULL;
 int count = 0;
 userName = nextWord(&line);
 dir = nextWord(&line);
 if (userName)
   thisUser = findUser(userName);
 if (dir)
   thisBatch = findBatch(thisUser, dir, TRUE);
 if (thisBatch)
     flushResults(thisBatch->name);
 if (!onePstatList(pm, runningJobs, TRUE, extended, &count))
     return;
 for (user = userList; user != NULL; user = user->next)
     {
     for (bNode = user->curBatches->head; !dlEnd(bNode); bNode = bNode->next)
 	{
 	batch = bNode->val;
 	if ((thisUser == NULL || thisUser == user) && (thisBatch == NULL || thisBatch == batch))
 	    {
 	    if (!onePstatList(pm, batch->jobQueue, FALSE, extended, &count))
 		return;
 	    }
 	else
 	    count += batch->queuedCount;
 	}
     }
 if (extended)
     {
     pmClear(pm);
     pmPrintf(pm, "Total Jobs: %d", count); 
     if (!pmSend(pm, rudpOut))
 	return;
     if (thisBatch && (thisBatch->continuousCrashCount >= sickBatchThreshold))
 	{
 	pmClear(pm);
 	pmPrintf(pm, "Sick Batch: consecutive crashes (%d) >= sick batch threshold (%d)", 
 	    thisBatch->continuousCrashCount, sickBatchThreshold); 
 	if (!pmSend(pm, rudpOut))
 	    return;
 	}
     if (thisBatch)
 	{
 	off_t resultsSize = fileSize(thisBatch->name);
         if (resultsSize != -1) // file exists
 	    {
 	    pmClear(pm);
 	    pmPrintf(pm, "Results Size: %lld", (long long) resultsSize); 
 	    if (!pmSend(pm, rudpOut))
 		return;
 	    }
 	}
     }
 pmSendString(pm, rudpOut, "");
 }
 
 int sumPendingJobs()
 /* Return sum of all pending jobs for all users. */
 {
 struct user *user;
 int count = 0;
 
 for (user = userList; user != NULL; user = user->next)
     count += userQueuedCount(user);
 return count;
 }
 
 int countActiveUsers()
 /* Return count of users with jobs running or in queue */
 {
 struct user *user;
 int count = 0;
 
 for (user = userList; user != NULL; user = user->next)
     {
     if (userIsActive(user))
         ++count;
     }
 return count;
 }
 
 int countActiveBatches()
 /* Return count of active batches */
 {
 int count = 0;
 struct user *user;
 
 for (user = userList; user != NULL; user = user->next)
     count += countUserActiveBatches(user);
 return count;
 }
 
 int getCpus(struct dlList *list)
 /* Return total CPU resources in list. */
 {
 int count = 0;
 struct dlNode *node = NULL;
 for (node = list->head; !dlEnd(node); node = node->next)
     {
     struct machine *mach = node->val;
     count += mach->machSpec->cpus;
     }
 return count;
 }
 
 int getBusyCpus(struct dlList *list)
 /* Return total CPU resources in list. */
 {
 int count = 0;
 struct dlNode *node = NULL;
 for (node = list->head; !dlEnd(node); node = node->next)
     {
     struct machine *machine = node->val;
     /* all the cpus now in-use */
     struct dlNode *jobNode = NULL;
     for (jobNode = machine->jobs->head; !dlEnd(jobNode); jobNode = jobNode->next)
 	{
 	struct job *job = jobNode->val;
 	struct batch * batch =job->batch;
 	count += batch->cpu;
 	}
     }
 return count;
 }
 
 void status(struct paraMessage *pm)
 /* Write summary status.  Format is one line per message
  * followed by a blank message. */
 {
 char buf[256];
 int totalCpus = getCpus(freeMachines)+getCpus(readyMachines)+getCpus(blockedMachines)+getCpus(busyMachines);
 int busyCpus = getBusyCpus(readyMachines)+getBusyCpus(blockedMachines)+getBusyCpus(busyMachines);
 safef(buf, sizeof(buf), "CPUs total: %d", totalCpus);
 pmSendString(pm, rudpOut, buf);
 safef(buf, sizeof(buf), "CPUs free: %d", totalCpus - busyCpus);
 pmSendString(pm, rudpOut, buf);
 safef(buf, sizeof(buf), "CPUs busy: %d", busyCpus);
 pmSendString(pm, rudpOut, buf);
 safef(buf, sizeof(buf), "Nodes total: %d", 
     dlCount(freeMachines)+dlCount(busyMachines)+dlCount(readyMachines)+
     dlCount(blockedMachines)+dlCount(deadMachines));
 pmSendString(pm, rudpOut, buf);
 safef(buf, sizeof(buf), "Nodes dead: %d", dlCount(deadMachines));
 pmSendString(pm, rudpOut, buf);
 safef(buf, sizeof(buf), "Nodes sick?: %d", listSickNodes(NULL));
 pmSendString(pm, rudpOut, buf);
 safef(buf, sizeof(buf), "Jobs running:  %d", dlCount(runningJobs));
 pmSendString(pm, rudpOut, buf);
 safef(buf, sizeof(buf), "Jobs waiting:  %d", sumPendingJobs());
 pmSendString(pm, rudpOut, buf);
 safef(buf, sizeof(buf), "Jobs finished: %d", finishedJobCount);
 pmSendString(pm, rudpOut, buf);
 safef(buf, sizeof(buf), "Jobs crashed:  %d", crashedJobCount);
 pmSendString(pm, rudpOut, buf);
 safef(buf, sizeof(buf), "Spokes free: %d", dlCount(freeSpokes));
 pmSendString(pm, rudpOut, buf);
 safef(buf, sizeof(buf), "Spokes busy: %d", dlCount(busySpokes));
 pmSendString(pm, rudpOut, buf);
 safef(buf, sizeof(buf), "Spokes dead: %d", dlCount(deadSpokes));
 pmSendString(pm, rudpOut, buf);
 safef(buf, sizeof(buf), "Active batches: %d", countActiveBatches());
 pmSendString(pm, rudpOut, buf);
 safef(buf, sizeof(buf), "Total batches: %d", slCount(batchList));
 pmSendString(pm, rudpOut, buf);
 safef(buf, sizeof(buf), "Active users: %d", countActiveUsers());
 pmSendString(pm, rudpOut, buf);
 safef(buf, sizeof(buf), "Total users: %d", slCount(userList));
 pmSendString(pm, rudpOut, buf);
 safef(buf, sizeof(buf), "Days up: %f", (now - startupTime)/(3600.0 * 24.0));
 pmSendString(pm, rudpOut, buf);
 safef(buf, sizeof(buf), "Version: %s", version);
 pmSendString(pm, rudpOut, buf);
 pmSendString(pm, rudpOut, "");
 }
 
 void addSpoke()
 /* Start up a new spoke and add it to free list. */
 {
 struct spoke *spoke;
 spoke = spokeNew();
 if (spoke != NULL)
     {
     slAddHead(&spokeList, spoke);
     dlAddTail(freeSpokes, spoke->node);
     }
 }
 
 void killSpokes()
 /* Kill all spokes. */
 {
 struct spoke *spoke, *next;
 for (spoke = spokeList; spoke != NULL; spoke = next)
     {
     next = spoke->next;
     dlRemove(spoke->node);
     spokeFree(&spoke);
     }
 }
 
 void startSpokes()
 /* Start default number of spokes. */
 {
 int i;
 for (i=0; i<initialSpokes; ++i)
     addSpoke();
 }
 
 void startMachines(char *fileName)
 /* If they give us a beginning machine list use it here. */
 {
 struct lineFile *lf = lineFileOpen(fileName, TRUE);
 char *row[7];
 boolean firstTime = TRUE;
 while (lineFileRow(lf, row))
     {
     struct machSpec *ms;
     bits32 ip;
     ms = machSpecLoad(row);
     ip = internetHostIp(ms->name);
     if (hashLookup(machineHash, ms->name))
 	errAbort("machine list contains duplicate: %s",  ms->name);
     struct machine *machine = doAddMachine(ms->name, ms->tempDir, ip, ms);
     hashStoreName(machineHash, ms->name);
 
     // TODO Add a command-line param for these that overrides default?
     /* use first machine in spec list as model node */
     if (firstTime) 
 	{
 	firstTime = FALSE;
 	cpuUnit = 1;       /* 1 CPU */
 	if (!optionExists("ramUnit"))
     	    ramUnit = ((long long)machine->machSpec->ramSize * 1024 * 1024) / machine->machSpec->cpus;
 	defaultJobCpu = 1;        /* number of cpuUnits in default job usage */  
 	/* number of ramUnits in default job usage, resolves to just 1 currently */
 	if (!optionExists("defaultJobRam"))
     	    defaultJobRam = (((long long)machine->machSpec->ramSize * 1024 * 1024) / machine->machSpec->cpus) / ramUnit;
 	}
 
     int c = 0, r = 0;
     readTotalMachineResources(machine, &c, &r);
     maxCpuInCluster = max(maxCpuInCluster, c);
     maxRamInCluster = max(maxRamInCluster, r);
 
     }
 lineFileClose(&lf);
 }
 
 
 struct existingResults
 /* Keep track of old results we need to integrate into */
     {
     struct existingResults *next;
     char *fileName;	  /* Name of file this is in, not allocated here */
     struct hash *hash;    /* Hash keyed by ascii jobId indicated job results
                            * already recorded. */
     };
 
 void existingResultsFree(struct existingResults **pEr)
 /* Free up existing results structure */
 {
 struct existingResults *er = *pEr;
 if (er != NULL)
     {
     freeHash(&er->hash);
     freez(pEr);
     }
 }
 
 void existingResultsFreeList(struct existingResults **pList)
 /* Free list of existingResults */
 {
 struct existingResults *el, *next;
 
 for (el = *pList; el != NULL; el = next)
     {
     next = el->next;
     existingResultsFree(&el);
     }
 *pList = NULL;
 }
 
 
 void readResults(char *fileName, struct hash *hash)
 /* Read jobId's of results into hash */
 {
 struct lineFile *lf = lineFileMayOpen(fileName, TRUE);
 char *row[3];
 char *line;
 int wordCount;
 if (lf == NULL)
      {
      warn("Couldn't open results file %s", fileName);
      return;
      }
 while (lineFileNext(lf, &line, NULL))
      {
      wordCount = chopLine(line, row);
      if (wordCount == 0 || row[0][0] == '#')
          continue;
      if (wordCount < 3)
 	 {
          warn("Short line %d of %s", lf->lineIx, lf->fileName);
 	 continue;
 	 }
      if (!isdigit(row[2][0]))
          {
 	 warn("Expecting number field 3 line %d of %s", lf->lineIx, lf->fileName);
 	 break;
 	 }
      hashAdd(hash, row[2], NULL);
      }
 lineFileClose(&lf);
 }
 
 struct existingResults *getExistingResults(char *fileName, struct hash *erHash,
 	struct existingResults **pErList)
 /* Get results from hash if we've seen them before, otherwise
  * read them in, save in hash, and return them. */
 {
 struct existingResults *er = hashFindVal(erHash, fileName);
 if (er == NULL)
     {
     AllocVar(er);
     slAddHead(pErList, er);
     hashAddSaveName(erHash, fileName, er, &er->fileName);
     er->hash = newHashExt(18, FALSE);
     readResults(fileName, er->hash);
     }
 return er;
 }
 
 
 void addRunningJob(struct runJobMessage *rjm, char *resultFile, 
 	struct machine *mach)
 /* Add job that is already running to queues. */
 {
 if (dlCount(mach->jobs) > mach->machSpec->cpus)
     warn("%s seems to have more jobs running than it has cpus", mach->name);
 else
     {
     struct job *job = jobNew(rjm->command, rjm->user, rjm->dir, rjm->in,
 	    rjm->out, rjm->cpus, rjm->ram, resultFile, FALSE);
     if (!job) return;
     struct batch *batch = job->batch;
     struct user *user = batch->user;
     job->id = atoi(rjm->jobIdString);
     ++batch->runningCount;
     ++user->runningCount;
     dlRemove(batch->node);
     dlAddTail(user->oldBatches, batch->node);
     dlAddTail(mach->jobs, job->jobNode);
     job->machine = mach;
     dlAddTail(runningJobs, job->node);
     dlRemove(mach->node);
     dlAddTail(readyMachines, mach->node);
     dlAddTail(hangJobs, job->hangNode);
     mach->lastChecked = job->lastChecked = job->submitTime = job->startTime = job->lastClockIn = now;
     }
 }
 
 void pljErr(struct machine *mach, int no)
 /* Print out error message in the middle of routine below. */
 {
 warn("%s: truncated listJobs response %d", mach->name, no);
 }
 
 void getExeOnly(char *command, char exe[256])
 /* Extract executable file (not including path) from command line. */
 {
 /* Extract name of executable file with no path. */
 char *dupeCommand = cloneString(command);
 char *exePath = firstWordInLine(dupeCommand);
 char exeFile[128], exeExt[64];
 splitPath(exePath, NULL, exeFile, exeExt);
 /* We cannot use sizeof(exe) because an array on a stack
  * is just a pointer, and so pointer-size is all that sizeof returns
  * for exe. */
 safef(exe, 256, "%s%s", exeFile, exeExt);
 freez(&dupeCommand);
 }
 
 void writeExistingResults(char *fileName, char *line, struct machine *mach, 
 	struct runJobMessage *rjm)
 {
 char err[512], exe[256];
 int jobId = atoi(rjm->jobIdString);
 char *status = nextWord(&line);
 char *uTime = nextWord(&line);
 char *sTime = nextWord(&line);
 
 if (sTime == NULL)
     {
     warn("Bad line format in writeExistingResults for %s", mach->name);
     return;
     }
 
 
 getExeOnly(rjm->command, exe);
 fillInErrFile(err, jobId, mach->tempDir);
 fileName = hashStoreName(stringHash, fileName);
 
 writeResults(fileName, rjm->user, mach->name, 
 	jobId, exe, now, now,
 	err, rjm->command, 
 	status, uTime, sTime);
 }
 
 boolean processListJobs(struct machine *mach, 
 	struct paraMessage *pm, struct rudp *ru, 
 	struct hash *erHash, struct existingResults **pErList,
 	int *pRunning, int *pFinished)
 /* Process response to list jobs message. Read jobs node is running and
  * has recently finished.  Add running ones to job list. Add finished
  * ones to results file if necessary.
  *
  * Format of message is
  *     running count
  *     one line for each running job.
  *     recent count
  *     two lines for each recent job.
  */
 {
 int running, recent, i, finCount = 0;
 struct runJobMessage rjm;
 char resultsFile[512];
 struct paraMultiMessage pmm;
 
 /* ensure the multi-message response comes from the correct ip and has no duplicate msgs*/
 pmmInit(&pmm, pm, pm->ipAddress.sin_addr);
 
 if (!pmmReceiveTimeOut(&pmm, ru, 2000000))
     {
     warn("%s: no listJobs response", mach->name);
     return FALSE;
     }
 running = atoi(pm->data);
 for (i=0; i<running; ++i)
     {
     if (!pmmReceiveTimeOut(&pmm, ru, 2000000))
         {
 	pljErr(mach, 1);
 	return FALSE;
 	}
     if (!parseRunJobMessage(pm->data, &rjm))
         {
 	pljErr(mach, 2);
 	return FALSE;
 	}
     snprintf(resultsFile, sizeof(resultsFile), "%s/%s", rjm.dir, "para.results");
     addRunningJob(&rjm, resultsFile, mach);
     }
 *pRunning += running;
 if (!pmmReceiveTimeOut(&pmm, ru, 2000000))
     {
     pljErr(mach, 3);
     return FALSE;
     }
 recent = atoi(pm->data);
 for (i=0; i<recent; ++i)
     {
     struct existingResults *er;
     char *startLine = NULL;
     if (!pmmReceiveTimeOut(&pmm, ru, 2000000))
         {
 	pljErr(mach, 4);
 	return FALSE;
 	}
     startLine = cloneString(pm->data);;
     if (!parseRunJobMessage(startLine, &rjm))
         {
 	pljErr(mach, 5);
 	freez(&startLine);
 	return FALSE;
 	}
     if (!pmmReceiveTimeOut(&pmm, ru, 2000000))
         {
 	pljErr(mach, 6);
 	freez(&startLine);
 	return FALSE;
 	}
     /* Do not duplicate a result. Check if it already is in para.results */
     safef(resultsFile, sizeof(resultsFile), "%s/%s", rjm.dir, "para.results");
     er = getExistingResults(resultsFile, erHash, pErList);
     if (!hashLookup(er->hash, rjm.jobIdString))
         {
 	writeExistingResults(resultsFile, pm->data, mach, &rjm);
 	++finCount;
 	}
     freez(&startLine);
     }
 *pFinished += finCount;
 return TRUE;
 }
 
 void checkForJobsOnNodes()
 /* Poll nodes and see if they have any jobs for us. */
 {
 struct machine *mach;
 int running = 0, finished = 0;
 struct hash *erHash = newHashExt(8, FALSE); /* A hash of existingResults */
 struct existingResults *erList = NULL;
 
 logDebug("Checking for jobs already running on nodes");
 for (mach = machineList; mach != NULL; mach = mach->next)
     {
     struct paraMessage pm;
     struct rudp *ru = rudpNew(rudpOut->socket);	/* Get own resend timing */
     logDebug("check for jobs on %s", mach->name);
     pmInitFromName(&pm, mach->name, paraNodePort);
     if (!pmSendString(&pm, ru, "listJobs"))
         {
 	machineDown(mach);
 	continue;
 	}
     if (!processListJobs(mach, &pm, rudpOut, erHash, &erList, &running, &finished))
 	machineDown(mach);
     rudpFree(&ru);
     }
 
 /* Clean up time. */
 existingResultsFreeList(&erList);
 hashFree(&erHash);
 needsPlanning = TRUE;
 
 /* Report results. */
 logDebug("%d running jobs, %d jobs that finished while hub was down",
 	running, finished);
 }
 
 void startHub(char *machineList)
 /* Do hub daemon - set up socket, and loop around on it until we get a quit. */
 {
 struct sockaddr_in sai;
 char *line, *command;
 struct rudp *rudpIn = NULL;
 
 /* Note startup time. */
 findNow();
 startupTime = now;
 
 /* Find name and IP address of our machine. */
 hubHost = getMachine();
 if (optionExists("log"))
     logOpenFile("paraHub", optionVal("log", NULL));
 else    
     logOpenSyslog("paraHub", optionVal("logFacility", NULL));
 logSetMinPriority(optionVal("logMinPriority", "info"));
 logInfo("starting paraHub on %s", hubHost);
 
 /* Set up various lists. */
 hubMessageQueueInit();
 stringHash = newHash(0);
 setupLists();
 machineHash = newHash(0);
 startMachines(machineList);
 
 openJobId();
 logInfo("next job ID is %d.", nextJobId);
 
 rudpOut = rudpMustOpen();
 if (!optionExists("noResume"))
     checkForJobsOnNodes();
 
 /* Initialize socket etc. */
 ZeroVar(&sai);
 sai.sin_family = AF_INET;
 sai.sin_port = htons(paraHubPort);
 sai.sin_addr.s_addr = INADDR_ANY;
 rudpIn = rudpMustOpenBound(&sai);
 
 /* Start up daemons. */
 sockSuckStart(rudpIn);
 startHeartbeat();
 startSpokes();
 
 logDebug("sockSuck,Heartbeat,Spokes have been started");
 
 /* Bump up our priority to just shy of real-time. */
 nice(-40);
 
 /* Main event loop. */
 for (;;)
     {
     struct paraMessage *pm = hubMessageGet();
     findNow();
     line = pm->data;
     logDebug("hub: %s", line);
     command = nextWord(&line);
     if (command == NULL)
          warn("Empty command");
     else if (sameWord(command, "jobDone"))
 	 jobDone(line);
     else if (sameWord(command, "recycleSpoke"))
 	 recycleSpoke(line);
     else if (sameWord(command, "heartbeat"))
 	 processHeartbeat();
     else if (sameWord(command, "setPriority"))
 	 setPriorityAcknowledge(line, pm);
     else if (sameWord(command, "setMaxJob"))
 	 setMaxJobAcknowledge(line, pm);
     else if (sameWord(command, "resetCounts"))
          resetCountsAcknowledge(line, pm);
     else if (sameWord(command, "freeBatch"))
          freeBatchAcknowledge(line, pm);
     else if (sameWord(command, "flushResults"))
          flushResultsAcknowledge(line, pm);
     else if (sameWord(command, "showSickNodes"))
 	 showSickNodesAcknowledge(line, pm);
     else if (sameWord(command, "clearSickNodes"))
 	 clearSickNodesAcknowledge(line, pm);
     else if (sameWord(command, "addJob"))
 	 addJobAcknowledge(line, pm, 1);
     else if (sameWord(command, "addJob2"))
 	 addJobAcknowledge(line, pm, 2);
     else if (sameWord(command, "nodeDown"))
 	 nodeDown(line);
     else if (sameWord(command, "alive"))
 	 nodeAlive(line);
     else if (sameWord(command, "checkIn"))
 	 nodeCheckIn(line);
     else if (sameWord(command, "checkDeadNodesASAP"))
 	 checkDeadNodesASAP();
     else if (sameWord(command, "removeJob"))
 	 removeJobAcknowledge(line, pm);
     else if (sameWord(command, "chill"))
 	 chillBatch(line, pm);
     else if (sameWord(command, "ping"))
 	 respondToPing(pm);
     else if (sameWord(command, "addMachine"))
 	 addMachine(line);
     else if (sameWord(command, "removeMachine"))
 	 removeMachineAcknowledge(line, pm);
     else if (sameWord(command, "listJobs"))
 	 listJobs(pm, FALSE);
     else if (sameWord(command, "listJobsExtended"))
 	 listJobs(pm, TRUE);
     else if (sameWord(command, "listMachines"))
 	 listMachines(pm);
     else if (sameWord(command, "listUsers"))
 	 listUsers(pm);
     else if (sameWord(command, "listBatches"))
 	 listBatches(pm);
     else if (sameWord(command, "listAllBatches"))
 	 listAllBatches(pm);
     else if (sameWord(command, "listSick"))
 	 listSickNodes(pm);
     else if (sameWord(command, "status"))
 	 status(pm);
     else if (sameWord(command, "pstat"))
 	 pstat(line, pm, FALSE);
     else if (sameWord(command, "pstat2"))
 	 pstat(line, pm, TRUE);
     else if (sameWord(command, "addSpoke"))
 	 addSpoke();
     else if (sameWord(command, "plan"))
 	 plan(pm);
     else if (sameWord(command, "quit"))
          break;
     else 
          warn("Unrecognized command %s", command);
     pmFree(&pm);
     }
 endHeartbeat();
 killSpokes();
 saveJobId();
 #ifdef SOON
 #endif /* SOON */
 }
 
 void fillInSubnet()
 /* Parse subnet paramenter if any into subnet variable. */
 {
 char *sns = optionVal("subnet", NULL);
 if (sns == NULL)
     sns = optionVal("subNet", NULL);
 netParseSubnet(sns, hubSubnet);
 }
 
 int main(int argc, char *argv[])
 /* Process command line. */
 {
 optionInit(&argc, argv, optionSpecs);
 if (argc < 2)
     usage();
 if (optionExists("ramUnit"))
     {
     ramUnit = paraParseRam(optionVal("ramUnit", ""));
     if (ramUnit == -1)
 	errAbort("Invalid RAM expression '%s' in '-ramUnit=' option", optionVal("ramUnit", ""));
     }
 if (optionExists("defaultJobRam"))
     {
     defaultJobRam = optionInt("defaultJobRam", defaultJobRam);
     if (defaultJobRam < 1)
 	errAbort("Invalid defaultJobRam specified in option -defaultJobRam=%d", defaultJobRam);
     }
 jobCheckPeriod = optionInt("jobCheckPeriod", jobCheckPeriod);
 machineCheckPeriod = optionInt("machineCheckPeriod", machineCheckPeriod);
 initialSpokes = optionInt("spokes",  initialSpokes);
 fillInSubnet();
 paraDaemonize("paraHub");
 startHub(argv[1]);
 return 0;
 }