f727e7c1325f863157c57d88ba0a1c3cf65b1478 galt Sat Jan 11 16:19:08 2025 -0800 fix for overflowing packet issue when paraHub restarts but most jobs are run on just one node. diff --git src/parasol/paraNode/paraNode.c src/parasol/paraNode/paraNode.c index 82c9c89..3aba23e 100644 --- src/parasol/paraNode/paraNode.c +++ src/parasol/paraNode/paraNode.c @@ -562,54 +562,73 @@ pmInit(&pm, ipStr, paraHubPortStr); if (job != NULL) pmPrintf(&pm, "checkIn %s %s running", hostName, jobIdString); else { struct job *job = findFinishedJob(jobId); if (job == NULL) pmPrintf(&pm, "checkIn %s %s free", hostName, jobIdString); else pmPrintf(&pm, "jobDone %s %s", jobIdString, job->doneMessage); } pmSend(&pm, mainRudp); } } +void doResurrectFullCheck(struct paraMessage pm, char *ipStr, struct job *job, boolean *firstTime, int *jobsReported) +/* Print a job, if overflows packet, send it and start another. */ +{ +if (firstTime) + { + pmInit(&pm, ipStr, paraHubPortStr); + pmPrintf(&pm, "alive %s", hostName); + *firstTime = FALSE; + } +pmPrintf(&pm, " %d", job->jobId); +++jobsReported; + +if ((rudpMaxSize - pm.size) < 20) + { + pmSend(&pm, mainRudp); + *firstTime = TRUE; + *jobsReported = 0; + } +} + void doResurrect(char *line, struct sockaddr_storage *ipAddress) /* Send back I'm alive message */ { struct paraMessage pm; struct dlNode *node; int jobsReported = 0; char ipStr[NI_MAXHOST]; getAddrAsString6n4(ipAddress, ipStr, sizeof ipStr); -pmInit(&pm, ipStr, paraHubPortStr); -pmPrintf(&pm, "alive %s", hostName); +boolean firstTime = TRUE; +struct job *job = NULL; for (node = jobsRunning->head; !dlEnd(node); node = node->next) { - struct job *job = node->val; - pmPrintf(&pm, " %d", job->jobId); - ++jobsReported; + job = node->val; + doResurrectFullCheck(pm, ipStr, job, &firstTime, &jobsReported); } for (node = jobsFinished->head; !dlEnd(node); node = node->next) { - struct job *job = node->val; + job = node->val; + doResurrectFullCheck(pm, ipStr, job, &firstTime, &jobsReported); if (jobsReported >= maxProcs) break; - pmPrintf(&pm, " %d", job->jobId); - ++jobsReported; } +if (jobsReported) pmSend(&pm, mainRudp); } void doRun(char *line, struct sockaddr_storage *ipAddress) /* Execute command. */ { char *jobMessage = cloneString(line); static char *args[1024]; int argCount; char ipStr[NI_MAXHOST]; getAddrAsString6n4(ipAddress, ipStr, sizeof ipStr); nextRandom(); if (line == NULL) warn("Executing nothing...");