9c6e77467db7d54f98e58e1e858ee7afae0e40e5
galt
  Thu Oct 11 03:07:37 2012 -0700
v12.15 - bugfixes and better support of rlimit for parasol
diff --git src/parasol/paraHub/paraHub.c src/parasol/paraHub/paraHub.c
index 3ad22c6..548aed5 100644
--- src/parasol/paraHub/paraHub.c
+++ src/parasol/paraHub/paraHub.c
@@ -70,30 +70,32 @@
 #include "sqlNum.h"
 
 
 /* command line option specifications */
 static struct optionSpec optionSpecs[] = {
     {"spokes", OPTION_INT},
     {"jobCheckPeriod", OPTION_INT},
     {"machineCheckPeriod", OPTION_INT},
     {"subnet", OPTION_STRING},
     {"nextJobId", OPTION_INT},
     {"logFacility", OPTION_STRING},
     {"logMinPriority", OPTION_STRING},
     {"log", OPTION_STRING},
     {"debug", OPTION_BOOLEAN},
     {"noResume", OPTION_BOOLEAN},
+    {"ramUnit", OPTION_STRING},
+    {"defaultJobRam", OPTION_INT},
     {NULL, 0}
 };
 
 char *version = PARA_VERSION;	/* Version number. */
 
 /* Some command-line configurable quantities and their defaults. */
 int jobCheckPeriod = 10;      /* Minutes between checking running jobs. */
 int machineCheckPeriod = 20;  /* Minutes between checking dead machines. */
 int assumeDeadPeriod = 60;    /* If haven't heard from job in this long assume
                                  * machine running it is dead. */
 int initialSpokes = 30;		/* Number of spokes to start with. */
 unsigned char hubSubnet[4] = {255,255,255,255};   /* Subnet to check. */
 int nextJobId = 0;		/* Next free job id. */
 time_t startupTime;		/* Clock tick of paraHub startup. */
 
@@ -118,30 +120,36 @@
          "    localSize - Megabytes of local disk\n"
          "    switchName - Name of switch this is on\n"
 	 "\n"
 	 "options:\n"
 	 "   -spokes=N  Number of processes that feed jobs to nodes - default %d.\n"
 	 "   -jobCheckPeriod=N  Minutes between checking on job - default %d.\n"
 	 "   -machineCheckPeriod=N  Minutes between checking on machine - default %d.\n"
 	 "   -subnet=XXX.YYY.ZZZ  Only accept connections from subnet (example 192.168).\n"
 	 "   -nextJobId=N  Starting job ID number.\n"
 	 "   -logFacility=facility  Log to the specified syslog facility - default local0.\n"
          "   -logMinPriority=pri minimum syslog priority to log, also filters file logging.\n"
          "    defaults to \"warn\"\n"
          "   -log=file  Log to file instead of syslog.\n"
          "   -debug  Don't daemonize\n"
 	 "   -noResume  Don't try to reconnect with jobs running on nodes.\n"
+         "   -ramUnit=N  Number of bytes of RAM in the base unit used by the jobs.\n"
+         "      Default is RAM on node divided by number of cpus on node.\n"
+         "      Shorthand expressions allow t,g,m,k for tera, giga, mega, kilo.\n"
+         "      e.g. 4g = 4 Gigabytes.\n"
+	 "   -defaultJobRam=N Number of ram units in a job has no specified ram usage.\n"
+	 "      Defaults to 1.\n"
 	               ,
 	 version, initialSpokes, jobCheckPeriod, machineCheckPeriod
 	 );
 }
 
 struct spoke *spokeList;	/* List of all spokes. */
 struct dlList *freeSpokes;      /* List of free spokes. */
 struct dlList *busySpokes;	/* List of busy spokes. */
 struct dlList *deadSpokes;	/* List of dead spokes. */
 
 struct machine *machineList;    /* List of all machines. */
 struct dlList *freeMachines;    /* List of machines idle. */
 struct dlList *readyMachines;   /* List of machines ready for jobs. */
 struct dlList *blockedMachines; /* List of machines ready but blocked by runningCount. */
 struct dlList *busyMachines;    /* List of machines running jobs. */
