e72cf0100e06d6fadb7282d4e7eb2c592f320951
galt
  Mon Jul 4 13:08:35 2011 -0700
Adding parallel-fetch loading of remote bigDataUrl tracks using pthreads
diff --git src/hg/hgTracks/hgTracks.c src/hg/hgTracks/hgTracks.c
index 50f8763..9c1bb2f 100644
--- src/hg/hgTracks/hgTracks.c
+++ src/hg/hgTracks/hgTracks.c
@@ -1,21 +1,22 @@
 /* hgTracks - the original, and still the largest module for the UCSC Human Genome
  * Browser main cgi script.  Currently contains most of the track framework, though
  * there's quite a bit of other framework type code in simpleTracks.c.  The main
  * routine got moved to create a new entry point to the bulk of the code for the
  * hgRenderTracks web service.  See mainMain.c for the main used by the hgTracks CGI. */
 
+#include <pthread.h>
 #include "common.h"
 #include "hCommon.h"
 #include "linefile.h"
 #include "portable.h"
 #include "memalloc.h"
 #include "localmem.h"
 #include "obscure.h"
 #include "dystring.h"
 #include "hash.h"
 #include "jksql.h"
 #include "gfxPoly.h"
 #include "memgfx.h"
 #include "hvGfx.h"
 #include "psGfx.h"
 #include "cheapcgi.h"
@@ -4334,30 +4335,206 @@
         // Unfortunately, since supertracks are not in trackList, this occurs on superChildren,
         // So now we need to find the supertrack and take changed cart values of its children
         struct slRef *childRef;
         for(childRef = track->tdb->parent->children;childRef != NULL;childRef = childRef->next)
             {
             struct trackDb * childTdb = childRef->val;
             struct track *child = hashFindVal(trackHash, childTdb->track);
             char *cartVis = cartOptionalString(cart,child->track);
             if (cartVis)
                 child->visibility = hTvFromString(cartVis);
             }
         }
     }
 }
 
