e72cf0100e06d6fadb7282d4e7eb2c592f320951 galt Mon Jul 4 13:08:35 2011 -0700 Adding parallel-fetch loading of remote bigDataUrl tracks using pthreads diff --git src/hg/hgTracks/hgTracks.c src/hg/hgTracks/hgTracks.c index 50f8763..9c1bb2f 100644 --- src/hg/hgTracks/hgTracks.c +++ src/hg/hgTracks/hgTracks.c @@ -1,21 +1,22 @@ /* hgTracks - the original, and still the largest module for the UCSC Human Genome * Browser main cgi script. Currently contains most of the track framework, though * there's quite a bit of other framework type code in simpleTracks.c. The main * routine got moved to create a new entry point to the bulk of the code for the * hgRenderTracks web service. See mainMain.c for the main used by the hgTracks CGI. */ +#include <pthread.h> #include "common.h" #include "hCommon.h" #include "linefile.h" #include "portable.h" #include "memalloc.h" #include "localmem.h" #include "obscure.h" #include "dystring.h" #include "hash.h" #include "jksql.h" #include "gfxPoly.h" #include "memgfx.h" #include "hvGfx.h" #include "psGfx.h" #include "cheapcgi.h" @@ -4334,30 +4335,206 @@ // Unfortunately, since supertracks are not in trackList, this occurs on superChildren, // So now we need to find the supertrack and take changed cart values of its children struct slRef *childRef; for(childRef = track->tdb->parent->children;childRef != NULL;childRef = childRef->next) { struct trackDb * childTdb = childRef->val; struct track *child = hashFindVal(trackHash, childTdb->track); char *cartVis = cartOptionalString(cart,child->track); if (cartVis) child->visibility = hTvFromString(cartVis); } } } } + +struct paraFetchData + { + struct paraFetchData *next; + struct track *track; + boolean done; + }; + +static boolean isTrackForParallelLoad(struct track *track) +/* Is this a track that should be loaded in parallel ? */ +{ +char *bdu = trackDbSetting(track->tdb, "bigDataUrl"); +return (startsWithWord("bigWig" , track->tdb->type) + || startsWithWord("bigBed" , track->tdb->type) + || startsWithWord("bam" , track->tdb->type) + || startsWithWord("vcfTabix", track->tdb->type)) + && (bdu && strstr(bdu,"://")) + && (track->subtracks == NULL); +} + +static void findLeavesForParallelLoad(struct track *trackList, struct paraFetchData **ppfdList) +/* Find leaves of track tree that are remote network resources for parallel-fetch loading */ +{ +struct track *track; +if (!trackList) + return; +for (track = trackList; track != NULL; track = track->next) + { + + if (track->visibility != tvHide) + { + if (isTrackForParallelLoad(track)) + { + struct paraFetchData *pfd; + AllocVar(pfd); + pfd->track = track; // need pointer to be stable + slAddHead(ppfdList, pfd); + track->parallelLoading = TRUE; + } + struct track *subtrack; + for (subtrack=track->subtracks; subtrack; subtrack=subtrack->next) + { + if (isSubtrackVisible(subtrack)) + { + if (isTrackForParallelLoad(subtrack)) + { + struct paraFetchData *pfd; + AllocVar(pfd); + pfd->track = subtrack; // need pointer to be stable + slAddHead(ppfdList, pfd); + subtrack->parallelLoading = TRUE; + } + } + } + } + } +} + +static pthread_mutex_t pfdMutex = PTHREAD_MUTEX_INITIALIZER; +static struct paraFetchData *pfdList = NULL, *pfdRunning = NULL, *pfdDone = NULL, *pfdNeverStarted = NULL; + +static void *remoteParallelLoad(void *threadParam) +/* Each thread loads tracks in parallel until all work is done. */ +{ +pthread_t *pthread = threadParam; +struct paraFetchData *pfd = NULL; +pthread_detach(*pthread); // this thread will never join back with it's progenitor + // Canceled threads that might leave locks behind, + // so the theads are detached and will be neither joined nor canceled. +boolean allDone = FALSE; +while(1) + { + pthread_mutex_lock( &pfdMutex ); + if (!pfdList) + { + allDone = TRUE; + } + else + { // move it from the waiting queue to the running queue + pfd = slPopHead(&pfdList); + slAddHead(&pfdRunning, pfd); + } + pthread_mutex_unlock( &pfdMutex ); + if (allDone) + return NULL; + + long thisTime = 0, lastTime = 0; + + if (measureTiming) + lastTime = clock1000(); + + /* protect against errAbort */ + struct errCatch *errCatch = errCatchNew(); + if (errCatchStart(errCatch)) + { + pfd->done = FALSE; + checkMaxWindowToDraw(pfd->track); + pfd->track->loadItems(pfd->track); + pfd->done = TRUE; + } + errCatchEnd(errCatch); + if (errCatch->gotError) + { + pfd->track->networkErrMsg = cloneString(errCatch->message->string); + pfd->done = TRUE; + } + errCatchFree(&errCatch); + + if (measureTiming) + { + thisTime = clock1000(); + pfd->track->loadTime = thisTime - lastTime; + } + + pthread_mutex_lock( &pfdMutex ); + slRemoveEl(&pfdRunning, pfd); // this list will not be huge + slAddHead(&pfdDone, pfd); + pthread_mutex_unlock( &pfdMutex ); + + } +} + +static int remoteParallelLoadWait(int maxTimeInSeconds) +/* Wait, checking to see if finished (completed or errAborted). + * If timed-out or never-ran, record error status. + * Return error count. */ +{ +int maxTimeInMilliseconds = 1000 * maxTimeInSeconds; +struct paraFetchData *pfd; +int errCount = 0; +int waitTime = 0; +while(1) + { + sleep1000(50); // milliseconds + waitTime += 50; + boolean done = TRUE; + pthread_mutex_lock( &pfdMutex ); + if (pfdList || pfdRunning) + done = FALSE; + pthread_mutex_unlock( &pfdMutex ); + if (done) + break; + if (waitTime >= maxTimeInMilliseconds) + break; + } +pthread_mutex_lock( &pfdMutex ); +pfdNeverStarted = pfdList; +pfdList = NULL; // stop the workers from starting any more waiting track loads +for (pfd = pfdNeverStarted; pfd; pfd = pfd->next) + { + // track was never even started + char temp[256]; + safef(temp, sizeof temp, "Ran out of time (%d milliseconds) unable to process %s", maxTimeInMilliseconds, pfd->track->track); + pfd->track->networkErrMsg = cloneString(temp); + ++errCount; + } +for (pfd = pfdRunning; pfd; pfd = pfd->next) + { + // unfinished track + char temp[256]; + safef(temp, sizeof temp, "Timeout %d milliseconds exceeded processing %s", maxTimeInMilliseconds, pfd->track->track); + pfd->track->networkErrMsg = cloneString(temp); + ++errCount; + } +for (pfd = pfdDone; pfd; pfd = pfd->next) + { + // some done tracks may have errors + if (pfd->track->networkErrMsg) + ++errCount; + } +pthread_mutex_unlock( &pfdMutex ); +return errCount; +} + + + void doTrackForm(char *psOutput, struct tempName *ideoTn) /* Make the tracks display form with the zoom/scroll buttons and the active * image. If the ideoTn parameter is not NULL, it is filled in if the * ideogram is created. */ { struct group *group; struct track *track; char *freezeName = NULL; boolean hideAll = cgiVarExists("hgt.hideAll"); boolean defaultTracks = cgiVarExists("hgt.reset"); boolean showedRuler = FALSE; boolean showTrackControls = cartUsualBoolean(cart, "trackControlsOnMain", TRUE); long thisTime = 0, lastTime = 0; char *clearButtonJavascript; @@ -4437,58 +4614,96 @@ if(imgBoxPortalDefine(theImgBox,&winStart,&winEnd,&(tl.picWidth),0)) { winBaseCount = winEnd - winStart; insideWidth = tl.picWidth-gfxBorder-insideX; } #endif//def IMAGEv2_DRAG_SCROLL } char *jsCommand = cartCgiUsualString(cart, hgtJsCommand, ""); if (!isEmpty(jsCommand)) { cartRemove(cart, hgtJsCommand); jsCommandDispatch(jsCommand, trackList); } + /* Tell tracks to load their items. */ + +/* adjust visibility */ for (track = trackList; track != NULL; track = track->next) { /* adjust track visibility based on supertrack just before load loop */ if (tdbIsSuperTrackChild(track->tdb)) limitSuperTrackVis(track); /* remove cart priority variables if they are set to the default values in the trackDb */ if(!hTrackOnChrom(track->tdb, chromName)) { track->limitedVis = tvHide; track->limitedVisSet = TRUE; } - else if (track->visibility != tvHide) + } +/* pre-load remote tracks in parallel */ +int ptMax = atoi(cfgOptionDefault("parallelFetch.threads", "20")); // default number of threads for parallel fetch. +pthread_t *threads = NULL; +if (ptMax > 0) // parallelFetch.threads=0 to disable parallel fetch + { + findLeavesForParallelLoad(trackList, &pfdList); + /* launch parallel threads */ + ptMax = min(ptMax, slCount(pfdList)); + if (ptMax > 0) + { + AllocArray(threads, ptMax); + int pt; + for (pt = 0; pt < ptMax; ++pt) + { + int rc = pthread_create(&threads[pt], NULL, remoteParallelLoad, &threads[pt]); + if (rc) + { + errAbort("Unexpected error %d from pthread_create(): %s",rc,strerror(rc)); + } + } + } + } +/* load regular tracks */ +for (track = trackList; track != NULL; track = track->next) + { + if (track->visibility != tvHide) + { + if (!track->parallelLoading) { if (measureTiming) lastTime = clock1000(); + checkMaxWindowToDraw(track); track->loadItems(track); if (measureTiming) { thisTime = clock1000(); track->loadTime = thisTime - lastTime; } } } + } +if (ptMax > 0) + { + /* wait for remote parallel load to finish */ + remoteParallelLoadWait(atoi(cfgOptionDefault("parallelFetch.timeout", "90"))); // wait up to default 90 seconds. + } printTrackInitJavascript(trackList); /* Generate two lists of hidden variables for track group visibility. Kludgy, but required b/c we have two different navigation forms on this page, but we want open/close changes in the bottom form to be submitted even if the user submits via the top form. */ struct dyString *trackGroupsHidden1 = newDyString(1000); struct dyString *trackGroupsHidden2 = newDyString(1000); for (group = groupList; group != NULL; group = group->next) { if (group->trackList != NULL) { int looper; for(looper=1;looper<=2;looper++) @@ -4744,30 +4959,33 @@ hButtonWithMsg("hgt.reset", "default tracks","Display only default tracks"); hPrintf(" "); hButtonWithMsg("hgt.defaultImgOrder", "default order","Display current tracks in their default order"); // if (showTrackControls) - always show "hide all", Hiram 2008-06-26 { hPrintf(" "); hButtonWithMsg("hgt.hideAll", "hide all","Hide all currently visibile tracks"); } hPrintf(" "); hPrintf("<INPUT TYPE='button' VALUE='%s' onClick='document.customTrackForm.submit();return false;' title='%s'>", hasCustomTracks ? CT_MANAGE_BUTTON_LABEL : CT_ADD_BUTTON_LABEL, hasCustomTracks ? "Manage your custom tracks" : "Add your own custom tracks"); hPrintf(" "); + hPrintf("<INPUT TYPE='button' VALUE='import tracks' onClick='document.trackHubForm.submit();return false;' title='Import tracks from hubs'>"); + + hPrintf(" "); hButtonWithMsg("hgTracksConfigPage", "configure","Configure image and track selection"); hPrintf(" "); if (!hIsGsidServer()) { hButtonWithMsg("hgt.toggleRevCmplDisp", "reverse", revCmplDisp?"Show forward strand at this location":"Show reverse strand at this location"); hPrintf(" "); } hButtonWithMsg("hgt.refresh", "refresh","Refresh image"); hPrintf("<BR>\n"); if( chromosomeColorsMade ) @@ -4969,30 +5187,35 @@ if (track->visibility != tvHide) { if (track->freeItems != NULL) track->freeItems(track); lmCleanup(&track->lm); } } #endif /* SLOW */ hPrintf("</FORM>\n"); /* hidden form for custom tracks CGI */ hPrintf("<FORM ACTION='%s' NAME='customTrackForm'>", hgCustomName()); cartSaveSession(cart); hPrintf("</FORM>\n"); +/* hidden form for track hub CGI */ +hPrintf("<FORM ACTION='%s' NAME='trackHubForm'>", hgHubConnectName()); +cartSaveSession(cart); +hPrintf("</FORM>\n"); + pruneRedundantCartVis(trackList); } static void toggleRevCmplDisp() /* toggle the reverse complement display mode */ { // forces complement bases to match display revCmplDisp = !revCmplDisp; cartSetBooleanDb(cart, database, REV_CMPL_DISP, revCmplDisp); cartSetBooleanDb(cart, database, COMPLEMENT_BASES_VAR, revCmplDisp); } void zoomToSize(int newSize) /* Zoom so that center stays in same place, * but window is new size. If necessary move