5359edc160de518d8e43fdd3448365c15b912c3c galt Mon Jul 22 11:48:10 2019 -0700 Added ipv6 support. Listening processes us hybrid dual stack feature of OS to simplify implementation and use a single listening socket. Works with both TCP and UDP. Parasol working. geoIp also updated and ready for IPv6. Should be invisible to most users, while providing connections via ipv6 where available. Supports both ipv4 and ipv6. diff --git src/parasol/paraNode/paraNode.c src/parasol/paraNode/paraNode.c index ade2c75..a7cbe4b 100644 --- src/parasol/paraNode/paraNode.c +++ src/parasol/paraNode/paraNode.c @@ -64,32 +64,32 @@ ); } /* Command line overwriteable variables. */ char *hubName; /* Name of hub machine, may be NULL. */ int umaskVal = 0002; /* File creation mask. */ int maxProcs = 1; /* Number of processers allowed to use. */ char *userPath = ""; /* User stuff to add to path. */ char *sysPath = ""; /* System stuff to add to path. */ struct slName *envExtra = NULL; /* Add to environment */ int randomDelay = 5000; /* How much to delay job startup. */ /* Other globals. */ char *hostName; /* Name of this machine. */ -in_addr_t hubIp; /* Hub IP address. */ -in_addr_t localIp; /* localhost IP address. */ +char hubIp[NI_MAXHOST]; /* Hub IP address as a string. */ +char localIp[NI_MAXHOST]; /* localhost IP address as a string. */ int busyProcs = 0; /* Number of processers in use. */ struct rudp *mainRudp; /* Rudp wrapper around main socket. */ struct paraMessage pmIn; /* Input message */ double ticksToHundreths; /* Conversion factor from system ticks * to 100ths of second. */ struct job /* Info on one job in this node. */ { struct job *next; /* Next job. */ int jobId; /* Job ID for hub. */ int pid; /* Process ID of running job. */ char *startMessage; /* Full message that started this job. */ char *doneMessage; /* Full message that ended this job if any. */ struct dlNode *node; /* Node for list this is on. */ @@ -446,76 +446,65 @@ if (grandChildId >= 0) { signal(SIGTERM, termHandler); cid = waitpid(grandChildId, &status, 0); if (cid < 0) errnoAbort("wait on grandchild failed"); times(&tms); uTime = ticksToHundreths*tms.tms_cutime; sTime = ticksToHundreths*tms.tms_cstime; } ru = rudpOpen(); if (ru != NULL) { ru->maxRetries = 20; - pmInit(&pm, localIp, paraNodePort); + pmInit(&pm, localIp, paraNodePortStr); pmPrintf(&pm, "jobDone %s %s %d %lu %lu", managingHost, jobIdString, status, uTime, sTime); pmSend(&pm, ru); rudpClose(&ru); } } } void clearZombies() /* Clear any zombie processes */ { int stat; for (;;) { if (waitpid(-1, &stat, WNOHANG) <= 0) break; } } -in_addr_t lookupIp(char *host) -/* Return IP address of host. */ +void lookupIp(char *host, char *ipStr, int ipStrSize) +/* convert host into IP address string. */ { -static char *lastHost = NULL; -static in_addr_t lastAddress; - -if (lastHost != NULL && sameString(lastHost, host)) - return lastAddress; -freez(&lastHost); -lastHost = cloneString(host); -lastAddress = internetHostIp(host); -return lastAddress; +struct sockaddr_storage sai; +if (!internetFillInAddress6n4(host, NULL, AF_UNSPEC, SOCK_DGRAM, &sai, FALSE)) + errAbort("host %s lookup failed.", host); +getAddrAsString6n4(&sai, ipStr, ipStrSize); } void tellManagerJobIsDone(char *managingHost, char *jobIdString, char *line) /* Try and send message to host saying job is done. */ { struct paraMessage pm; -bits32 ip; -if (!internetDottedQuadToIp(managingHost, &ip)) - { - warn("%s doesn't seem to be in dotted quad form\n", managingHost); - return; - } -pmInit(&pm, ip, paraHubPort); +pmInitFromName(&pm, managingHost, paraHubPortStr); pmPrintf(&pm, "jobDone %s %s", jobIdString, line); if (!pmSend(&pm, mainRudp)) warn("Couldn't send message to %s to say %s is done\n", managingHost, jobIdString); } void jobFree(struct job **pJob) /* Free up memory associated with job */ { struct job *job = *pJob; if (job != NULL) { freeMem(job->startMessage); freeMem(job->doneMessage); freeMem(job->node); freez(pJob); @@ -545,122 +534,126 @@ job->doneMessage = cloneString(line); dlRemove(job->node); if (dlCount(jobsFinished) >= 4*maxProcs) { struct dlNode *node = dlPopTail(jobsFinished); struct job *oldJob = node->val; jobFree(&oldJob); } dlAddHead(jobsFinished, job->node); --busyProcs; } tellManagerJobIsDone(managingHost, jobIdString, line); } } -void doCheck(char *line, struct sockaddr_in *hubIp) +void doCheck(char *line, struct sockaddr_storage *ipAddress) /* Send back check result - either a check in message or * jobDone. */ { char *jobIdString = nextWord(&line); if (jobIdString != NULL) { int jobId = atoi(jobIdString); struct job *job = findRunningJob(jobId); struct paraMessage pm; - pmInit(&pm, ntohl(hubIp->sin_addr.s_addr), paraHubPort); + + char ipStr[NI_MAXHOST]; + getAddrAsString6n4(ipAddress, ipStr, sizeof ipStr); + pmInit(&pm, ipStr, paraHubPortStr); if (job != NULL) pmPrintf(&pm, "checkIn %s %s running", hostName, jobIdString); else { struct job *job = findFinishedJob(jobId); if (job == NULL) pmPrintf(&pm, "checkIn %s %s free", hostName, jobIdString); else pmPrintf(&pm, "jobDone %s %s", jobIdString, job->doneMessage); } pmSend(&pm, mainRudp); } } -void doResurrect(char *line, struct sockaddr_in *hubIp) +void doResurrect(char *line, struct sockaddr_storage *ipAddress) /* Send back I'm alive message */ { struct paraMessage pm; struct dlNode *node; int jobsReported = 0; -pmInit(&pm, ntohl(hubIp->sin_addr.s_addr), paraHubPort); +char ipStr[NI_MAXHOST]; +getAddrAsString6n4(ipAddress, ipStr, sizeof ipStr); +pmInit(&pm, ipStr, paraHubPortStr); pmPrintf(&pm, "alive %s", hostName); for (node = jobsRunning->head; !dlEnd(node); node = node->next) { struct job *job = node->val; pmPrintf(&pm, " %d", job->jobId); ++jobsReported; } for (node = jobsFinished->head; !dlEnd(node); node = node->next) { struct job *job = node->val; if (jobsReported >= maxProcs) break; pmPrintf(&pm, " %d", job->jobId); ++jobsReported; } pmSend(&pm, mainRudp); } -void doRun(char *line, struct sockaddr_in *hubIp) +void doRun(char *line, struct sockaddr_storage *ipAddress) /* Execute command. */ { char *jobMessage = cloneString(line); static char *args[1024]; int argCount; -char hubDottedQuad[17]; +char ipStr[NI_MAXHOST]; +getAddrAsString6n4(ipAddress, ipStr, sizeof ipStr); nextRandom(); if (line == NULL) warn("Executing nothing..."); -else if (!internetIpToDottedQuad(ntohl(hubIp->sin_addr.s_addr), hubDottedQuad)) - warn("Can't convert ipToDottedQuad"); else { struct runJobMessage rjm; if (parseRunJobMessage(line, &rjm)) { int jobId = atoi(rjm.jobIdString); if (findRunningJob(jobId) == NULL && findFinishedJob(jobId) == NULL) { if (busyProcs < maxProcs) { int childPid; argCount = chopLine(rjm.command, args); if (argCount >= ArraySize(args)) warn("Too many arguments to run"); else { args[argCount] = NULL; if ((childPid = forkOrDie()) == 0) { /* Do JOB_ID substitutions */ struct subText *st = subTextNew("$JOB_ID", rjm.jobIdString); int i; rjm.in = subTextString(st, rjm.in); rjm.out = subTextString(st, rjm.out); rjm.err = subTextString(st, rjm.err); for (i=0; ijobId = atoi(rjm.jobIdString); job->pid = childPid; job->startMessage = jobMessage; jobMessage = NULL; /* No longer own memory. */ job->node = dlAddValTail(jobsRunning, job); ++busyProcs; } @@ -797,121 +790,121 @@ pmPrintf(&pmIn, "%s", job->startMessage); if (!pmSend(&pmIn, mainRudp)) return; pmClear(&pmIn); pmPrintf(&pmIn, "%s", job->doneMessage); if (!pmSend(&pmIn, mainRudp)) return; } } void paraNode() /* paraNode - a net server. */ { char *line; char *command; -struct sockaddr_in sai; +struct sockaddr_storage sai; /* We have to know who we are... */ hostName = getMachine(); initRandom(); getTicksToHundreths(); /* log init */ if (optionExists("log")) logOpenFile("paraNode", optionVal("log", NULL)); else logOpenSyslog("paraNode", optionVal("logFacility", NULL)); logSetMinPriority(optionVal("logMinPriority", "info")); logInfo("starting paraNode on %s", hostName); /* Make job lists. */ jobsRunning = newDlList(); jobsFinished = newDlList(); /* Set up socket and self to listen to it. */ ZeroVar(&sai); -sai.sin_family = AF_INET; -sai.sin_port = htons(paraNodePort); -sai.sin_addr.s_addr = INADDR_ANY; + +if (!internetFillInAddress6n4(NULL, paraNodePortStr, AF_INET6, SOCK_DGRAM, &sai, FALSE)) + errAbort("NULL host addrinfo lookup failed trying to bind listener."); + mainRudp = rudpMustOpenBound(&sai); mainRudp->maxRetries = 12; /* Event loop. */ findNow(); for (;;) { /* Get next incoming message and optionally check to make * sure that it's from a host we trust, and check signature * on first bit of incoming data. */ if (pmReceive(&pmIn, mainRudp)) { findNow(); - if (hubName == NULL || ntohl(pmIn.ipAddress.sin_addr.s_addr) == hubIp - || ntohl(pmIn.ipAddress.sin_addr.s_addr) == localIp) + char pmIpStr[NI_MAXHOST]; + getAddrAsString6n4(&pmIn.ipAddress, pmIpStr, sizeof pmIpStr); + + if (hubName == NULL || sameString(pmIpStr, hubIp) || sameString(pmIpStr, localIp)) { /* Host and signature look ok, read a string and * parse out first word as command. */ line = pmIn.data; - logDebug("message from %s: \"%s\"", - paraFormatIp(ntohl(pmIn.ipAddress.sin_addr.s_addr)), - line); + logDebug("message from %s: \"%s\"", pmIpStr, line); command = nextWord(&line); if (command != NULL) { if (sameString("quit", command)) break; else if (sameString("run", command)) doRun(line, &pmIn.ipAddress); else if (sameString("jobDone", command)) jobDone(line); else if (sameString("status", command)) doStatus(); else if (sameString("kill", command)) doKill(line); else if (sameString("check", command)) doCheck(line, &pmIn.ipAddress); else if (sameString("resurrect", command)) doResurrect(line, &pmIn.ipAddress); else if (sameString("listJobs", command)) listJobs(); else if (sameString("fetch", command)) doFetch(line); else logWarn("invalid command: \"%s\"", command); } logDebug("done command"); } else { - logWarn("command from unauthorized host %s", - paraFormatIp(ntohl(pmIn.ipAddress.sin_addr.s_addr))); + logWarn("command from unauthorized host %s", pmIpStr); } } } rudpClose(&mainRudp); } int main(int argc, char *argv[]) /* Process command line. */ { optionInit(&argc, argv, optionSpecs); if (argc != 2) usage(); maxProcs = optionInt("cpu", 1); umaskVal = optionInt("umask", 0002); userPath = optionVal("userPath", userPath); sysPath = optionVal("sysPath", sysPath); envExtra = optionMultiVal("env", NULL); randomDelay = optionInt("randomDelay", randomDelay); /* Look up IP addresses. */ -localIp = lookupIp("localhost"); +lookupIp("localhost", localIp, sizeof localIp); hubName = optionVal("hub", NULL); if (hubName != NULL) - hubIp = lookupIp(hubName); + lookupIp(hubName, hubIp, sizeof hubIp); paraDaemonize("paraNode"); paraNode(); return 0; }