+
+struct paraFetchData
+    {
+    struct paraFetchData *next;
+    struct track *track;
+    boolean done;
+    };
+
+static boolean isTrackForParallelLoad(struct track *track)
+/* Is this a track that should be loaded in parallel ? */
+{
+char *bdu = trackDbSetting(track->tdb, "bigDataUrl");
+return (startsWithWord("bigWig"  , track->tdb->type) 
+     || startsWithWord("bigBed"  , track->tdb->type)
+     || startsWithWord("bam"     , track->tdb->type)
+     || startsWithWord("vcfTabix", track->tdb->type))
+     && (bdu && strstr(bdu,"://"))
+     && (track->subtracks == NULL);
+}
+
+static void findLeavesForParallelLoad(struct track *trackList, struct paraFetchData **ppfdList)
+/* Find leaves of track tree that are remote network resources for parallel-fetch loading */
+{
+struct track *track;
+if (!trackList)
+    return;
+for (track = trackList; track != NULL; track = track->next)
+    {
+
+    if (track->visibility != tvHide)
+	{
+	if (isTrackForParallelLoad(track))
+	    {
+	    struct paraFetchData *pfd;
+	    AllocVar(pfd);
+	    pfd->track = track;  // need pointer to be stable
+	    slAddHead(ppfdList, pfd);
+	    track->parallelLoading = TRUE;
+	    }
+	struct track *subtrack;
+        for (subtrack=track->subtracks; subtrack; subtrack=subtrack->next)
+	    {
+	    if (isSubtrackVisible(subtrack))
+		{
+		if (isTrackForParallelLoad(subtrack))
+		    {
+		    struct paraFetchData *pfd;
+		    AllocVar(pfd);
+		    pfd->track = subtrack;  // need pointer to be stable
+		    slAddHead(ppfdList, pfd);
+		    subtrack->parallelLoading = TRUE;
+		    }
+		}
+	    }
+	}
+    }
+}
+
+static pthread_mutex_t pfdMutex = PTHREAD_MUTEX_INITIALIZER;
+static struct paraFetchData *pfdList = NULL, *pfdRunning = NULL, *pfdDone = NULL, *pfdNeverStarted = NULL;
+
+static void *remoteParallelLoad(void *threadParam)
+/* Each thread loads tracks in parallel until all work is done. */
+{
+pthread_t *pthread = threadParam;
+struct paraFetchData *pfd = NULL;
+pthread_detach(*pthread);  // this thread will never join back with it's progenitor
+    // Canceled threads that might leave locks behind,
+    // so the theads are detached and will be neither joined nor canceled.
+boolean allDone = FALSE;
+while(1)
+    {
+    pthread_mutex_lock( &pfdMutex );
+    if (!pfdList)
+	{
+	allDone = TRUE;
+	}
+    else
+	{  // move it from the waiting queue to the running queue
+	pfd = slPopHead(&pfdList);
+	slAddHead(&pfdRunning, pfd);
+	}	
+    pthread_mutex_unlock( &pfdMutex );
+    if (allDone)
+	return NULL;
+    
+    long thisTime = 0, lastTime = 0;
+
+    if (measureTiming)
+	lastTime = clock1000();
+
+    /* protect against errAbort */
+    struct errCatch *errCatch = errCatchNew();
+    if (errCatchStart(errCatch))
+	{
+	pfd->done = FALSE;
+	checkMaxWindowToDraw(pfd->track);
+	pfd->track->loadItems(pfd->track);
+	pfd->done = TRUE;
+	}
+    errCatchEnd(errCatch);
+    if (errCatch->gotError)
+	{
+	pfd->track->networkErrMsg = cloneString(errCatch->message->string);
+	pfd->done = TRUE;
+	}
+    errCatchFree(&errCatch);
+
+    if (measureTiming)
+	{
+	thisTime = clock1000();
+	pfd->track->loadTime = thisTime - lastTime;
+	}
+
+    pthread_mutex_lock( &pfdMutex );
+    slRemoveEl(&pfdRunning, pfd);  // this list will not be huge
+    slAddHead(&pfdDone, pfd);
+    pthread_mutex_unlock( &pfdMutex );
+
+    }
+}
+
+static int remoteParallelLoadWait(int maxTimeInSeconds)
+/* Wait, checking to see if finished (completed or errAborted).
+ * If timed-out or never-ran, record error status.
+ * Return error count. */
+{
+int maxTimeInMilliseconds = 1000 * maxTimeInSeconds;
+struct paraFetchData *pfd;
+int errCount = 0;
+int waitTime = 0;
+while(1)
+    {
+    sleep1000(50); // milliseconds
+    waitTime += 50;
+    boolean done = TRUE;
+    pthread_mutex_lock( &pfdMutex );
+    if (pfdList || pfdRunning)
+	done = FALSE;
+    pthread_mutex_unlock( &pfdMutex );
+    if (done)
+        break;
+    if (waitTime >= maxTimeInMilliseconds)
+        break;
+    }
+pthread_mutex_lock( &pfdMutex );
+pfdNeverStarted = pfdList;
+pfdList = NULL;  // stop the workers from starting any more waiting track loads
+for (pfd = pfdNeverStarted; pfd; pfd = pfd->next)
+    {
+    // track was never even started
+    char temp[256];
+    safef(temp, sizeof temp, "Ran out of time (%d milliseconds) unable to process  %s", maxTimeInMilliseconds, pfd->track->track);
+    pfd->track->networkErrMsg = cloneString(temp);
+    ++errCount;
+    }
+for (pfd = pfdRunning; pfd; pfd = pfd->next)
+    {
+    // unfinished track
+    char temp[256];
+    safef(temp, sizeof temp, "Timeout %d milliseconds exceeded processing %s", maxTimeInMilliseconds, pfd->track->track);
+    pfd->track->networkErrMsg = cloneString(temp);
+    ++errCount;
+    }
+for (pfd = pfdDone; pfd; pfd = pfd->next)
+    {
+    // some done tracks may have errors
+    if (pfd->track->networkErrMsg)
+        ++errCount;
+    }
+pthread_mutex_unlock( &pfdMutex );
+return errCount;
+}
+
+
+
 void doTrackForm(char *psOutput, struct tempName *ideoTn)
 /* Make the tracks display form with the zoom/scroll buttons and the active
  * image.  If the ideoTn parameter is not NULL, it is filled in if the
  * ideogram is created.  */
 {
 struct group *group;
 struct track *track;
 char *freezeName = NULL;
 boolean hideAll = cgiVarExists("hgt.hideAll");
 boolean defaultTracks = cgiVarExists("hgt.reset");
 boolean showedRuler = FALSE;
 boolean showTrackControls = cartUsualBoolean(cart, "trackControlsOnMain", TRUE);
 long thisTime = 0, lastTime = 0;
 char *clearButtonJavascript;
 
@@ -4437,58 +4614,96 @@
     if(imgBoxPortalDefine(theImgBox,&winStart,&winEnd,&(tl.picWidth),0))
         {
         winBaseCount = winEnd - winStart;
         insideWidth = tl.picWidth-gfxBorder-insideX;
         }
     #endif//def IMAGEv2_DRAG_SCROLL
     }
 
 char *jsCommand = cartCgiUsualString(cart, hgtJsCommand, "");
 if (!isEmpty(jsCommand))
    {
    cartRemove(cart, hgtJsCommand);
    jsCommandDispatch(jsCommand, trackList);
    }
 
+
 /* Tell tracks to load their items. */
