9c6e77467db7d54f98e58e1e858ee7afae0e40e5 galt Thu Oct 11 03:07:37 2012 -0700 v12.15 - bugfixes and better support of rlimit for parasol diff --git src/parasol/paraHub/paraHub.c src/parasol/paraHub/paraHub.c index 3ad22c6..548aed5 100644 --- src/parasol/paraHub/paraHub.c +++ src/parasol/paraHub/paraHub.c @@ -70,30 +70,32 @@ #include "sqlNum.h" /* command line option specifications */ static struct optionSpec optionSpecs[] = { {"spokes", OPTION_INT}, {"jobCheckPeriod", OPTION_INT}, {"machineCheckPeriod", OPTION_INT}, {"subnet", OPTION_STRING}, {"nextJobId", OPTION_INT}, {"logFacility", OPTION_STRING}, {"logMinPriority", OPTION_STRING}, {"log", OPTION_STRING}, {"debug", OPTION_BOOLEAN}, {"noResume", OPTION_BOOLEAN}, + {"ramUnit", OPTION_STRING}, + {"defaultJobRam", OPTION_INT}, {NULL, 0} }; char *version = PARA_VERSION; /* Version number. */ /* Some command-line configurable quantities and their defaults. */ int jobCheckPeriod = 10; /* Minutes between checking running jobs. */ int machineCheckPeriod = 20; /* Minutes between checking dead machines. */ int assumeDeadPeriod = 60; /* If haven't heard from job in this long assume * machine running it is dead. */ int initialSpokes = 30; /* Number of spokes to start with. */ unsigned char hubSubnet[4] = {255,255,255,255}; /* Subnet to check. */ int nextJobId = 0; /* Next free job id. */ time_t startupTime; /* Clock tick of paraHub startup. */ @@ -118,30 +120,36 @@ " localSize - Megabytes of local disk\n" " switchName - Name of switch this is on\n" "\n" "options:\n" " -spokes=N Number of processes that feed jobs to nodes - default %d.\n" " -jobCheckPeriod=N Minutes between checking on job - default %d.\n" " -machineCheckPeriod=N Minutes between checking on machine - default %d.\n" " -subnet=XXX.YYY.ZZZ Only accept connections from subnet (example 192.168).\n" " -nextJobId=N Starting job ID number.\n" " -logFacility=facility Log to the specified syslog facility - default local0.\n" " -logMinPriority=pri minimum syslog priority to log, also filters file logging.\n" " defaults to \"warn\"\n" " -log=file Log to file instead of syslog.\n" " -debug Don't daemonize\n" " -noResume Don't try to reconnect with jobs running on nodes.\n" + " -ramUnit=N Number of bytes of RAM in the base unit used by the jobs.\n" + " Default is RAM on node divided by number of cpus on node.\n" + " Shorthand expressions allow t,g,m,k for tera, giga, mega, kilo.\n" + " e.g. 4g = 4 Gigabytes.\n" + " -defaultJobRam=N Number of ram units in a job has no specified ram usage.\n" + " Defaults to 1.\n" , version, initialSpokes, jobCheckPeriod, machineCheckPeriod ); } struct spoke *spokeList; /* List of all spokes. */ struct dlList *freeSpokes; /* List of free spokes. */ struct dlList *busySpokes; /* List of busy spokes. */ struct dlList *deadSpokes; /* List of dead spokes. */ struct machine *machineList; /* List of all machines. */ struct dlList *freeMachines; /* List of machines idle. */ struct dlList *readyMachines; /* List of machines ready for jobs. */ struct dlList *blockedMachines; /* List of machines ready but blocked by runningCount. */ struct dlList *busyMachines; /* List of machines running jobs. */ @@ -1059,30 +1067,32 @@ job = jNode->val; dlAddTail(hangJobs, job->hangNode); ++batch->runningCount; --batch->queuedCount; ++user->runningCount; unactivateBatchIfEmpty(batch); /* Tell machine, job, and spoke about each other. */ dlAddTail(machine->jobs, job->jobNode); /* just put it back on the ready list, it will get looked at again */ dlAddTail(readyMachines, mNode); job->machine = machine; job->lastChecked = job->startTime = job->lastClockIn = now; + if (!(job->ram)) /* if no ram size specified, use the default */ + job->ram = batch->ram * ramUnit; spokeSendJob(spoke, machine, job); return TRUE; } } void runner(int count) /* Try to run a couple of jobs. */ { while (--count >= 0) if (!runNextJob()) break; } struct machine *machineNew(char *name, char *tempDir, struct machSpec *m) /* Create a new machine structure. */ @@ -2945,33 +2955,35 @@ struct machSpec *ms; bits32 ip; ms = machSpecLoad(row); ip = internetHostIp(ms->name); if (hashLookup(machineHash, ms->name)) errAbort("machine list contains duplicate: %s", ms->name); struct machine *machine = doAddMachine(ms->name, ms->tempDir, ip, ms); hashStoreName(machineHash, ms->name); // TODO Add a command-line param for these that overrides default? /* use first machine in spec list as model node */ if (firstTime) { firstTime = FALSE; cpuUnit = 1; /* 1 CPU */ + if (!optionExists("ramUnit")) ramUnit = ((long long)machine->machSpec->ramSize * 1024 * 1024) / machine->machSpec->cpus; defaultJobCpu = 1; /* number of cpuUnits in default job usage */ /* number of ramUnits in default job usage, resolves to just 1 currently */ + if (!optionExists("defaultJobRam")) defaultJobRam = (((long long)machine->machSpec->ramSize * 1024 * 1024) / machine->machSpec->cpus) / ramUnit; } int c = 0, r = 0; readTotalMachineResources(machine, &c, &r); maxCpuInCluster = max(maxCpuInCluster, c); maxRamInCluster = max(maxRamInCluster, r); } lineFileClose(&lf); } struct existingResults /* Keep track of old results we need to integrate into */ @@ -3391,25 +3403,37 @@ void fillInSubnet() /* Parse subnet paramenter if any into subnet variable. */ { char *sns = optionVal("subnet", NULL); if (sns == NULL) sns = optionVal("subNet", NULL); netParseSubnet(sns, hubSubnet); } int main(int argc, char *argv[]) /* Process command line. */ { optionInit(&argc, argv, optionSpecs); if (argc < 2) usage(); +if (optionExists("ramUnit")) + { + ramUnit = paraParseRam(optionVal("ramUnit", "")); + if (ramUnit == -1) + errAbort("Invalid RAM expression '%s' in '-ramUnit=' option", optionVal("ramUnit", "")); + } +if (optionExists("defaultJobRam")) + { + defaultJobRam = optionInt("defaultJobRam", defaultJobRam); + if (defaultJobRam < 1) + errAbort("Invalid defaultJobRam specified in option -defaultJobRam=%d", defaultJobRam); + } jobCheckPeriod = optionInt("jobCheckPeriod", jobCheckPeriod); machineCheckPeriod = optionInt("machineCheckPeriod", machineCheckPeriod); initialSpokes = optionInt("spokes", initialSpokes); fillInSubnet(); paraDaemonize("paraHub"); startHub(argv[1]); return 0; }