5359edc160de518d8e43fdd3448365c15b912c3c galt Mon Jul 22 11:48:10 2019 -0700 Added ipv6 support. Listening processes us hybrid dual stack feature of OS to simplify implementation and use a single listening socket. Works with both TCP and UDP. Parasol working. geoIp also updated and ready for IPv6. Should be invisible to most users, while providing connections via ipv6 where available. Supports both ipv4 and ipv6. diff --git src/parasol/paraHub/paraHub.c src/parasol/paraHub/paraHub.c index 1ebe5b5..3375595 100644 --- src/parasol/paraHub/paraHub.c +++ src/parasol/paraHub/paraHub.c @@ -126,30 +126,31 @@ "Where machine list is a file with the following columns:\n" " name - Network name\n" " cpus - Number of CPUs we can use\n" " ramSize - Megabytes of memory\n" " tempDir - Location of (local) temp dir\n" " localDir - Location of local data dir\n" " localSize - Megabytes of local disk\n" " switchName - Name of switch this is on\n" "\n" "options:\n" " -spokes=N Number of processes that feed jobs to nodes - default %d.\n" " -jobCheckPeriod=N Minutes between checking on job - default %d.\n" " -machineCheckPeriod=N Minutes between checking on machine - default %d.\n" " -subnet=XXX.YYY.ZZZ Only accept connections from subnet (example 192.168).\n" " Or CIDR notation (example 192.168.1.2/24).\n" + " Supports comma-separated list of IPv4 or IPv6 subnets in CIDR notation.\n" " -nextJobId=N Starting job ID number.\n" " -logFacility=facility Log to the specified syslog facility - default local0.\n" " -logMinPriority=pri minimum syslog priority to log, also filters file logging.\n" " defaults to \"warn\"\n" " -log=file Log to file instead of syslog.\n" " -debug Don't daemonize\n" " -noResume Don't try to reconnect with jobs running on nodes.\n" " -ramUnit=N Number of bytes of RAM in the base unit used by the jobs.\n" " Default is RAM on node divided by number of cpus on node.\n" " Shorthand expressions allow t,g,m,k for tera, giga, mega, kilo.\n" " e.g. 4g = 4 Gigabytes.\n" " -defaultJobRam=N Number of ram units in a job has no specified ram usage.\n" " Defaults to 1.\n" , version, initialSpokes, jobCheckPeriod, machineCheckPeriod @@ -217,30 +218,39 @@ freeMachines = newDlList(); readyMachines = newDlList(); blockedMachines = newDlList(); busyMachines = newDlList(); deadMachines = newDlList(); runningJobs = newDlList(); hangJobs = newDlList(); freeSpokes = newDlList(); busySpokes = newDlList(); deadSpokes = newDlList(); queuedUsers = newDlList(); unqueuedUsers = newDlList(); userHash = newHash(6); } +void lookupIp(char *host, char *ipStr, int ipStrSize) +/* convert host into IP address string. */ +{ +struct sockaddr_storage sai; +if (!internetFillInAddress6n4(host, NULL, AF_UNSPEC, SOCK_DGRAM, &sai, FALSE)) + errAbort("host %s lookup failed.", host); +getAddrAsString6n4(&sai, ipStr, ipStrSize); +} + int avgBatchTime(struct batch *batch) /**/ { if (batch->doneCount == 0) return 0; return batch->doneTime / batch->doneCount; } boolean nodeSickOnAllBatches(struct user *user, char *machineName) /* Return true if all of a user's current batches believe the machine is sick. */ { struct dlNode *node = user->curBatches->head; if (dlEnd(node)) return FALSE; for (; !dlEnd(node); node = node->next) { @@ -1122,37 +1132,37 @@ void machineFree(struct machine **pMach) /* Delete machine structure. */ { struct machine *mach = *pMach; if (mach != NULL) { freeMem(mach->node); freeMem(mach->name); freeMem(mach->tempDir); machSpecFree(&mach->machSpec); freeDlList(&mach->jobs); freez(pMach); } } -struct machine *doAddMachine(char *name, char *tempDir, bits32 ip, struct machSpec *m) +struct machine *doAddMachine(char *name, char *tempDir, char *ipStr, struct machSpec *m) /* Add machine to pool. If you don't know ip yet just pass * in 0 for that argument. */ { struct machine *mach; mach = machineNew(name, tempDir, m); -mach->ip = ip; +safecpy(mach->ipStr, sizeof mach->ipStr, ipStr); dlAddTail(freeMachines, mach->node); slAddHead(&machineList, mach); needsPlanning = TRUE; return mach; } void addMachine(char *line) /* Process message to add machine to pool. */ { char *name = nextWord(&line); if (hashLookup(machineHash, name)) /* ignore duplicate machines */ { warn("machine already added: %s", name); return; } @@ -1177,31 +1187,31 @@ m->name = cloneString(name); m->cpus = atoi(param2); m->ramSize = atoi(nextWord(&line)); m->tempDir = cloneString(nextWord(&line)); m->localDir = cloneString(nextWord(&line)); m->localSize = atoi(nextWord(&line)); m->switchName = cloneString(nextWord(&line)); if (!m->switchName) { freeMem(m); warn("incomplete addMachine request"); return; } } -doAddMachine(name, m->tempDir, 0, m); +doAddMachine(name, m->tempDir, "0", m); // "0" means no ipStr here runner(1); } struct machine *findMachine(char *name) /* Find named machine. */ { struct machine *mach; for (mach = machineList; mach != NULL; mach = mach->next) { if (sameString(mach->name, name)) return mach; } return NULL; } @@ -2988,36 +2998,36 @@ { int i; for (i=0; iname); + char ipStr[NI_MAXHOST]; + lookupIp(ms->name, ipStr, sizeof ipStr); if (hashLookup(machineHash, ms->name)) errAbort("machine list contains duplicate: %s", ms->name); - struct machine *machine = doAddMachine(ms->name, ms->tempDir, ip, ms); + struct machine *machine = doAddMachine(ms->name, ms->tempDir, ipStr, ms); hashStoreName(machineHash, ms->name); // TODO Add a command-line param for these that overrides default? /* use first machine in spec list as model node */ if (firstTime) { firstTime = FALSE; cpuUnit = 1; /* 1 CPU */ if (!optionExists("ramUnit")) ramUnit = ((long long)machine->machSpec->ramSize * 1024 * 1024) / machine->machSpec->cpus; defaultJobCpu = 1; /* number of cpuUnits in default job usage */ /* number of ramUnits in default job usage, resolves to just 1 currently */ if (!optionExists("defaultJobRam")) defaultJobRam = (((long long)machine->machSpec->ramSize * 1024 * 1024) / machine->machSpec->cpus) / ramUnit; } @@ -3200,31 +3210,31 @@ * ones to results file if necessary. * * Format of message is * running count * one line for each running job. * recent count * two lines for each recent job. */ { int running, recent, i, finCount = 0; struct runJobMessage rjm; char resultsFile[512]; struct paraMultiMessage pmm; /* ensure the multi-message response comes from the correct ip and has no duplicate msgs*/ -pmmInit(&pmm, pm, pm->ipAddress.sin_addr); +pmmInit(&pmm, pm); if (!pmmReceiveTimeOut(&pmm, ru, 2000000)) { warn("%s: no listJobs response", mach->name); return FALSE; } running = atoi(pm->data); for (i=0; idata, &rjm)) @@ -3280,55 +3290,55 @@ void checkForJobsOnNodes() /* Poll nodes and see if they have any jobs for us. */ { struct machine *mach; int running = 0, finished = 0; struct hash *erHash = newHashExt(8, FALSE); /* A hash of existingResults */ struct existingResults *erList = NULL; logDebug("Checking for jobs already running on nodes"); for (mach = machineList; mach != NULL; mach = mach->next) { struct paraMessage pm; struct rudp *ru = rudpNew(rudpOut->socket); /* Get own resend timing */ logDebug("check for jobs on %s", mach->name); - pmInitFromName(&pm, mach->name, paraNodePort); + pmInitFromName(&pm, mach->name, paraNodePortStr); if (!pmSendString(&pm, ru, "listJobs")) { machineDown(mach); continue; } if (!processListJobs(mach, &pm, rudpOut, erHash, &erList, &running, &finished)) machineDown(mach); rudpFree(&ru); } /* Clean up time. */ existingResultsFreeList(&erList); hashFree(&erHash); needsPlanning = TRUE; /* Report results. */ logDebug("%d running jobs, %d jobs that finished while hub was down", running, finished); } void startHub(char *machineList) /* Do hub daemon - set up socket, and loop around on it until we get a quit. */ { -struct sockaddr_in sai; +struct sockaddr_storage sai; char *line, *command; struct rudp *rudpIn = NULL; /* Note startup time. */ findNow(); startupTime = now; /* Find name and IP address of our machine. */ hubHost = getMachine(); if (optionExists("log")) logOpenFile("paraHub", optionVal("log", NULL)); else logOpenSyslog("paraHub", optionVal("logFacility", NULL)); logSetMinPriority(optionVal("logMinPriority", "info")); logInfo("starting paraHub on %s", hubHost); @@ -3337,33 +3347,34 @@ hubMessageQueueInit(); stringHash = newHash(0); setupLists(); machineHash = newHash(0); startMachines(machineList); openJobId(); logInfo("next job ID is %d.", nextJobId); rudpOut = rudpMustOpen(); if (!optionExists("noResume")) checkForJobsOnNodes(); /* Initialize socket etc. */ ZeroVar(&sai); -sai.sin_family = AF_INET; -sai.sin_port = htons(paraHubPort); -sai.sin_addr.s_addr = INADDR_ANY; + +if (!internetFillInAddress6n4(NULL, paraHubPortStr, AF_INET6, SOCK_DGRAM, &sai, FALSE)) + errAbort("NULL host addrinfo lookup failed trying to bind listener."); + rudpIn = rudpMustOpenBound(&sai); /* Start up daemons. */ sockSuckStart(rudpIn); startHeartbeat(); startSpokes(); logDebug("sockSuck,Heartbeat,Spokes have been started"); /* Bump up our priority to just shy of real-time. */ (void) nice(-40); // ignore return value /* Main event loop. */ for (;;) { @@ -3444,33 +3455,39 @@ break; else warn("Unrecognized command %s", command); pmFree(&pm); } endHeartbeat(); killSpokes(); saveJobId(); #ifdef SOON #endif /* SOON */ } void fillInSubnet() /* Parse subnet paramenter if any into subnet variable. */ { +char hubSubnetStr[1024]; char *sns = optionVal("subnet", NULL); -hubSubnet = internetParseSubnetCidr(sns); -localHostSubnet = internetParseSubnetCidr("127.0.0.1"); /* Address for local host */ +char *localHostSubnet = "127.0.0.1,::1/128"; /* Address for local host */ +if (sns) + safef(hubSubnetStr, sizeof hubSubnetStr, "%s,%s", localHostSubnet, sns); +else + safef(hubSubnetStr, sizeof hubSubnetStr, "%s", localHostSubnet); + +hubSubnet = internetParseSubnetCidr(hubSubnetStr); } int main(int argc, char *argv[]) /* Process command line. */ { optionInit(&argc, argv, optionSpecs); if (argc < 2) usage(); if (optionExists("ramUnit")) { ramUnit = paraParseRam(optionVal("ramUnit", "")); if (ramUnit == -1) errAbort("Invalid RAM expression '%s' in '-ramUnit=' option", optionVal("ramUnit", "")); } if (optionExists("defaultJobRam"))