1438fb82056f2d3626472057bef4502438b3f985
galt
  Thu Jun 24 04:06:38 2010 -0700
initial paraFetch work - the fast multiple connection parallel downloader
diff --git src/lib/net.c src/lib/net.c
index d30587e..cff8122 100644
--- src/lib/net.c
+++ src/lib/net.c
@@ -7,6 +7,7 @@
 #include <signal.h>
 #include <errno.h>
 #include <string.h>
+#include <sys/time.h>
 #include "internet.h"
 #include "errabort.h"
 #include "hash.h"
@@ -1261,6 +1262,254 @@
 }
 
 
+struct parallelConn
+/* struct to information on a parallel connection */
+    {
+    struct parallelConn *next;  /* next connection */
+    char *url;                  /* full url including byterange */
+    int sd;                     /* socket descriptor */
+    off_t rangeStart;           /* where does the range start */
+    off_t partSize;             /* range size */
+    off_t received;             /* bytes received */
+    };
+
+boolean parallelFetch(char *url, int numConnections, char *outPath)
+/* Open multiple parallel connections to URL to speed downloading */
+{
+/* get the size of the file to be downloaded */
+off_t fileSize = 0;
+if (startsWith("http://",url) || startsWith("https://",url))
+    {
+    struct hash *hash = newHash(0);
+    int status = netUrlHead(url, hash);
+    if (status != 200) // && status != 302 && status != 301)
+	{
+	warn("Error code: %d, expected 200 for %s, can't proceed, sorry", status, url);
+	return FALSE;
+	}
+    char *sizeString = hashFindValUpperCase(hash, "Content-Length:");
+    if (sizeString == NULL)
+	{
+	hashFree(&hash);
+	warn("No Content-Length: returned in header for %s, can't proceed, sorry", url);
+	return FALSE;
+	}
+    fileSize = atoll(sizeString);
+    hashFree(&hash);
+    }
+else
+    {
+    long long size = 0;
+    time_t t;
+    boolean ok = netGetFtpInfo(url, &size, &t);
+    if (!ok)
+	{
+	warn("Unable to get size info from FTP for %s, can't proceed, sorry", url);
+	return FALSE;
+	}
+    fileSize = size;
+    }
+
+verbose(2,"debug fileSize=%llu\n", (unsigned long long) fileSize); //debug
+
+if (fileSize < 65536)    /* special case small file */
+    numConnections = 1;
+
+if (numConnections > 50)    /* ignore high values for numConnections */
+    numConnections = 50;
+
+verbose(2,"debug numConnections=%d\n", numConnections); //debug
+
+/* what is the size of each part */
+off_t partSize = (fileSize + numConnections -1) / numConnections;
+off_t base = 0;
+int c;
+
+verbose(2,"debug partSize=%llu\n", (unsigned long long) partSize); //debug
+
+
+
+/* n is the highest-numbered descriptor */
+int n = 0;
+int connOpen = 0;
+
+/* make a list of connections and open them */
+struct parallelConn *pcList = NULL, *pc;
+for (c = 0; c < numConnections; ++c)
+    {
+    AllocVar(pc);
+    pc->next = NULL;
+    pc->rangeStart = base;
+    base += partSize;
+    pc->partSize = partSize;
+    if (pc->rangeStart+pc->partSize >= fileSize)
+	pc->partSize = fileSize - pc->rangeStart;
+    pc->received = 0;
+    char urlExt[1024];
+    safef(urlExt, sizeof(urlExt), "%s;byterange=%llu-%llu"
+    , url
+    , (unsigned long long) pc->rangeStart
+    , (unsigned long long) pc->rangeStart + pc->partSize - 1 );
+
+    verbose(2,"debug opening url %s\n", urlExt); //debug
+
+    pc->sd = netUrlOpen(urlExt);
+    if (pc->sd < 0)
+	{
+	warn("Couldn't open %s", url);
+	return FALSE;
+	}
+    char *newUrl = NULL;
+    int newSd = 0;
+    if (startsWith("http://",url) || startsWith("https://",url))
+	{
+	if (!netSkipHttpHeaderLinesHandlingRedirect(pc->sd, url, &newSd, &newUrl))
+	    {
+	    warn("Error processing http response for %s", url);
+	    return FALSE;
+	    }
+	if (newUrl) 
+	    {
+	    /*  Update sd with newSd, replace it with newUrl, etc. */
+	    pc->sd = newSd;
+	    freeMem(newUrl);  /* redirects with byterange do not work anyway */
+	    }
+	}
+    if (pc->sd > n)
+	n = pc->sd;
+    ++connOpen;
+    slAddHead(&pcList, pc);
+    }
+slReverse(&pcList);
+
+int out = open(outPath, O_CREAT|O_WRONLY, 0664);
+if (out < 0)
+    {
+    warn("Unable to open %s for write while downloading %s, can't proceed, sorry", url, outPath);
+    return FALSE;
+    }
+
+
+/* descriptors for select() */
+fd_set rfds;
+struct timeval tv;
+int retval;
+
+verbose(2,"debug: connOpen = %d\n", connOpen); //debug
+verbose(2,"debug: n+1 = %d (select)\n", n+1); //debug
+
+ssize_t readCount = 0;
+#define BUFSIZE 65536 * 4
+char buf[BUFSIZE];
+
+#define SELTIMEOUT 5
+while (connOpen > 0)
+    {
+
+    FD_ZERO(&rfds);
+    n = 0;
+    for(pc = pcList; pc; pc = pc->next)
+	{
+	if (pc->sd != -1)
+	    {
+	    FD_SET(pc->sd, &rfds);    /* reset descriptor in readfds for select() */
+	    if (pc->sd > n)
+		n = pc->sd;
+	    }
+	}
+
+
+    /* Wait up to SELTIMEOUT seconds. */
+    tv.tv_sec = SELTIMEOUT;
+    tv.tv_usec = 0;
+    retval = select(n+1, &rfds, NULL, NULL, &tv);
+    /* Don't rely on the value of tv now! */
+
+    if (retval < 0)
+	{
+	perror("select retval < 0");
+	return FALSE;
+	}
+    else if (retval)
+	{
+
+	verbose(2,"returned from select, retval=%d\n", retval); //debug
+
+	for(pc = pcList; pc; pc = pc->next)
+	    {
+	    if ((pc->sd != -1) && FD_ISSET(pc->sd, &rfds))
+		{
+
+		verbose(2,"debug found a descriptor with data: %d\n", pc->sd); //debug
+
+		readCount = read(pc->sd, buf, BUFSIZE);
+
+		verbose(2,"debug readCount = %lld\n", (long long) readCount); //debug
+
+		if (readCount == 0)
+		    {
+		    close(pc->sd);
+
+		    verbose(2,"debug closing descriptor: %d\n", pc->sd); //debug
+		    pc->sd = -1;
+
+		    if (pc->received != pc->partSize)	
+			{
+			warn("error expected to read %llu, actually read %llu bytes for url %s"
+			    , (unsigned long long) pc->partSize
+			    , (unsigned long long) pc->received
+			    , url);
+			return FALSE;
+			}
+		    --connOpen;
+		    continue;     // DEBUG DOES THIS HELP?  
+		    }
+		if (readCount < 0)
+		    {
+		    warn("error reading from socket for url %s", url);
+		    return FALSE;
+		    }
+
+		verbose(2,"rangeStart %llu  received %llu\n"
+			, (unsigned long long) pc->rangeStart
+			, (unsigned long long) pc->received );
+
+		verbose(2,"debug seeking to %llu\n", (unsigned long long) pc->rangeStart + pc->received); //debug
+
+		if (lseek(out, pc->rangeStart + pc->received, SEEK_SET) == -1)
+		    {
+		    perror("error seeking output file");
+		    warn("error seeking output file %s: rangeStart %llu  received %llu for url %s"
+			, outPath
+			, (unsigned long long) pc->rangeStart
+			, (unsigned long long) pc->received
+			, url);
+		    return FALSE;
+		    }
+		int writeCount = write(out, buf, readCount);
+		if (writeCount < 0)
+		    {
+		    warn("error writing output file %s", outPath);
+		    return FALSE;
+		    }
+		pc->received += readCount;
+		}
+	    }
+	}
+    else
+	{
+	warn("No data within %d seconds for %s", SELTIMEOUT, url);
+	return FALSE;
+	}
+
+    }
+
+close(out);
+
+return TRUE;
+}
+
+
 struct lineFile *netLineFileOpen(char *url)
 /* Return a lineFile attached to url.  This one
  * will skip any headers.   Free this with