f727e7c1325f863157c57d88ba0a1c3cf65b1478 galt Sat Jan 11 16:19:08 2025 -0800 fix for overflowing packet issue when paraHub restarts but most jobs are run on just one node. diff --git src/parasol/lib/paraMessage.c src/parasol/lib/paraMessage.c index 598fe20..1b368ef 100644 --- src/parasol/lib/paraMessage.c +++ src/parasol/lib/paraMessage.c @@ -1,373 +1,373 @@ /* paraMessage - routines to pack and unpack messages in * the parasol system, and also to send them via sockets. */ #include "paraCommon.h" #include "paraLib.h" #include "internet.h" #include "rudp.h" #include "paraMessage.h" #include "errAbort.h" #include "log.h" static void pmInitExt(struct paraMessage *pm, char *hostStr, char *portStr, boolean ipOnly) /* Initialize message (that might be on stack). ipStr is host ip as string. portStr is port number as string */ { ZeroVar(pm); if (!hostStr && !portStr) // special case used by spoke etc for message that goes on queue but not sent via network. return; // ipOnly = TRUE = only allow ipv4 or ipv6 addresses, not names // ipOnly = FALSE = names allowed too. if (!internetFillInAddress6n4(hostStr, portStr, AF_UNSPEC, SOCK_DGRAM, &pm->ipAddress, ipOnly)) errAbort("ip address %s port %s lookup failed.", hostStr, portStr); } void pmInit(struct paraMessage *pm, char *ipStr, char *portStr) /* Initialize message (that might be on stack). ipStr is host ip as string. portStr is port number as string */ { // ipOnly = TRUE = only allow ipv4 or ipv6 addresses, not names pmInitExt(pm, ipStr, portStr, TRUE); } void pmInitFromName(struct paraMessage *pm, char *hostName, char *portStr) /* Initialize message with ascii ip address. */ { // ipOnly = FALSE = names allowed too. pmInitExt(pm, hostName, portStr, FALSE); } struct paraMessage *pmNew(char *ipStr, char *portStr) /* Create new message in memory */ { struct paraMessage *pm; AllocVar(pm); pmInit(pm, ipStr, portStr); return pm; } void pmFree(struct paraMessage **pPm) /* Free up message. */ { freez(pPm); } void pmClear(struct paraMessage *pm) /* Clear out data buffer. */ { pm->size = 0; } void pmSet(struct paraMessage *pm, char *message) /* Set message in data buffer. */ { int len = strlen(message); if (len >= sizeof(pm->data) - 1) { - warn("Message too long in pmSet, ignoring: %.20s...", message); - pmClear(pm); + errAbort("Message too long in pmSet, ignoring: %.20s...", message); + //pmClear(pm); } else { strcpy(pm->data, message); pm->size = len; } } void pmVaPrintf(struct paraMessage *pm, char *format, va_list args) /* Print message into end of data buffer. Warn if it goes * past limit. */ { int sizeLeft = sizeof(pm->data) - pm->size; int sz = vsnprintf(pm->data + pm->size, sizeLeft, format, args); /* note that some version return -1 if too small */ if ((sz < 0) || (sz >= sizeLeft)) { - warn("pmVaPrintf buffer overflow size %d, format %s", sz, format); - pmClear(pm); + errAbort("pmVaPrintf buffer overflow size %d, format %s", sz, format); + //pmClear(pm); } else pm->size += sz; } void pmPrintf(struct paraMessage *pm, char *format, ...) /* Print message into end of data buffer. Warn if it goes * past limit. */ { va_list args; va_start(args, format); pmVaPrintf(pm, format, args); va_end(args); } boolean pmSend(struct paraMessage *pm, struct rudp *ru) /* Send out message. Print warning message and return FALSE if * there is a problem. */ { return rudpSend(ru, &pm->ipAddress, pm->data, pm->size) == 0; } boolean pmSendString(struct paraMessage *pm, struct rudp *ru, char *string) /* Send out given message strng. Print warning message and return FALSE if * there is a problem. */ { pmSet(pm, string); return pmSend(pm, ru); } void pmCheckCommandSize(char *string, int len) /* Check that string of given len is not too long to fit into paraMessage. * If it is, abort with good error message assuming it was a command string */ { if (len > rudpMaxSize) { errAbort("The following string has %d bytes, but can only be %d:\n%s\n" "Please either shorten the current directory or the command line\n" "possibly by making a shell script that encapsulates a long command.\n" , len, (int)rudpMaxSize, string); } } boolean pmReceiveTimeOut(struct paraMessage *pm, struct rudp *ru, int timeOut) /* Wait up to timeOut microseconds for message. To wait forever * set timeOut to zero. */ { int size = rudpReceiveTimeOut(ru, pm->data, sizeof(pm->data)-1, &pm->ipAddress, timeOut); if (size < 0) { pmClear(pm); return FALSE; } pm->size = size; pm->data[size] = 0; return TRUE; } boolean pmReceive(struct paraMessage *pm, struct rudp *ru) /* Receive message. Print warning message and return FALSE if * there is a problem. */ { return pmReceiveTimeOut(pm, ru, 0); } static void setPort6n4(struct sockaddr_storage *sai, char *portStr) /* set port to zero */ { int port = atoi(portStr); if (sai->ss_family == AF_INET6) //ipv6 { struct sockaddr_in6 *sai6 = (struct sockaddr_in6 *)sai; sai6->sin6_port = htons(port); } else if (sai->ss_family == AF_INET) // ipv4 { struct sockaddr_in *sai4 = (struct sockaddr_in *)sai; sai4->sin_port = htons(port); } else errAbort("unknown sai->ss_family=%d in setPort6n4", sai->ss_family); } void pmmInit(struct paraMultiMessage *pmm, struct paraMessage *pm) /* Initialize structure for multi-message response */ { pmm->pm = pm; memcpy(&pmm->ipAddress, &pm->ipAddress, sizeof(struct sockaddr_storage)); setPort6n4(&pmm->ipAddress, "0"); /* we don't yet know what port the responder is going to use */ if (pm->ipAddress.ss_family != pmm->ipAddress.ss_family) // must match errAbort("unexpected error pmmInit, ss_family does not match pm family=%d, pmm family=%d ", pm->ipAddress.ss_family, pmm->ipAddress.ss_family); pmm->id = 0; } boolean pmmReceiveTimeOut(struct paraMultiMessage *pmm, struct rudp *ru, int timeOut) /* Multi-message receive * Wait up to timeOut microseconds for message. To wait forever * set timeOut to zero. For multi-message response * We know the ip, and can track the port for continuity * and the packet id for sequential series. */ { for(;;) { if (!pmReceiveTimeOut(pmm->pm, ru, timeOut)) return FALSE; char pmmIpStr[NI_MAXHOST]; char pmmpmIpStr[NI_MAXHOST]; char pmmPortStr[NI_MAXSERV]; char pmmpmPortStr[NI_MAXSERV]; getAddrAndPortAsString6n4(&pmm->ipAddress , pmmIpStr , sizeof pmmIpStr , pmmPortStr , sizeof pmmPortStr ); getAddrAndPortAsString6n4(&pmm->pm->ipAddress, pmmpmIpStr, sizeof pmmpmIpStr, pmmpmPortStr, sizeof pmmpmPortStr); if (!sameString(pmmIpStr, pmmpmIpStr)) // diff { logDebug("rudp: pmmReceiveTimeOut ignoring unwanted packet from wrong sender expected %s got %s", pmmIpStr, pmmpmIpStr); continue; } if (sameString(pmmPortStr,"0")) /* we don't yet know what port the responder is going to use */ /* Should be ASSERTABLE first packet received since pmmInit was called */ { setPort6n4(&pmm->ipAddress, pmmpmPortStr); } else { if (!sameString(pmmPortStr,pmmpmPortStr)) /* we don't yet know what port the responder is going to use */ { logDebug("rudp: pmmReceiveTimeOut ignoring unwanted packet from wrong port expected %s got %s", pmmPortStr, pmmpmPortStr); continue; } if (pmm->id == ru->lastIdReceived && ru->resend) { logDebug("rudp: pmmReceiveTimeOut ignoring duplicate packet lastIdReceived=%d", pmm->id); continue; } if (pmm->id+1 != ru->lastIdReceived) { logDebug("rudp: pmmReceiveTimeOut invalid msg id expected %d got %d", pmm->id+1, ru->lastIdReceived); continue; } } pmm->id = ru->lastIdReceived; return TRUE; } } boolean pmmReceive(struct paraMultiMessage *pmm, struct rudp *ru) /* Receive multi message. Print warning message and return FALSE if * there is a problem. */ { return pmmReceiveTimeOut(pmm, ru, 0); } void pmFetchOpenFile(struct paraMessage *pm, struct rudp *ru, char *fileName) /* Read everything you can from socket and output to file. */ { struct paraMultiMessage pmm; FILE *f = mustOpen(fileName, "w"); /* ensure the multi-message response comes from the correct ip and has no duplicate msgs*/ pmmInit(&pmm, pm); while (pmmReceive(&pmm, ru)) { if (pm->size == 0) break; mustWrite(f, pm->data, pm->size); } carefulClose(&f); } void pmFetchFile(char *host, char *sourceName, char *destName) /* Fetch small file. Only works if you are on hub if they've set up any security. */ { struct paraMessage pm; struct rudp *ru = rudpOpen(); if (ru != NULL) { pmInitFromName(&pm, host, paraNodePortStr); pmPrintf(&pm, "fetch %s %s", getUser(), sourceName); if (pmSend(&pm, ru)) pmFetchOpenFile(&pm, ru, destName); rudpClose(&ru); } } boolean pmSendStringWithRetries(struct paraMessage *pm, struct rudp *ru, char *string) /* Send out given message strng. Print warning message and return FALSE if * there is a problem. Try up to 5 times sleeping for 60 seconds in between. * This is an attempt to help automated processes. */ { int tries = 0; #define PMSENDSLEEP 60 #define PMSENDMAXTRIES 5 boolean result = FALSE; while (TRUE) { result = pmSendString(pm, ru, string); if (result) break; warn("pmSendString timed out!"); ++tries; if (tries >= PMSENDMAXTRIES) break; warn("pmSendString: will sleep %d seconds and retry", PMSENDSLEEP); sleep(PMSENDSLEEP); } return result; } char *pmHubSendSimple(char *message, char *host) /* Send message to host, no response. */ { struct rudp *ru = rudpMustOpen(); struct paraMessage pm; pmInitFromName(&pm, host, paraHubPortStr); if (!pmSendStringWithRetries(&pm, ru, message)) noWarnAbort(); rudpClose(&ru); return cloneString(pm.data); } char *pmHubSingleLineQuery(char *query, char *host) /* Send message to hub and get single line response. * This should be freeMem'd when done. */ { struct rudp *ru = rudpMustOpen(); struct paraMessage pm; pmInitFromName(&pm, host, paraHubPortStr); if (!pmSendStringWithRetries(&pm, ru, query)) noWarnAbort(); if (!pmReceive(&pm, ru)) noWarnAbort(); rudpClose(&ru); return cloneString(pm.data); } struct slName *pmHubMultilineQuery(char *query, char *host) /* Send a command with a multiline response to hub, * and return response as a list of strings. */ { struct slName *list = NULL; struct rudp *ru = rudpMustOpen(); struct paraMessage pm; struct paraMultiMessage pmm; char *row[256]; int count = 0; pmInitFromName(&pm, host, paraHubPortStr); /* ensure the multi-message response comes from the correct ip and has no duplicate msgs*/ pmmInit(&pmm, &pm); if (!pmSendStringWithRetries(&pm, ru, query)) noWarnAbort(); for (;;) { if (!pmmReceive(&pmm, ru)) break; if (pm.size == 0) break; count = chopByChar(pm.data, '\n', row, sizeof(row)); if (count > 1) --count; /* for multiline, count is inflated by one */ int i; for(i=0;i<count;++i) { slNameAddHead(&list, row[i]); } } rudpClose(&ru); slReverse(&list); return list; } struct paraPstat2Job *paraPstat2JobLoad(char **row) /* Turn an array of 5 strings into a paraPstat2Job. */ { struct paraPstat2Job *job; AllocVar(job); job->status = cloneString(row[0]); job->parasolId = cloneString(row[1]); job->user = cloneString(row[2]); job->program = cloneString(row[3]); job->host = cloneString(row[4]); return job; }