1438fb82056f2d3626472057bef4502438b3f985 galt Thu Jun 24 04:06:38 2010 -0700 initial paraFetch work - the fast multiple connection parallel downloader diff --git src/lib/net.c src/lib/net.c index d30587e..cff8122 100644 --- src/lib/net.c +++ src/lib/net.c @@ -7,6 +7,7 @@ #include #include #include +#include #include "internet.h" #include "errabort.h" #include "hash.h" @@ -1261,6 +1262,254 @@ } +struct parallelConn +/* struct to information on a parallel connection */ + { + struct parallelConn *next; /* next connection */ + char *url; /* full url including byterange */ + int sd; /* socket descriptor */ + off_t rangeStart; /* where does the range start */ + off_t partSize; /* range size */ + off_t received; /* bytes received */ + }; + +boolean parallelFetch(char *url, int numConnections, char *outPath) +/* Open multiple parallel connections to URL to speed downloading */ +{ +/* get the size of the file to be downloaded */ +off_t fileSize = 0; +if (startsWith("http://",url) || startsWith("https://",url)) + { + struct hash *hash = newHash(0); + int status = netUrlHead(url, hash); + if (status != 200) // && status != 302 && status != 301) + { + warn("Error code: %d, expected 200 for %s, can't proceed, sorry", status, url); + return FALSE; + } + char *sizeString = hashFindValUpperCase(hash, "Content-Length:"); + if (sizeString == NULL) + { + hashFree(&hash); + warn("No Content-Length: returned in header for %s, can't proceed, sorry", url); + return FALSE; + } + fileSize = atoll(sizeString); + hashFree(&hash); + } +else + { + long long size = 0; + time_t t; + boolean ok = netGetFtpInfo(url, &size, &t); + if (!ok) + { + warn("Unable to get size info from FTP for %s, can't proceed, sorry", url); + return FALSE; + } + fileSize = size; + } + +verbose(2,"debug fileSize=%llu\n", (unsigned long long) fileSize); //debug + +if (fileSize < 65536) /* special case small file */ + numConnections = 1; + +if (numConnections > 50) /* ignore high values for numConnections */ + numConnections = 50; + +verbose(2,"debug numConnections=%d\n", numConnections); //debug + +/* what is the size of each part */ +off_t partSize = (fileSize + numConnections -1) / numConnections; +off_t base = 0; +int c; + +verbose(2,"debug partSize=%llu\n", (unsigned long long) partSize); //debug + + + +/* n is the highest-numbered descriptor */ +int n = 0; +int connOpen = 0; + +/* make a list of connections and open them */ +struct parallelConn *pcList = NULL, *pc; +for (c = 0; c < numConnections; ++c) + { + AllocVar(pc); + pc->next = NULL; + pc->rangeStart = base; + base += partSize; + pc->partSize = partSize; + if (pc->rangeStart+pc->partSize >= fileSize) + pc->partSize = fileSize - pc->rangeStart; + pc->received = 0; + char urlExt[1024]; + safef(urlExt, sizeof(urlExt), "%s;byterange=%llu-%llu" + , url + , (unsigned long long) pc->rangeStart + , (unsigned long long) pc->rangeStart + pc->partSize - 1 ); + + verbose(2,"debug opening url %s\n", urlExt); //debug + + pc->sd = netUrlOpen(urlExt); + if (pc->sd < 0) + { + warn("Couldn't open %s", url); + return FALSE; + } + char *newUrl = NULL; + int newSd = 0; + if (startsWith("http://",url) || startsWith("https://",url)) + { + if (!netSkipHttpHeaderLinesHandlingRedirect(pc->sd, url, &newSd, &newUrl)) + { + warn("Error processing http response for %s", url); + return FALSE; + } + if (newUrl) + { + /* Update sd with newSd, replace it with newUrl, etc. */ + pc->sd = newSd; + freeMem(newUrl); /* redirects with byterange do not work anyway */ + } + } + if (pc->sd > n) + n = pc->sd; + ++connOpen; + slAddHead(&pcList, pc); + } +slReverse(&pcList); + +int out = open(outPath, O_CREAT|O_WRONLY, 0664); +if (out < 0) + { + warn("Unable to open %s for write while downloading %s, can't proceed, sorry", url, outPath); + return FALSE; + } + + +/* descriptors for select() */ +fd_set rfds; +struct timeval tv; +int retval; + +verbose(2,"debug: connOpen = %d\n", connOpen); //debug +verbose(2,"debug: n+1 = %d (select)\n", n+1); //debug + +ssize_t readCount = 0; +#define BUFSIZE 65536 * 4 +char buf[BUFSIZE]; + +#define SELTIMEOUT 5 +while (connOpen > 0) + { + + FD_ZERO(&rfds); + n = 0; + for(pc = pcList; pc; pc = pc->next) + { + if (pc->sd != -1) + { + FD_SET(pc->sd, &rfds); /* reset descriptor in readfds for select() */ + if (pc->sd > n) + n = pc->sd; + } + } + + + /* Wait up to SELTIMEOUT seconds. */ + tv.tv_sec = SELTIMEOUT; + tv.tv_usec = 0; + retval = select(n+1, &rfds, NULL, NULL, &tv); + /* Don't rely on the value of tv now! */ + + if (retval < 0) + { + perror("select retval < 0"); + return FALSE; + } + else if (retval) + { + + verbose(2,"returned from select, retval=%d\n", retval); //debug + + for(pc = pcList; pc; pc = pc->next) + { + if ((pc->sd != -1) && FD_ISSET(pc->sd, &rfds)) + { + + verbose(2,"debug found a descriptor with data: %d\n", pc->sd); //debug + + readCount = read(pc->sd, buf, BUFSIZE); + + verbose(2,"debug readCount = %lld\n", (long long) readCount); //debug + + if (readCount == 0) + { + close(pc->sd); + + verbose(2,"debug closing descriptor: %d\n", pc->sd); //debug + pc->sd = -1; + + if (pc->received != pc->partSize) + { + warn("error expected to read %llu, actually read %llu bytes for url %s" + , (unsigned long long) pc->partSize + , (unsigned long long) pc->received + , url); + return FALSE; + } + --connOpen; + continue; // DEBUG DOES THIS HELP? + } + if (readCount < 0) + { + warn("error reading from socket for url %s", url); + return FALSE; + } + + verbose(2,"rangeStart %llu received %llu\n" + , (unsigned long long) pc->rangeStart + , (unsigned long long) pc->received ); + + verbose(2,"debug seeking to %llu\n", (unsigned long long) pc->rangeStart + pc->received); //debug + + if (lseek(out, pc->rangeStart + pc->received, SEEK_SET) == -1) + { + perror("error seeking output file"); + warn("error seeking output file %s: rangeStart %llu received %llu for url %s" + , outPath + , (unsigned long long) pc->rangeStart + , (unsigned long long) pc->received + , url); + return FALSE; + } + int writeCount = write(out, buf, readCount); + if (writeCount < 0) + { + warn("error writing output file %s", outPath); + return FALSE; + } + pc->received += readCount; + } + } + } + else + { + warn("No data within %d seconds for %s", SELTIMEOUT, url); + return FALSE; + } + + } + +close(out); + +return TRUE; +} + + struct lineFile *netLineFileOpen(char *url) /* Return a lineFile attached to url. This one * will skip any headers. Free this with