f183ef95b2e4a00177a3e21a47b6272da746ce4c chmalee Mon Dec 13 15:21:15 2021 -0800 hgTracks track search on hubs: Angie noticed that calling pthread_join will wait for a still running thread to finish, which nullifies the point of having a timeout, while also discovering that if we have fewer threads than hubs to work through, we will never work through some hubs at all because of a missing loop in addUnconnectedHubSearchResults(). This commit rectifies both problems diff --git src/hg/hgTracks/searchTracks.c src/hg/hgTracks/searchTracks.c index 624f634..4609c28 100644 --- src/hg/hgTracks/searchTracks.c +++ src/hg/hgTracks/searchTracks.c @@ -256,71 +256,77 @@ pthread_t *threadId; // so we can stop the thread if it has been taking too long long searchTime; // how many milliseconds did it take to search this hub boolean done; }; // helper variables for connecting to hubs in parallel pthread_mutex_t pfdMutex = PTHREAD_MUTEX_INITIALIZER; struct paraFetchData *pfdListInitial = NULL; struct paraFetchData *pfdList = NULL; struct paraFetchData *pfdRunning = NULL; struct paraFetchData *pfdDone = NULL; void *addUnconnectedHubSearchResults(void *threadParam) /* Add a not yet connected hub to the search results */ { +pthread_t *thread = threadParam; struct paraFetchData *pfd = NULL; +// this thread will just happily keep working until waitForSearchResults() finishes, +// moving it's completed work onto pfdDone, so we can safely detach +pthread_detach(*thread); +boolean allDone = FALSE; +while(1) + { pthread_mutex_lock(&pfdMutex); // the wait function will set pfdList = NULL, so don't start up any more // stuff if that happens: -boolean allDone = FALSE; if (!pfdList) allDone = TRUE; else { pfd = slPopHead(&pfdList); pfd->threadId = threadParam; slAddHead(&pfdRunning, pfd); } pthread_mutex_unlock(&pfdMutex); if (allDone) return NULL; struct hubSearchTracks *hst = pfd->hst; struct track *tracksToAdd = NULL; long startTime = clock1000(); struct errCatch *errCatch = errCatchNew(); if (errCatchStart(errCatch)) { + pfd->done = FALSE; struct trackDb *tdbList = hubAddTracks(hst->hub, database); if (measureTiming) measureTime("After connecting to hub %s: '%d': ", hst->hubUrl, hst->hubId); // get composite and subtracks into trackList hubTdbListToTrackList(tdbList, &tracksToAdd, hst->searchedTracks); hubTdbListAddSupers(tdbList, &tracksToAdd, hst->searchedTracks); - pthread_mutex_lock(&pfdMutex); - pfd->tlist = tracksToAdd; pfd->done = TRUE; - if (measureTiming) - { + pfd->tlist = tracksToAdd; pfd->searchTime = clock1000() - startTime;; - measureTime("Finished finding tracks for hub '%s': ", pfd->hubName); } + errCatchEnd(errCatch); + pthread_mutex_lock(&pfdMutex); slRemoveEl(&pfdRunning, pfd); slAddHead(&pfdDone, pfd); + if (measureTiming) + measureTime("Finished finding tracks for hub '%s': ", pfd->hubName); pthread_mutex_unlock(&pfdMutex); } -errCatchEnd(errCatch); //always return NULL for pthread_create() return NULL; } static void hubSearchHashToPfdList(char *descSearch, struct hash *searchResultsHash, struct hash *hubLookup, struct sqlConnection *conn) /* getHubSearchResults() returned a hash of search hits to various hubs, convert that * into something we can work on in parallel */ { struct hubSearchTracks *ret = NULL; struct hashCookie cookie = hashFirst(searchResultsHash); struct hash *hubUrlsToTrackList = hashNew(0); struct hashEl *hel = NULL; struct dyString *trackName = dyStringNew(0); struct slName *connectedHubs = hubConnectHubsInCart(cart); @@ -413,38 +419,39 @@ if (lockStatus == 0) // we aquired the lock pthread_mutex_unlock(&pfdMutex); break; } if (lockStatus == 0) // release the lock if we got it pthread_mutex_unlock(&pfdMutex); } // now that we've waited the maximum time we need to kill // any running threads and add the results of any threads // that ran successfully lockStatus = pthread_mutex_trylock(&pfdMutex); struct paraFetchData *neverRan = pfdList; if (lockStatus == 0) { + // prevent still running threads from continuing + pfdList = NULL; if (measureTiming) fprintf(stdout, "<span class='timing'>Successfully aquired lock, adding any succesful thread data\n<br></span>"); for (pfd = pfdDone; pfd != NULL; pfd = pfd->next) { struct track *t; for (t = pfd->tlist; t != NULL; t = t->next) refAdd(&tracks, t); - pthread_join(*pfd->threadId, NULL); if (measureTiming) measureTime("'%s' search times", pfd->hubName); } for (pfd = pfdRunning; pfd != NULL; pfd = pfd->next) { pthread_cancel(*pfd->threadId); if (measureTiming) measureTime("'%s' search times: timed out", pfd->hubName); } for (pfd = neverRan; pfd != NULL; pfd = pfd->next) if (measureTiming) measureTime("'%s' search times: never ran", pfd->hubName); } else {