6e5ee11ca95cd971984038cf65bae00d9c898707
galt
  Wed Jun 4 15:40:02 2014 -0700
Since we have git, it is easy to rename errabort.c to errAbort.c without losing any history.
diff --git src/parasol/lib/paraMessage.c src/parasol/lib/paraMessage.c
index fe90ca9..d6a4336 100644
--- src/parasol/lib/paraMessage.c
+++ src/parasol/lib/paraMessage.c
@@ -1,364 +1,364 @@
 /* paraMessage - routines to pack and unpack messages in
  * the parasol system, and also to send them via sockets. */
 
 #include "paraCommon.h"
 #include "paraLib.h"
 #include "internet.h"
 #include "rudp.h"
 #include "paraMessage.h"
-#include "errabort.h"
+#include "errAbort.h"
 #include "log.h"
 
 void pmInit(struct paraMessage *pm, rudpHost ipAddress, bits16 port)
 /* Initialize message (that might be on stack). ipAddress is in host
  * order. */
 {
 ZeroVar(pm);
 pm->ipAddress.sin_family = AF_INET;
 pm->ipAddress.sin_port = htons(port);
 pm->ipAddress.sin_addr.s_addr = htonl(ipAddress);
 }
 
 void pmInitFromName(struct paraMessage *pm, char *hostName, bits16 port)
 /* Initialize message with ascii ip address. */
 {
 pmInit(pm, internetHostIp(hostName), port);
 }
 
 
 struct paraMessage *pmNew(rudpHost ipAddress, bits16 port)
 /* Create new message in memory */
 {
 struct paraMessage *pm;
 AllocVar(pm);
 pmInit(pm, ipAddress, port);
 return pm;
 }
 
 struct paraMessage *pmNewFromName(char *hostName, bits16 port)
 /* Create new message in memory */
 {
 return pmNew(internetHostIp(hostName), port);
 }
 
 void pmFree(struct paraMessage **pPm)
 /* Free up message. */
 {
 freez(pPm);
 }
 
 void pmClear(struct paraMessage *pm)
 /* Clear out data buffer. */
 {
 pm->size = 0;
 }
 
 void pmSet(struct paraMessage *pm, char *message)
 /* Set message in data buffer. */
 {
 int len = strlen(message);
 if (len >= sizeof(pm->data) - 1)
     {
     warn("Message too long in pmSet, ignoring: %.20s...", message);
     pmClear(pm);
     }
 else
     {
     strcpy(pm->data, message);
     pm->size = len;
     }
 }
 
 void pmVaPrintf(struct paraMessage *pm, char *format, va_list args)
 /* Print message into end of data buffer.  Warn if it goes
  * past limit. */
 {
 int sizeLeft = sizeof(pm->data) - pm->size;
 int sz = vsnprintf(pm->data + pm->size, sizeLeft, format, args);
 /* note that some version return -1 if too small */
 if ((sz < 0) || (sz >= sizeLeft))
     {
     warn("pmVaPrintf buffer overflow size %d, format %s", sz, format);
     pmClear(pm);
     }
 else
     pm->size += sz;
 }
 
 void pmPrintf(struct paraMessage *pm, char *format, ...)
 /* Print message into end of data buffer.  Warn if it goes
  * past limit. */
 {
 va_list args;
 va_start(args, format);
 pmVaPrintf(pm, format, args);
 va_end(args);
 }
 
 boolean pmSend(struct paraMessage *pm, struct rudp *ru)
 /* Send out message.  Print warning message and return FALSE if
  * there is a problem. */
 {
 return rudpSend(ru, &pm->ipAddress, pm->data, pm->size) == 0;
 }
 
 boolean pmSendString(struct paraMessage *pm, struct rudp *ru, char *string)
 /* Send out given message strng.  Print warning message and return FALSE if
  * there is a problem. */
 {
 pmSet(pm, string);
 return pmSend(pm, ru);
 }
 
 void pmCheckCommandSize(char *string, int len)
 /* Check that string of given len is not too long to fit into paraMessage.
  * If it is, abort with good error message assuming it was a command string */
 {
 if (len > rudpMaxSize)
     {
     errAbort("The following string has %d bytes, but can only be %d:\n%s\n"
              "Please either shorten the current directory or the command line\n"
              "possibly by making a shell script that encapsulates a long command.\n"
              ,  len, (int)rudpMaxSize, string);
     }
 }
 
 boolean pmReceiveTimeOut(struct paraMessage *pm, struct rudp *ru, int timeOut)
 /* Wait up to timeOut microseconds for message.  To wait forever
  * set timeOut to zero. */
 {
 int size = rudpReceiveTimeOut(ru, pm->data, sizeof(pm->data)-1, &pm->ipAddress, timeOut);
 if (size < 0)
     {
     pmClear(pm);
     return FALSE;
     }
 pm->size = size;
 pm->data[size] = 0;
 return TRUE;
 }
 
 boolean pmReceive(struct paraMessage *pm, struct rudp *ru)
 /* Receive message.  Print warning message and return FALSE if
  * there is a problem. */
 {
 return pmReceiveTimeOut(pm, ru, 0);
 }
 
 void pmmInit(struct paraMultiMessage *pmm, struct paraMessage *pm, struct in_addr sin_addr)
 /* Initialize structure for multi-message response  */
 {
 pmm->pm = pm;
 pmm->ipAddress.sin_addr.s_addr = sin_addr.s_addr;
 pmm->ipAddress.sin_port = 0;    /* we don't yet know what port the responder is going to use */
 pmm->id = 0; 
 }
 
 boolean pmmReceiveTimeOut(struct paraMultiMessage *pmm, struct rudp *ru, int timeOut)
 /* Multi-message receive
  * Wait up to timeOut microseconds for message.  To wait forever
  * set timeOut to zero.  For multi-message response
  * We know the ip, and can track the port for continuity
  * and the packet id for sequential series. 
  */
 {
 for(;;)
     {
     if (!pmReceiveTimeOut(pmm->pm, ru, timeOut))
 	return FALSE;
     if (pmm->ipAddress.sin_addr.s_addr != pmm->pm->ipAddress.sin_addr.s_addr)
 	{
 	logDebug("rudp: pmmReceiveTimeOut ignoring unwanted packet from wrong sender expected %ux got %ux",
 	    (unsigned int)pmm->ipAddress.sin_addr.s_addr, (unsigned int)pmm->pm->ipAddress.sin_addr.s_addr);
 	continue;
 	}
     if (pmm->ipAddress.sin_port == 0)  /* we don't yet know what port the responder is going to use */
        	/* Should be ASSERTABLE first packet received since pmmInit was called */
 	{
 	pmm->ipAddress.sin_port = pmm->pm->ipAddress.sin_port;
 	}
     else
 	{
 	if (pmm->ipAddress.sin_port != pmm->pm->ipAddress.sin_port)
 	    {
 	    logDebug("rudp: pmmReceiveTimeOut ignoring unwanted packet from wrong port expected %x got %x",
 		pmm->ipAddress.sin_port, pmm->pm->ipAddress.sin_port);
 	    continue;
 	    }
 	if (pmm->id == ru->lastIdReceived && ru->resend)
 	    {
 	    logDebug("rudp: pmmReceiveTimeOut ignoring duplicate packet lastIdReceived=%d",	pmm->id);
 	    continue;
 	    }
 	if (pmm->id+1 != ru->lastIdReceived)
 	    {
 	    logDebug("rudp: pmmReceiveTimeOut invalid msg id expected %d got %d", pmm->id+1, ru->lastIdReceived);
 	    continue;
 	    }
 	}
     pmm->id = ru->lastIdReceived;
     return TRUE;
     }
 }
 
 boolean pmmReceive(struct paraMultiMessage *pmm, struct rudp *ru)
 /* Receive multi message.  Print warning message and return FALSE if
  * there is a problem. */
 {
 return pmmReceiveTimeOut(pmm, ru, 0);
 }
 
 
 #if 0 /* unused static functions */
 static char *addLongData(char *data, bits32 val)
 /* Add val to data stream, converting to network format.  Return
  * new position of data stream. */
 {
 val = htonl(val);
 memcpy(data, &val, sizeof(val));
 return data + sizeof(val);
 }
 
 static bits32 getLongData(char **pData)
 /* Get val from data stream, converting from network to host format.
  * Update data stream position. */
 {
 bits32 val;
 memcpy(&val, *pData, sizeof(val));
 *pData += sizeof(val);
 val = ntohl(val);
 return val;
 }
 #endif
 
 void pmFetchOpenFile(struct paraMessage *pm, struct rudp *ru, char *fileName)
 /* Read everything you can from socket and output to file. */
 {
 struct paraMultiMessage pmm;
 FILE *f = mustOpen(fileName, "w");
 /* ensure the multi-message response comes from the correct ip and has no duplicate msgs*/
 pmmInit(&pmm, pm, pm->ipAddress.sin_addr);
 while (pmmReceive(&pmm, ru))
     {
     if (pm->size == 0)
 	break;
     mustWrite(f, pm->data, pm->size);
     }
 carefulClose(&f);
 }
 
 void pmFetchFile(char *host, char *sourceName, char *destName)
 /* Fetch small file. Only works if you are on hub if they've set up any security. */
 {
 struct rudp *ru = rudpOpen();
 struct paraMessage pm;
 if (ru != NULL)
     {
     pmInitFromName(&pm, host, paraNodePort);
     pmPrintf(&pm, "fetch %s %s", getUser(), sourceName);
     if (pmSend(&pm, ru))
 	pmFetchOpenFile(&pm, ru, destName);
     rudpClose(&ru);
     }
 }
 
 boolean pmSendStringWithRetries(struct paraMessage *pm, struct rudp *ru, char *string)
 /* Send out given message strng.  Print warning message and return FALSE if
  * there is a problem. Try up to 5 times sleeping for 60 seconds in between.
  * This is an attempt to help automated processes. */
 {
 int tries = 0;
 #define PMSENDSLEEP 60
 #define PMSENDMAXTRIES 5
 boolean result = FALSE;
 while (TRUE)
     {
     result = pmSendString(pm, ru, string);
     if (result)
 	break;
     warn("pmSendString timed out!");
     ++tries;
     if (tries >= PMSENDMAXTRIES)
 	break;
     warn("pmSendString: will sleep %d seconds and retry", PMSENDSLEEP);
     sleep(PMSENDSLEEP);
     }
 return result;
 }
 
 char *pmHubSendSimple(char *message, char *host)
 /* Send message to host, no response. */
 {
 struct rudp *ru = rudpMustOpen();
 struct paraMessage pm;
 pmInitFromName(&pm, host, paraHubPort);
 if (!pmSendStringWithRetries(&pm, ru, message))
     noWarnAbort();
 rudpClose(&ru);
 return cloneString(pm.data);
 }
 
 char *pmHubSingleLineQuery(char *query, char *host)
 /* Send message to hub and get single line response.
  * This should be freeMem'd when done. */
 {
 struct rudp *ru = rudpMustOpen();
 struct paraMessage pm;
 
 pmInitFromName(&pm, host, paraHubPort);
 if (!pmSendStringWithRetries(&pm, ru, query))
     noWarnAbort();
 if (!pmReceive(&pm, ru))
     noWarnAbort();
 rudpClose(&ru);
 return cloneString(pm.data);
 }
 
 struct slName *pmHubMultilineQuery(char *query, char *host)
 /* Send a command with a multiline response to hub,
  * and return response as a list of strings. */
 {
 struct slName *list = NULL;
 struct rudp *ru = rudpMustOpen();
 struct paraMessage pm;
 struct paraMultiMessage pmm;
 char *row[256];
 int count = 0;
 pmInitFromName(&pm, host, paraHubPort);
 /* ensure the multi-message response comes from the correct ip and has no duplicate msgs*/
 pmmInit(&pmm, &pm, pm.ipAddress.sin_addr);
 if (!pmSendStringWithRetries(&pm, ru, query))
     noWarnAbort();
 for (;;)
     {
     if (!pmmReceive(&pmm, ru))
 	break;
     if (pm.size == 0)
 	break;
     count = chopByChar(pm.data, '\n', row, sizeof(row));
     if (count > 1) --count;  /* for multiline, count is inflated by one */
 
     int i;
     for(i=0;i<count;++i)
 	{
 	slNameAddHead(&list, row[i]);
 	}
     }
 rudpClose(&ru);
 slReverse(&list);
 return list;
 }
 
 struct paraPstat2Job *paraPstat2JobLoad(char **row)
 /* Turn an array of 5 strings into a paraPstat2Job. */
 {
 struct paraPstat2Job *job;
 AllocVar(job);
 job->status = cloneString(row[0]);
 job->parasolId = cloneString(row[1]);
 job->user = cloneString(row[2]);
 job->program = cloneString(row[3]);
 job->host = cloneString(row[4]);
 return job;
 }