f65a2b3c5cce9a173c871fc40e41881d08f5def9
kent
  Mon Jul 8 13:52:38 2013 -0700
Adding paraFetchTempUpdateTime for tracking file progress.
diff --git src/lib/paraFetch.c src/lib/paraFetch.c
index 7b28efe..354a320 100644
--- src/lib/paraFetch.c
+++ src/lib/paraFetch.c
@@ -1,655 +1,673 @@
 /* paraFetch - fetch things from remote URLs in parallel. */
 
 #include <utime.h>
 #include "common.h"
 #include "internet.h"
 #include "errabort.h"
 #include "hash.h"
 #include "linefile.h"
 #include "net.h"
 #include "https.h"
 #include "sqlNum.h"
 #include "obscure.h"
+#include "portable.h"
 #include "paraFetch.h"
 
 
 static void paraFetchWriteStatus(char *origPath, struct parallelConn *pcList, 
     char *url, off_t fileSize, char *dateString, boolean isFinal)
 /* Write a status file.
  * This has two purposes.
  * First, we can use it to resume a failed transfer.
  * Second, we can use it to follow progress */
 {
 char outTempX[1024];
 char outTemp[1024];
 safef(outTempX, sizeof(outTempX), "%s.paraFetchStatusX", origPath);
 safef(outTemp, sizeof(outTemp), "%s.paraFetchStatus", origPath);
 struct parallelConn *pc = NULL;
 
 FILE *f = mustOpen(outTempX, "w");
 int part = 0;
 fprintf(f, "%s\n", url);
 fprintf(f, "%lld\n", (long long)fileSize);
 fprintf(f, "%s\n", dateString);
 for(pc = pcList; pc; pc = pc->next)
     {
     fprintf(f, "part%d %lld %lld %lld\n", part
 	, (long long)pc->rangeStart
 	, (long long)pc->partSize
 	, (long long)pc->received);
     ++part;
     }
 
 carefulClose(&f);
 
 /* rename the successful status to the original name */
 rename(outTempX, outTemp);
 
 if (isFinal)  /* We are done and just looking to get rid of the file. */
     unlink(outTemp);
 }
 
 
