5359edc160de518d8e43fdd3448365c15b912c3c
galt
  Mon Jul 22 11:48:10 2019 -0700
Added ipv6 support. Listening processes us hybrid dual stack feature of OS to simplify implementation and use a single listening socket. Works with both TCP and UDP. Parasol working. geoIp also updated and ready for IPv6. Should be invisible to most users, while providing connections via ipv6 where available. Supports both ipv4 and ipv6.

diff --git src/parasol/paraNode/paraNode.c src/parasol/paraNode/paraNode.c
index ade2c75..a7cbe4b 100644
--- src/parasol/paraNode/paraNode.c
+++ src/parasol/paraNode/paraNode.c
@@ -1,917 +1,910 @@
 /* paraNode - parasol node server. */
 
 #include "paraCommon.h"
 #include <grp.h>
 #include "errAbort.h"
 #include "dystring.h"
 #include "dlist.h"
 #include "hash.h"
 #include "localmem.h"
 #include "options.h"
 #include "subText.h"
 #include "paraLib.h"
 #include "net.h"
 #include "portable.h"
 #include "rudp.h"
 #include "paraMessage.h"
 #include "internet.h"
 #include "log.h"
 #include <sys/resource.h>
 
 /* command line option specifications */
 static struct optionSpec optionSpecs[] = {
     {"logFacility", OPTION_STRING},
     {"logMinPriority", OPTION_STRING},
     {"log", OPTION_STRING},
     {"debug", OPTION_BOOLEAN},
     {"hub", OPTION_STRING},
     {"umask", OPTION_INT},
     {"userPath", OPTION_STRING},
     {"sysPath", OPTION_STRING},
     {"env", OPTION_STRING|OPTION_MULTI},
     {"randomDelay", OPTION_INT},
     {"cpu", OPTION_INT},
     {"localhost", OPTION_STRING},
     {NULL, 0}
 };
 
 char *version = PARA_VERSION;   /* Version number. */
 
 void usage()
 /* Explain usage and exit. */
 {
 errAbort("paraNode - version %s\n"
 	 "Parasol node server.\n"
          "usage:\n"
 	 "    paraNode start\n"
 	 "options:\n"
 	 "    -logFacility=facility  Log to the specified syslog facility - default local0.\n"
          "    -logMinPriority=pri minimum syslog priority to log, also filters file logging.\n"
          "     defaults to \"warn\"\n"
          "    -log=file  Log to file instead of syslog.\n"
          "    -debug  Don't daemonize\n"
 	 "    -hub=host  Restrict access to connections from hub.\n"
 	 "    -umask=000  Set umask to run under - default 002.\n"
 	 "    -userPath=bin:bin/i386  User dirs to add to path.\n"
 	 "    -sysPath=/sbin:/local/bin  System dirs to add to path.\n"
          "    -env=name=value - add environment variable to jobs.  Maybe repeated.\n"
 	 "    -randomDelay=N  Up to this many milliseconds of random delay before\n"
 	 "        starting a job.  This is mostly to avoid swamping NFS with\n"
 	 "        file opens when loading up an idle cluster.  Also it limits\n"
 	 "        the impact on the hub of very short jobs. Default 5000.\n"
 	 "    -cpu=N  Number of CPUs to use - default 1.\n"
 	, version
 	);
 }
 
 
 /* Command line overwriteable variables. */
 char *hubName;			/* Name of hub machine, may be NULL. */
 int umaskVal = 0002;		/* File creation mask. */
 int maxProcs = 1;		/* Number of processers allowed to use. */
 char *userPath = "";		/* User stuff to add to path. */
 char *sysPath = "";		/* System stuff to add to path. */
 struct slName *envExtra = NULL; /* Add to environment */ 
 int randomDelay = 5000;		/* How much to delay job startup. */
 
 /* Other globals. */
 char *hostName;			/* Name of this machine. */
