4898794edd81be5285ea6e544acbedeaeb31bf78 max Tue Nov 23 08:10:57 2021 -0800 Fixing pointers to README file for license in all source code files. refs #27614 diff --git src/hg/encode3/eap/eapDaemon/eapDaemon.c src/hg/encode3/eap/eapDaemon/eapDaemon.c index df468b4..821976e 100644 --- src/hg/encode3/eap/eapDaemon/eapDaemon.c +++ src/hg/encode3/eap/eapDaemon/eapDaemon.c @@ -1,338 +1,338 @@ /* eapDaemon - Run jobs remotely via parasol based on jobs in table.. */ /* Copyright (C) 2014 The Regents of the University of California - * See README in this or parent directory for licensing information. */ + * See kent/LICENSE or http://genome.ucsc.edu/license/ for licensing information. */ #include <sys/wait.h> #include "common.h" #include "linefile.h" #include "hash.h" #include "options.h" #include "errAbort.h" #include "portable.h" #include "rbTree.h" #include "obscure.h" #include "net.h" #include "log.h" #include "sqlNum.h" #include "../../encodeDataWarehouse/inc/encodeDataWarehouse.h" #include "../../encodeDataWarehouse/inc/edwLib.h" #include "verbose.h" #include "../../../../parasol/inc/jobResult.h" #include "../../../../parasol/inc/paraMessage.h" #include "eapDb.h" #include "eapLib.h" /* Variables set from command line */ char *clDatabase = NULL; char *clTable = NULL; char *clParaHost = NULL; void usage() /* Explain usage and exit. */ { errAbort( "eapDaemon - Run jobs remotely via parasol based on jobs in table.\n" "usage:\n" " eapDaemon count\n" "where:\n" " count - number of simultanious jobs to run\n" "options:\n" " -database=%s - mySQL database where eapJob table lives\n" " -table=%s - table in eapJob format to use\n" " -debug - don't fork, close stdout, and daemonize self\n" " -log=logFile - send error messages and warnings of daemon itself to logFile\n" " There are not many of these. Error mesages from jobs daemon runs end up\n" " in errorMessage fields of database.\n" " -logFacility - sends error messages and such to system log facility instead.\n" " -paraHost - machine running parasol (paraHub in particular)\n" , edwDatabase, eapJobTable ); } /* Command line validation table. */ static struct optionSpec options[] = { {"database", OPTION_STRING}, {"table", OPTION_STRING}, {"log", OPTION_STRING}, {"logFacility", OPTION_STRING}, {"debug", OPTION_BOOLEAN}, {"paraHost", OPTION_STRING}, {NULL, 0}, }; int cmpByParasolId(void *a, void *b) /* Set up rbTree so as to work on strings. */ { struct eapJob *aJob = a, *bJob = b; return strcmp(aJob->parasolId, bJob->parasolId); } struct resultsQueue /* A file with job results in it */ { struct resultsQueue *next; char *fileName; /* Results file shared with parasol. */ long long pos; /* Current position in file. */ boolean believePos; /* Set to true if you actually trust position */ }; struct resultsQueue *resultsQueueNew(char *name, boolean seekToEnd) /* Return a new results queue */ { struct resultsQueue *queue; AllocVar(queue); queue->fileName = cloneString(name); if (fileExists(queue->fileName)) { if (seekToEnd) queue->pos = fileSize(queue->fileName); } else { char dir[PATH_LEN],root[FILENAME_LEN],ext[FILEEXT_LEN]; splitPath(name, dir, root, ext); makeDirsOnPath(dir); FILE *f = mustOpen(name, "a"); carefulClose(&f); } return queue; } struct resultsQueue *allQueues = NULL; struct resultsQueue *findOrCreateQueue(char *name, boolean seekToEndOnNew) /* Find name in existing list of results queue or if it's not there make it. */ { struct resultsQueue *queue; for (queue = allQueues; queue != NULL; queue = queue->next) { if (sameString(queue->fileName, name)) return queue; } queue = resultsQueueNew(name, seekToEndOnNew); slAddHead(&allQueues, queue); return queue; } void sendToParasol(struct eapJob *job, struct resultsQueue *queue) /* Add job to current parasol queue. Sets job->parasolId and saves it * to database. */ { /* Prepare parasol add job command and send it to hub. */ int cpu = (job->cpusRequested ? job->cpusRequested : 1); long long ram = 8LL * 1024 * 1024 * 1024; struct dyString *cmd = dyStringNew(1024); dyStringPrintf(cmd, "addJob2 %s %s /dev/null /dev/null %s %f %lld %s", getUser(), getCurrentDir(), queue->fileName, (double)cpu, ram, job->commandLine); char *jobIdString = pmHubSingleLineQuery(cmd->string, clParaHost); verbose(1, "%s: %s\n", jobIdString, cmd->string); /* Check for sick batch result */ if (sameString(jobIdString, "0")) { warn("command: %s", cmd->string); errAbort("Looks like paraHub has decided we're sick. It's usually right."); } /* Save to data structure in memory and to database. */ job->parasolId = cloneString(jobIdString); struct sqlConnection *conn = sqlConnect(clDatabase); char query[512]; sqlSafef(query, sizeof(query), "update %s set startTime=%lld, parasolId='%s' where id=%u", clTable, edwNow(), jobIdString, job->id); sqlUpdate(conn, query); sqlDisconnect(&conn); dyStringFree(&cmd); } void finishJob(struct eapJob *job, struct jobResult *jr) /* Move parasol job result into eapJob table */ { int status = -1; if (WIFEXITED(jr->status)) status = WEXITSTATUS(jr->status); /* Save end time and process status and errFile to database. */ struct sqlConnection *conn = sqlConnect(clDatabase); char query[512]; sqlSafef(query, sizeof(query), "update %s set endTime=%lld, returnCode=%d, stderr='%s:%s' where id=%u", clTable, edwNow(), status, jr->host, jr->errFile, job->id); sqlUpdate(conn, query); sqlDisconnect(&conn); } int finishJobsWithResults(struct rbTree *running, struct resultsQueue *queueList) /* Go through all results queues and look for jobs that are in our running container * that have finished. For these jobs transport results to database and remove them * from container. Returns the number of finished jobs. */ { int finishedCount = 0; struct resultsQueue *queue; for (queue = queueList; queue != NULL; queue = queue->next) { struct lineFile *lf = lineFileOpen(queue->fileName, TRUE); lineFileSeek(lf, queue->pos, SEEK_SET); for (;;) { char *line; if (!lineFileNextReal(lf, &line)) break; char *row[JOBRESULT_NUM_COLS]; int wordsRead = chopByWhite(line, row, ArraySize(row)); if (wordsRead < ArraySize(row)) { if (queue->believePos) errAbort("%s seems screwed up, expecting %d words in line got %d", lf->fileName, (int)ArraySize(row), wordsRead); else { continue; // Forgive bad first line in case initial position, based on file size // is not at an even line boundary. } } queue->believePos = TRUE; struct jobResult *jr; jr = jobResultLoad(row); struct eapJob jobKey = {.parasolId = jr->jobId,}; struct eapJob *job = rbTreeFind(running, &jobKey); if (job != NULL) { finishJob(job, jr); rbTreeRemove(running, job); eapJobFree(&job); finishedCount += 1; } jobResultFree(&jr); } queue->pos = lineFileTell(lf); lineFileClose(&lf); } return finishedCount; } int checkFreeThreads(struct rbTree *running, struct resultsQueue *queueList, boolean mustWait) /* Wait on the queue for one of our jobs to come in. */ { int freeCount = 0; int pollTime = 3; // Poll every 3 seconds for file to open. while (freeCount == 0) { freeCount = finishJobsWithResults(running, queueList); verbose(2, "waiting for %d\n", pollTime); if (!mustWait) break; if (freeCount == 0) sleep(pollTime); } return freeCount; } void eapDaemon(char *countString) /* eapDaemon - Run jobs remotely via parasol based on jobs in table.. */ { verbose(1, "Starting eapDaemon v16 on %s %s with %s threads.\n", clDatabase, clTable, countString); int maxThreads = sqlUnsigned(countString); /* On start up we first try to connect to any jobs that were started but not finished * at time daemon died last time around. */ /* Look for any jobs mentioned in table as started but not finished */ char query[256]; struct sqlConnection *conn = sqlConnect(clDatabase); sqlSafef(query, sizeof(query), "select * from %s where startTime > 0 and endTime = 0 order by id", clTable); struct eapJob *oldJob, *oldJobList = eapJobLoadByQuery(conn, query); verbose(1, "Got %d old jobs in %s to reconnect to\n", slCount(oldJobList), clTable); /* Set up rbTree as a convenient quick access container for jobs */ struct rbTree *running = rbTreeNew(cmpByParasolId); /* Add old jobs to container and get associated queues */ for (oldJob = oldJobList; oldJob != NULL; oldJob = oldJob->next) { char *commandName = eapStepFromCommandLine(oldJob->commandLine); char queueName[PATH_LEN]; safef(queueName, sizeof(queueName), "%s/%s/results", eapParaDirs(conn), commandName); findOrCreateQueue(queueName, FALSE); rbTreeAdd(running, oldJob); freez(&commandName); } oldJobList = NULL; // We've transfered ownership of jobs in list to container int oldFinishedCount = finishJobsWithResults(running, allQueues); // This updates database /* Figure out jobs that are *still* running in one of our batches. These * we'll add to our count of running threads. */ oldJobList = eapJobLoadByQuery(conn, query); // Reload hopefully diminished old job list struct hash *currentlyRunningHash = eapParasolRunningHash(clParaHost, NULL); int oldRunningCount = 0; for (oldJob = oldJobList; oldJob != NULL; oldJob = oldJob->next) { if (hashLookup(currentlyRunningHash, oldJob->parasolId) != NULL) oldRunningCount += 1; } hashFree(¤tlyRunningHash); eapJobFree(&oldJobList); sqlDisconnect(&conn); verbose(1, "Reconnected to %d finished and %d running old jobs\n", oldFinishedCount, oldRunningCount); /* Ok, we are done with all the old business. Now onto the main loop where * where we poll parasol for finished jobs and database for new jobs. */ int threadCount = oldRunningCount; for (;;) { /* Process finished jobs, if need be waiting for some to finish. */ boolean tooManyThreads = (threadCount >= maxThreads); int freeThreads = checkFreeThreads(running, allQueues, tooManyThreads); threadCount -= freeThreads; /* Get next job from database. */ conn = sqlConnect(clDatabase); // It may have been a while so open fresh connection sqlSafef(query, sizeof(query), "select * from %s where startTime = 0 order by id limit 1", clTable); struct eapJob *job = eapJobLoadByQuery(conn, query); if (job != NULL) { /* Cool, got a job, send it to parasol and track it in running container */ char *commandName = eapStepFromCommandLine(job->commandLine); char queueName[PATH_LEN]; safef(queueName, sizeof(queueName), "%s/%s/results", eapParaDirs(conn), commandName); struct resultsQueue *queue = findOrCreateQueue(queueName, TRUE); sendToParasol(job, queue); rbTreeAdd(running, job); threadCount += 1; freez(&commandName); } else { /* No new jobs, maybe we'll nap for 10 seconds */ sleep(10); } sqlDisconnect(&conn); } } int main(int argc, char *argv[]) /* Process command line. */ { optionInit(&argc, argv, options); if (argc != 2) usage(); clParaHost = optionVal("paraHost", eapParaHost); clDatabase = optionVal("database", edwDatabase); clTable = optionVal("table", eapJobTable); logDaemonize(argv[0]); if (optionExists("log")) verboseSetLogFile(optionVal("log", NULL)); eapDaemon(argv[1]); return 0; }