+time_t paraFetchTempUpdateTime(char *origPath)
+/* Return last mod date of temp file - which is useful to see if process has stalled. */
+{
+char outTemp[1024];
+safef(outTemp, sizeof(outTemp), "%s.paraFetch", origPath);
+if (fileExists(outTemp))
+    return fileModTime(outTemp);
+else if (fileExists(origPath))
+    return fileModTime(origPath);
+else
+    {
+    errAbort("%s doesn't exist", origPath);
+    return 0;
+    }
+}
+
+
 boolean paraFetchReadStatus(char *origPath, 
     struct parallelConn **pPcList, char **pUrl, off_t *pFileSize, 
     char **pDateString, off_t *pTotalDownloaded)
 /* Read a status file - which is just origPath plus .paraFetchStatus.  This is updated during 
  * transit by parallelFetch. Returns FALSE if status file not there - possibly because
  * transfer is finished.  Any of the return parameters (pThis and pThat) may be NULL */
 {
 char outTemp[1024];
 char outStat[1024];
 safef(outStat, sizeof(outStat), "%s.paraFetchStatus", origPath);
 safef(outTemp, sizeof(outTemp), "%s.paraFetch", origPath);
 struct parallelConn *pcList = NULL, *pc = NULL;
 off_t totalDownloaded = 0;
 
 if (!fileExists(outStat))
     {
     unlink(outTemp);
     return FALSE;
     }
 
 if (!fileExists(outTemp))
     {
     unlink(outStat);
     return FALSE;
     }
 
 char *line, *word;
 struct lineFile *lf = lineFileOpen(outStat, TRUE);
 if (!lineFileNext(lf, &line, NULL))
     {
     unlink(outTemp);
     unlink(outStat);
     return FALSE;
     }
 char *url = cloneString(line);
 if (!lineFileNext(lf, &line, NULL))
     {
     unlink(outTemp);
     unlink(outStat);
     return FALSE;
     }
 off_t fileSize = sqlLongLong(line);
 if (!lineFileNext(lf, &line, NULL))
     {
     unlink(outTemp);
     unlink(outStat);
     return FALSE;
     }
 char *dateString = cloneString(line);
 while (lineFileNext(lf, &line, NULL))
     {
     word = nextWord(&line);
     AllocVar(pc);
     pc->next = NULL;
     pc->sd = -4;  /* no connection tried yet */
     word = nextWord(&line);
     pc->rangeStart = sqlLongLong(word);
     word = nextWord(&line);
     pc->partSize = sqlLongLong(word);
     word = nextWord(&line);
     pc->received = sqlLongLong(word);
     if (pc->received == pc->partSize)
 	pc->sd = -1;  /* part all done already */
     totalDownloaded += pc->received;
     slAddHead(&pcList, pc);
     }
 slReverse(&pcList);
 
 lineFileClose(&lf);
 
 if (slCount(pcList) < 1)
     {
     unlink(outTemp);
     unlink(outStat);
     return FALSE;
     }
 
 if (pPcList != NULL)
     *pPcList = pcList;
 if (pUrl != NULL)
     *pUrl = url;
 if (pFileSize != NULL)
     *pFileSize = fileSize;
 if (pDateString != NULL)
     *pDateString = dateString;
 if (pTotalDownloaded != NULL)
     *pTotalDownloaded = totalDownloaded;
 
 return TRUE;
 }
 
 
 boolean parallelFetch(char *url, char *outPath, int numConnections, int numRetries, boolean newer, boolean progress)
 /* Open multiple parallel connections to URL to speed downloading */
 {
 char *origPath = outPath;
 char outTemp[1024];
 safef(outTemp, sizeof(outTemp), "%s.paraFetch", outPath);
 outPath = outTemp;
 /* get the size of the file to be downloaded */
 off_t fileSize = 0;
 off_t totalDownloaded = 0;
 ssize_t sinceLastStatus = 0;
 char *dateString = "";
 int star = 1;  
 int starMax = 20;  
 int starStep = 1;
 // TODO handle case-sensitivity of protocols input
 if (startsWith("http://",url) || startsWith("https://",url))
     {
     struct hash *hash = newHash(0);
     int status = netUrlHead(url, hash);
     if (status != 200) // && status != 302 && status != 301)
 	{
 	warn("Error code: %d, expected 200 for %s, can't proceed, sorry", status, url);
 	return FALSE;
 	}
     char *sizeString = hashFindValUpperCase(hash, "Content-Length:");
     if (sizeString)
 	{
 	fileSize = atoll(sizeString);
 	}
     else
 	{
 	warn("No Content-Length: returned in header for %s, must limit to a single connection, will not know if data is complete", url);
 	numConnections = 1;
 	fileSize = -1;
 	}
     char *ds = hashFindValUpperCase(hash, "Last-Modified:");
     if (ds)
 	dateString = cloneString(ds);
     hashFree(&hash);
     }
 else if (startsWith("ftp://",url))
     {
     long long size = 0;
     time_t t;
     boolean ok = netGetFtpInfo(url, &size, &t);
     if (!ok)
 	{
 	warn("Unable to get size info from FTP for %s, can't proceed, sorry", url);
 	return FALSE;
 	}
     fileSize = size;
 
     struct tm  *ts;
     char ftpTime[80];
  
     /* Format the time "Tue, 15 Jun 2010 06:45:08 GMT" */
     ts = localtime(&t);
     strftime(ftpTime, sizeof(ftpTime), "%a, %d %b %Y %H:%M:%S %Z", ts);
     dateString = cloneString(ftpTime);
 
     }
 else
     {
     warn("unrecognized protocol: %s", url);
     return FALSE;
     }
 
 
 verbose(2,"fileSize=%lld\n", (long long) fileSize);
 
 if (fileSize < 65536)    /* special case small file */
     numConnections = 1;
 
 if (numConnections > 50)    /* ignore high values for numConnections */
     {
     warn("Currently maximum number of connections is 50. You requested %d. Will proceed with 50 on %s", numConnections, url);
     numConnections = 50;
     }
 
 verbose(2,"numConnections=%d\n", numConnections); //debug
 
 if (numConnections < 1)
     {
     warn("number of connections must be greater than 0 for %s, can't proceed, sorry", url);
     return FALSE;
     }
 
 if (numRetries < 0)
     numRetries = 0;
 
 /* what is the size of each part */
 off_t partSize = (fileSize + numConnections -1) / numConnections;
 if (fileSize == -1) 
     partSize = -1;
 off_t base = 0;
 int c;
 
 verbose(2,"partSize=%lld\n", (long long) partSize); //debug
 
 
 /* n is the highest-numbered descriptor */
 int n = 0;
 int connOpen = 0;
 int reOpen = 0;
 
 
 struct parallelConn *restartPcList = NULL;
 char *restartUrl = NULL;
 off_t restartFileSize = 0;
 char *restartDateString = "";
 off_t restartTotalDownloaded = 0;
 boolean restartable = paraFetchReadStatus(origPath, &restartPcList, &restartUrl, &restartFileSize, &restartDateString, &restartTotalDownloaded);
 if (fileSize == -1)
     restartable = FALSE;
 
 struct parallelConn *pcList = NULL, *pc;
 
 if (restartable 
  && sameString(url, restartUrl)
  && fileSize == restartFileSize
  && sameString(dateString, restartDateString))
     {
     pcList = restartPcList;
     totalDownloaded = restartTotalDownloaded;
     }
 else
     {
 
     if (newer) // only download it if it is newer than what we already have
 	{
 	/* datestamp mtime from last-modified header */
 	struct tm tm;
 	// Last-Modified: Wed, 15 Nov 1995 04:58:08 GMT
 	// These strings are always GMT
 	if (strptime(dateString, "%a, %d %b %Y %H:%M:%S %Z", &tm) == NULL)
 	    {
 	    warn("unable to parse last-modified string [%s]", dateString);
 	    }
 	else
 	    {
 	    time_t t;
 	    // convert to UTC (GMT) time
 	    t = mktimeFromUtc(&tm);
 	    if (t == -1)
 		{
 		warn("mktimeFromUtc failed while converting last-modified string to UTC [%s]", dateString);
 		}
 	    else
 		{
 		// get the file mtime
 		struct stat mystat;
 		ZeroVar(&mystat);
 		if (stat(origPath,&mystat)==0)
 		    {
 		    if (t <= mystat.st_mtime)
 			{
 			verbose(2,"Since nothing newer was found, skipping %s\n", origPath);
 			verbose(3,"t from last-modified = %ld; st_mtime = %ld\n", (long) t, (long)mystat.st_mtime);
 			return TRUE;
 			}
 		    }
 		}
 	    }
 	}
 
     /* make a list of connections */
     for (c = 0; c < numConnections; ++c)
 	{
 	AllocVar(pc);
 	pc->next = NULL;
 	pc->rangeStart = base;
 	base += partSize;
 	pc->partSize = partSize;
 	if (fileSize != -1 && pc->rangeStart+pc->partSize >= fileSize)
 	    pc->partSize = fileSize - pc->rangeStart;
 	pc->received = 0;
 	pc->sd = -4;  /* no connection tried yet */
 	slAddHead(&pcList, pc);
 	}
     slReverse(&pcList);
     }
 
 if (progress)
     {
     char nicenumber[1024]="";
     sprintWithGreekByte(nicenumber, sizeof(nicenumber), fileSize);
     printf("downloading %s ", nicenumber); fflush(stdout);
     starStep = fileSize/starMax;
     if (starStep < 1)
 	starStep = 1;
     }
 
 int out = open(outPath, O_CREAT|O_WRONLY, 0664);
 if (out < 0)
     {
     warn("Unable to open %s for write while downloading %s, can't proceed, sorry", url, outPath);
     return FALSE;
     }
 
 
 /* descriptors for select() */
 fd_set rfds;
 struct timeval tv;
 int retval;
 
 ssize_t readCount = 0;
 #define BUFSIZE 65536 * 4
 char buf[BUFSIZE];
 
 /* create paraFetchStatus right away for monitoring programs */
 paraFetchWriteStatus(origPath, pcList, url, fileSize, dateString, FALSE);
 sinceLastStatus = 0;
 
 int retryCount = 0;
 
 time_t startTime = time(NULL);
 
 #define SELTIMEOUT 5
 #define RETRYSLEEPTIME 30    
 while (TRUE)
     {
 
     verbose(2,"Top of big loop\n");
 
     if (progress)
 	{
 	while (totalDownloaded >= star * starStep)
 	    {
 	    printf("*");fflush(stdout);
 	    ++star;
 	    }
 	}
 
     /* are we done? */
     if (connOpen == 0)
 	{
 	boolean done = TRUE;
 	for(pc = pcList; pc; pc = pc->next)
 	    if (pc->sd != -1)
 		done = FALSE;
 	if (done) break;
 	}
 
     /* See if we need to open any connections, either new or retries */
     for(pc = pcList; pc; pc = pc->next)
 	{
 	if ((pc->sd == -4)  /* never even tried to open yet */
 	 || ((reOpen>0)      /* some connections have been released */
 	    && (pc->sd == -2  /* started but not finished */
 	    ||  pc->sd == -3)))  /* not even started */
 	    {
 	    char urlExt[1024];
 	    safef(urlExt, sizeof(urlExt), "%s;byterange=%llu-%llu"
 	    , url
 	    , (unsigned long long) (pc->rangeStart + pc->received)
 	    , (unsigned long long) (pc->rangeStart + pc->partSize - 1) );
 
 
 	    int oldSd = pc->sd;  /* in case we need to remember where we were */
 	    if (oldSd != -4)      /* decrement whether we succeed or not */
 		--reOpen;
 	    if (oldSd == -4) 
 		oldSd = -3;       /* ok this one just changes */
 	    if (fileSize == -1)
 		{
 		verbose(2,"opening url %s\n", url);
 		pc->sd = netUrlOpen(url);
 		}
 	    else
 		{
 		verbose(2,"opening url %s\n", urlExt);
 		pc->sd = netUrlOpen(urlExt);
 		}
 	    if (pc->sd < 0)
 		{
 		pc->sd = oldSd;  /* failed to open, can retry later */
 		}
 	    else
 		{
 		char *newUrl = NULL;
 		int newSd = 0;
 		if (startsWith("http://",url) || startsWith("https://",url))
 		    {
 		    if (!netSkipHttpHeaderLinesHandlingRedirect(pc->sd, urlExt, &newSd, &newUrl))
 			{
 			warn("Error processing http response for %s", url);
 			return FALSE;
 			}
 		    if (newUrl) 
 			{
 			/*  Update sd with newSd, replace it with newUrl, etc. */
 			pc->sd = newSd;
 			}
 		    }
 		++connOpen;
 		}
 	    }
 	}
 
 
     if (connOpen == 0)
 	{
 	warn("Unable to open any connections to download %s, can't proceed, sorry", url);
 	return FALSE;
 	}
 
 
     FD_ZERO(&rfds);
     n = 0;
     for(pc = pcList; pc; pc = pc->next)
 	{
 	if (pc->sd >= 0)
 	    {
 	    FD_SET(pc->sd, &rfds);    /* reset descriptor in readfds for select() */
 	    if (pc->sd > n)
 		n = pc->sd;
 	    }
 	}
 
 
     /* Wait up to SELTIMEOUT seconds. */
     tv.tv_sec = SELTIMEOUT;
     tv.tv_usec = 0;
     retval = select(n+1, &rfds, NULL, NULL, &tv);
     /* Don't rely on the value of tv now! */
 
     if (retval < 0)
 	{
 	perror("select retval < 0");
 	return FALSE;
 	}
     else if (retval)
 	{
 
 	verbose(2,"returned from select, retval=%d\n", retval);
 
 	for(pc = pcList; pc; pc = pc->next)
 	    {
 	    if ((pc->sd >= 0) && FD_ISSET(pc->sd, &rfds))
 		{
 
 		verbose(2,"found a descriptor with data: %d\n", pc->sd);
 
 		readCount = read(pc->sd, buf, BUFSIZE);
 
 		verbose(2,"readCount = %lld\n", (long long) readCount);
 
 		if (readCount == 0)
 		    {
 		    close(pc->sd);
 
 		    verbose(2,"closing descriptor: %d\n", pc->sd);
 		    pc->sd = -1;
 
 		    if (fileSize != -1 && pc->received != pc->partSize)	
 			{
 			pc->sd = -2;  /* conn was closed before all data was sent, can retry later */
 			return FALSE;
 			}
 		    --connOpen;
 		    ++reOpen;
 		    paraFetchWriteStatus(origPath, pcList, url, fileSize, dateString, FALSE);
 		    sinceLastStatus = 0;
 		    continue; 
 		    }
 		if (readCount < 0)
 		    {
 		    warn("error reading from socket for url %s", url);
 		    return FALSE;
 		    }
 
 		verbose(2,"rangeStart %llu  received %llu\n"
 			, (unsigned long long) pc->rangeStart
 			, (unsigned long long) pc->received );
 
 		verbose(2,"seeking to %llu\n", (unsigned long long) (pc->rangeStart + pc->received));
 
 		if (lseek(out, pc->rangeStart + pc->received, SEEK_SET) == -1)
 		    {
 		    perror("error seeking output file");
 		    warn("error seeking output file %s: rangeStart %llu  received %llu for url %s"
 			, outPath
 			, (unsigned long long) pc->rangeStart
 			, (unsigned long long) pc->received
 			, url);
 		    return FALSE;
 		    }
 		int writeCount = write(out, buf, readCount);
 		if (writeCount < 0)
 		    {
 		    warn("error writing output file %s", outPath);
 		    return FALSE;
 		    }
 		pc->received += readCount;
 		totalDownloaded += readCount;
 		sinceLastStatus += readCount;
 		if (sinceLastStatus >= 10*1024*1024)
 		    {
 		    paraFetchWriteStatus(origPath, pcList, url, fileSize, dateString, FALSE);
 		    sinceLastStatus = 0;
 		    }
 		}
 	    }
 	}
     else
 	{
 	warn("No data within %d seconds for %s", SELTIMEOUT, url);
 	/* Retry ? */
 	if (retryCount >= numRetries)
 	    {
     	    return FALSE;
 	    }
 	else
 	    {
 	    ++retryCount;
 	    /* close any open connections */
 	    for(pc = pcList; pc; pc = pc->next)
 		{
 		if (pc->sd >= 0) 
 		    {
 		    close(pc->sd);
 		    verbose(2,"closing descriptor: %d\n", pc->sd);
 		    }
 		if (pc->sd != -1) 
 		    pc->sd = -4;
 		}
 	    connOpen = 0;
 	    reOpen = 0;
 	    /* sleep for a while, maybe the server will recover */
 	    sleep(RETRYSLEEPTIME);
 	    }
 	}
 
     }
 
 close(out);
 
 /* delete the status file - by passing TRUE */
 paraFetchWriteStatus(origPath, pcList, url, fileSize, dateString, TRUE); 
 
 /* restore original file datestamp mtime from last-modified header */
 struct tm tm;
 // Last-Modified: Wed, 15 Nov 1995 04:58:08 GMT
 // These strings are always GMT
 if (strptime(dateString, "%a, %d %b %Y %H:%M:%S %Z", &tm) == NULL)
     {
     warn("unable to parse last-modified string [%s]", dateString);
     }
 else
     {
     time_t t;
     // convert to UTC (GMT) time
     t = mktimeFromUtc(&tm);
     if (t == -1)
 	{
 	warn("mktimeFromUtc failed while converting last-modified string to UTC [%s]", dateString);
 	}
     else
 	{
 	// update the file mtime
 	struct utimbuf ut;
 	struct stat mystat;
 	ZeroVar(&mystat);
 	if (stat(outTemp,&mystat)==0)
 	    {
 	    ut.actime = mystat.st_atime;
 	    ut.modtime = t;
 	    if (utime(outTemp, &ut)==-1)
 		{
 		char errMsg[256];
                 safef(errMsg, sizeof(errMsg), "paraFetch: error setting modification time of %s to %s\n", outTemp, dateString);
 		perror(errMsg);
 		}
 	    }
 	}
     }
 
 /* rename the successful download to the original name */
 rename(outTemp, origPath);
 
 
 
 if (progress)
     {
     while (star <= starMax)
 	{
 	printf("*");fflush(stdout);
 	++star;
 	}
     long timeDiff = (long)(time(NULL) - startTime);
     if (timeDiff > 0)
 	{
 	printf(" %ld seconds", timeDiff);
 	float mbpersec =  ((totalDownloaded - restartTotalDownloaded)/1000000) / timeDiff;
 	printf(" %0.1f MB/sec", mbpersec);
 	}
     printf("\n");fflush(stdout);
     }
 
 if (fileSize != -1 && totalDownloaded != fileSize)
     {
     warn("Unexpected result: Total downloaded bytes %lld is not equal to fileSize %lld"
 	, (long long) totalDownloaded
 	, (long long) fileSize);
     return FALSE;
     }
 return TRUE;
 }