-in_addr_t hubIp;		/* Hub IP address. */
-in_addr_t localIp;		/* localhost IP address. */
+char   hubIp[NI_MAXHOST];	/* Hub IP address as a string. */
+char localIp[NI_MAXHOST];	/* localhost IP address as a string. */
 int busyProcs = 0;		/* Number of processers in use. */
 struct rudp *mainRudp;		/* Rudp wrapper around main socket. */ 
 struct paraMessage pmIn;	/* Input message */
 double ticksToHundreths;	/* Conversion factor from system ticks
                                  * to 100ths of second. */
 
 struct job
 /* Info on one job in this node. */
     {
     struct job *next;	/* Next job. */
     int jobId;		/* Job ID for hub. */
     int pid;		/* Process ID of running job. */
     char *startMessage;	/* Full message that started this job. */
     char *doneMessage;  /* Full message that ended this job if any. */
     struct dlNode *node; /* Node for list this is on. */
     };
 struct dlList *jobsRunning;		/* List of currently running jobs. */
 struct dlList *jobsFinished;	/* List of recent finished jobs. */
 
 struct job *findJobOnList(struct dlList *list, int jobId)
 /* Return job if it's on list, otherwise NULL */
 {
 struct dlNode *node;
 struct job *job;
 for (node = list->head; !dlEnd(node); node = node->next)
     {
     job = node->val;
     if (job->jobId == jobId)
         return job;
     }
 return NULL;
 }
 
 struct job *findRunningJob(int jobId)
 /* Return job if it's on running list, otherwise NULL */
 {
 return findJobOnList(jobsRunning, jobId);
 }
 
 struct job *findFinishedJob(int jobId)
 /* Return recently finished job or NULL if it doesn't exist */
 {
 return findJobOnList(jobsFinished, jobId);
 }
 
 extern char **environ;	/* The environment strings. */
 
 char **hashToEnviron(struct hash *hash)
 /* Create an environ formatted string from hash. */
 {
 struct dyString *dy = newDyString(512);
 struct hashEl *list = hashElListHash(hash), *el;
 char **newEnv;
 int envCount = slCount(list), i;
 AllocArray(newEnv, envCount+1);
 for (i=0, el=list; i<envCount; ++i, el = el->next)
     {
     dyStringClear(dy);
     dyStringAppend(dy, el->name);
     dyStringAppend(dy, "=");
     dyStringAppend(dy, el->val);
     newEnv[i] = cloneString(dy->string);
     }
 freeDyString(&dy);
 hashElFreeList(&list);
 return newEnv;
 }
 
 struct hash *environToHash(char **env)
 /* Put environment into hash. */
 {
 struct hash *hash = newHash(7);
 char *name, *val, *s;
 while ((s = *env++) != NULL)
     {
     name = cloneString(s);
     val = strchr(name, '=');
     if (val != NULL)
         {
 	*val++ = 0;
 	hashAdd(hash, name, cloneString(val));
 	}
     freez(&name);
     }
 return hash;
 }
 
 unsigned randomNumber;	/* Last random number generated. */
 
 void initRandom()
 /* Initialize random number generator */
 {
 /* Set up random number generator with seed depending on host name. */
 unsigned seed = hashCrc(hostName)&0xfffff;
 srand(seed);
 }
 
 void nextRandom()
 /* Generate next random number.  This is in
  * executed main thread in response to any message
  * rather than in child threads. */
 {
 randomNumber = rand();
 }
 
 void randomSleep()
 /* Put in a random sleep time of 0-5 seconds to help
  * prevent overloading hub with jobDone messages all
  * at once, and file server with file open requests
  * all at once. */
 {
 if (randomDelay > 0)
     {
     int sleepTime = randomNumber%randomDelay;
     sleep1000(sleepTime);
     }
 }
 
 
 void hashUpdate(struct hash *hash, char *name, char *val)
 /* Update hash with name/val pair.   If name not in hash
  * already, put it in hash, otherwise free up old value
  * associated with name and put new value in it's place. */
 {
 struct hashEl *hel = hashLookup(hash, name);
 val = cloneString(val);
 if (hel == NULL)
     hashAdd(hash, name, val);
 else
     {
     freez(&hel->val);
     hel->val = val;
     }
 }
 
 void changeUid(char *name, char **retDir)
 /* Try and change process user (and groups) id to that of name. if 
  * we are root.  Get home dir no matter what. */
 {
 struct passwd *pw = getpwnam(name);
 if (pw == NULL)
     errnoAbort("can't obtain user passwd entry for user %s", name);
 if (geteuid() == 0)
     {
     if (initgroups(name, pw->pw_gid) < 0)
         errnoAbort("initgroups failed");
     if (setgid(pw->pw_gid) < 0)
         errnoAbort("setgid failed");
     if (setuid(pw->pw_uid) < 0)
         errnoAbort("setuid failed");
     }
 if (retDir != NULL)
     *retDir = pw->pw_dir;
 }
 
 static int grandChildId = 0;
 
 void termHandler()
 /* Handle termination signal. */
 {
 if (grandChildId != 0)
     {
     kill(-grandChildId, SIGTERM);
     grandChildId = 0;
     // sleep(3);
     // kill(-grandChildId, SIGKILL);
     }
 }
 
 void updatePath(struct hash *hash, char *userPath, 
 	char *homeDir, char *sysPath)
 /* Prepend userPath and system path to existing path. 
  * Add homeDir in front of all elements of user path. */
 {
 struct dyString *dy = newDyString(1024);
 char *s, *e;
 char *oldPath;
 
 /* Go through user path - which is colon separated
  * and prepend homeDir to it. */
 userPath = cloneString(userPath);
 s = userPath;
 for (;;)
     {
     if (s == NULL || s[0] == 0)
         break;
     e = strchr(s, ':');
     if (e != NULL)
        *e++ = 0;
     dyStringAppend(dy, homeDir);
     dyStringAppend(dy, "/");
     dyStringAppend(dy, s);
     dyStringAppend(dy, ":");
     s = e;
     }
 
 /* Add system path next. */
 if (sysPath != NULL && sysPath[0] != 0)
     {
     dyStringAppend(dy, sysPath);
     if (lastChar(sysPath) != ':')
 	dyStringAppend(dy, ":");
     }
 
 /* Add paths we inherited from root. */
 oldPath = hashFindVal(hash, "PATH");
 if (oldPath == NULL || oldPath[0] == 0)
     oldPath = "/bin:/usr/bin";
 dyStringAppend(dy, oldPath);
 
 hashUpdate(hash, "PATH", dy->string);
 freez(&userPath);
 dyStringFree(&dy);
 }
 
 void addEnvExtra(struct hash *hash, char *nameVal)
 /* parse and add one of the environment extra entries */
 {
 char *eq = strchr(nameVal, '=');
 if (eq == NULL)
     errAbort("invalid -env argument, expected -env=name=value, got -env=%s", nameVal);
 *eq = '\0';
 hashUpdate(hash, nameVal, eq+1);
 *eq = '=';
 }
 
 void addEnvExtras(struct hash *hash)
 /* add environment extras */
 {
 struct slName *nameVal;
 for (nameVal = envExtra; nameVal != NULL; nameVal = nameVal->next)
     addEnvExtra(hash, nameVal->name);
 }
 
 void getTicksToHundreths()
 /* Return number of hundreths of seconds per system tick.
  * It used to be CLK_TCK would work for this, but
  * under recent Linux's it doesn't. */
 {
 #ifdef CLK_TCK
     ticksToHundreths = 100.0/CLK_TCK;
 #else
     ticksToHundreths = 100.0/sysconf(_SC_CLK_TCK);
 #endif /* CLK_TCK */
 }
 
 void setupProcStdio(char *in, char *out, char *err)
 /* setup stdio for process (after fork) */
 {
 int newStdin, newStdout, newStderr;
 
 close(mainRudp->socket);
 
 /* do stderr first */
 newStderr = open(err, O_WRONLY | O_CREAT, 0666);
 if (newStderr < 0)
     errnoAbort("can't open job stderr file %s", err);
 if (dup2(newStderr, STDERR_FILENO) < 0)
     errnoAbort("can't dup2 stderr");
 
 newStdin = open(in, O_RDONLY);
 if (newStdin < 0)
     errnoAbort("can't open job stdin file %s", in);
 if (dup2(newStdin, STDIN_FILENO) < 0)
     errnoAbort("can't dup2 stdin");
 
 newStdout = open(out, O_WRONLY | O_CREAT, 0666);
 if (newStdout < 0)
     errnoAbort("can't open job stdout file %s", out);
 if (dup2(newStdout, STDOUT_FILENO) < 0)
     errnoAbort("can't dup2 stdout");
 }
 
 void execProc(char *managingHost, char *jobIdString, char *reserved,
 	char *user, char *dir, char *in, char *out, char *err, long long memLimit,
 	char *exe, char **params)
 /* This routine is the child process of doExec.
  * It spawns a grandchild that actually does the
  * work and waits on it.  It sends message to the
  * main message loop here when done. */
 {
 if ((grandChildId = forkOrDie()) == 0)
     {
     // use rlimit to limit RAM available to process so it will not hog the machine unfairly.
     if (sizeof(long) == 4 && memLimit > 4294967295LL)  // 4GB is max value on 32-bit platform.
 	{
 	logWarn("memLimit %lld exceeds address space on 32-bit machine, treating it as 4GB limit", memLimit);
 	memLimit = 4294967295LL;
 	}
     struct rlimit rlim;
     rlim.rlim_cur = rlim.rlim_max = memLimit;
     if (setrlimit(RLIMIT_DATA, &rlim) < 0)
 	logWarn("setrlimit failed with RLIMIT_DATA rlim_cur=%lld rlim_max=%lld"
 	    , (long long) rlim.rlim_cur , (long long) rlim.rlim_max); 
     // although RLIMIT_AS is not supported/enforced on all platforms,
     // it is useful for linux and some other unix OSes. 
     if (setrlimit(RLIMIT_AS, &rlim) < 0)
 	logWarn("setrlimit failed with RLIMIT_AS rlim_cur=%lld rlim_max=%lld"
 	    , (long long) rlim.rlim_cur , (long long) rlim.rlim_max); 
 
     if (0) { // change to 1 for debugging
     logInfo("memLimit=%lld", memLimit);
     struct rlimit rlim; 
     int rv; 
     rv = getrlimit(RLIMIT_DATA,&rlim); 
     if ( rv == -1 ) 
 	logWarn("error getrlimit RLIMIT_DATA %s", strerror(errno)); 
     else 
     	logInfo("rlimit_data:%lu,%lu\n", (unsigned long) rlim.rlim_max, (unsigned long) rlim.rlim_cur); 
     rv = getrlimit(RLIMIT_AS,&rlim); 
     if ( rv == -1 ) 
 	logWarn("error getrlimit RLIMIT_AS %s", strerror(errno)); 
     else 
     	logInfo("rlimit_as:%lu,%lu\n", (unsigned long) rlim.rlim_max, (unsigned long) rlim.rlim_cur); 
     }
 
 
     char *homeDir = "";
 
     /* Change to given user (if root) */
     changeUid(user, &homeDir);
 
     /* create output files just after becoming user so that errors in the rest
      * of this proc will go to the err file and be available via para
      * problems */
     setupProcStdio(in, out, err);
 
     if (chdir(dir) < 0)
         errnoAbort("can't chdir to %s", dir);
     setsid();
     // setpgid(0,0);
     umask(umaskVal); 
 
 
     /* Update environment. */
         {
 	struct hash *hash = environToHash(environ);
 	hashUpdate(hash, "JOB_ID", jobIdString);
 	hashUpdate(hash, "USER", user);
 	hashUpdate(hash, "HOME", homeDir);
 	hashUpdate(hash, "HOST", hostName);
 	hashUpdate(hash, "PARASOL", "7");
 	updatePath(hash, userPath, homeDir, sysPath);
         addEnvExtras(hash);
 	environ = hashToEnviron(hash);
 	freeHashAndVals(&hash);
 	}
 
     randomSleep();	/* Sleep a random bit before executing this thing
                          * to help spread out i/o when a big batch of jobs
 			 * hit idle cluster */
     execvp(exe, params);
     errnoAbort("execvp'ing %s", exe);
     }
 else
     {
     /* Wait on executed job and send jobID and status back to 
      * main process. */
     int status = -1;
     int cid;
     struct paraMessage pm;
     struct rudp *ru = NULL;
     struct tms tms;
     unsigned long uTime = 0;
     unsigned long sTime = 0;
 
     if (grandChildId >= 0)
 	{
 	signal(SIGTERM, termHandler);
 	cid = waitpid(grandChildId, &status, 0);
         if (cid < 0)
             errnoAbort("wait on grandchild failed");
 	times(&tms);
 	uTime = ticksToHundreths*tms.tms_cutime;
 	sTime = ticksToHundreths*tms.tms_cstime;
 	}
     ru = rudpOpen();
     if (ru != NULL)
 	{
 	ru->maxRetries = 20;
-	pmInit(&pm, localIp, paraNodePort);
+	pmInit(&pm, localIp, paraNodePortStr);
 	pmPrintf(&pm, "jobDone %s %s %d %lu %lu", managingHost, 
 	    jobIdString, status, uTime, sTime);
 	pmSend(&pm, ru);
 	rudpClose(&ru);
 	}
     }
 }
 
 void clearZombies()
 /* Clear any zombie processes */
 {
 int stat;
 for (;;)
     {
     if (waitpid(-1, &stat, WNOHANG) <= 0)
         break;
     }
 }
 
