c344bba4bb978e02da57ad7be117699ee5d61255
kent
  Tue May 7 08:09:07 2013 -0700
Moving parallelFetch to a new module.
diff --git src/lib/paraFetch.c src/lib/paraFetch.c
new file mode 100644
index 0000000..bf7dbf9
--- /dev/null
+++ src/lib/paraFetch.c
@@ -0,0 +1,659 @@
+/* paraFetch - fetch things from remote URLs in parallel. */
+
+#include <utime.h>
+#include "common.h"
+#include "internet.h"
+#include "errabort.h"
+#include "hash.h"
+#include "linefile.h"
+#include "net.h"
+#include "https.h"
+#include "sqlNum.h"
+#include "obscure.h"
+
+
+struct parallelConn
+/* struct to information on a parallel connection */
+    {
+    struct parallelConn *next;  /* next connection */
+    int sd;                     /* socket descriptor */
+    off_t rangeStart;           /* where does the range start */
+    off_t partSize;             /* range size */
+    off_t received;             /* bytes received */
+    };
+
+void writeParaFetchStatus(char *origPath, struct parallelConn *pcList, char *url, off_t fileSize, char *dateString, boolean isFinal)
+/* Write a status file.
+ * This has two purposes.
+ * First, we can use it to resume a failed transfer.
+ * Second, we can use it to follow progress */
+{
+char outTempX[1024];
+char outTemp[1024];
+safef(outTempX, sizeof(outTempX), "%s.paraFetchStatusX", origPath);
+safef(outTemp, sizeof(outTemp), "%s.paraFetchStatus", origPath);
+struct parallelConn *pc = NULL;
+
+FILE *f = mustOpen(outTempX, "w");
+int part = 0;
+fprintf(f, "%s\n", url);
+fprintf(f, "%lld\n", (long long)fileSize);
+fprintf(f, "%s\n", dateString);
+for(pc = pcList; pc; pc = pc->next)
+    {
+    fprintf(f, "part%d %lld %lld %lld\n", part
+	, (long long)pc->rangeStart
+	, (long long)pc->partSize
+	, (long long)pc->received);
+    ++part;
+    }
+
+carefulClose(&f);
+
+/* rename the successful status to the original name */
+rename(outTempX, outTemp);
+
+if (isFinal)  /* We are done and just looking to get rid of the file. */
+    unlink(outTemp);
+}
+
+
+boolean readParaFetchStatus(char *origPath, 
+    struct parallelConn **pPcList, char **pUrl, off_t *pFileSize, char **pDateString, off_t *pTotalDownloaded)
+/* Write a status file.
+ * This has two purposes.
+ * First, we can use it to resume a failed transfer.
+ * Second, we can use it to follow progress */
+{
+char outTemp[1024];
+char outStat[1024];
+safef(outStat, sizeof(outStat), "%s.paraFetchStatus", origPath);
+safef(outTemp, sizeof(outTemp), "%s.paraFetch", origPath);
+struct parallelConn *pcList = NULL, *pc = NULL;
+off_t totalDownloaded = 0;
+
+if (!fileExists(outStat))
+    {
+    unlink(outTemp);
+    return FALSE;
+    }
+
+if (!fileExists(outTemp))
+    {
+    unlink(outStat);
+    return FALSE;
+    }
+
+char *line, *word;
+struct lineFile *lf = lineFileOpen(outStat, TRUE);
+if (!lineFileNext(lf, &line, NULL))
+    {
+    unlink(outTemp);
+    unlink(outStat);
+    return FALSE;
+    }
+char *url = cloneString(line);
+if (!lineFileNext(lf, &line, NULL))
+    {
+    unlink(outTemp);
+    unlink(outStat);
+    return FALSE;
+    }
+off_t fileSize = sqlLongLong(line);
+if (!lineFileNext(lf, &line, NULL))
+    {
+    unlink(outTemp);
+    unlink(outStat);
+    return FALSE;
+    }
+char *dateString = cloneString(line);
+while (lineFileNext(lf, &line, NULL))
+    {
+    word = nextWord(&line);
+    AllocVar(pc);
+    pc->next = NULL;
+    pc->sd = -4;  /* no connection tried yet */
+    word = nextWord(&line);
+    pc->rangeStart = sqlLongLong(word);
+    word = nextWord(&line);
+    pc->partSize = sqlLongLong(word);
+    word = nextWord(&line);
+    pc->received = sqlLongLong(word);
+    if (pc->received == pc->partSize)
+	pc->sd = -1;  /* part all done already */
+    totalDownloaded += pc->received;
+    slAddHead(&pcList, pc);
+    }
+slReverse(&pcList);
+
+lineFileClose(&lf);
+
+if (slCount(pcList) < 1)
+    {
+    unlink(outTemp);
+    unlink(outStat);
+    return FALSE;
+    }
+
+*pPcList = pcList;
+*pUrl = url;
+*pFileSize = fileSize;
+*pDateString = dateString;
+*pTotalDownloaded = totalDownloaded;
+
+return TRUE;
+
+}
+
+
+boolean parallelFetch(char *url, char *outPath, int numConnections, int numRetries, boolean newer, boolean progress)
+/* Open multiple parallel connections to URL to speed downloading */
+{
+char *origPath = outPath;
+char outTemp[1024];
+safef(outTemp, sizeof(outTemp), "%s.paraFetch", outPath);
+outPath = outTemp;
+/* get the size of the file to be downloaded */
+off_t fileSize = 0;
+off_t totalDownloaded = 0;
+ssize_t sinceLastStatus = 0;
+char *dateString = "";
+int star = 1;  
+int starMax = 20;  
+int starStep = 1;
+// TODO handle case-sensitivity of protocols input
+if (startsWith("http://",url) || startsWith("https://",url))
+    {
+    struct hash *hash = newHash(0);
+    int status = netUrlHead(url, hash);
+    if (status != 200) // && status != 302 && status != 301)
+	{
+	warn("Error code: %d, expected 200 for %s, can't proceed, sorry", status, url);
+	return FALSE;
+	}
+    char *sizeString = hashFindValUpperCase(hash, "Content-Length:");
+    if (sizeString)
+	{
+	fileSize = atoll(sizeString);
+	}
+    else
+	{
+	warn("No Content-Length: returned in header for %s, must limit to a single connection, will not know if data is complete", url);
+	numConnections = 1;
+	fileSize = -1;
+	}
+    char *ds = hashFindValUpperCase(hash, "Last-Modified:");
+    if (ds)
+	dateString = cloneString(ds);
+    hashFree(&hash);
+    }
+else if (startsWith("ftp://",url))
+    {
+    long long size = 0;
+    time_t t;
+    boolean ok = netGetFtpInfo(url, &size, &t);
+    if (!ok)
+	{
+	warn("Unable to get size info from FTP for %s, can't proceed, sorry", url);
+	return FALSE;
+	}
+    fileSize = size;
+
+    struct tm  *ts;
+    char ftpTime[80];
+ 
+    /* Format the time "Tue, 15 Jun 2010 06:45:08 GMT" */
+    ts = localtime(&t);
+    strftime(ftpTime, sizeof(ftpTime), "%a, %d %b %Y %H:%M:%S %Z", ts);
+    dateString = cloneString(ftpTime);
+
+    }
+else
+    {
+    warn("unrecognized protocol: %s", url);
+    return FALSE;
+    }
+
+
+verbose(2,"fileSize=%lld\n", (long long) fileSize);
+
+if (fileSize < 65536)    /* special case small file */
+    numConnections = 1;
+
+if (numConnections > 50)    /* ignore high values for numConnections */
+    {
+    warn("Currently maximum number of connections is 50. You requested %d. Will proceed with 50 on %s", numConnections, url);
+    numConnections = 50;
+    }
+
+verbose(2,"numConnections=%d\n", numConnections); //debug
+
+if (numConnections < 1)
+    {
+    warn("number of connections must be greater than 0 for %s, can't proceed, sorry", url);
+    return FALSE;
+    }
+
+if (numRetries < 0)
+    numRetries = 0;
+
+/* what is the size of each part */
+off_t partSize = (fileSize + numConnections -1) / numConnections;
+if (fileSize == -1) 
+    partSize = -1;
+off_t base = 0;
+int c;
+
+verbose(2,"partSize=%lld\n", (long long) partSize); //debug
+
+
+/* n is the highest-numbered descriptor */
+int n = 0;
+int connOpen = 0;
+int reOpen = 0;
+
+
+struct parallelConn *restartPcList = NULL;
+char *restartUrl = NULL;
+off_t restartFileSize = 0;
+char *restartDateString = "";
+off_t restartTotalDownloaded = 0;
+boolean restartable = readParaFetchStatus(origPath, &restartPcList, &restartUrl, &restartFileSize, &restartDateString, &restartTotalDownloaded);
+if (fileSize == -1)
+    restartable = FALSE;
+
+struct parallelConn *pcList = NULL, *pc;
+
+if (restartable 
+ && sameString(url, restartUrl)
+ && fileSize == restartFileSize
+ && sameString(dateString, restartDateString))
+    {
+    pcList = restartPcList;
+    totalDownloaded = restartTotalDownloaded;
+    }
+else
+    {
+
+    if (newer) // only download it if it is newer than what we already have
+	{
+	/* datestamp mtime from last-modified header */
+	struct tm tm;
+	// Last-Modified: Wed, 15 Nov 1995 04:58:08 GMT
+	// These strings are always GMT
+	if (strptime(dateString, "%a, %d %b %Y %H:%M:%S %Z", &tm) == NULL)
+	    {
+	    warn("unable to parse last-modified string [%s]", dateString);
+	    }
+	else
+	    {
+	    time_t t;
+	    // convert to UTC (GMT) time
+	    t = mktimeFromUtc(&tm);
+	    if (t == -1)
+		{
+		warn("mktimeFromUtc failed while converting last-modified string to UTC [%s]", dateString);
+		}
+	    else
+		{
+		// get the file mtime
+		struct stat mystat;
+		ZeroVar(&mystat);
+		if (stat(origPath,&mystat)==0)
+		    {
+		    if (t <= mystat.st_mtime)
+			{
+			verbose(2,"Since nothing newer was found, skipping %s\n", origPath);
+			verbose(3,"t from last-modified = %ld; st_mtime = %ld\n", (long) t, (long)mystat.st_mtime);
+			return TRUE;
+			}
+		    }
+		}
+	    }
+	}
+
+    /* make a list of connections */
+    for (c = 0; c < numConnections; ++c)
+	{
+	AllocVar(pc);
+	pc->next = NULL;
+	pc->rangeStart = base;
+	base += partSize;
+	pc->partSize = partSize;
+	if (fileSize != -1 && pc->rangeStart+pc->partSize >= fileSize)
+	    pc->partSize = fileSize - pc->rangeStart;
+	pc->received = 0;
+	pc->sd = -4;  /* no connection tried yet */
+	slAddHead(&pcList, pc);
+	}
+    slReverse(&pcList);
+    }
+
+if (progress)
+    {
+    char nicenumber[1024]="";
+    sprintWithGreekByte(nicenumber, sizeof(nicenumber), fileSize);
+    printf("downloading %s ", nicenumber); fflush(stdout);
+    starStep = fileSize/starMax;
+    if (starStep < 1)
+	starStep = 1;
+    }
+
+int out = open(outPath, O_CREAT|O_WRONLY, 0664);
+if (out < 0)
+    {
+    warn("Unable to open %s for write while downloading %s, can't proceed, sorry", url, outPath);
+    return FALSE;
+    }
+
+
+/* descriptors for select() */
+fd_set rfds;
+struct timeval tv;
+int retval;
+
+ssize_t readCount = 0;
+#define BUFSIZE 65536 * 4
+char buf[BUFSIZE];
+
+/* create paraFetchStatus right away for monitoring programs */
+writeParaFetchStatus(origPath, pcList, url, fileSize, dateString, FALSE);
+sinceLastStatus = 0;
+
+int retryCount = 0;
+
+time_t startTime = time(NULL);
+
+#define SELTIMEOUT 5
+#define RETRYSLEEPTIME 30    
+while (TRUE)
+    {
+
+    verbose(2,"Top of big loop\n");
+
+    if (progress)
+	{
+	while (totalDownloaded >= star * starStep)
+	    {
+	    printf("*");fflush(stdout);
+	    ++star;
+	    }
+	}
+
+    /* are we done? */
+    if (connOpen == 0)
+	{
+	boolean done = TRUE;
+	for(pc = pcList; pc; pc = pc->next)
+	    if (pc->sd != -1)
+		done = FALSE;
+	if (done) break;
+	}
+
+    /* See if we need to open any connections, either new or retries */
+    for(pc = pcList; pc; pc = pc->next)
+	{
+	if ((pc->sd == -4)  /* never even tried to open yet */
+	 || ((reOpen>0)      /* some connections have been released */
+	    && (pc->sd == -2  /* started but not finished */
+	    ||  pc->sd == -3)))  /* not even started */
+	    {
+	    char urlExt[1024];
+	    safef(urlExt, sizeof(urlExt), "%s;byterange=%llu-%llu"
+	    , url
+	    , (unsigned long long) (pc->rangeStart + pc->received)
+	    , (unsigned long long) (pc->rangeStart + pc->partSize - 1) );
+
+
+	    int oldSd = pc->sd;  /* in case we need to remember where we were */
+	    if (oldSd != -4)      /* decrement whether we succeed or not */
+		--reOpen;
+	    if (oldSd == -4) 
+		oldSd = -3;       /* ok this one just changes */
+	    if (fileSize == -1)
+		{
+		verbose(2,"opening url %s\n", url);
+		pc->sd = netUrlOpen(url);
+		}
+	    else
+		{
+		verbose(2,"opening url %s\n", urlExt);
+		pc->sd = netUrlOpen(urlExt);
+		}
+	    if (pc->sd < 0)
+		{
+		pc->sd = oldSd;  /* failed to open, can retry later */
+		}
+	    else
+		{
+		char *newUrl = NULL;
+		int newSd = 0;
+		if (startsWith("http://",url) || startsWith("https://",url))
+		    {
+		    if (!netSkipHttpHeaderLinesHandlingRedirect(pc->sd, urlExt, &newSd, &newUrl))
+			{
+			warn("Error processing http response for %s", url);
+			return FALSE;
+			}
+		    if (newUrl) 
+			{
+			/*  Update sd with newSd, replace it with newUrl, etc. */
+			pc->sd = newSd;
+			}
+		    }
+		++connOpen;
+		}
+	    }
+	}
+
+
+    if (connOpen == 0)
+	{
+	warn("Unable to open any connections to download %s, can't proceed, sorry", url);
+	return FALSE;
+	}
+
+
+    FD_ZERO(&rfds);
+    n = 0;
+    for(pc = pcList; pc; pc = pc->next)
+	{
+	if (pc->sd >= 0)
+	    {
+	    FD_SET(pc->sd, &rfds);    /* reset descriptor in readfds for select() */
+	    if (pc->sd > n)
+		n = pc->sd;
+	    }
+	}
+
+
+    /* Wait up to SELTIMEOUT seconds. */
+    tv.tv_sec = SELTIMEOUT;
+    tv.tv_usec = 0;
+    retval = select(n+1, &rfds, NULL, NULL, &tv);
+    /* Don't rely on the value of tv now! */
+
+    if (retval < 0)
+	{
+	perror("select retval < 0");
+	return FALSE;
+	}
+    else if (retval)
+	{
+
+	verbose(2,"returned from select, retval=%d\n", retval);
+
+	for(pc = pcList; pc; pc = pc->next)
+	    {
+	    if ((pc->sd >= 0) && FD_ISSET(pc->sd, &rfds))
+		{
+
+		verbose(2,"found a descriptor with data: %d\n", pc->sd);
+
+		readCount = read(pc->sd, buf, BUFSIZE);
+
+		verbose(2,"readCount = %lld\n", (long long) readCount);
+
+		if (readCount == 0)
+		    {
+		    close(pc->sd);
+
+		    verbose(2,"closing descriptor: %d\n", pc->sd);
+		    pc->sd = -1;
+
+		    if (fileSize != -1 && pc->received != pc->partSize)	
+			{
+			pc->sd = -2;  /* conn was closed before all data was sent, can retry later */
+			return FALSE;
+			}
+		    --connOpen;
+		    ++reOpen;
+		    writeParaFetchStatus(origPath, pcList, url, fileSize, dateString, FALSE);
+		    sinceLastStatus = 0;
+		    continue; 
+		    }
+		if (readCount < 0)
+		    {
+		    warn("error reading from socket for url %s", url);
+		    return FALSE;
+		    }
+
+		verbose(2,"rangeStart %llu  received %llu\n"
+			, (unsigned long long) pc->rangeStart
+			, (unsigned long long) pc->received );
+
+		verbose(2,"seeking to %llu\n", (unsigned long long) (pc->rangeStart + pc->received));
+
+		if (lseek(out, pc->rangeStart + pc->received, SEEK_SET) == -1)
+		    {
+		    perror("error seeking output file");
+		    warn("error seeking output file %s: rangeStart %llu  received %llu for url %s"
+			, outPath
+			, (unsigned long long) pc->rangeStart
+			, (unsigned long long) pc->received
+			, url);
+		    return FALSE;
+		    }
+		int writeCount = write(out, buf, readCount);
+		if (writeCount < 0)
+		    {
+		    warn("error writing output file %s", outPath);
+		    return FALSE;
+		    }
+		pc->received += readCount;
+		totalDownloaded += readCount;
+		sinceLastStatus += readCount;
+		if (sinceLastStatus >= 100*1024*1024)
+		    {
+		    writeParaFetchStatus(origPath, pcList, url, fileSize, dateString, FALSE);
+		    sinceLastStatus = 0;
+		    }
+		}
+	    }
+	}
+    else
+	{
+	warn("No data within %d seconds for %s", SELTIMEOUT, url);
+	/* Retry ? */
+	if (retryCount >= numRetries)
+	    {
+    	    return FALSE;
+	    }
+	else
+	    {
+	    ++retryCount;
+	    /* close any open connections */
+	    for(pc = pcList; pc; pc = pc->next)
+		{
+		if (pc->sd >= 0) 
+		    {
+		    close(pc->sd);
+		    verbose(2,"closing descriptor: %d\n", pc->sd);
+		    }
+		if (pc->sd != -1) 
+		    pc->sd = -4;
+		}
+	    connOpen = 0;
+	    reOpen = 0;
+	    /* sleep for a while, maybe the server will recover */
+	    sleep(RETRYSLEEPTIME);
+	    }
+	}
+
+    }
+
+close(out);
+
+/* delete the status file - by passing TRUE */
+writeParaFetchStatus(origPath, pcList, url, fileSize, dateString, TRUE); 
+
+/* restore original file datestamp mtime from last-modified header */
+struct tm tm;
+// Last-Modified: Wed, 15 Nov 1995 04:58:08 GMT
+// These strings are always GMT
+if (strptime(dateString, "%a, %d %b %Y %H:%M:%S %Z", &tm) == NULL)
+    {
+    warn("unable to parse last-modified string [%s]", dateString);
+    }
+else
+    {
+    time_t t;
+    // convert to UTC (GMT) time
+    t = mktimeFromUtc(&tm);
+    if (t == -1)
+	{
+	warn("mktimeFromUtc failed while converting last-modified string to UTC [%s]", dateString);
+	}
+    else
+	{
+	// update the file mtime
+	struct utimbuf ut;
+	struct stat mystat;
+	ZeroVar(&mystat);
+	if (stat(outTemp,&mystat)==0)
+	    {
+	    ut.actime = mystat.st_atime;
+	    ut.modtime = t;
+	    if (utime(outTemp, &ut)==-1)
+		{
+		char errMsg[256];
+                safef(errMsg, sizeof(errMsg), "paraFetch: error setting modification time of %s to %s\n", outTemp, dateString);
+		perror(errMsg);
+		}
+	    }
+	}
+    }
+
+/* rename the successful download to the original name */
+rename(outTemp, origPath);
+
+
+
+if (progress)
+    {
+    while (star <= starMax)
+	{
+	printf("*");fflush(stdout);
+	++star;
+	}
+    long timeDiff = (long)(time(NULL) - startTime);
+    if (timeDiff > 0)
+	{
+	printf(" %ld seconds", timeDiff);
+	float mbpersec =  ((totalDownloaded - restartTotalDownloaded)/1000000) / timeDiff;
+	printf(" %0.1f MB/sec", mbpersec);
+	}
+    printf("\n");fflush(stdout);
+    }
+
+if (fileSize != -1 && totalDownloaded != fileSize)
+    {
+    warn("Unexpected result: Total downloaded bytes %lld is not equal to fileSize %lld"
+	, (long long) totalDownloaded
+	, (long long) fileSize);
+    return FALSE;
+    }
+return TRUE;
+}
+
+