c344bba4bb978e02da57ad7be117699ee5d61255
kent
  Tue May 7 08:09:07 2013 -0700
Moving parallelFetch to a new module.
diff --git src/lib/net.c src/lib/net.c
index 7b25511..2804ac4 100644
--- src/lib/net.c
+++ src/lib/net.c
@@ -1,26 +1,25 @@
 /* net.c some stuff to wrap around net communications. 
  *
  * This file is copyright 2002 Jim Kent, but license is hereby
  * granted for all use - public, private or commercial. */
 
 #include "common.h"
 #include <signal.h>
 #include <errno.h>
 #include <string.h>
 #include <sys/time.h>
-#include <utime.h>
 #include <pthread.h>
 #include "internet.h"
 #include "errabort.h"
 #include "hash.h"
 #include "net.h"
 #include "linefile.h"
 #include "base64.h"
 #include "cheapcgi.h"
 #include "https.h"
 #include "sqlNum.h"
 #include "obscure.h"
 
 /* Brought errno in to get more useful error messages */
 extern int errno;
 
@@ -1665,676 +1664,30 @@
 }
 
 char *netReadTextFileIfExists(char *url)
 /* Read entire URL and return it as a string.  URL should be text (embedded zeros will be
  * interpreted as end of string).  If the url doesn't exist or has other problems,
  * returns NULL. */
 {
 struct lineFile *lf = netLineFileSilentOpen(url);
 if (lf == NULL)
     return NULL;
 char *text = lineFileReadAll(lf);
 lineFileClose(&lf);
 return text;
 }
 