-in_addr_t lookupIp(char *host)
-/* Return IP address of host. */
+void lookupIp(char *host, char *ipStr, int ipStrSize)
+/* convert host into IP address string. */
 {
-static char *lastHost = NULL;
-static in_addr_t lastAddress;
-
-if (lastHost != NULL && sameString(lastHost, host))
-    return lastAddress;
-freez(&lastHost);
-lastHost = cloneString(host);
-lastAddress = internetHostIp(host);
-return lastAddress;
+struct sockaddr_storage sai;
+if (!internetFillInAddress6n4(host, NULL, AF_UNSPEC, SOCK_DGRAM, &sai, FALSE))
+    errAbort("host %s lookup failed.", host);
+getAddrAsString6n4(&sai, ipStr, ipStrSize);
 }
 
 
 void tellManagerJobIsDone(char *managingHost, char *jobIdString, char *line)
 /* Try and send message to host saying job is done. */
 {
 struct paraMessage pm;
-bits32 ip;
-if (!internetDottedQuadToIp(managingHost, &ip))
-    {
-    warn("%s doesn't seem to be in dotted quad form\n", managingHost);
-    return;
-    }
-pmInit(&pm, ip, paraHubPort);
+pmInitFromName(&pm, managingHost, paraHubPortStr);
 pmPrintf(&pm, "jobDone %s %s", jobIdString, line);
 if (!pmSend(&pm, mainRudp))
     warn("Couldn't send message to %s to say %s is done\n", managingHost, jobIdString);
 }
 
 void jobFree(struct job **pJob)
 /* Free up memory associated with job */
 {
 struct job *job = *pJob;
 if (job != NULL)
     {
     freeMem(job->startMessage);
     freeMem(job->doneMessage);
     freeMem(job->node);
     freez(pJob);
     }
 }
 
 void jobDone(char *line)
 /* Handle job-done message - forward it to managing host. */
 {
 char *managingHost = nextWord(&line);
 char *jobIdString = nextWord(&line);
 
 // clearZombies();
 if (jobIdString != NULL && line != NULL && line[0] != 0)
     {
     /* Remove job from list running list and put on recently finished list. */
     struct job *job = findRunningJob(atoi(jobIdString));
     if (job != NULL)
         {
 	int status, err;
 	err = waitpid(job->pid, &status, 0);
 	if (err == -1 || !WIFEXITED(status) || WEXITSTATUS(status) != 0)
 	    {
 	    logDebug("paraNode sheparding %s pid %d status %d err %d errno %d", 
                      jobIdString, job->pid, status, err, errno);
 	    }
 	job->doneMessage = cloneString(line);
 	dlRemove(job->node);
 	if (dlCount(jobsFinished) >= 4*maxProcs)
 	    {
 	    struct dlNode *node = dlPopTail(jobsFinished);
 	    struct job *oldJob = node->val;
 	    jobFree(&oldJob);
 	    }
 	dlAddHead(jobsFinished, job->node);
 	--busyProcs;
 	}
     tellManagerJobIsDone(managingHost, jobIdString, line);
     }
 }
 