+
+/* adjust visibility */
 for (track = trackList; track != NULL; track = track->next)
     {
     /* adjust track visibility based on supertrack just before load loop */
     if (tdbIsSuperTrackChild(track->tdb))
         limitSuperTrackVis(track);
 
     /* remove cart priority variables if they are set
        to the default values in the trackDb */
     if(!hTrackOnChrom(track->tdb, chromName))
 	{
 	track->limitedVis = tvHide;
 	track->limitedVisSet = TRUE;
 	}
-    else if (track->visibility != tvHide)
+    }
+/* pre-load remote tracks in parallel */
+int ptMax = atoi(cfgOptionDefault("parallelFetch.threads", "20"));  // default number of threads for parallel fetch.
+pthread_t *threads = NULL;
+if (ptMax > 0)     // parallelFetch.threads=0 to disable parallel fetch
+    {
+    findLeavesForParallelLoad(trackList, &pfdList);
+    /* launch parallel threads */
+    ptMax = min(ptMax, slCount(pfdList));
+    if (ptMax > 0)
+	{
+	AllocArray(threads, ptMax);
+	int pt;
+	for (pt = 0; pt < ptMax; ++pt)
+	    {
+	    int rc = pthread_create(&threads[pt], NULL, remoteParallelLoad, &threads[pt]);
+	    if (rc)
+		{
+		errAbort("Unexpected error %d from pthread_create(): %s",rc,strerror(rc));
+		}
+	    }
+	}
+    }
+/* load regular tracks */
+for (track = trackList; track != NULL; track = track->next)
+    {
+    if (track->visibility != tvHide)
+	{
+	if (!track->parallelLoading)
 	{
 	if (measureTiming)
 	    lastTime = clock1000();
+
 	checkMaxWindowToDraw(track);
 	track->loadItems(track);
 
 	if (measureTiming)
 	    {
 	    thisTime = clock1000();
 	    track->loadTime = thisTime - lastTime;
 	    }
 	}
     }
+    }
+if (ptMax > 0)
+    {
+    /* wait for remote parallel load to finish */
+    remoteParallelLoadWait(atoi(cfgOptionDefault("parallelFetch.timeout", "90")));  // wait up to default 90 seconds.
+    }
 
 printTrackInitJavascript(trackList);
 
 /* Generate two lists of hidden variables for track group visibility.  Kludgy,
    but required b/c we have two different navigation forms on this page, but
    we want open/close changes in the bottom form to be submitted even if the user
    submits via the top form. */
 struct dyString *trackGroupsHidden1 = newDyString(1000);
 struct dyString *trackGroupsHidden2 = newDyString(1000);
 for (group = groupList; group != NULL; group = group->next)
     {
     if (group->trackList != NULL)
         {
         int looper;
         for(looper=1;looper<=2;looper++)
@@ -4744,30 +4959,33 @@
     hButtonWithMsg("hgt.reset", "default tracks","Display only default tracks");
 	hPrintf("&nbsp;");
     hButtonWithMsg("hgt.defaultImgOrder", "default order","Display current tracks in their default order");
     // if (showTrackControls)  - always show "hide all", Hiram 2008-06-26
 	{
 	hPrintf("&nbsp;");
 	hButtonWithMsg("hgt.hideAll", "hide all","Hide all currently visibile tracks");
 	}
 
     hPrintf(" ");
     hPrintf("<INPUT TYPE='button' VALUE='%s' onClick='document.customTrackForm.submit();return false;' title='%s'>",
         hasCustomTracks ? CT_MANAGE_BUTTON_LABEL : CT_ADD_BUTTON_LABEL,
         hasCustomTracks ? "Manage your custom tracks" : "Add your own custom tracks");
 
     hPrintf(" ");
+    hPrintf("<INPUT TYPE='button' VALUE='import tracks' onClick='document.trackHubForm.submit();return false;' title='Import tracks from hubs'>");
+
+    hPrintf(" ");
     hButtonWithMsg("hgTracksConfigPage", "configure","Configure image and track selection");
     hPrintf(" ");
 
     if (!hIsGsidServer())
 	{
         hButtonWithMsg("hgt.toggleRevCmplDisp", "reverse",
             revCmplDisp?"Show forward strand at this location":"Show reverse strand at this location");
         hPrintf(" ");
 	}
 
     hButtonWithMsg("hgt.refresh", "refresh","Refresh image");
 
     hPrintf("<BR>\n");
 
     if( chromosomeColorsMade )
@@ -4969,30 +5187,35 @@
     if (track->visibility != tvHide)
 	{
 	if (track->freeItems != NULL)
 	    track->freeItems(track);
 	lmCleanup(&track->lm);
 	}
     }
 #endif /* SLOW */
 hPrintf("</FORM>\n");
 
 /* hidden form for custom tracks CGI */
 hPrintf("<FORM ACTION='%s' NAME='customTrackForm'>", hgCustomName());
 cartSaveSession(cart);
 hPrintf("</FORM>\n");
 
+/* hidden form for track hub CGI */
+hPrintf("<FORM ACTION='%s' NAME='trackHubForm'>", hgHubConnectName());
+cartSaveSession(cart);
+hPrintf("</FORM>\n");
+
 pruneRedundantCartVis(trackList);
 }
 
 static void toggleRevCmplDisp()
 /* toggle the reverse complement display mode */
 {
 // forces complement bases to match display
 revCmplDisp = !revCmplDisp;
 cartSetBooleanDb(cart, database, REV_CMPL_DISP, revCmplDisp);
 cartSetBooleanDb(cart, database, COMPLEMENT_BASES_VAR, revCmplDisp);
 }
 
 void zoomToSize(int newSize)
 /* Zoom so that center stays in same place,
  * but window is new size.  If necessary move