-
-struct parallelConn
-/* struct to information on a parallel connection */
-    {
-    struct parallelConn *next;  /* next connection */
-    int sd;                     /* socket descriptor */
-    off_t rangeStart;           /* where does the range start */
-    off_t partSize;             /* range size */
-    off_t received;             /* bytes received */
-    };
-
-void writeParaFetchStatus(char *origPath, struct parallelConn *pcList, char *url, off_t fileSize, char *dateString, boolean isFinal)
-/* Write a status file.
- * This has two purposes.
- * First, we can use it to resume a failed transfer.
- * Second, we can use it to follow progress */
-{
-char outTempX[1024];
-char outTemp[1024];
-safef(outTempX, sizeof(outTempX), "%s.paraFetchStatusX", origPath);
-safef(outTemp, sizeof(outTemp), "%s.paraFetchStatus", origPath);
-struct parallelConn *pc = NULL;
-
-FILE *f = mustOpen(outTempX, "w");
-int part = 0;
-fprintf(f, "%s\n", url);
-fprintf(f, "%lld\n", (long long)fileSize);
-fprintf(f, "%s\n", dateString);
-for(pc = pcList; pc; pc = pc->next)
-    {
-    fprintf(f, "part%d %lld %lld %lld\n", part
-	, (long long)pc->rangeStart
-	, (long long)pc->partSize
-	, (long long)pc->received);
-    ++part;
-    }
-
-carefulClose(&f);
-
-/* rename the successful status to the original name */
-rename(outTempX, outTemp);
-
-if (isFinal)  /* We are done and just looking to get rid of the file. */
-    unlink(outTemp);
-}
-
-
-boolean readParaFetchStatus(char *origPath, 
-    struct parallelConn **pPcList, char **pUrl, off_t *pFileSize, char **pDateString, off_t *pTotalDownloaded)
-/* Write a status file.
- * This has two purposes.
- * First, we can use it to resume a failed transfer.
- * Second, we can use it to follow progress */
-{
-char outTemp[1024];
-char outStat[1024];
-safef(outStat, sizeof(outStat), "%s.paraFetchStatus", origPath);
-safef(outTemp, sizeof(outTemp), "%s.paraFetch", origPath);
-struct parallelConn *pcList = NULL, *pc = NULL;
-off_t totalDownloaded = 0;
-
-if (!fileExists(outStat))
-    {
-    unlink(outTemp);
-    return FALSE;
-    }
-
-if (!fileExists(outTemp))
-    {
-    unlink(outStat);
-    return FALSE;
-    }
-
-char *line, *word;
-struct lineFile *lf = lineFileOpen(outStat, TRUE);
-if (!lineFileNext(lf, &line, NULL))
-    {
-    unlink(outTemp);
-    unlink(outStat);
-    return FALSE;
-    }
-char *url = cloneString(line);
-if (!lineFileNext(lf, &line, NULL))
-    {
-    unlink(outTemp);
-    unlink(outStat);
-    return FALSE;
-    }
-off_t fileSize = sqlLongLong(line);
-if (!lineFileNext(lf, &line, NULL))
-    {
-    unlink(outTemp);
-    unlink(outStat);
-    return FALSE;
-    }
-char *dateString = cloneString(line);
-while (lineFileNext(lf, &line, NULL))
-    {
-    word = nextWord(&line);
-    AllocVar(pc);
-    pc->next = NULL;
-    pc->sd = -4;  /* no connection tried yet */
-    word = nextWord(&line);
-    pc->rangeStart = sqlLongLong(word);
-    word = nextWord(&line);
-    pc->partSize = sqlLongLong(word);
-    word = nextWord(&line);
-    pc->received = sqlLongLong(word);
-    if (pc->received == pc->partSize)
-	pc->sd = -1;  /* part all done already */
-    totalDownloaded += pc->received;
-    slAddHead(&pcList, pc);
-    }
-slReverse(&pcList);
-
-lineFileClose(&lf);
-
-if (slCount(pcList) < 1)
-    {
-    unlink(outTemp);
-    unlink(outStat);
-    return FALSE;
-    }
-
-*pPcList = pcList;
-*pUrl = url;
-*pFileSize = fileSize;
-*pDateString = dateString;
-*pTotalDownloaded = totalDownloaded;
-
-return TRUE;
-
-}
-
-
-boolean parallelFetch(char *url, char *outPath, int numConnections, int numRetries, boolean newer, boolean progress)
-/* Open multiple parallel connections to URL to speed downloading */
-{
-char *origPath = outPath;
-char outTemp[1024];
-safef(outTemp, sizeof(outTemp), "%s.paraFetch", outPath);
-outPath = outTemp;
-/* get the size of the file to be downloaded */
-off_t fileSize = 0;
-off_t totalDownloaded = 0;
-ssize_t sinceLastStatus = 0;
-char *dateString = "";
-int star = 1;  
-int starMax = 20;  
-int starStep = 1;
-// TODO handle case-sensitivity of protocols input
-if (startsWith("http://",url) || startsWith("https://",url))
-    {
-    struct hash *hash = newHash(0);
-    int status = netUrlHead(url, hash);
-    if (status != 200) // && status != 302 && status != 301)
-	{
-	warn("Error code: %d, expected 200 for %s, can't proceed, sorry", status, url);
-	return FALSE;
-	}
-    char *sizeString = hashFindValUpperCase(hash, "Content-Length:");
-    if (sizeString)
-	{
-	fileSize = atoll(sizeString);
-	}
-    else
-	{
-	warn("No Content-Length: returned in header for %s, must limit to a single connection, will not know if data is complete", url);
-	numConnections = 1;
-	fileSize = -1;
-	}
-    char *ds = hashFindValUpperCase(hash, "Last-Modified:");
-    if (ds)
-	dateString = cloneString(ds);
-    hashFree(&hash);
-    }
-else if (startsWith("ftp://",url))
-    {
-    long long size = 0;
-    time_t t;
-    boolean ok = netGetFtpInfo(url, &size, &t);
-    if (!ok)
-	{
-	warn("Unable to get size info from FTP for %s, can't proceed, sorry", url);
-	return FALSE;
-	}
-    fileSize = size;
-
-    struct tm  *ts;
-    char ftpTime[80];
- 
-    /* Format the time "Tue, 15 Jun 2010 06:45:08 GMT" */
-    ts = localtime(&t);
-    strftime(ftpTime, sizeof(ftpTime), "%a, %d %b %Y %H:%M:%S %Z", ts);
-    dateString = cloneString(ftpTime);
-
-    }
-else
-    {
-    warn("unrecognized protocol: %s", url);
-    return FALSE;
-    }
-
-
-verbose(2,"fileSize=%lld\n", (long long) fileSize);
-
-if (fileSize < 65536)    /* special case small file */
-    numConnections = 1;
-
-if (numConnections > 50)    /* ignore high values for numConnections */
-    {
-    warn("Currently maximum number of connections is 50. You requested %d. Will proceed with 50 on %s", numConnections, url);
-    numConnections = 50;
-    }
-
-verbose(2,"numConnections=%d\n", numConnections); //debug
-
-if (numConnections < 1)
-    {
-    warn("number of connections must be greater than 0 for %s, can't proceed, sorry", url);
-    return FALSE;
-    }
-
-if (numRetries < 0)
-    numRetries = 0;
-
-/* what is the size of each part */
-off_t partSize = (fileSize + numConnections -1) / numConnections;
-if (fileSize == -1) 
-    partSize = -1;
-off_t base = 0;
-int c;
-
-verbose(2,"partSize=%lld\n", (long long) partSize); //debug
-
-
-/* n is the highest-numbered descriptor */
-int n = 0;
-int connOpen = 0;
-int reOpen = 0;
-
-
-struct parallelConn *restartPcList = NULL;
-char *restartUrl = NULL;
-off_t restartFileSize = 0;
-char *restartDateString = "";
-off_t restartTotalDownloaded = 0;
-boolean restartable = readParaFetchStatus(origPath, &restartPcList, &restartUrl, &restartFileSize, &restartDateString, &restartTotalDownloaded);
-if (fileSize == -1)
-    restartable = FALSE;
-
-struct parallelConn *pcList = NULL, *pc;
-
-if (restartable 
- && sameString(url, restartUrl)
- && fileSize == restartFileSize
- && sameString(dateString, restartDateString))
-    {
-    pcList = restartPcList;
-    totalDownloaded = restartTotalDownloaded;
-    }
-else
-    {
-
-    if (newer) // only download it if it is newer than what we already have
-	{
-	/* datestamp mtime from last-modified header */
-	struct tm tm;
-	// Last-Modified: Wed, 15 Nov 1995 04:58:08 GMT
-	// These strings are always GMT
-	if (strptime(dateString, "%a, %d %b %Y %H:%M:%S %Z", &tm) == NULL)
-	    {
-	    warn("unable to parse last-modified string [%s]", dateString);
-	    }
-	else
-	    {
-	    time_t t;
-	    // convert to UTC (GMT) time
-	    t = mktimeFromUtc(&tm);
-	    if (t == -1)
-		{
-		warn("mktimeFromUtc failed while converting last-modified string to UTC [%s]", dateString);
-		}
-	    else
-		{
-		// get the file mtime
-		struct stat mystat;
-		ZeroVar(&mystat);
-		if (stat(origPath,&mystat)==0)
-		    {
-		    if (t <= mystat.st_mtime)
-			{
-			verbose(2,"Since nothing newer was found, skipping %s\n", origPath);
-			verbose(3,"t from last-modified = %ld; st_mtime = %ld\n", (long) t, (long)mystat.st_mtime);
-			return TRUE;
-			}
-		    }
-		}
-	    }
-	}
-
-    /* make a list of connections */
-    for (c = 0; c < numConnections; ++c)
-	{
-	AllocVar(pc);
-	pc->next = NULL;
-	pc->rangeStart = base;
-	base += partSize;
-	pc->partSize = partSize;
-	if (fileSize != -1 && pc->rangeStart+pc->partSize >= fileSize)
-	    pc->partSize = fileSize - pc->rangeStart;
-	pc->received = 0;
-	pc->sd = -4;  /* no connection tried yet */
-	slAddHead(&pcList, pc);
-	}
-    slReverse(&pcList);
-    }
-
-if (progress)
-    {
-    char nicenumber[1024]="";
-    sprintWithGreekByte(nicenumber, sizeof(nicenumber), fileSize);
-    printf("downloading %s ", nicenumber); fflush(stdout);
-    starStep = fileSize/starMax;
-    if (starStep < 1)
-	starStep = 1;
-    }
-
-int out = open(outPath, O_CREAT|O_WRONLY, 0664);
-if (out < 0)
-    {
-    warn("Unable to open %s for write while downloading %s, can't proceed, sorry", url, outPath);
-    return FALSE;
-    }
-
-
-/* descriptors for select() */
-fd_set rfds;
-struct timeval tv;
-int retval;
-
-ssize_t readCount = 0;
-#define BUFSIZE 65536 * 4
-char buf[BUFSIZE];
-
-/* create paraFetchStatus right away for monitoring programs */
-writeParaFetchStatus(origPath, pcList, url, fileSize, dateString, FALSE);
-sinceLastStatus = 0;
-
-int retryCount = 0;
-
-time_t startTime = time(NULL);
-
-#define SELTIMEOUT 5
-#define RETRYSLEEPTIME 30    
-while (TRUE)
-    {
-
-    verbose(2,"Top of big loop\n");
-
-    if (progress)
-	{
-	while (totalDownloaded >= star * starStep)
-	    {
-	    printf("*");fflush(stdout);
-	    ++star;
-	    }
-	}
-
-    /* are we done? */
-    if (connOpen == 0)
-	{
-	boolean done = TRUE;
-	for(pc = pcList; pc; pc = pc->next)
-	    if (pc->sd != -1)
-		done = FALSE;
-	if (done) break;
-	}
-
-    /* See if we need to open any connections, either new or retries */
-    for(pc = pcList; pc; pc = pc->next)
-	{
-	if ((pc->sd == -4)  /* never even tried to open yet */
-	 || ((reOpen>0)      /* some connections have been released */
-	    && (pc->sd == -2  /* started but not finished */
-	    ||  pc->sd == -3)))  /* not even started */
-	    {
-	    char urlExt[1024];
-	    safef(urlExt, sizeof(urlExt), "%s;byterange=%llu-%llu"
-	    , url
-	    , (unsigned long long) (pc->rangeStart + pc->received)
-	    , (unsigned long long) (pc->rangeStart + pc->partSize - 1) );
-
-
-	    int oldSd = pc->sd;  /* in case we need to remember where we were */
-	    if (oldSd != -4)      /* decrement whether we succeed or not */
-		--reOpen;
-	    if (oldSd == -4) 
-		oldSd = -3;       /* ok this one just changes */
-	    if (fileSize == -1)
-		{
-		verbose(2,"opening url %s\n", url);
-		pc->sd = netUrlOpen(url);
-		}
-	    else
-		{
-		verbose(2,"opening url %s\n", urlExt);
-		pc->sd = netUrlOpen(urlExt);
-		}
-	    if (pc->sd < 0)
-		{
-		pc->sd = oldSd;  /* failed to open, can retry later */
-		}
-	    else
-		{
-		char *newUrl = NULL;
-		int newSd = 0;
-		if (startsWith("http://",url) || startsWith("https://",url))
-		    {
-		    if (!netSkipHttpHeaderLinesHandlingRedirect(pc->sd, urlExt, &newSd, &newUrl))
-			{
-			warn("Error processing http response for %s", url);
-			return FALSE;
-			}
-		    if (newUrl) 
-			{
-			/*  Update sd with newSd, replace it with newUrl, etc. */
-			pc->sd = newSd;
-			}
-		    }
-		++connOpen;
-		}
-	    }
-	}
-
-
-    if (connOpen == 0)
-	{
-	warn("Unable to open any connections to download %s, can't proceed, sorry", url);
-	return FALSE;
-	}
-
-
-    FD_ZERO(&rfds);
-    n = 0;
-    for(pc = pcList; pc; pc = pc->next)
-	{
-	if (pc->sd >= 0)
-	    {
-	    FD_SET(pc->sd, &rfds);    /* reset descriptor in readfds for select() */
-	    if (pc->sd > n)
-		n = pc->sd;
-	    }
-	}
-
-
-    /* Wait up to SELTIMEOUT seconds. */
-    tv.tv_sec = SELTIMEOUT;
-    tv.tv_usec = 0;
-    retval = select(n+1, &rfds, NULL, NULL, &tv);
-    /* Don't rely on the value of tv now! */
-
-    if (retval < 0)
-	{
-	perror("select retval < 0");
-	return FALSE;
-	}
-    else if (retval)
-	{
-
-	verbose(2,"returned from select, retval=%d\n", retval);
-
-	for(pc = pcList; pc; pc = pc->next)
-	    {
-	    if ((pc->sd >= 0) && FD_ISSET(pc->sd, &rfds))
-		{
-
-		verbose(2,"found a descriptor with data: %d\n", pc->sd);
-
-		readCount = read(pc->sd, buf, BUFSIZE);
-
-		verbose(2,"readCount = %lld\n", (long long) readCount);
-
-		if (readCount == 0)
-		    {
-		    close(pc->sd);
-
-		    verbose(2,"closing descriptor: %d\n", pc->sd);
-		    pc->sd = -1;
-
-		    if (fileSize != -1 && pc->received != pc->partSize)	
-			{
-			pc->sd = -2;  /* conn was closed before all data was sent, can retry later */
-			return FALSE;
-			}
-		    --connOpen;
-		    ++reOpen;
-		    writeParaFetchStatus(origPath, pcList, url, fileSize, dateString, FALSE);
-		    sinceLastStatus = 0;
-		    continue; 
-		    }
-		if (readCount < 0)
-		    {
-		    warn("error reading from socket for url %s", url);
-		    return FALSE;
-		    }
-
-		verbose(2,"rangeStart %llu  received %llu\n"
-			, (unsigned long long) pc->rangeStart
-			, (unsigned long long) pc->received );
-
-		verbose(2,"seeking to %llu\n", (unsigned long long) (pc->rangeStart + pc->received));
-
-		if (lseek(out, pc->rangeStart + pc->received, SEEK_SET) == -1)
-		    {
-		    perror("error seeking output file");
-		    warn("error seeking output file %s: rangeStart %llu  received %llu for url %s"
-			, outPath
-			, (unsigned long long) pc->rangeStart
-			, (unsigned long long) pc->received
-			, url);
-		    return FALSE;
-		    }
-		int writeCount = write(out, buf, readCount);
-		if (writeCount < 0)
-		    {
-		    warn("error writing output file %s", outPath);
-		    return FALSE;
-		    }
-		pc->received += readCount;
-		totalDownloaded += readCount;
-		sinceLastStatus += readCount;
-		if (sinceLastStatus >= 100*1024*1024)
-		    {
-		    writeParaFetchStatus(origPath, pcList, url, fileSize, dateString, FALSE);
-		    sinceLastStatus = 0;
-		    }
-		}
-	    }
-	}
-    else
-	{
-	warn("No data within %d seconds for %s", SELTIMEOUT, url);
-	/* Retry ? */
-	if (retryCount >= numRetries)
-	    {
-    	    return FALSE;
-	    }
-	else
-	    {
-	    ++retryCount;
-	    /* close any open connections */
-	    for(pc = pcList; pc; pc = pc->next)
-		{
-		if (pc->sd >= 0) 
-		    {
-		    close(pc->sd);
-		    verbose(2,"closing descriptor: %d\n", pc->sd);
-		    }
-		if (pc->sd != -1) 
-		    pc->sd = -4;
-		}
-	    connOpen = 0;
-	    reOpen = 0;
-	    /* sleep for a while, maybe the server will recover */
-	    sleep(RETRYSLEEPTIME);
-	    }
-	}
-
-    }
-
-close(out);
-
-/* delete the status file - by passing TRUE */
-writeParaFetchStatus(origPath, pcList, url, fileSize, dateString, TRUE); 
-
-/* restore original file datestamp mtime from last-modified header */
-struct tm tm;
-// Last-Modified: Wed, 15 Nov 1995 04:58:08 GMT
-// These strings are always GMT
-if (strptime(dateString, "%a, %d %b %Y %H:%M:%S %Z", &tm) == NULL)
-    {
-    warn("unable to parse last-modified string [%s]", dateString);
-    }
-else
-    {
-    time_t t;
-    // convert to UTC (GMT) time
-    t = mktimeFromUtc(&tm);
-    if (t == -1)
-	{
-	warn("mktimeFromUtc failed while converting last-modified string to UTC [%s]", dateString);
-	}
-    else
-	{
-	// update the file mtime
-	struct utimbuf ut;
-	struct stat mystat;
-	ZeroVar(&mystat);
-	if (stat(outTemp,&mystat)==0)
-	    {
-	    ut.actime = mystat.st_atime;
-	    ut.modtime = t;
-	    if (utime(outTemp, &ut)==-1)
-		{
-		char errMsg[256];
-                safef(errMsg, sizeof(errMsg), "paraFetch: error setting modification time of %s to %s\n", outTemp, dateString);
-		perror(errMsg);
-		}
-	    }
-	}
-    }
-
-/* rename the successful download to the original name */
-rename(outTemp, origPath);
-
-
-
-if (progress)
-    {
-    while (star <= starMax)
-	{
-	printf("*");fflush(stdout);
-	++star;
-	}
-    long timeDiff = (long)(time(NULL) - startTime);
-    if (timeDiff > 0)
-	{
-	printf(" %ld seconds", timeDiff);
-	float mbpersec =  ((totalDownloaded - restartTotalDownloaded)/1000000) / timeDiff;
-	printf(" %0.1f MB/sec", mbpersec);
-	}
-    printf("\n");fflush(stdout);
-    }
-
-if (fileSize != -1 && totalDownloaded != fileSize)
-    {
-    warn("Unexpected result: Total downloaded bytes %lld is not equal to fileSize %lld"
-	, (long long) totalDownloaded
-	, (long long) fileSize);
-    return FALSE;
-    }
-return TRUE;
-}
-
-
 struct lineFile *netLineFileOpen(char *url)
 /* Return a lineFile attached to url.  This one
  * will skip any headers.   Free this with
  * lineFileClose(). */
 {
 struct lineFile *lf = netLineFileMayOpen(url);
 if (lf == NULL)
     noWarnAbort();
 return lf;
 }
 
 boolean netSendString(int sd, char *s)
 /* Send a string down a socket - length byte first. */
 {
 int length = strlen(s);