-void doCheck(char *line, struct sockaddr_in *hubIp)
+void doCheck(char *line, struct sockaddr_storage *ipAddress)
 /* Send back check result - either a check in message or
  * jobDone. */
 {
 char *jobIdString = nextWord(&line);
 if (jobIdString != NULL)
     {
     int jobId = atoi(jobIdString);
     struct job *job = findRunningJob(jobId);
     struct paraMessage pm;
-    pmInit(&pm, ntohl(hubIp->sin_addr.s_addr), paraHubPort);
+
+    char     ipStr[NI_MAXHOST];
+    getAddrAsString6n4(ipAddress, ipStr, sizeof ipStr);
+    pmInit(&pm, ipStr, paraHubPortStr);
     if (job != NULL)
 	pmPrintf(&pm, "checkIn %s %s running", hostName, jobIdString);
     else
 	{
 	struct job *job = findFinishedJob(jobId);
 	if (job == NULL)
 	    pmPrintf(&pm, "checkIn %s %s free", hostName, jobIdString);
 	else
 	    pmPrintf(&pm, "jobDone %s %s", jobIdString, job->doneMessage);
 	}
     pmSend(&pm, mainRudp);
     }
 }
 
-void doResurrect(char *line, struct sockaddr_in *hubIp)
+void doResurrect(char *line, struct sockaddr_storage *ipAddress)
 /* Send back I'm alive message */
 {
 struct paraMessage pm;
 struct dlNode *node;
 int jobsReported = 0;
-pmInit(&pm, ntohl(hubIp->sin_addr.s_addr), paraHubPort);
+char     ipStr[NI_MAXHOST];
+getAddrAsString6n4(ipAddress, ipStr, sizeof ipStr);
+pmInit(&pm, ipStr, paraHubPortStr);
 pmPrintf(&pm, "alive %s", hostName);
 for (node = jobsRunning->head; !dlEnd(node); node = node->next)
     {
     struct job *job = node->val;
     pmPrintf(&pm, " %d", job->jobId);
     ++jobsReported;
     }
 for (node = jobsFinished->head; !dlEnd(node); node = node->next)
     {
     struct job *job = node->val;
     if (jobsReported >= maxProcs)
 	break;
     pmPrintf(&pm, " %d", job->jobId);
     ++jobsReported;
     }
 pmSend(&pm, mainRudp);
 }
 
