44ccfacbe3a3d4b300f80d48651c77837a4b571e galt Tue Apr 26 11:12:02 2022 -0700 SQL INJECTION Prevention Version 2 - this improves our methods by making subclauses of SQL that get passed around be both easy and correct to use. The way that was achieved was by getting rid of the obscure and not well used functions sqlSafefFrag and sqlDyStringPrintfFrag and replacing them with the plain versions of those functions, since these are not needed anymore. The new version checks for NOSQLINJ in unquoted %-s which is used to include SQL clauses, and will give an error the NOSQLINJ clause is not present, and this will automatically require the correct behavior by developers. sqlDyStringPrint is a very useful function, however because it was not enforced, users could use various other dyString functions and they operated without any awareness or checking for SQL correct use. Now those dyString functions are prohibited and it will produce an error if you try to use a dyString function on a SQL string, which is simply detected by the presence of the NOSQLINJ prefix. diff --git src/parasol/paraHub/paraHub.c src/parasol/paraHub/paraHub.c index 3375595..e07e43f 100644 --- src/parasol/paraHub/paraHub.c +++ src/parasol/paraHub/paraHub.c @@ -1,3508 +1,3508 @@ /* paraHub - Parasol hub server. This is the heart of the parasol system * and consists of several threads - sucketSucker, heartbeat, a collection * of spokes, as well as the main hub thread. The system is synchronized * around a message queue that the hub reads and the other threads write. * * The purpose of socketSucker is to move messages from the UDP * socket, which has a limited queue size, to the message queue, which * can be much larger. The spoke daemons exist to send messages to compute * nodes. Since sending a message to a node can take a while depending on * the network conditions, the multiple spokes allow the system to be * delivering messages to multiple nodes simultaniously. The heartbeat * daemon simply sits in a loop adding a heartbeat message to the message * queue every 15 seconds or so. The hub thead is responsible for * keeping track of everything. * * The hub keeps track of users, batches, jobs, and machines. It tries * to balance machine usage between users and between batches. If a machine * goes down it will restart the jobs the machine was running on other machines. * When a job finishes it will add a line about the job to the results file * associated with the batch. * * A fair bit of the hub's code is devoted to scheduling. It does this by * periodically "planning" what batches to associate with what machines. * When a machine is free it will run the next job from one of it's batches. * A number of events including a new batch of jobs, machines being added or * removed, and so forth can make the system decide it needs to replan. The * replanning itself is done in the next heartbeat. * * When the plan is in place, the most common thing the system does is * try to run the next job. It keeps lists of free machines and free spokes, * and for the most part just just takes the next machine, a job from one * of the batches the machine is running, and the next free spoke, and sends * a message to the machine via the spoke to run the job. This * indirection of starting jobs via a separate spoke process avoids the * hub daemon itself having to wait for a response from a compute node * over the network. * * When a spoke is done assigning a job, the spoke sends a 'recycleSpoke' * message to the hub, which puts the spoke back on the freeSpoke list. * Likewise when a job is done the machine running the jobs sends a * 'job done' message to the hub, which puts the machine back on the * free list, writes the job exit code to a file, and removes the job * from the system. * * Sometimes a spoke will find that a machine is down. In this case it * sends a 'node down' message to the hub as well as the 'spoke free' * message. The hub will then move the machine to the deadMachines list, * and put the job back on the top of the pending list. * * The heartbeat messages stimulate the hub to do various background * chores. When the hub gets a heartbeat message it * does a few things: * o - It calls runner to try and start some more jobs. (Runner * is also called at the end of processing a recycleSpoke, * jobDone, addJob or addMachine message. Typically runner * won't find anything new to run in the heartbeat, but this * is put here mostly just in case of unforseen issues.) * o - It calls graveDigger, a routine which sees if machines * on the dead list have come back to life. * o - It calls hangman, a routine which sees if jobs the system * thinks have been running for a long time are still * running on the machine they have been assigned to. * If the machine has gone down it is moved to the dead list * and the job is reassigned. */ #include "paraCommon.h" #include "options.h" #include "linefile.h" #include "hash.h" #include "errAbort.h" #include "dystring.h" #include "dlist.h" #include "net.h" #include "internet.h" #include "paraHub.h" #include "machSpec.h" #include "log.h" #include "obscure.h" #include "sqlNum.h" #include "internet.h" /* command line option specifications */ static struct optionSpec optionSpecs[] = { {"spokes", OPTION_INT}, {"jobCheckPeriod", OPTION_INT}, {"machineCheckPeriod", OPTION_INT}, {"subnet", OPTION_STRING}, {"nextJobId", OPTION_INT}, {"logFacility", OPTION_STRING}, {"logMinPriority", OPTION_STRING}, {"log", OPTION_STRING}, {"debug", OPTION_BOOLEAN}, {"noResume", OPTION_BOOLEAN}, {"ramUnit", OPTION_STRING}, {"defaultJobRam", OPTION_INT}, {NULL, 0} }; char *version = PARA_VERSION; /* Version number. */ /* Some command-line configurable quantities and their defaults. */ int jobCheckPeriod = 10; /* Minutes between checking running jobs. */ int machineCheckPeriod = 20; /* Minutes between checking dead machines. */ int assumeDeadPeriod = 60; /* If haven't heard from job in this long assume * machine running it is dead. */ int initialSpokes = 30; /* Number of spokes to start with. */ struct cidr *hubSubnet = NULL; /* Subnet to check. */ struct cidr *localHostSubnet = NULL; int nextJobId = 0; /* Next free job id. */ time_t startupTime; /* Clock tick of paraHub startup. */ /* not yet configurable */ int sickNodeThreshold = 3; /* Treat node as sick if this number of failures */ int sickBatchThreshold = 25; /* Auto-chill sick batch if this number of continuous failures */ void usage() /* Explain usage and exit. */ { errAbort("paraHub - parasol hub server version %s\n" "usage:\n" " paraHub machineList\n" "Where machine list is a file with the following columns:\n" " name - Network name\n" " cpus - Number of CPUs we can use\n" " ramSize - Megabytes of memory\n" " tempDir - Location of (local) temp dir\n" " localDir - Location of local data dir\n" " localSize - Megabytes of local disk\n" " switchName - Name of switch this is on\n" "\n" "options:\n" " -spokes=N Number of processes that feed jobs to nodes - default %d.\n" " -jobCheckPeriod=N Minutes between checking on job - default %d.\n" " -machineCheckPeriod=N Minutes between checking on machine - default %d.\n" " -subnet=XXX.YYY.ZZZ Only accept connections from subnet (example 192.168).\n" " Or CIDR notation (example 192.168.1.2/24).\n" " Supports comma-separated list of IPv4 or IPv6 subnets in CIDR notation.\n" " -nextJobId=N Starting job ID number.\n" " -logFacility=facility Log to the specified syslog facility - default local0.\n" " -logMinPriority=pri minimum syslog priority to log, also filters file logging.\n" " defaults to \"warn\"\n" " -log=file Log to file instead of syslog.\n" " -debug Don't daemonize\n" " -noResume Don't try to reconnect with jobs running on nodes.\n" " -ramUnit=N Number of bytes of RAM in the base unit used by the jobs.\n" " Default is RAM on node divided by number of cpus on node.\n" " Shorthand expressions allow t,g,m,k for tera, giga, mega, kilo.\n" " e.g. 4g = 4 Gigabytes.\n" " -defaultJobRam=N Number of ram units in a job has no specified ram usage.\n" " Defaults to 1.\n" , version, initialSpokes, jobCheckPeriod, machineCheckPeriod ); } struct spoke *spokeList; /* List of all spokes. */ struct dlList *freeSpokes; /* List of free spokes. */ struct dlList *busySpokes; /* List of busy spokes. */ struct dlList *deadSpokes; /* List of dead spokes. */ struct machine *machineList; /* List of all machines. */ struct dlList *freeMachines; /* List of machines idle. */ struct dlList *readyMachines; /* List of machines ready for jobs. */ struct dlList *blockedMachines; /* List of machines ready but blocked by runningCount. */ struct dlList *busyMachines; /* List of machines running jobs. */ struct dlList *deadMachines; /* List of machines that aren't running. */ struct dlList *runningJobs; /* Jobs that are running. Preserves oldest first order. */ struct dlList *hangJobs; /* Jobs running hang check list. */ struct hash *userHash; /* Hash of all users. */ struct user *userList; /* List of all users. */ struct batch *batchList; /* List of all batches. */ struct dlList *queuedUsers; /* Users with jobs in queue. */ struct dlList *unqueuedUsers; /* Users with no jobs in queue. */ struct hash *machineHash; /* Find if machine exists already */ struct hash *stringHash; /* Unique strings throughout system go here * including directory names and results file * names/batch names. */ struct resultQueue *resultQueues; /* Result files. */ int finishedJobCount = 0; /* Number of finished jobs. */ int crashedJobCount = 0; /* Number of crashed jobs. */ char *jobIdFileName = "parasol.jid"; /* File name where jobId file is. */ FILE *jobIdFile = NULL; /* Handle to jobId file. */ char *hubHost; /* Name of machine running this. */ struct rudp *rudpOut; /* Our rUDP socket. */ /* Variables for new scheduler */ // TODO make commandline param options to override defaults for unit sizes? /* using machines list spec info for defaults */ int cpuUnit = 1; /* 1 CPU */ /* someday this could be float 0.5 */ long long ramUnit = 512 * 1024 * 1024; /* 500 MB */ int defaultJobCpu = 1; /* number of cpuUnits in default job usage */ int defaultJobRam = 1; /* number of ramUnits in default job usage */ /* for the resource array dimensions */ int maxCpuInCluster = 0; /* node with largest number of cpu units */ int maxRamInCluster = 0; /* node with largest number of ram units */ struct slRef ***perCpu = NULL; /* an array of resources sharing the same cpu units free units count */ boolean needsPlanning = FALSE; /* remember if situation changed, need new plan */ void setupLists() /* Make up machine, spoke, user and job lists - all doubly linked * so it is fast to remove items from one list and put them * on another. */ { freeMachines = newDlList(); readyMachines = newDlList(); blockedMachines = newDlList(); busyMachines = newDlList(); deadMachines = newDlList(); runningJobs = newDlList(); hangJobs = newDlList(); freeSpokes = newDlList(); busySpokes = newDlList(); deadSpokes = newDlList(); queuedUsers = newDlList(); unqueuedUsers = newDlList(); userHash = newHash(6); } void lookupIp(char *host, char *ipStr, int ipStrSize) /* convert host into IP address string. */ { struct sockaddr_storage sai; if (!internetFillInAddress6n4(host, NULL, AF_UNSPEC, SOCK_DGRAM, &sai, FALSE)) errAbort("host %s lookup failed.", host); getAddrAsString6n4(&sai, ipStr, ipStrSize); } int avgBatchTime(struct batch *batch) /**/ { if (batch->doneCount == 0) return 0; return batch->doneTime / batch->doneCount; } boolean nodeSickOnAllBatches(struct user *user, char *machineName) /* Return true if all of a user's current batches believe the machine is sick. */ { struct dlNode *node = user->curBatches->head; if (dlEnd(node)) return FALSE; for (; !dlEnd(node); node = node->next) { struct batch *batch = node->val; /* does any batch think the node is not sick? */ if (hashIntValDefault(batch->sickNodes, machineName, 0) < sickNodeThreshold) { return FALSE; } } return TRUE; } void updateUserSickNode(struct user *user, char *machineName) /* If all of a users batches reject a sick machine, then the user rejects it. */ { boolean allSick = nodeSickOnAllBatches(user, machineName); if (allSick) hashStore(user->sickNodes, machineName); else hashRemove(user->sickNodes, machineName); } void updateUserSickNodes(struct user *user) /* Update user sickNodes. A node is only sick if all batches call it sick. */ { struct dlNode *node; struct batch *batch; hashFree(&user->sickNodes); user->sickNodes = newHashExt(6, FALSE); node = user->curBatches->head; if (!dlEnd(node)) { batch = node->val; struct hashEl *el, *list = hashElListHash(batch->sickNodes); for (el = list; el != NULL; el = el->next) { updateUserSickNode(user, el->name); } hashElFreeList(&list); } } boolean userIsActive(struct user *user) /* Return TRUE if user has jobs running or in queue */ { return user->runningCount > 0 || !dlEmpty(user->curBatches); } int listSickNodes(struct paraMessage *pm) /* find nodes that are sick for all active users */ { int sickNodeCount = 0, userCount = 0; struct user *user; if (userList) { struct hashEl *el, *list = NULL; /* get list from an active user if any, and get active-users count */ for (user = userList; user != NULL; user = user->next) { if (userIsActive(user)) { ++userCount; if (!list) list = hashElListHash(user->sickNodes); } } if (list) { for (el = list; el != NULL; el = el->next) { boolean allSick = TRUE; for (user = userList; user != NULL; user = user->next) { if (userIsActive(user)) if (!hashLookup(user->sickNodes, el->name)) allSick = FALSE; } if (allSick) { ++sickNodeCount; if (pm) { pmClear(pm); pmPrintf(pm, "%s", el->name); pmSend(pm, rudpOut); } } } hashElFreeList(&list); } } if (sickNodeCount > 0) { if (pm) { pmClear(pm); pmPrintf(pm, "Strength of evidence: %d users", userCount); pmSend(pm, rudpOut); } } if (pm) pmSendString(pm, rudpOut, ""); return sickNodeCount; } void updateUserMaxJob(struct user *user) /* Update user maxJob. >=0 only if all batches have >=0 maxJob values */ { /* Note - at this point the user->maxJob is mostly ornamental, * it has been left in for people who want to see it in list users */ struct dlNode *node; struct batch *batch; boolean unlimited = FALSE; user->maxJob = 0; for (node = user->curBatches->head; !dlEnd(node); node = node->next) { batch = node->val; if (batch->maxJob >= 0) user->maxJob += batch->maxJob; else unlimited = TRUE; } if (unlimited) user->maxJob = -1; } void updateUserPriority(struct user *user) /* Update user priority. Equals minimum of current batch priorities */ { struct dlNode *node; struct batch *batch; user->priority = MAX_PRIORITY; for (node = user->curBatches->head; !dlEnd(node); node = node->next) { batch = node->val; if (batch->priority < user->priority) user->priority = batch->priority; } } struct batch *findBatchInList(struct dlList *list, char *nameString) /* Find a batch of jobs in list or return NULL. * nameString must be from stringHash. */ { struct dlNode *node; for (node = list->head; !dlEnd(node); node = node->next) { struct batch *batch = node->val; if (nameString == batch->name) return batch; } return NULL; } struct batch *newBatch(char *nameString, struct user *user) /* Make new batch. NameString must be in stringHash already */ { struct batch *batch; AllocVar(batch); slAddHead(&batchList, batch); AllocVar(batch->node); batch->node->val = batch; batch->name = nameString; batch->user = user; batch->jobQueue = newDlList(); batch->priority = NORMAL_PRIORITY; batch->maxJob = -1; batch->sickNodes = newHashExt(6, FALSE); batch->cpu = defaultJobCpu; /* number of cpuUnits in default job usage */ batch->ram = defaultJobRam; /* number of ramUnits in default job usage */ needsPlanning = TRUE; return batch; } struct batch *findBatch(struct user *user, char *name, boolean holding) /* Find batch of jobs. If no such batch yet make it. */ { struct batch *batch; name = hashStoreName(stringHash, name); batch = findBatchInList(user->curBatches, name); if (batch == NULL) { batch = findBatchInList(user->oldBatches, name); if (batch != NULL) dlRemove(batch->node); else batch = newBatch(name, user); if (holding && dlEmpty(batch->jobQueue)) /* setPriority must not release batch if jobs not yet pushed */ dlAddTail(user->oldBatches, batch->node); else dlAddTail(user->curBatches, batch->node); needsPlanning = TRUE; updateUserPriority(user); updateUserMaxJob(user); updateUserSickNodes(user); } return batch; } struct user *findUser(char *name) /* Find user. If it's the first time we've seen this * user then make up a user object and put it on the * idle user list. */ { struct user *user = hashFindVal(userHash, name); if (user == NULL) { AllocVar(user); slAddHead(&userList, user); hashAddSaveName(userHash, name, user, &user->name); AllocVar(user->node); user->node->val = user; dlAddTail(unqueuedUsers, user->node); user->curBatches = newDlList(); user->oldBatches = newDlList(); user->sickNodes = newHashExt(6, FALSE); } return user; } int userQueuedCount(struct user *user) /* Count up jobs user has waiting */ { struct dlNode *node; struct batch *batch; int count = 0; for (node = user->curBatches->head; !dlEnd(node); node = node->next) { batch = node->val; count += batch->queuedCount; } return count; } struct batch *findLuckyBatch(struct user *user) /* Find the batch that gets to run a job. */ { struct batch *minBatch = NULL; int minScore = BIGNUM; struct dlNode *node; for (node = user->curBatches->head; !dlEnd(node); node = node->next) { struct batch *batch = node->val; if (batch->planning) { if (batch->planScore < minScore) { minScore = batch->planScore; minBatch = batch; } } } return minBatch; } struct user *findLuckyUser() /* Find lucky user who gets to run a job. */ { struct user *minUser = NULL; int minScore = BIGNUM; struct dlNode *node; for (node = queuedUsers->head; !dlEnd(node); node = node->next) { struct user *user = node->val; if (user->planningBatchCount > 0) { if (user->planScore < minScore) { minScore = user->planScore; minUser = user; } } } return minUser; } void resetBatchesForPlanning(struct user *user) /* Initialize batches for given user for planning.*/ { struct dlNode *node; for (node = user->curBatches->head; !dlEnd(node); node = node->next) { struct batch *batch = node->val; batch->planning = TRUE; batch->planCount = 0; /* adding 1 to planCount helps suppress running any jobs when priority is set very high */ batch->planScore = 1 * batch->priority; if (batch->maxJob == 0) batch->planning = FALSE; if (batch->planning) { ++user->planningBatchCount; } } } void resetUsersForPlanning() /* Initialize users for planning. */ { struct dlNode *node; for (node = queuedUsers->head; !dlEnd(node); node = node->next) { struct user *user = node->val; user->planCount = 0; user->planningBatchCount = 0; updateUserPriority(user); updateUserMaxJob(user); updateUserSickNodes(user); /* adding 1 to planCount helps suppress running any jobs when priority is set very high */ user->planScore = 1 * user->priority; resetBatchesForPlanning(user); } } void unactivateBatchIfEmpty(struct batch *batch) /* If job queue on batch is empty then remove batch from * user's active batch list, and possibly user from active * user list. */ { if (dlEmpty(batch->jobQueue)) { struct user *user = batch->user; batch->queuedCount = 0; dlRemove(batch->node); dlAddTail(user->oldBatches, batch->node); batch->planCount = 0; /* use as a signal that it's not active any more */ needsPlanning = TRUE; /* remember if situation changed, need new plan */ updateUserPriority(user); updateUserMaxJob (user); updateUserSickNodes(user); /* Check if it's last user batch and if so take them off queue */ if (dlEmpty(user->curBatches)) { dlRemove(user->node); dlAddTail(unqueuedUsers, user->node); } } } void readTotalMachineResources(struct machine *machine, int *cpuReturn, int *ramReturn) /* Return in units the cpu and ram resources of given machine */ { int c = 0, r = 0; c = machine->machSpec->cpus / cpuUnit; r = ((long long)machine->machSpec->ramSize * 1024 * 1024) / ramUnit; *cpuReturn = c; *ramReturn = r; } void readRemainingMachineResources(struct machine *machine, int *cpuReturn, int *ramReturn) /* Calculate available cpu and ram resources in given machine */ { int c = 0, r = 0; readTotalMachineResources(machine, &c, &r); /* subtract all the resources now in-use */ struct dlNode *jobNode = NULL; for (jobNode = machine->jobs->head; !dlEnd(jobNode); jobNode = jobNode->next) { struct job *job = jobNode->val; struct batch * batch =job->batch; c -= batch->cpu; r -= batch->ram; } *cpuReturn = c; *ramReturn = r; } struct batch *findRunnableBatch(struct machine *machine, struct slRef **pEl, boolean *pCouldRun) /* Search machine for runnable batch, preferable something not at maxJob */ { int c = 0, r = 0; readRemainingMachineResources(machine, &c, &r); struct slRef* el; for(el = machine->plannedBatches; el; el=el->next) { struct batch *batch = el->val; /* Prevent too many from this batch from running. * This is helpful for keeping the balance with longrunning batches * and maxJob. */ if (batch->cpu <= c && batch->ram <= r) { if (pCouldRun) *pCouldRun = TRUE; if (batch->runningCount < batch->planCount) { if (pEl) *pEl = el; return batch; } } } if (pEl) *pEl = NULL; return NULL; } int scoreCost(struct batch *batch) /* calculate score cost of using resources */ { return max(batch->cpu * defaultJobRam, batch->ram * defaultJobCpu); } void allocateResourcesToMachine(struct machine *mach, struct batch *batch, struct user *user, int *pC, int *pR) /* Allocate Resources to machine*/ { *pC -= batch->cpu; *pR -= batch->ram; ++batch->planCount; ++user->planCount; /* incrementally update score for batches and users */ /* scoring that accounts the resources carefully, e.g. actual ram and cpu. */ int cost = scoreCost(batch); batch->planScore += cost * batch->priority; user->planScore += cost * user->priority; /* add batch to plannedBatches queue */ refAdd(&mach->plannedBatches, batch); /* maxJob handling */ if ((batch->maxJob!=-1) && (batch->planCount >= batch->maxJob)) { /* remove batch from the allocating */ batch->planning = FALSE; --user->planningBatchCount; } } void plan(struct paraMessage *pm) /* Make a new plan allocating resources to batches */ { logDebug("executing new plan"); if (pm) { pmClear(pm); pmPrintf(pm, "cpuUnit=%d, ramUnit=%lld", cpuUnit, ramUnit); pmSend(pm, rudpOut); pmClear(pm); pmPrintf(pm, "job default units: Cpu=%d, ram=%d", defaultJobCpu, defaultJobRam); pmSend(pm, rudpOut); pmClear(pm); pmPrintf(pm, "max cluster units: Cpu=%d, ram=%d", maxCpuInCluster, maxRamInCluster); pmSend(pm, rudpOut); pmSendString(pm, rudpOut, "-----"); } //if (pm) pmSendString(pm, rudpOut, "about to initialize cpu/ram 2d arrays"); /* Initialize Resource Arrays for CPU and RAM */ /* allocate memory like a 2D array */ int c = 0, r = 0; /* +1 to allow for zero slot simplifies the code */ AllocArray(perCpu, maxCpuInCluster+1); for (c = 1; c <= maxCpuInCluster; ++c) AllocArray(perCpu[c], maxRamInCluster+1); //if (pm) pmSendString(pm, rudpOut, "about to add machines resources to cpu/ram arrays"); resetUsersForPlanning(); /* allocate machines to resource lists */ struct machine *mach; for (mach = machineList; mach != NULL; mach = mach->next) { slFreeList(&mach->plannedBatches); // free any from last plan if (!mach->isDead) { readTotalMachineResources(mach, &c, &r); /* Sweep mark all running jobs as oldPlan, * this helps us deal with jobsDone from old plan. * For better handling of long-running maxJob batches * with frequent replanning, * preserve the same resources on the same machines. */ struct dlNode *jobNode = NULL; for (jobNode = mach->jobs->head; !dlEnd(jobNode); jobNode = jobNode->next) { struct job *job = jobNode->val; struct batch *batch = job->batch; struct user *user = batch->user; job->oldPlan = TRUE; if (batch->planning && (batch->maxJob != -1)) { if (pm) { //pmClear(pm); //pmPrintf(pm, "preserving batch %s on machine %s", batch->name, mach->name); //pmSend(pm, rudpOut); } allocateResourcesToMachine(mach, batch, user, &c, &r); } } if (pm) { //pmClear(pm); //pmPrintf(pm, "machSpec (%s) cpus:%d ramSize=%d" //, mach->name, mach->machSpec->cpus, mach->machSpec->ramSize); //pmSend(pm, rudpOut); } if (c < 1 || r < 1) { if (pm) { //pmClear(pm); //pmPrintf(pm, "IGNORING mach: %s c=%d cpu units; r=%d ram units", mach->name, c, r); //pmSend(pm, rudpOut); } } else { if (pm) { pmClear(pm); pmPrintf(pm, "mach: %s c=%d cpu units; r=%d ram units", mach->name, c, r); pmSend(pm, rudpOut); } refAdd(&perCpu[c][r], mach); } } } /* allocate machines to resource lists */ while(TRUE) { /* find lucky user/batch */ struct user *user = findLuckyUser(); if (!user) break; struct batch *batch = findLuckyBatch(user); if (!batch) { errAbort("unexpected error: batch not found while planning for lucky user"); break; } if (pm) { //pmClear(pm); //pmPrintf(pm, "lucky user: %s; lucky batch=%s", user->name, batch->name); //pmSend(pm, rudpOut); } /* find machine with adequate resources in resource array (if any) */ boolean found = FALSE; struct slRef **perRam = NULL; struct slRef *el = NULL; for (c = batch->cpu; c <= maxCpuInCluster; ++c) { /* an array of resources sharing the same cpu and ram free units count */ perRam = perCpu[c]; for (r = batch->ram; r <= maxRamInCluster; ++r) { if (perRam[r]) { /* avoid any machine in the sickNodes */ /* extract from list if found */ el = perRam[r]; struct slRef **listPt = &perRam[r]; while (el) { mach = (struct machine *) el->val; if (hashIntValDefault(batch->sickNodes, mach->name, 0) < sickNodeThreshold) { found = TRUE; *listPt = el->next; el->next = NULL; break; } listPt = &el->next; el = el->next; } } if (found) break; // preserve value of r } if (found) break; // preserve value of c } if (found) { /* allocate plan, reduce resources, calc new resources and pos. * move machine from old array pos to new pos. (slPopHead, slAddHead) * update its stats, and if heaps, update heaps. */ if (pm) { //pmClear(pm); //pmPrintf(pm, "found hardware cpu %d ram %d in machine %s c=%d r=%d batch=%s", //batch->cpu, batch->ram, mach->name, c, r, batch->name); //pmSend(pm, rudpOut); } allocateResourcesToMachine(mach, batch, user, &c, &r); if (pm) { //pmClear(pm); //pmPrintf(pm, "remaining hardware c=%d r=%d", c, r); //pmSend(pm, rudpOut); } if (c < 1 || r < 1) freeMem(el); /* this node has insufficient resources remaining */ else slAddHead(&perCpu[c][r], el); } else { if (pm) { //pmClear(pm); //pmPrintf(pm, "no suitable machines left, removing from planning: user %s; lucky batch %s", //user->name, batch->name); //pmSend(pm, rudpOut); } /* no suitable machine found */ /* remove batch from the allocating */ batch->planning = FALSE; --user->planningBatchCount; } } /* free arrays when finished */ for (c = 1; c <= maxCpuInCluster; ++c) { for (r = 1; r <= maxRamInCluster; ++r) { slFreeList(&perCpu[c][r]); } freeMem(perCpu[c]); } freeMem(perCpu); /* allocate machines to busy, ready, free lists */ for (mach = machineList; mach != NULL; mach = mach->next) { if (!mach->isDead) { /* See if any machines have enough resources free * to start their plan, and start those jobs. * If so, add them to the readyMachines list. */ struct dlNode *mNode = mach->node; dlRemove(mNode); /* remove it from whichever list it was on */ if (mach->plannedBatches) /* was anything planned for this machine? */ { boolean couldRun = FALSE; struct batch *batch = findRunnableBatch(mach, NULL, &couldRun); if (batch) dlAddTail(readyMachines, mNode); else if (couldRun) dlAddTail(blockedMachines, mNode); else dlAddTail(busyMachines, mNode); } else { struct dlNode *jobNode = mach->jobs->head; if (dlEnd(jobNode)) dlAddTail(freeMachines, mNode); else dlAddTail(busyMachines, mNode); } } } if (pm) { pmClear(pm); pmPrintf(pm, "# machines:" " busy %d" " ready %d" " blocked %d" " free %d" " dead %d" , dlCount(busyMachines) , dlCount(readyMachines) , dlCount(blockedMachines) , dlCount(freeMachines) , dlCount(deadMachines) ); pmSend(pm, rudpOut); pmSendString(pm, rudpOut, "end of planning"); pmSendString(pm, rudpOut, ""); } needsPlanning = FALSE; logDebug("plan finished"); } boolean runNextJob() /* Assign next job in pending queue if any to a machine. */ { /* give blocked machines another chance */ while (!dlEmpty(blockedMachines)) { struct dlNode *mNode; mNode = dlPopHead(blockedMachines); dlAddTail(readyMachines, mNode); } while(TRUE) { if (dlEmpty(readyMachines)) return FALSE; if (dlEmpty(freeSpokes)) return FALSE; struct dlNode *mNode; struct machine *machine; /* Get free machine */ mNode = dlPopHead(readyMachines); machine = mNode->val; if (!machine->plannedBatches) /* anything to do for this machine? */ { struct dlNode *jobNode = machine->jobs->head; if (dlEnd(jobNode)) dlAddTail(freeMachines, mNode); else dlAddTail(busyMachines, mNode); continue; } boolean couldRun = FALSE; /* was it limited only by runningCount? */ struct slRef *batchEl = NULL; struct batch *batch = findRunnableBatch(machine, &batchEl, &couldRun); if (!batch) { if (couldRun) dlAddTail(blockedMachines, mNode); else dlAddTail(busyMachines, mNode); continue; } /* remove the batch from the planning list */ if (!slRemoveEl(&machine->plannedBatches, batchEl)) { /* this should not happen */ logWarn("unable to remove batch from machine->plannedBatches, length: %d\n", slCount(machine->plannedBatches)); dlAddTail(freeMachines, mNode); continue; } freeMem(batchEl); if (batch->queuedCount == 0) { /* probably the batch has been chilled */ /* needsPlanning=TRUE and a new plan will come along soon. */ /* just put it back on the ready list, it will get looked at again */ /* this has the effect of removing the batch from this machine's plannedBatches */ dlAddTail(readyMachines, mNode); continue; } struct user *user = batch->user; struct dlNode *jNode, *sNode; struct spoke *spoke; struct job *job; /* Get free spoke and move them to busy lists. */ machine->lastChecked = now; sNode = dlPopHead(freeSpokes); dlAddTail(busySpokes, sNode); spoke = sNode->val; /* Get active batch from user and take job off of it. * If it's the last job in the batch move batch to * finished list. */ jNode = dlPopHead(batch->jobQueue); dlAddTail(runningJobs, jNode); job = jNode->val; dlAddTail(hangJobs, job->hangNode); ++batch->runningCount; --batch->queuedCount; ++user->runningCount; unactivateBatchIfEmpty(batch); /* Tell machine, job, and spoke about each other. */ dlAddTail(machine->jobs, job->jobNode); /* just put it back on the ready list, it will get looked at again */ dlAddTail(readyMachines, mNode); job->machine = machine; job->lastChecked = job->startTime = job->lastClockIn = now; spokeSendJob(spoke, machine, job); return TRUE; } } void runner(int count) /* Try to run a couple of jobs. */ { while (--count >= 0) if (!runNextJob()) break; } struct machine *machineNew(char *name, char *tempDir, struct machSpec *m) /* Create a new machine structure. */ { struct machine *mach; AllocVar(mach); mach->name = cloneString(name); mach->tempDir = cloneString(tempDir); AllocVar(mach->node); mach->node->val = mach; mach->machSpec = m; mach->jobs = newDlList(); return mach; } void machineFree(struct machine **pMach) /* Delete machine structure. */ { struct machine *mach = *pMach; if (mach != NULL) { freeMem(mach->node); freeMem(mach->name); freeMem(mach->tempDir); machSpecFree(&mach->machSpec); freeDlList(&mach->jobs); freez(pMach); } } struct machine *doAddMachine(char *name, char *tempDir, char *ipStr, struct machSpec *m) /* Add machine to pool. If you don't know ip yet just pass * in 0 for that argument. */ { struct machine *mach; mach = machineNew(name, tempDir, m); safecpy(mach->ipStr, sizeof mach->ipStr, ipStr); dlAddTail(freeMachines, mach->node); slAddHead(&machineList, mach); needsPlanning = TRUE; return mach; } void addMachine(char *line) /* Process message to add machine to pool. */ { char *name = nextWord(&line); if (hashLookup(machineHash, name)) /* ignore duplicate machines */ { warn("machine already added: %s", name); return; } char *param2 = nextWord(&line); struct machSpec *m = NULL; AllocVar(m); if (!line) { /* for backwards compatibility, allow running without full spec, * just copy the machSpec of the first machine on the list */ *m = *machineList->machSpec; m->name = cloneString(name); m->tempDir = cloneString(param2); if (!m->tempDir) { freeMem(m); warn("incomplete addMachine request"); return; } } else { m->name = cloneString(name); m->cpus = atoi(param2); m->ramSize = atoi(nextWord(&line)); m->tempDir = cloneString(nextWord(&line)); m->localDir = cloneString(nextWord(&line)); m->localSize = atoi(nextWord(&line)); m->switchName = cloneString(nextWord(&line)); if (!m->switchName) { freeMem(m); warn("incomplete addMachine request"); return; } } doAddMachine(name, m->tempDir, "0", m); // "0" means no ipStr here runner(1); } struct machine *findMachine(char *name) /* Find named machine. */ { struct machine *mach; for (mach = machineList; mach != NULL; mach = mach->next) { if (sameString(mach->name, name)) return mach; } return NULL; } struct job *jobFind(struct dlList *list, int id) /* Find node of job with given id on list. Return NULL if * not found. */ { struct dlNode *el; struct job *job; for (el = list->head; !dlEnd(el); el = el->next) { job = el->val; if (job->id == id) return job; } return NULL; } struct job *findWaitingJob(int id) /* Find job that's waiting (as opposed to running). Return * NULL if it can't be found. */ { /* If it's not running look in user job queues. */ struct user *user; struct job *job = NULL; for (user = userList; user != NULL; user = user->next) { struct dlNode *node; for (node = user->curBatches->head; !dlEnd(node); node = node->next) { struct batch *batch = node->val; if ((job = jobFind(batch->jobQueue, id)) != NULL) break; } if (job != NULL) break; } return job; } void requeueJob(struct job *job) /* Move job from running queue back to a user pending * queue. This happens when a node is down or when * it missed the message about a job. */ { struct batch *batch = job->batch; struct user *user = batch->user; job->machine = NULL; dlRemove(job->node); dlAddTail(batch->jobQueue, job->node); dlRemove(job->jobNode); dlRemove(job->hangNode); batch->runningCount -= 1; batch->queuedCount += 1; user->runningCount -= 1; dlRemove(batch->node); dlAddHead(user->curBatches, batch->node); dlRemove(user->node); dlAddHead(queuedUsers, user->node); if (batch->planCount == 0) needsPlanning = TRUE; updateUserPriority(user); updateUserMaxJob(user); updateUserSickNodes(user); } void requeueAllJobs(struct machine *mach, boolean doDead) /* Requeue all jobs on machine. */ { struct dlNode *next = NULL; struct dlNode *jobNode = NULL; for (jobNode = mach->jobs->head; !dlEnd(jobNode); jobNode = next) { struct job *job = jobNode->val; next = jobNode->next; if (doDead) { struct slInt *i = slIntNew(job->id); slAddHead( &mach->deadJobIds, i ); } /* this affects the mach->jobs list itself by removing this node */ requeueJob(job); } } boolean removeMachine(char *machName, char *user, char *reason) /* Remove machine from pool. */ { struct machine *mach; if ((mach = findMachine(machName))) { // logged as an error because it's important for admins to know that there is an // error with this machine logError("hub: user %s removed machine %s because: %s",user,machName,reason); requeueAllJobs(mach, FALSE); dlRemove(mach->node); slRemoveEl(&machineList, mach); hashRemove(machineHash, mach->name); machineFree(&mach); return TRUE; } else { logDebug("hub: user %s wanted to removed machine %s because: %s but machine was not found",user,machName,reason); return FALSE; } } void removeMachineAcknowledge(char *line, struct paraMessage *pm) /* Remove machine and send response back. */ { char *machName = nextWord(&line); char *user = nextWord(&line); char *reason = line; machName = trimSpaces(machName); char *retVal = "ok"; if (!removeMachine(machName, user, reason)) retVal = "Machine not found."; pmSendString(pm, rudpOut, retVal); pmSendString(pm, rudpOut, ""); } void machineDown(struct machine *mach) /* Mark machine as down and move it to dead list. */ { dlRemove(mach->node); mach->lastChecked = time(NULL); mach->isDead = TRUE; dlAddTail(deadMachines, mach->node); } void buryMachine(struct machine *machine) /* Reassign jobs that machine is processing and bury machine * in dead list. */ { requeueAllJobs(machine, TRUE); machineDown(machine); } void nodeDown(char *line) /* Deal with a node going down - move it to dead list and * put job back on job list. */ { struct machine *mach; char *machName = nextWord(&line); if ((mach = findMachine(machName)) != NULL) buryMachine(mach); runner(1); } char *exeFromCommand(char *cmd) /* Return executable name (without path) given command line. */ { static char exe[128]; char *s,*e; int i, size; int lastSlash = -1; /* Isolate first space-delimited word between s and e. */ s = skipLeadingSpaces(cmd); e = skipToSpaces(cmd); if (e == NULL) e = s + strlen(s); size = e - s; /* Find last '/' in this word if any, and reposition s after it. */ for (i=0; i<size; ++i) { if (s[i] == '/') lastSlash = i; } if (lastSlash > 0) s += lastSlash + 1; /* Copy whats left to string to return . */ size = e - s; if (size >= sizeof(exe)) size = sizeof(exe)-1; memcpy(exe, s, size); exe[size] = 0; return exe; } struct job *jobNew(char *cmd, char *userName, char *dir, char *in, char *out, float cpus, long long ram, char *results, boolean forQueue) /* Create a new job structure */ { struct job *job; struct user *user = findUser(userName); struct batch *batch = findBatch(user, results, FALSE); if (forQueue && (batch->continuousCrashCount >= sickBatchThreshold)) { warn("not adding job [%s] for %s, sick batch %s", cmd, userName, batch->name); unactivateBatchIfEmpty(batch); /* handle side-effect of findBatch call above */ return NULL; } AllocVar(job); AllocVar(job->jobNode); job->jobNode->val = job; AllocVar(job->node); job->node->val = job; job->id = ++nextJobId; job->exe = cloneString(exeFromCommand(cmd)); job->cmd = cloneString(cmd); job->batch = batch; job->dir = hashStoreName(stringHash, dir); job->in = cloneString(in); job->out = cloneString(out); job->cpus = cpus; job->ram = ram; AllocVar(job->hangNode); job->hangNode->val = job; return job; } void jobFree(struct job **pJob) /* Free up a job. */ { struct job *job = *pJob; if (job != NULL) { freeMem(job->jobNode); freeMem(job->node); freeMem(job->exe); freeMem(job->cmd); freeMem(job->in); freeMem(job->out); freeMem(job->err); freeMem(job->hangNode); freez(pJob); } } boolean sendViaSpoke(struct machine *machine, char *message) /* Send a message to machine via spoke. */ { struct dlNode *node = dlPopHead(freeSpokes); struct spoke *spoke; if (node == NULL) { logDebug("hub: out of spokes!"); return FALSE; } dlAddTail(busySpokes, node); spoke = node->val; spokeSendMessage(spoke, machine, message); return TRUE; } void checkDeadNodesASAP() /* Check dead nodes ASAP, some may have been fixed. * It tweaks the time since last check on all dead machines * so that grave digger will send them resurrect messages * to see if they are alive. */ { struct dlNode *mNode; struct machine *machine; for (mNode = deadMachines->head; !dlEnd(mNode); mNode = mNode->next) { machine = mNode->val; machine->lastChecked = now - MINUTE * machineCheckPeriod; } } void checkPeriodically(struct dlList *machList, int period, char *checkMessage, int spokesToUse) /* Periodically send checkup messages to machines on list. */ { struct dlNode *mNode; struct machine *machine; char message[512]; int i; safef(message, sizeof(message), "%s", checkMessage); for (i=0; i<spokesToUse; ++i) { /* If we have some free spokes and some busy machines, and * the busy machines haven't been checked for a while, go * check them. */ if (dlEmpty(freeSpokes) || dlEmpty(machList)) break; machine = machList->head->val; if (now - machine->lastChecked < period) break; machine->lastChecked = now; mNode = dlPopHead(machList); dlAddTail(machList, mNode); sendViaSpoke(machine, message); logDebug("hub: sending resurrect message to %s",machine->name); } } void hangman(int spokesToUse) /* Check that jobs are alive, sense if nodes are dead. Also send message for * busy nodes to check in for specific jobs, in case we missed one of their earlier * jobDone messages. */ { int i, period = jobCheckPeriod*MINUTE; struct dlNode *hangNode; struct job *job; struct machine *machine; for (i=0; i<spokesToUse; ++i) { if (dlEmpty(freeSpokes) || dlEmpty(hangJobs)) break; job = hangJobs->head->val; if (now - job->lastChecked < period) break; job->lastChecked = now; hangNode = dlPopHead(hangJobs); dlAddTail(hangJobs, hangNode); machine = job->machine; if (now - job->lastClockIn >= MINUTE * assumeDeadPeriod) { warn("hub: node %s running %d looks dead, burying", machine->name, job->id); buryMachine(machine); break; /* jobs list has been freed by bury, break immediately */ } else { char message[512]; safef(message, sizeof(message), "check %d", job->id); sendViaSpoke(machine, message); } } } void graveDigger(int spokesToUse) /* Check out dead nodes. Try and resurrect them periodically. */ { checkPeriodically(deadMachines, MINUTE * machineCheckPeriod, "resurrect", spokesToUse); } void flushResults(char *batchName) /* Flush all results files. batchName can be NULL for all. */ { struct resultQueue *rq; for (rq = resultQueues; rq != NULL; rq = rq->next) { if (!batchName || (rq->name == batchName)) if (rq->f != NULL) fflush(rq->f); } } void changeFileOwner(char *fileName, char *newOwner) /* Attempt to change ownership of file. */ { struct passwd *pwd = getpwnam(newOwner); if (pwd == NULL) { perror("getpwnam"); return; } if (chown(fileName, pwd->pw_uid, -1) == -1) perror("chown"); } void writeResults(char *fileName, char *userName, char *machineName, int jobId, char *exe, time_t submitTime, time_t startTime, char *errFile, char *cmd, char *status, char *uTime, char *sTime) /* Write out job results to output queue. This * will create the output queue if it doesn't yet * exist. */ { struct resultQueue *rq; for (rq = resultQueues; rq != NULL; rq = rq->next) if (sameString(fileName, rq->name)) break; if (rq == NULL) { AllocVar(rq); slAddHead(&resultQueues, rq); rq->name = fileName; rq->f = fopen(rq->name, "a"); if (rq->f == NULL) warn("hub: couldn't open results file %s", rq->name); rq->lastUsed = now; changeFileOwner(fileName, userName); } if (rq->f != NULL) { fprintf(rq->f, "%s %s %d %s %s %s %lu %lu %lu %s %s '%s'\n", status, machineName, jobId, exe, uTime, sTime, submitTime, startTime, now, userName, errFile, cmd); fflush(rq->f); rq->lastUsed = now; } } void writeJobResults(struct job *job, char *status, char *uTime, char *sTime) /* Write out job results to output queue. This * will create the output queue if it doesn't yet * exist. */ { struct batch *batch = job->batch; if (sameString(status, "0")) { ++finishedJobCount; ++batch->doneCount; batch->doneTime += (now - job->startTime); ++batch->user->doneCount; batch->continuousCrashCount = 0; /* remember the continuous number of times this batch has crashed on this node */ hashRemove(batch->sickNodes, job->machine->name); hashRemove(batch->user->sickNodes, job->machine->name); } else { ++crashedJobCount; ++batch->crashCount; ++batch->continuousCrashCount; /* remember the continuous number of times this batch has crashed on this node */ hashIncInt(batch->sickNodes, job->machine->name); updateUserSickNode(batch->user, job->machine->name); } writeResults(batch->name, batch->user->name, job->machine->name, job->id, job->exe, job->submitTime, job->startTime, job->err, job->cmd, status, uTime, sTime); } void resultQueueFree(struct resultQueue **pRq) /* Free up a results queue, closing file if open. */ { struct resultQueue *rq = *pRq; if (rq != NULL) { carefulCloseWarn(&rq->f); freez(pRq); } } void sweepResultsWithRemove(char *name) /* Get rid of result queues that haven't been accessed for * a while. Also remove any matching name if not NULL. * Flushes all results. */ { struct resultQueue *newList = NULL, *rq, *next; for (rq = resultQueues; rq != NULL; rq = next) { next = rq->next; if ((now - rq->lastUsed > 1*MINUTE) || (name && name == rq->name)) { logDebug("hub: closing results file %s", rq->name); resultQueueFree(&rq); } else { slAddHead(&newList, rq); } } slReverse(&newList); resultQueues = newList; flushResults(NULL); } void saveJobId() /* Save job ID. */ { rewind(jobIdFile); writeOne(jobIdFile, nextJobId); fflush(jobIdFile); if (ferror(jobIdFile)) errnoAbort("can't write job id file %s", jobIdFileName); } void openJobId() /* Open file with jobID in it and read jobId. Bump it * by 100000 in case we crashed to avoid reusing job * id's, but do reuse every 2 billion. Let command line * overwrite this though . */ { jobIdFile = fopen(jobIdFileName, "r+"); if (jobIdFile != NULL) { (void)readOne(jobIdFile, nextJobId); nextJobId += 100000; } else jobIdFile = mustOpen(jobIdFileName, "w"); if (nextJobId < 0) nextJobId = 0; nextJobId = optionInt("nextJobId", nextJobId); } void processHeartbeat() /* Check that system is ok. See if we can do anything useful. */ { int spokesToUse; if (needsPlanning) plan(NULL); runner(30); spokesToUse = dlCount(freeSpokes); if (spokesToUse > 0) { spokesToUse >>= 1; spokesToUse -= 1; if (spokesToUse < 1) spokesToUse = 1; graveDigger(spokesToUse); hangman(spokesToUse); sweepResultsWithRemove(NULL); saveJobId(); } } boolean sendKillJobMessage(struct machine *machine, int jobId) /* Send message to compute node to kill job there. */ { char message[64]; safef(message, sizeof(message), "kill %d", jobId); logDebug("hub: %s %s", machine->name, message); if (!sendViaSpoke(machine, message)) { return FALSE; } return TRUE; } void nodeAlive(char *line) /* Deal with message from node that says it's alive. * Move it from dead to free list. The major complication * of this occurs if the node was running a job and it * didn't really go down, we just lost communication with it. * In this case we will have restarted the job elsewhere, and * that other copy could be conflicting with the copy of * the job the node is still running. */ { char *name = nextWord(&line), *jobIdString; int jobId; struct machine *mach; struct dlNode *node; boolean hostFound = FALSE; for (node = deadMachines->head; !dlEnd(node); node = node->next) { mach = node->val; if (sameString(mach->name, name) && mach->isDead) { hostFound = TRUE; dlRemove(node); dlAddTail(freeMachines, node); needsPlanning = TRUE; mach->isDead = FALSE; if (mach->deadJobIds != NULL) { - struct dyString *dy = newDyString(0); + struct dyString *dy = dyStringNew(0); struct slInt *i = mach->deadJobIds; dyStringPrintf(dy, "hub: node %s assigned ", name); for(i = mach->deadJobIds; i; i = i->next) dyStringPrintf(dy, "%d ", i->val); dyStringPrintf(dy, "came back."); logWarn("%s", dy->string); dyStringFree(&dy); while ((jobIdString = nextWord(&line)) != NULL) { jobId = atoi(jobIdString); if ((i = slIntFind(mach->deadJobIds, jobId))) { struct job *job; warn("hub: Looks like %s is still keeping track of %d", name, jobId); if ((job = findWaitingJob(jobId)) != NULL) { warn("hub: Luckily rerun of job %d has not yet happened.", jobId); job->machine = mach; dlAddTail(mach->jobs, job->jobNode); job->lastChecked = mach->lastChecked = job->lastClockIn = now; dlRemove(job->node); dlAddTail(runningJobs, job->node); dlRemove(mach->node); dlAddTail(busyMachines, mach->node); dlAddTail(hangJobs, job->hangNode); struct batch *batch = job->batch; struct user *user = batch->user; batch->runningCount += 1; batch->queuedCount -= 1; user->runningCount += 1; } else if ((job = jobFind(runningJobs, jobId)) != NULL) { /* Job is running on resurrected machine and another. * Kill it on both since the output it created could * be corrupt at this point. Then add it back to job * queue. */ warn("hub: Job %d is running on %s as well.", jobId, job->machine->name); sendKillJobMessage(mach, job->id); sendKillJobMessage(job->machine, job->id); requeueJob(job); } else { /* This case should be very rare. It should happen when * a node is out of touch for 2 hours, but when it comes * back is running a job that we reran to completion * on another node. */ warn("hub: Job %d has finished running, there is a conflict. " "Data may be corrupted, and it will take a lot of logic to fix.", jobId); } } } } slFreeList(&mach->deadJobIds); runner(1); break; } } if (!hostFound) { warn("hub 'alive $HOST' msg handler: unable to resurrect host %s, " "not find in deadMachines list.", name); } } void recycleMachine(struct machine *mach) /* Recycle machine into free list. */ { dlRemove(mach->node); dlAddTail(readyMachines, mach->node); } void recycleJob(struct job *job) /* Remove job from lists and free up memory associated with it. */ { dlRemove(job->node); jobFree(&job); } void nodeCheckIn(char *line) /* Deal with check in message from node. */ { char *machine = nextWord(&line); char *jobIdString = nextWord(&line); char *status = nextWord(&line); int jobId = atoi(jobIdString); if (status != NULL) { struct job *job = jobFind(runningJobs, jobId); if (job != NULL) { job->lastClockIn = now; if (!sameWord(job->machine->name, machine)) { logError("hub: checkIn %s %s %s should be from %s", machine, jobIdString, status, job->machine->name); } } else { logError("hub: checkIn of unknown job: %s %s %s", machine, jobIdString, status); } if (sameString(status, "free")) { /* Node thinks it's free, we think it has a job. Node * must have missed our job assignment... */ if (job != NULL) { struct machine *mach = job->machine; if (mach != NULL) { dlRemove(mach->node); dlAddTail(readyMachines, mach->node); } requeueJob(job); logDebug("hub: requeueing job in nodeCheckIn"); runner(1); } } } } void recycleSpoke(char *spokeName) /* Try to find spoke and put it back on free list. */ { struct dlNode *node; struct spoke *spoke; boolean foundSpoke = FALSE; for (node = busySpokes->head; !dlEnd(node); node = node->next) { spoke = node->val; if (sameString(spoke->name, spokeName)) { dlRemove(spoke->node); dlAddTail(freeSpokes, spoke->node); foundSpoke = TRUE; break; } } if (!foundSpoke) warn("Couldn't free spoke %s", spokeName); else runner(1); } int addJob(char *userName, char *dir, char *in, char *out, char *results, float cpus, long long ram, char *command) /* Add job to queues. */ { struct job *job; struct user *user; struct batch *batch; job = jobNew(command, userName, dir, in, out, cpus, ram, results, TRUE); if (!job) { return 0; } batch = job->batch; dlAddTail(batch->jobQueue, job->node); ++batch->queuedCount; int oldCpu = batch->cpu; int oldRam = batch->ram; if (job->cpus) batch->cpu = (job->cpus + 0.5) / cpuUnit; /* rounding */ else { /* if no cpus specified, use the default */ batch->cpu = defaultJobCpu; job->cpus = defaultJobCpu * cpuUnit; } if (job->ram) batch->ram = 1 + (job->ram - 1) / ramUnit; /* any remainder will be rounded upwards e.g. 1 to 1024m --> 1G but 1025m --> 2G if unit is 1G. 0m would just cause default ram usage. */ else { /* if no ram size specified, use the default */ batch->ram = defaultJobRam; job->ram = defaultJobRam * ramUnit; } if (oldCpu != batch->cpu || oldRam != batch->ram) { needsPlanning = TRUE; } if (batch->planCount == 0) { needsPlanning = TRUE; } user = batch->user; dlRemove(user->node); dlAddTail(queuedUsers, user->node); job->submitTime = time(NULL); return job->id; } int addJobFromMessage(char *line, int addJobVersion) /* Parse out addJob message and add job to queues. */ { char *userName, *dir, *in, *out, *results, *command; float cpus = 0; long long ram = 0; if ((userName = nextWord(&line)) == NULL) return 0; if ((dir = nextWord(&line)) == NULL) return 0; if ((in = nextWord(&line)) == NULL) return 0; if ((out = nextWord(&line)) == NULL) return 0; if ((results = nextWord(&line)) == NULL) return 0; if (addJobVersion == 2) { char *tempCpus = NULL; char *tempRam = NULL; if ((tempCpus = nextWord(&line)) == NULL) return 0; if ((tempRam = nextWord(&line)) == NULL) return 0; cpus = sqlFloat(tempCpus); ram = sqlLongLong(tempRam); } if (line == NULL || line[0] == 0) return 0; command = line; return addJob(userName, dir, in, out, results, cpus, ram, command); } void addJobAcknowledge(char *line, struct paraMessage *pm, int addJobVersion) /* Add job. Line format is <user> <dir> <stdin> <stdout> <results> <command> * Returns job ID or 0 if a problem. Send jobId back to client. */ { int id = addJobFromMessage(line, addJobVersion); pmClear(pm); pmPrintf(pm, "%d", id); pmSend(pm, rudpOut); runner(1); } int setMaxJob(char *userName, char *dir, int maxJob) /* Set new maxJob for batch */ { struct user *user = findUser(userName); struct batch *batch = findBatch(user, dir, TRUE); if (user == NULL) return -2; if (batch == NULL) return -2; needsPlanning = TRUE; batch->maxJob = maxJob; updateUserMaxJob(user); if (maxJob>=-1) logDebug("paraHub: User %s set maxJob=%d for batch %s", userName, maxJob, dir); return maxJob; } int setMaxJobFromMessage(char *line) /* Parse out setMaxJob message and set new maxJob for batch, update user-maxJob. */ { char *userName, *dir; int maxJob; if ((userName = nextWord(&line)) == NULL) return -2; if ((dir = nextWord(&line)) == NULL) return -2; if ((maxJob = atoi(nextWord(&line))) < -1) return -2; return setMaxJob(userName, dir, maxJob); } void setMaxJobAcknowledge(char *line, struct paraMessage *pm) /* Set batch maxJob. Line format is <user> <dir> <maxJob> * Returns new maxJob or -2 if a problem. Send new maxJob back to client. */ { int maxJob = setMaxJobFromMessage(line); pmClear(pm); pmPrintf(pm, "%d", maxJob); pmSend(pm, rudpOut); } int resetCounts(char *userName, char *dir) /* Reset done and crashed batch counts */ { struct user *user = findUser(userName); struct batch *batch = findBatch(user, dir, TRUE); if (user == NULL) return -2; if (batch == NULL) return -2; batch->doneCount = 0; batch->doneTime = 0; batch->crashCount = 0; logDebug("paraHub: User %s reset done and crashed counts for batch %s", userName, dir); return 0; } int resetCountsFromMessage(char *line) /* Parse out resetCounts message and reset counts for batch. */ { char *userName, *dir; if ((userName = nextWord(&line)) == NULL) return -2; if ((dir = nextWord(&line)) == NULL) return -2; return resetCounts(userName, dir); } void resetCountsAcknowledge(char *line, struct paraMessage *pm) /* Resets batch counts for done and crashed. Line format is <user> <dir> * Returns new maxJob or -2 if a problem. Send new maxJob back to client. */ { int resetCounts = resetCountsFromMessage(line); pmClear(pm); pmPrintf(pm, "%d",resetCounts); pmSend(pm, rudpOut); } int freeBatch(char *userName, char *batchName) /* Free batch resources, if possible */ { struct user *user = findUser(userName); if (user == NULL) return -3; struct hashEl *hel = hashLookup(stringHash, batchName); if (hel == NULL) return -2; char *name = hel->name; struct batch *batch = findBatchInList(user->curBatches, name); if (batch == NULL) batch = findBatchInList(user->oldBatches, name); if (batch == NULL) return -2; /* make sure nothing running and queue empty */ if (batch->runningCount > 0) return -1; if (!dlEnd(batch->jobQueue->head)) return -1; sweepResultsWithRemove(name); logDebug("paraHub: User %s freed batch %s", userName, batchName); /* remove batch from batchList */ slRemoveEl(&batchList, batch); /* remove from user cur/old batches */ dlRemove(batch->node); /* free batch and its members */ freeMem(batch->node); hashRemove(stringHash, name); freeDlList(&batch->jobQueue); freeHash(&batch->sickNodes); freeMem(batch); return 0; } int freeBatchFromMessage(char *line) /* Parse out freeBatch message and free batch. */ { char *userName, *batchName; if ((userName = nextWord(&line)) == NULL) return -2; if ((batchName = nextWord(&line)) == NULL) return -2; return freeBatch(userName, batchName); } void freeBatchAcknowledge(char *line, struct paraMessage *pm) /* Free batch resources. Line format is <user> <dir> * Returns 0 if success or some err # if a problem. Sends result back to client. */ { int result = freeBatchFromMessage(line); pmClear(pm); pmPrintf(pm, "%d",result); pmSend(pm, rudpOut); } int flushResultsByRequest(char *userName, char *batchName) /* Flush results file. Return 0 if nothing running and queue empty. */ { struct user *user = findUser(userName); if (user == NULL) return -3; struct hashEl *hel = hashLookup(stringHash, batchName); if (hel == NULL) return -2; char *name = hel->name; struct batch *batch = findBatchInList(user->curBatches, name); if (batch == NULL) batch = findBatchInList(user->oldBatches, name); if (batch == NULL) return -2; flushResults(batch->name); logDebug("paraHub: User %s flushed results batch %s", userName, batchName); /* return 0 if nothing running and queue empty */ if (batch->runningCount > 0) return -1; if (!dlEnd(batch->jobQueue->head)) return -1; return 0; } int flushResultsFromMessage(char *line) /* Parse out flushResults message and flush the results file. */ { char *userName, *batchName; if ((userName = nextWord(&line)) == NULL) return -2; if ((batchName = nextWord(&line)) == NULL) return -2; return flushResultsByRequest(userName, batchName); } void flushResultsAcknowledge(char *line, struct paraMessage *pm) /* Flush results file. Line format is <user> <dir> * Returns 0 if success or some err # if a problem. Sends result back to client. */ { int result = flushResultsFromMessage(line); pmClear(pm); pmPrintf(pm, "%d",result); pmSend(pm, rudpOut); } int clearSickNodes(char *userName, char *dir) /* Clear sick nodes for batch */ { struct user *user = findUser(userName); struct batch *batch = findBatch(user, dir, TRUE); if (user == NULL) return -2; if (batch == NULL) return -2; hashFree(&batch->sickNodes); batch->sickNodes = newHashExt(6, FALSE); batch->continuousCrashCount = 0; /* reset so user can retry */ needsPlanning = TRUE; updateUserSickNodes(user); logDebug("paraHub: User %s cleared sick nodes for batch %s", userName, dir); return 0; } int clearSickNodesFromMessage(char *line) /* Parse out clearSickNodes message and call clear nodes. */ { char *userName, *dir; if ((userName = nextWord(&line)) == NULL) return -2; if ((dir = nextWord(&line)) == NULL) return -2; return clearSickNodes(userName, dir); } void clearSickNodesAcknowledge(char *line, struct paraMessage *pm) /* Clear sick nodes from batch. Line format is <user> <dir> * Returns 0 or -2 if a problem back to client. */ { int result = clearSickNodesFromMessage(line); pmClear(pm); pmPrintf(pm, "%d", result); pmSend(pm, rudpOut); } int showSickNodes(char *userName, char *dir, struct paraMessage *pm) /* Show sick nodes for batch */ { int machineCount = 0, sickCount = 0; struct user *user = findUser(userName); struct batch *batch = findBatch(user, dir, TRUE); if (user == NULL) return -2; if (batch == NULL) return -2; logDebug("paraHub: User %s ran showSickNodes for batch %s", userName, dir); struct hashEl *el, *list = hashElListHash(batch->sickNodes); slSort(&list, hashElCmp); for (el = list; el != NULL; el = el->next) { int failures = ptToInt(el->val); if (failures >= sickNodeThreshold) { ++machineCount; sickCount += failures; pmClear(pm); pmPrintf(pm, "%s %d", el->name, ptToInt(el->val)); pmSend(pm, rudpOut); } } hashElFreeList(&list); pmClear(pm); pmPrintf(pm, "total sick machines: %d failures: %d", machineCount, sickCount); pmSend(pm, rudpOut); return 0; } int showSickNodesFromMessage(char *line, struct paraMessage *pm) /* Parse out showSickNodes message and print sick nodes for batch. */ { char *userName, *dir; if ((userName = nextWord(&line)) == NULL) return -2; if ((dir = nextWord(&line)) == NULL) return -2; return showSickNodes(userName, dir, pm); } void showSickNodesAcknowledge(char *line, struct paraMessage *pm) /* Show sick nodes from batch. Line format is <user> <dir> * Returns just empty line if a problem back to client. */ { showSickNodesFromMessage(line,pm); pmClear(pm); pmSend(pm, rudpOut); } int setPriority(char *userName, char *dir, int priority) /* Set new priority for batch */ { struct user *user = findUser(userName); struct batch *batch = findBatch(user, dir, TRUE); if (user == NULL) return 0; if (batch == NULL) return 0; needsPlanning = TRUE; batch->priority = priority; updateUserPriority(user); if ((priority>=1)&&(priority<NORMAL_PRIORITY)) logDebug("paraHub: User %s set priority=%d for batch %s", userName, priority, dir); return priority; } int setPriorityFromMessage(char *line) /* Parse out setPriority message and set new priority for batch, update user-priority. */ { char *userName, *dir; int priority; if ((userName = nextWord(&line)) == NULL) return 0; if ((dir = nextWord(&line)) == NULL) return 0; if ((priority = atoi(nextWord(&line))) < 1) return 0; return setPriority(userName, dir, priority); } void setPriorityAcknowledge(char *line, struct paraMessage *pm) /* Set batch priority. Line format is <user> <dir> <priority> * Returns new priority or 0 if a problem. Send new priority back to client. */ { int priority = setPriorityFromMessage(line); pmClear(pm); pmPrintf(pm, "%d", priority); pmSend(pm, rudpOut); } void respondToPing(struct paraMessage *pm) /* Someone want's to know we're alive. */ { pmSendString(pm, rudpOut, "ok"); processHeartbeat(); } void finishJob(struct job *job) /* Recycle job memory and the machine it's running on. */ { struct machine *mach = job->machine; struct batch *batch = job->batch; struct user *user = batch->user; if (mach != NULL) { /* see if the node appears to be sick for this batch */ if (hashIntValDefault(batch->sickNodes, mach->name, 0) >= sickNodeThreshold) { /* skip adding back to the mach->plannedBatches list */ needsPlanning = TRUE; } else if (!job->oldPlan) { /* add its batch to end of list so it gets run again on same machine */ struct slRef *el = slRefNew(batch); slAddTail(&mach->plannedBatches, el); } recycleMachine(mach); /* NOTE I moved the following two lines inside the if (mach != NULL) block * because this may fix the problem where we were seeing users get duplicate * jobDone messages or something like that causing users to get * e.g. -200 user->runningCount which then made them hog the whole cluster. */ batch->runningCount -= 1; user->runningCount -= 1; } dlRemove(job->jobNode); dlRemove(job->hangNode); recycleJob(job); } boolean removeRunningJob(struct job *job) /* Remove job - if it's running kill it, remove from job list. */ { if (!sendKillJobMessage(job->machine, job->id)) return FALSE; finishJob(job); return TRUE; } void removePendingJob(struct job *job) /* Remove job from pending queue. */ { struct batch *batch = job->batch; recycleJob(job); unactivateBatchIfEmpty(batch); } boolean removeJobId(int id) /* Remove job of a given id. */ { struct job *job = jobFind(runningJobs, id); if (job != NULL) { logDebug("Removing %s's %s", job->batch->user->name, job->cmd); if (!removeRunningJob(job)) return FALSE; } else { job = findWaitingJob(id); if (job != NULL) { logDebug("Pending job %s", job->cmd); removePendingJob(job); } } return TRUE; } void removeJobAcknowledge(char *names, struct paraMessage *pm) /* Remove job of a given name(s). */ { char *name; char *retVal = "ok"; while ((name = nextWord(&names)) != NULL) { /* It is possible for this remove to fail if we * run out of spokes at the wrong time. Currently * the para client will just report the problem. */ if (!removeJobId(atoi(name))) { retVal = "err"; break; } } pmSendString(pm, rudpOut, retVal); } void chillABatch(struct batch *batch) /* Stop launching jobs from a batch, but don't disturb * running jobs. */ { struct user *user = batch->user; struct dlNode *el, *next; for (el = batch->jobQueue->head; !dlEnd(el); el = next) { struct job *job = el->val; next = el->next; recycleJob(job); /* This free's el too! */ } batch->queuedCount = 0; batch->planCount = 0; dlRemove(batch->node); dlAddTail(user->oldBatches, batch->node); needsPlanning = TRUE; updateUserPriority(user); updateUserMaxJob(user); updateUserSickNodes(user); } void chillBatch(char *line, struct paraMessage *pm) /* Parse user and batch names from message, * call chillABatch to clear the queue, * and send response ok or err to client. */ { char *userName = nextWord(&line); char *batchName = nextWord(&line); char *res = "err"; if (batchName != NULL) { struct user *user = hashFindVal(userHash, userName); if (user != NULL) { struct batch *batch; batchName = hashStoreName(stringHash, batchName); batch = findBatchInList(user->curBatches, batchName); if (batch != NULL) { chillABatch(batch); } res = "ok"; } } pmSendString(pm, rudpOut, res); } void jobDone(char *line) /* Handle job is done message. */ { struct job *job; char *id = nextWord(&line); char *status = nextWord(&line); char *uTime = nextWord(&line); char *sTime = nextWord(&line); if (sTime != NULL) { job = jobFind(runningJobs, atoi(id)); if (job != NULL) { struct machine *machine = job->machine; if (machine != NULL) { machine->lastChecked = now; if (sameString(status, "0")) machine->goodCount += 1; else machine->errCount += 1; } writeJobResults(job, status, uTime, sTime); struct batch *batch = job->batch; finishJob(job); /* is the batch sick? */ if (batch->continuousCrashCount >= sickBatchThreshold) { chillABatch(batch); } runner(1); } } } void listMachines(struct paraMessage *pm) /* Write list of machines to fd. Format is one machine per message * followed by a blank message. */ { struct machine *mach; for (mach = machineList; mach != NULL; mach = mach->next) { struct dlNode *jobNode = mach->jobs->head; do { /* this list may output multiple rows per machine, one for each running job */ pmClear(pm); pmPrintf(pm, "%-10s good %d, err %d, ", mach->name, mach->goodCount, mach->errCount); if (dlEmpty(mach->jobs)) { if (mach->isDead) pmPrintf(pm, "dead"); else pmPrintf(pm, "idle"); } else { struct job *job = jobNode->val; pmPrintf(pm, "running %-10s %s ", job->batch->user->name, job->cmd); jobNode = jobNode->next; } pmSend(pm, rudpOut); } while (!(dlEmpty(mach->jobs) || dlEnd(jobNode))); } pmSendString(pm, rudpOut, ""); } int countUserActiveBatches(struct user *user) /* Count active batches for user. */ { int count = dlCount(user->curBatches); /* Start with batches with pending jobs. */ struct dlNode *node; /* Add in batches with running but no pending jobs. */ for (node = user->oldBatches->head; !dlEnd(node); node = node->next) { struct batch *batch = node->val; if (batch->runningCount > 0) ++count; } return count; } void listUsers(struct paraMessage *pm) /* Write list of users to fd. Format is one user per line * followed by a blank line. */ { struct user *user; for (user = userList; user != NULL; user = user->next) { int totalBatch = dlCount(user->curBatches) + dlCount(user->oldBatches); pmClear(pm); pmPrintf(pm, "%s ", user->name); pmPrintf(pm, "%d jobs running, %d waiting, %d finished, %d of %d batches active" ", priority=%d" ", maxJob=%d" , user->runningCount, userQueuedCount(user), user->doneCount, countUserActiveBatches(user), totalBatch, user->priority , user->maxJob ); pmSend(pm, rudpOut); } pmSendString(pm, rudpOut, ""); } void writeOneBatchInfo(struct paraMessage *pm, struct user *user, struct batch *batch) /* Write out info on one batch. */ { char shortBatchName[512]; splitPath(batch->name, shortBatchName, NULL, NULL); pmClear(pm); pmPrintf(pm, "%-8s %4d %6d %6d %5d %3d %3d %3d %4.1fg %4d %3d %s", user->name, batch->runningCount, batch->queuedCount, batch->doneCount, batch->crashCount, batch->priority, batch->maxJob, batch->cpu, ((float)batch->ram*ramUnit)/(1024*1024*1024), batch->planCount, (avgBatchTime(batch)+30)/60, shortBatchName); pmSend(pm, rudpOut); } void listSomeBatches(struct paraMessage *pm, int runThreshold) /* Write list of batches. Format is one batch per * line followed by a blank line. */ { struct user *user; pmSendString(pm, rudpOut, "#user run wait done crash pri max cpu ram plan min batch"); for (user = userList; user != NULL; user = user->next) { struct dlNode *bNode; for (bNode = user->curBatches->head; !dlEnd(bNode); bNode = bNode->next) { writeOneBatchInfo(pm, user, bNode->val); } for (bNode = user->oldBatches->head; !dlEnd(bNode); bNode = bNode->next) { struct batch *batch = bNode->val; if (batch->runningCount >= runThreshold) writeOneBatchInfo(pm, user, batch); } } pmSendString(pm, rudpOut, ""); } void listBatches(struct paraMessage *pm) /* Write list of all active batches. Format is one batch per * line followed by a blank line. */ { listSomeBatches(pm, 1); } void listAllBatches(struct paraMessage *pm) /* Write list of batches including inactive ones. Format is one batch per * line followed by a blank line. */ { listSomeBatches(pm, 0); } void appendLocalTime(struct paraMessage *pm, time_t t) /* Append time t converted to day/time format to dy. */ { struct tm *tm; tm = localtime(&t); pmPrintf(pm, "%04d/%02d/%02d %02d:%02d:%02d", 1900+tm->tm_year, 1+tm->tm_mon, tm->tm_mday, tm->tm_hour, tm->tm_min, tm->tm_sec); } char *upToFirstDot(char *s, bool dotQ) /* Return string up to first dot. */ { static char ret[128]; int size; char *e = strchr(s, '.'); if (e == NULL) size = strlen(s); else size = e - s; if (size >= sizeof(ret)-2) /* Leave room for .q */ size = sizeof(ret)-3; memcpy(ret, s, size); ret[size] = 0; if (dotQ) strcat(ret, ".q"); return ret; } boolean oneJobList(struct paraMessage *pm, struct dlList *list, boolean sinceStart, boolean extended) /* Write out one job list. Return FALSE if there is a problem. */ { struct dlNode *el; struct job *job; char *machName; for (el = list->head; !dlEnd(el); el = el->next) { job = el->val; if (job->machine != NULL) machName = upToFirstDot(job->machine->name, FALSE); else machName = "none"; pmClear(pm); pmPrintf(pm, "%-4d %-10s %-10s ", job->id, machName, job->batch->user->name); if (sinceStart) appendLocalTime(pm, job->startTime); else appendLocalTime(pm, job->submitTime); pmPrintf(pm, " %s", job->cmd); if (extended) pmPrintf(pm, " %s", job->batch->name); if (!pmSend(pm, rudpOut)) return FALSE; } return TRUE; } void listJobs(struct paraMessage *pm, boolean extended) /* Write list of jobs. Format is one job per message * followed by a blank message. */ { struct user *user; struct dlNode *bNode; struct batch *batch; if (!oneJobList(pm, runningJobs, TRUE, extended)) return; for (user = userList; user != NULL; user = user->next) { for (bNode = user->curBatches->head; !dlEnd(bNode); bNode = bNode->next) { batch = bNode->val; if (!oneJobList(pm, batch->jobQueue, FALSE, extended)) return; } } pmSendString(pm, rudpOut, ""); } boolean onePstatList(struct paraMessage *pm, struct dlList *list, boolean running, boolean extended, int *resultCount) /* Write out one job list in pstat format. Return FALSE if there is * a problem. */ { struct dlNode *node; struct job *job; time_t t; char *machName; char *state = (running ? "r" : "q"); int count = 0; char buf[rudpMaxSize]; char *terminator = ""; if (extended) terminator = "\n"; pmClear(pm); for (node = list->head; !dlEnd(node); node = node->next) { ++count; job = node->val; if (job->machine != NULL) machName = job->machine->name; else machName = "none"; if (running) t = job->startTime; else t = job->submitTime; if (!running && extended) safef(buf, sizeof(buf), "%s %d\n", state, job->id); else safef(buf, sizeof(buf), "%s %d %s %s %lu %s%s", state, job->id, job->batch->user->name, job->exe, t, machName, terminator); if ((!extended && pm->size > 0) || (pm->size + strlen(buf) > rudpMaxSize)) { if (!pmSend(pm, rudpOut)) { *resultCount += count; return FALSE; } pmClear(pm); } pmPrintf(pm, "%s", buf); } if (pm->size > 0) { if (!pmSend(pm, rudpOut)) { *resultCount += count; return FALSE; } } *resultCount += count; return TRUE; } void pstat(char *line, struct paraMessage *pm, boolean extended) /* Write list of jobs in pstat format. * Extended pstat2 format means we only show queued jobs for the * specific batch, but we still need to return total queue size * and also we can include a status about batch failure. * Older versions of para will not call extended but should still work.*/ { struct user *user; struct dlNode *bNode; struct batch *batch = NULL; char *userName, *dir; struct user *thisUser = NULL; struct batch *thisBatch = NULL; int count = 0; userName = nextWord(&line); dir = nextWord(&line); if (userName) thisUser = findUser(userName); if (dir) thisBatch = findBatch(thisUser, dir, TRUE); if (thisBatch) flushResults(thisBatch->name); if (!onePstatList(pm, runningJobs, TRUE, extended, &count)) return; for (user = userList; user != NULL; user = user->next) { for (bNode = user->curBatches->head; !dlEnd(bNode); bNode = bNode->next) { batch = bNode->val; if ((thisUser == NULL || thisUser == user) && (thisBatch == NULL || thisBatch == batch)) { if (!onePstatList(pm, batch->jobQueue, FALSE, extended, &count)) return; } else count += batch->queuedCount; } } if (extended) { pmClear(pm); pmPrintf(pm, "Total Jobs: %d", count); if (!pmSend(pm, rudpOut)) return; if (thisBatch && (thisBatch->continuousCrashCount >= sickBatchThreshold)) { pmClear(pm); pmPrintf(pm, "Sick Batch: consecutive crashes (%d) >= sick batch threshold (%d)", thisBatch->continuousCrashCount, sickBatchThreshold); if (!pmSend(pm, rudpOut)) return; } if (thisBatch) { off_t resultsSize = fileSize(thisBatch->name); if (resultsSize != -1) // file exists { pmClear(pm); pmPrintf(pm, "Results Size: %lld", (long long) resultsSize); if (!pmSend(pm, rudpOut)) return; } } } pmSendString(pm, rudpOut, ""); } int sumPendingJobs() /* Return sum of all pending jobs for all users. */ { struct user *user; int count = 0; for (user = userList; user != NULL; user = user->next) count += userQueuedCount(user); return count; } int countActiveUsers() /* Return count of users with jobs running or in queue */ { struct user *user; int count = 0; for (user = userList; user != NULL; user = user->next) { if (userIsActive(user)) ++count; } return count; } int countActiveBatches() /* Return count of active batches */ { int count = 0; struct user *user; for (user = userList; user != NULL; user = user->next) count += countUserActiveBatches(user); return count; } int getCpus(struct dlList *list) /* Return total CPU resources in list. */ { int count = 0; struct dlNode *node = NULL; for (node = list->head; !dlEnd(node); node = node->next) { struct machine *mach = node->val; count += mach->machSpec->cpus; } return count; } int getBusyCpus(struct dlList *list) /* Return total CPU resources in list. */ { int count = 0; struct dlNode *node = NULL; for (node = list->head; !dlEnd(node); node = node->next) { struct machine *machine = node->val; /* all the cpus now in-use */ struct dlNode *jobNode = NULL; for (jobNode = machine->jobs->head; !dlEnd(jobNode); jobNode = jobNode->next) { struct job *job = jobNode->val; struct batch * batch =job->batch; count += batch->cpu; } } return count; } void status(struct paraMessage *pm) /* Write summary status. Format is one line per message * followed by a blank message. */ { char buf[256]; int totalCpus = getCpus(freeMachines)+getCpus(readyMachines)+getCpus(blockedMachines)+getCpus(busyMachines); int busyCpus = getBusyCpus(readyMachines)+getBusyCpus(blockedMachines)+getBusyCpus(busyMachines); safef(buf, sizeof(buf), "CPUs total: %d", totalCpus); pmSendString(pm, rudpOut, buf); safef(buf, sizeof(buf), "CPUs free: %d", totalCpus - busyCpus); pmSendString(pm, rudpOut, buf); safef(buf, sizeof(buf), "CPUs busy: %d", busyCpus); pmSendString(pm, rudpOut, buf); safef(buf, sizeof(buf), "Nodes total: %d", dlCount(freeMachines)+dlCount(busyMachines)+dlCount(readyMachines)+ dlCount(blockedMachines)+dlCount(deadMachines)); pmSendString(pm, rudpOut, buf); safef(buf, sizeof(buf), "Nodes dead: %d", dlCount(deadMachines)); pmSendString(pm, rudpOut, buf); safef(buf, sizeof(buf), "Nodes sick?: %d", listSickNodes(NULL)); pmSendString(pm, rudpOut, buf); safef(buf, sizeof(buf), "Jobs running: %d", dlCount(runningJobs)); pmSendString(pm, rudpOut, buf); safef(buf, sizeof(buf), "Jobs waiting: %d", sumPendingJobs()); pmSendString(pm, rudpOut, buf); safef(buf, sizeof(buf), "Jobs finished: %d", finishedJobCount); pmSendString(pm, rudpOut, buf); safef(buf, sizeof(buf), "Jobs crashed: %d", crashedJobCount); pmSendString(pm, rudpOut, buf); safef(buf, sizeof(buf), "Spokes free: %d", dlCount(freeSpokes)); pmSendString(pm, rudpOut, buf); safef(buf, sizeof(buf), "Spokes busy: %d", dlCount(busySpokes)); pmSendString(pm, rudpOut, buf); safef(buf, sizeof(buf), "Spokes dead: %d", dlCount(deadSpokes)); pmSendString(pm, rudpOut, buf); safef(buf, sizeof(buf), "Active batches: %d", countActiveBatches()); pmSendString(pm, rudpOut, buf); safef(buf, sizeof(buf), "Total batches: %d", slCount(batchList)); pmSendString(pm, rudpOut, buf); safef(buf, sizeof(buf), "Active users: %d", countActiveUsers()); pmSendString(pm, rudpOut, buf); safef(buf, sizeof(buf), "Total users: %d", slCount(userList)); pmSendString(pm, rudpOut, buf); safef(buf, sizeof(buf), "Days up: %f", (now - startupTime)/(3600.0 * 24.0)); pmSendString(pm, rudpOut, buf); safef(buf, sizeof(buf), "Version: %s", version); pmSendString(pm, rudpOut, buf); pmSendString(pm, rudpOut, ""); } void addSpoke() /* Start up a new spoke and add it to free list. */ { struct spoke *spoke; spoke = spokeNew(); if (spoke != NULL) { slAddHead(&spokeList, spoke); dlAddTail(freeSpokes, spoke->node); } } void killSpokes() /* Kill all spokes. */ { struct spoke *spoke, *next; for (spoke = spokeList; spoke != NULL; spoke = next) { next = spoke->next; dlRemove(spoke->node); spokeFree(&spoke); } } void startSpokes() /* Start default number of spokes. */ { int i; for (i=0; i<initialSpokes; ++i) addSpoke(); } void startMachines(char *fileName) /* If they give us a beginning machine list use it here. */ { struct lineFile *lf = lineFileOpen(fileName, TRUE); char *row[7]; boolean firstTime = TRUE; while (lineFileRow(lf, row)) { struct machSpec *ms; ms = machSpecLoad(row); char ipStr[NI_MAXHOST]; lookupIp(ms->name, ipStr, sizeof ipStr); if (hashLookup(machineHash, ms->name)) errAbort("machine list contains duplicate: %s", ms->name); struct machine *machine = doAddMachine(ms->name, ms->tempDir, ipStr, ms); hashStoreName(machineHash, ms->name); // TODO Add a command-line param for these that overrides default? /* use first machine in spec list as model node */ if (firstTime) { firstTime = FALSE; cpuUnit = 1; /* 1 CPU */ if (!optionExists("ramUnit")) ramUnit = ((long long)machine->machSpec->ramSize * 1024 * 1024) / machine->machSpec->cpus; defaultJobCpu = 1; /* number of cpuUnits in default job usage */ /* number of ramUnits in default job usage, resolves to just 1 currently */ if (!optionExists("defaultJobRam")) defaultJobRam = (((long long)machine->machSpec->ramSize * 1024 * 1024) / machine->machSpec->cpus) / ramUnit; } int c = 0, r = 0; readTotalMachineResources(machine, &c, &r); maxCpuInCluster = max(maxCpuInCluster, c); maxRamInCluster = max(maxRamInCluster, r); } lineFileClose(&lf); } struct existingResults /* Keep track of old results we need to integrate into */ { struct existingResults *next; char *fileName; /* Name of file this is in, not allocated here */ struct hash *hash; /* Hash keyed by ascii jobId indicated job results * already recorded. */ }; void existingResultsFree(struct existingResults **pEr) /* Free up existing results structure */ { struct existingResults *er = *pEr; if (er != NULL) { freeHash(&er->hash); freez(pEr); } } void existingResultsFreeList(struct existingResults **pList) /* Free list of existingResults */ { struct existingResults *el, *next; for (el = *pList; el != NULL; el = next) { next = el->next; existingResultsFree(&el); } *pList = NULL; } void readResults(char *fileName, struct hash *hash) /* Read jobId's of results into hash */ { struct lineFile *lf = lineFileMayOpen(fileName, TRUE); char *row[3]; char *line; int wordCount; if (lf == NULL) { warn("Couldn't open results file %s", fileName); return; } while (lineFileNext(lf, &line, NULL)) { wordCount = chopLine(line, row); if (wordCount == 0 || row[0][0] == '#') continue; if (wordCount < 3) { warn("Short line %d of %s", lf->lineIx, lf->fileName); continue; } if (!isdigit(row[2][0])) { warn("Expecting number field 3 line %d of %s", lf->lineIx, lf->fileName); break; } hashAdd(hash, row[2], NULL); } lineFileClose(&lf); } struct existingResults *getExistingResults(char *fileName, struct hash *erHash, struct existingResults **pErList) /* Get results from hash if we've seen them before, otherwise * read them in, save in hash, and return them. */ { struct existingResults *er = hashFindVal(erHash, fileName); if (er == NULL) { AllocVar(er); slAddHead(pErList, er); hashAddSaveName(erHash, fileName, er, &er->fileName); er->hash = newHashExt(18, FALSE); readResults(fileName, er->hash); } return er; } void addRunningJob(struct runJobMessage *rjm, char *resultFile, struct machine *mach) /* Add job that is already running to queues. */ { if (dlCount(mach->jobs) > mach->machSpec->cpus) warn("%s seems to have more jobs running than it has cpus", mach->name); else { struct job *job = jobNew(rjm->command, rjm->user, rjm->dir, rjm->in, rjm->out, rjm->cpus, rjm->ram, resultFile, FALSE); if (!job) return; struct batch *batch = job->batch; struct user *user = batch->user; job->id = atoi(rjm->jobIdString); ++batch->runningCount; ++user->runningCount; dlRemove(batch->node); dlAddTail(user->oldBatches, batch->node); dlAddTail(mach->jobs, job->jobNode); job->machine = mach; dlAddTail(runningJobs, job->node); dlRemove(mach->node); dlAddTail(readyMachines, mach->node); dlAddTail(hangJobs, job->hangNode); mach->lastChecked = job->lastChecked = job->submitTime = job->startTime = job->lastClockIn = now; } } void pljErr(struct machine *mach, int no) /* Print out error message in the middle of routine below. */ { warn("%s: truncated listJobs response %d", mach->name, no); } void getExeOnly(char *command, char exe[256]) /* Extract executable file (not including path) from command line. */ { /* Extract name of executable file with no path. */ char *dupeCommand = cloneString(command); char *exePath = firstWordInLine(dupeCommand); char exeFile[128], exeExt[64]; splitPath(exePath, NULL, exeFile, exeExt); /* We cannot use sizeof(exe) because an array on a stack * is just a pointer, and so pointer-size is all that sizeof returns * for exe. */ safef(exe, 256, "%s%s", exeFile, exeExt); freez(&dupeCommand); } void writeExistingResults(char *fileName, char *line, struct machine *mach, struct runJobMessage *rjm) { char err[512], exe[256]; int jobId = atoi(rjm->jobIdString); char *status = nextWord(&line); char *uTime = nextWord(&line); char *sTime = nextWord(&line); if (sTime == NULL) { warn("Bad line format in writeExistingResults for %s", mach->name); return; } getExeOnly(rjm->command, exe); fillInErrFile(err, jobId, mach->tempDir); fileName = hashStoreName(stringHash, fileName); writeResults(fileName, rjm->user, mach->name, jobId, exe, now, now, err, rjm->command, status, uTime, sTime); } boolean processListJobs(struct machine *mach, struct paraMessage *pm, struct rudp *ru, struct hash *erHash, struct existingResults **pErList, int *pRunning, int *pFinished) /* Process response to list jobs message. Read jobs node is running and * has recently finished. Add running ones to job list. Add finished * ones to results file if necessary. * * Format of message is * running count * one line for each running job. * recent count * two lines for each recent job. */ { int running, recent, i, finCount = 0; struct runJobMessage rjm; char resultsFile[512]; struct paraMultiMessage pmm; /* ensure the multi-message response comes from the correct ip and has no duplicate msgs*/ pmmInit(&pmm, pm); if (!pmmReceiveTimeOut(&pmm, ru, 2000000)) { warn("%s: no listJobs response", mach->name); return FALSE; } running = atoi(pm->data); for (i=0; i<running; ++i) { if (!pmmReceiveTimeOut(&pmm, ru, 2000000)) { pljErr(mach, 1); return FALSE; } if (!parseRunJobMessage(pm->data, &rjm)) { pljErr(mach, 2); return FALSE; } snprintf(resultsFile, sizeof(resultsFile), "%s/%s", rjm.dir, "para.results"); addRunningJob(&rjm, resultsFile, mach); } *pRunning += running; if (!pmmReceiveTimeOut(&pmm, ru, 2000000)) { pljErr(mach, 3); return FALSE; } recent = atoi(pm->data); for (i=0; i<recent; ++i) { struct existingResults *er; char *startLine = NULL; if (!pmmReceiveTimeOut(&pmm, ru, 2000000)) { pljErr(mach, 4); return FALSE; } startLine = cloneString(pm->data);; if (!parseRunJobMessage(startLine, &rjm)) { pljErr(mach, 5); freez(&startLine); return FALSE; } if (!pmmReceiveTimeOut(&pmm, ru, 2000000)) { pljErr(mach, 6); freez(&startLine); return FALSE; } /* Do not duplicate a result. Check if it already is in para.results */ safef(resultsFile, sizeof(resultsFile), "%s/%s", rjm.dir, "para.results"); er = getExistingResults(resultsFile, erHash, pErList); if (!hashLookup(er->hash, rjm.jobIdString)) { writeExistingResults(resultsFile, pm->data, mach, &rjm); ++finCount; } freez(&startLine); } *pFinished += finCount; return TRUE; } void checkForJobsOnNodes() /* Poll nodes and see if they have any jobs for us. */ { struct machine *mach; int running = 0, finished = 0; struct hash *erHash = newHashExt(8, FALSE); /* A hash of existingResults */ struct existingResults *erList = NULL; logDebug("Checking for jobs already running on nodes"); for (mach = machineList; mach != NULL; mach = mach->next) { struct paraMessage pm; struct rudp *ru = rudpNew(rudpOut->socket); /* Get own resend timing */ logDebug("check for jobs on %s", mach->name); pmInitFromName(&pm, mach->name, paraNodePortStr); if (!pmSendString(&pm, ru, "listJobs")) { machineDown(mach); continue; } if (!processListJobs(mach, &pm, rudpOut, erHash, &erList, &running, &finished)) machineDown(mach); rudpFree(&ru); } /* Clean up time. */ existingResultsFreeList(&erList); hashFree(&erHash); needsPlanning = TRUE; /* Report results. */ logDebug("%d running jobs, %d jobs that finished while hub was down", running, finished); } void startHub(char *machineList) /* Do hub daemon - set up socket, and loop around on it until we get a quit. */ { struct sockaddr_storage sai; char *line, *command; struct rudp *rudpIn = NULL; /* Note startup time. */ findNow(); startupTime = now; /* Find name and IP address of our machine. */ hubHost = getMachine(); if (optionExists("log")) logOpenFile("paraHub", optionVal("log", NULL)); else logOpenSyslog("paraHub", optionVal("logFacility", NULL)); logSetMinPriority(optionVal("logMinPriority", "info")); logInfo("starting paraHub on %s", hubHost); /* Set up various lists. */ hubMessageQueueInit(); stringHash = newHash(0); setupLists(); machineHash = newHash(0); startMachines(machineList); openJobId(); logInfo("next job ID is %d.", nextJobId); rudpOut = rudpMustOpen(); if (!optionExists("noResume")) checkForJobsOnNodes(); /* Initialize socket etc. */ ZeroVar(&sai); if (!internetFillInAddress6n4(NULL, paraHubPortStr, AF_INET6, SOCK_DGRAM, &sai, FALSE)) errAbort("NULL host addrinfo lookup failed trying to bind listener."); rudpIn = rudpMustOpenBound(&sai); /* Start up daemons. */ sockSuckStart(rudpIn); startHeartbeat(); startSpokes(); logDebug("sockSuck,Heartbeat,Spokes have been started"); /* Bump up our priority to just shy of real-time. */ (void) nice(-40); // ignore return value /* Main event loop. */ for (;;) { struct paraMessage *pm = hubMessageGet(); findNow(); line = pm->data; logDebug("hub: %s", line); command = nextWord(&line); if (command == NULL) warn("Empty command"); else if (sameWord(command, "jobDone")) jobDone(line); else if (sameWord(command, "recycleSpoke")) recycleSpoke(line); else if (sameWord(command, "heartbeat")) processHeartbeat(); else if (sameWord(command, "setPriority")) setPriorityAcknowledge(line, pm); else if (sameWord(command, "setMaxJob")) setMaxJobAcknowledge(line, pm); else if (sameWord(command, "resetCounts")) resetCountsAcknowledge(line, pm); else if (sameWord(command, "freeBatch")) freeBatchAcknowledge(line, pm); else if (sameWord(command, "flushResults")) flushResultsAcknowledge(line, pm); else if (sameWord(command, "showSickNodes")) showSickNodesAcknowledge(line, pm); else if (sameWord(command, "clearSickNodes")) clearSickNodesAcknowledge(line, pm); else if (sameWord(command, "addJob")) addJobAcknowledge(line, pm, 1); else if (sameWord(command, "addJob2")) addJobAcknowledge(line, pm, 2); else if (sameWord(command, "nodeDown")) nodeDown(line); else if (sameWord(command, "alive")) nodeAlive(line); else if (sameWord(command, "checkIn")) nodeCheckIn(line); else if (sameWord(command, "checkDeadNodesASAP")) checkDeadNodesASAP(); else if (sameWord(command, "removeJob")) removeJobAcknowledge(line, pm); else if (sameWord(command, "chill")) chillBatch(line, pm); else if (sameWord(command, "ping")) respondToPing(pm); else if (sameWord(command, "addMachine")) addMachine(line); else if (sameWord(command, "removeMachine")) removeMachineAcknowledge(line, pm); else if (sameWord(command, "listJobs")) listJobs(pm, FALSE); else if (sameWord(command, "listJobsExtended")) listJobs(pm, TRUE); else if (sameWord(command, "listMachines")) listMachines(pm); else if (sameWord(command, "listUsers")) listUsers(pm); else if (sameWord(command, "listBatches")) listBatches(pm); else if (sameWord(command, "listAllBatches")) listAllBatches(pm); else if (sameWord(command, "listSick")) listSickNodes(pm); else if (sameWord(command, "status")) status(pm); else if (sameWord(command, "pstat")) pstat(line, pm, FALSE); else if (sameWord(command, "pstat2")) pstat(line, pm, TRUE); else if (sameWord(command, "addSpoke")) addSpoke(); else if (sameWord(command, "plan")) plan(pm); else if (sameWord(command, "quit")) break; else warn("Unrecognized command %s", command); pmFree(&pm); } endHeartbeat(); killSpokes(); saveJobId(); #ifdef SOON #endif /* SOON */ } void fillInSubnet() /* Parse subnet paramenter if any into subnet variable. */ { char hubSubnetStr[1024]; char *sns = optionVal("subnet", NULL); char *localHostSubnet = "127.0.0.1,::1/128"; /* Address for local host */ if (sns) safef(hubSubnetStr, sizeof hubSubnetStr, "%s,%s", localHostSubnet, sns); else safef(hubSubnetStr, sizeof hubSubnetStr, "%s", localHostSubnet); hubSubnet = internetParseSubnetCidr(hubSubnetStr); } int main(int argc, char *argv[]) /* Process command line. */ { optionInit(&argc, argv, optionSpecs); if (argc < 2) usage(); if (optionExists("ramUnit")) { ramUnit = paraParseRam(optionVal("ramUnit", "")); if (ramUnit == -1) errAbort("Invalid RAM expression '%s' in '-ramUnit=' option", optionVal("ramUnit", "")); } if (optionExists("defaultJobRam")) { defaultJobRam = optionInt("defaultJobRam", defaultJobRam); if (defaultJobRam < 1) errAbort("Invalid defaultJobRam specified in option -defaultJobRam=%d", defaultJobRam); } jobCheckPeriod = optionInt("jobCheckPeriod", jobCheckPeriod); machineCheckPeriod = optionInt("machineCheckPeriod", machineCheckPeriod); initialSpokes = optionInt("spokes", initialSpokes); fillInSubnet(); paraDaemonize("paraHub"); startHub(argv[1]); return 0; }