@@ -1059,30 +1067,32 @@
     job = jNode->val;
     dlAddTail(hangJobs, job->hangNode);
     ++batch->runningCount;
     --batch->queuedCount;
     ++user->runningCount;
     unactivateBatchIfEmpty(batch); 
 
     /* Tell machine, job, and spoke about each other. */
     dlAddTail(machine->jobs, job->jobNode);
 
     /* just put it back on the ready list, it will get looked at again */
     dlAddTail(readyMachines, mNode);
 
     job->machine = machine;
     job->lastChecked = job->startTime = job->lastClockIn = now;
+    if (!(job->ram))  /* if no ram size specified, use the default */
+	job->ram = batch->ram * ramUnit;
     spokeSendJob(spoke, machine, job);
     return TRUE;
     }
 }
 
 void runner(int count)
 /* Try to run a couple of jobs. */
 {
 while (--count >= 0)
     if (!runNextJob())
         break;
 }
 
 struct machine *machineNew(char *name, char *tempDir, struct machSpec *m)
 /* Create a new machine structure. */
@@ -2945,33 +2955,35 @@
     struct machSpec *ms;
     bits32 ip;
     ms = machSpecLoad(row);
     ip = internetHostIp(ms->name);
     if (hashLookup(machineHash, ms->name))
 	errAbort("machine list contains duplicate: %s",  ms->name);
     struct machine *machine = doAddMachine(ms->name, ms->tempDir, ip, ms);
     hashStoreName(machineHash, ms->name);
 
     // TODO Add a command-line param for these that overrides default?
     /* use first machine in spec list as model node */
     if (firstTime) 
 	{
 	firstTime = FALSE;
 	cpuUnit = 1;       /* 1 CPU */
+	if (!optionExists("ramUnit"))
 	ramUnit = ((long long)machine->machSpec->ramSize * 1024 * 1024) / machine->machSpec->cpus;
 	defaultJobCpu = 1;        /* number of cpuUnits in default job usage */  
 	/* number of ramUnits in default job usage, resolves to just 1 currently */
+	if (!optionExists("defaultJobRam"))
 	defaultJobRam = (((long long)machine->machSpec->ramSize * 1024 * 1024) / machine->machSpec->cpus) / ramUnit;
 	}
 
     int c = 0, r = 0;
     readTotalMachineResources(machine, &c, &r);
     maxCpuInCluster = max(maxCpuInCluster, c);
     maxRamInCluster = max(maxRamInCluster, r);
 
     }
 lineFileClose(&lf);
 }
 
 
 struct existingResults
 /* Keep track of old results we need to integrate into */
@@ -3391,25 +3403,37 @@
 void fillInSubnet()
 /* Parse subnet paramenter if any into subnet variable. */
 {
 char *sns = optionVal("subnet", NULL);
 if (sns == NULL)
     sns = optionVal("subNet", NULL);
 netParseSubnet(sns, hubSubnet);
 }
 
 int main(int argc, char *argv[])
 /* Process command line. */
 {
 optionInit(&argc, argv, optionSpecs);
 if (argc < 2)
     usage();
+if (optionExists("ramUnit"))
+    {
+    ramUnit = paraParseRam(optionVal("ramUnit", ""));
+    if (ramUnit == -1)
+	errAbort("Invalid RAM expression '%s' in '-ramUnit=' option", optionVal("ramUnit", ""));
+    }
+if (optionExists("defaultJobRam"))
+    {
+    defaultJobRam = optionInt("defaultJobRam", defaultJobRam);
+    if (defaultJobRam < 1)
+	errAbort("Invalid defaultJobRam specified in option -defaultJobRam=%d", defaultJobRam);
+    }
 jobCheckPeriod = optionInt("jobCheckPeriod", jobCheckPeriod);
 machineCheckPeriod = optionInt("machineCheckPeriod", machineCheckPeriod);
 initialSpokes = optionInt("spokes",  initialSpokes);
 fillInSubnet();
 paraDaemonize("paraHub");
 startHub(argv[1]);
 return 0;
 }