-void doRun(char *line, struct sockaddr_in *hubIp)
+void doRun(char *line, struct sockaddr_storage *ipAddress)
 /* Execute command. */
 {
 char *jobMessage = cloneString(line);
 static char *args[1024];
 int argCount;
-char hubDottedQuad[17];
+char ipStr[NI_MAXHOST];
+getAddrAsString6n4(ipAddress, ipStr, sizeof ipStr);
 
 nextRandom();
 if (line == NULL)
     warn("Executing nothing...");
-else if (!internetIpToDottedQuad(ntohl(hubIp->sin_addr.s_addr), hubDottedQuad))
-    warn("Can't convert ipToDottedQuad");
 else
     {
     struct runJobMessage rjm;
     if (parseRunJobMessage(line, &rjm))
 	{
 	int jobId = atoi(rjm.jobIdString);
 	if (findRunningJob(jobId) == NULL && findFinishedJob(jobId) == NULL)
 	    {
 	    if (busyProcs < maxProcs)
 		{
 		int childPid;
 		argCount = chopLine(rjm.command, args);
 		if (argCount >= ArraySize(args))
 		    warn("Too many arguments to run");
 		else
 		    {
 		    args[argCount] = NULL;
 		    if ((childPid = forkOrDie()) == 0)
 			{
 			/* Do JOB_ID substitutions */
 			struct subText *st = subTextNew("$JOB_ID", rjm.jobIdString);
 			int i;
 			rjm.in = subTextString(st, rjm.in);
 			rjm.out = subTextString(st, rjm.out);
 			rjm.err = subTextString(st, rjm.err);
 			for (i=0; i<argCount; ++i)
 			    args[i] = subTextString(st, args[i]);
 
-			execProc(hubDottedQuad, rjm.jobIdString, rjm.reserved,
+			execProc(ipStr, rjm.jobIdString, rjm.reserved,
 			    rjm.user, rjm.dir, rjm.in, rjm.out, rjm.err, rjm.ram,
 			    args[0], args);
 			exit(0);
 			}
 		    else
 			{
 			struct job *job;
 			AllocVar(job);
 			job->jobId = atoi(rjm.jobIdString);
 			job->pid = childPid;
 			job->startMessage = jobMessage;
 			jobMessage = NULL;	/* No longer own memory. */
 			job->node = dlAddValTail(jobsRunning, job);
 			++busyProcs;
 			}
 		    }
 		}
 	    else
 		{
 		warn("Trying to run when busy.");
 		}
 	    }
 	else
 	    {
 	    warn("Duplicate run-job %d\n", jobId);
 	    }
 	}
     }
 freez(&jobMessage);
 }
 
 void doFetch(char *line)
 /* Fetch first part of file.  Protocol is to send the
  * file one UDP packet at a time.  A zero length packet
  * indicates end of file. */
 {
 char *user = cloneString(nextWord(&line));
 char *fileName = cloneString(nextWord(&line));
 if ((user == NULL) || (fileName != NULL))
     {
     FILE *f = fopen(fileName, "r");
     pmClear(&pmIn);
     if (f == NULL)
 	{
         if (user == NULL)
             user = "<null>";
 	pmPrintf(&pmIn, "Couldn't open fetch file: \"%s\" %s for user %s",
                  fileName, strerror(errno), user);
 	warn("Couldn't open fetch file: \"%s\" %s for user %s",
              fileName, strerror(errno), user);
 	pmSend(&pmIn, mainRudp);
 	pmClear(&pmIn);
 	pmSend(&pmIn, mainRudp);
 	}
     else
 	{
 	int size;
 	for (;;)
 	    {
 	    size = fread(pmIn.data,1,  sizeof(pmIn.data)-1, f);
 	    if (size < 0)
 		{
 		size = 0;
 		warn("Couldn't read fetch file: \"%s\" %s",
                      fileName, strerror(errno));
 		}
 	    pmIn.size = size;
 	    pmSend(&pmIn, mainRudp);
 	    if (size == 0)
 		break;
 	    }
 	fclose(f);
 	}
     }
 freez(&user);
 freez(&fileName);
 }
 
 void doKill(char *line)
 /* Kill a specific job. */
 {
 char *jobIdString = nextWord(&line);
 int jobId = atoi(jobIdString);
 if (jobId != 0)
     {
     struct job *job = findRunningJob(jobId);
     if (job != NULL)
         {
 	kill(job->pid, SIGTERM);
 	dlRemove(job->node);
 	jobFree(&job);
 	--busyProcs;
 	}
     }
 else
     {
     warn("Nothing to kill\n");
     }
 }
 
 
 void doStatus()
 /* Report status. */
 {
 pmClear(&pmIn);
 pmPrintf(&pmIn, "%d of %d CPUs busy", busyProcs, maxProcs);
 if (busyProcs > 0)
     {
     struct dlNode *node;
     pmPrintf(&pmIn, ". Jobs:");
     for (node = jobsRunning->head; !dlEnd(node); node = node->next)
         {
 	struct job *job = node->val;
         pmPrintf(&pmIn, " %d", job->jobId);
 	}
     }
 pmSend(&pmIn, mainRudp);
 }
 
 void listJobs()
 /* Report jobs running and recently finished. */
 {
 struct job *job;
 struct dlNode *node;
 
 pmClear(&pmIn);
 pmPrintf(&pmIn, "%d running", dlCount(jobsRunning));
 if (!pmSend(&pmIn, mainRudp))
     return;
 for (node = jobsRunning->head; !dlEnd(node); node=node->next)
     {
     job = node->val;
     pmClear(&pmIn);
     pmPrintf(&pmIn, "%s", job->startMessage);
     if (!pmSend(&pmIn, mainRudp))
         return;
     }
 pmClear(&pmIn);
 pmPrintf(&pmIn, "%d recent", dlCount(jobsFinished));
 if (!pmSend(&pmIn, mainRudp))
     return;
 for (node = jobsFinished->head; !dlEnd(node); node=node->next)
     {
     job = node->val;
     pmClear(&pmIn);
     pmPrintf(&pmIn, "%s", job->startMessage);
     if (!pmSend(&pmIn, mainRudp))
         return;
     pmClear(&pmIn);
     pmPrintf(&pmIn, "%s", job->doneMessage);
     if (!pmSend(&pmIn, mainRudp))
         return;
     }
 }
 
 void paraNode()
 /* paraNode - a net server. */
 {
 char *line;
 char *command;
-struct sockaddr_in sai;
+struct sockaddr_storage sai;
 
 /* We have to know who we are... */
 hostName = getMachine();
 initRandom();
 getTicksToHundreths();
 
 /* log init */
 if (optionExists("log"))
     logOpenFile("paraNode", optionVal("log", NULL));
 else    
     logOpenSyslog("paraNode", optionVal("logFacility", NULL));
 logSetMinPriority(optionVal("logMinPriority", "info"));
 logInfo("starting paraNode on %s", hostName);
 
 /* Make job lists. */
 jobsRunning = newDlList();
 jobsFinished = newDlList();
 
 /* Set up socket and self to listen to it. */
 ZeroVar(&sai);
-sai.sin_family = AF_INET;
-sai.sin_port = htons(paraNodePort);
-sai.sin_addr.s_addr = INADDR_ANY;
+
+if (!internetFillInAddress6n4(NULL, paraNodePortStr, AF_INET6, SOCK_DGRAM, &sai, FALSE))
+    errAbort("NULL host addrinfo lookup failed trying to bind listener.");
+
 mainRudp = rudpMustOpenBound(&sai);
 mainRudp->maxRetries = 12;
 
 /* Event loop. */
 findNow();
 for (;;)
     {
     /* Get next incoming message and optionally check to make
      * sure that it's from a host we trust, and check signature
      * on first bit of incoming data. */
     if (pmReceive(&pmIn, mainRudp))
 	{
 	findNow();
-	if (hubName == NULL || ntohl(pmIn.ipAddress.sin_addr.s_addr) == hubIp 
-		|| ntohl(pmIn.ipAddress.sin_addr.s_addr) == localIp)
+	char pmIpStr[NI_MAXHOST];
+	getAddrAsString6n4(&pmIn.ipAddress, pmIpStr, sizeof pmIpStr);
+
+	if (hubName == NULL || sameString(pmIpStr, hubIp) || sameString(pmIpStr, localIp))
 	    {
 	    /* Host and signature look ok,  read a string and
 	     * parse out first word as command. */
 	    line = pmIn.data;
-	    logDebug("message from %s: \"%s\"",
-                     paraFormatIp(ntohl(pmIn.ipAddress.sin_addr.s_addr)),
-                     line);
+	    logDebug("message from %s: \"%s\"", pmIpStr, line);
 	    command = nextWord(&line);
 	    if (command != NULL)
 		{
 		if (sameString("quit", command))
 		    break;
 		else if (sameString("run", command))
 		    doRun(line, &pmIn.ipAddress);
 		else if (sameString("jobDone", command))
 		    jobDone(line);
 		else if (sameString("status", command))
 		    doStatus();
 		else if (sameString("kill", command))
 		    doKill(line);
 		else if (sameString("check", command))
 		    doCheck(line, &pmIn.ipAddress);
 		else if (sameString("resurrect", command))
 		    doResurrect(line, &pmIn.ipAddress);
 		else if (sameString("listJobs", command))
 		    listJobs();
 		else if (sameString("fetch", command))
 		    doFetch(line);
                 else
                     logWarn("invalid command: \"%s\"", command);
 		}
 	    logDebug("done command");
 	    }
 	else
 	    {
-	    logWarn("command from unauthorized host %s",
-                    paraFormatIp(ntohl(pmIn.ipAddress.sin_addr.s_addr)));
+	    logWarn("command from unauthorized host %s", pmIpStr);
 	    }
 	}
     }
 rudpClose(&mainRudp);
 }
 
 int main(int argc, char *argv[])
 /* Process command line. */
 {
 optionInit(&argc, argv, optionSpecs);
 if (argc != 2)
     usage();
 maxProcs = optionInt("cpu", 1);
 umaskVal = optionInt("umask", 0002);
 userPath = optionVal("userPath", userPath);
 sysPath = optionVal("sysPath", sysPath);
 envExtra = optionMultiVal("env", NULL);
 randomDelay = optionInt("randomDelay", randomDelay);
 
 /* Look up IP addresses. */
-localIp = lookupIp("localhost");
+lookupIp("localhost", localIp, sizeof localIp);
 hubName = optionVal("hub", NULL);
 if (hubName != NULL)
-    hubIp = lookupIp(hubName);
+    lookupIp(hubName, hubIp, sizeof hubIp);
 paraDaemonize("paraNode");
 paraNode();
 return 0;
 }