c344bba4bb978e02da57ad7be117699ee5d61255 kent Tue May 7 08:09:07 2013 -0700 Moving parallelFetch to a new module. diff --git src/lib/net.c src/lib/net.c index 7b25511..2804ac4 100644 --- src/lib/net.c +++ src/lib/net.c @@ -1,26 +1,25 @@ /* net.c some stuff to wrap around net communications. * * This file is copyright 2002 Jim Kent, but license is hereby * granted for all use - public, private or commercial. */ #include "common.h" #include <signal.h> #include <errno.h> #include <string.h> #include <sys/time.h> -#include <utime.h> #include <pthread.h> #include "internet.h" #include "errabort.h" #include "hash.h" #include "net.h" #include "linefile.h" #include "base64.h" #include "cheapcgi.h" #include "https.h" #include "sqlNum.h" #include "obscure.h" /* Brought errno in to get more useful error messages */ extern int errno; @@ -1665,676 +1664,30 @@ } char *netReadTextFileIfExists(char *url) /* Read entire URL and return it as a string. URL should be text (embedded zeros will be * interpreted as end of string). If the url doesn't exist or has other problems, * returns NULL. */ { struct lineFile *lf = netLineFileSilentOpen(url); if (lf == NULL) return NULL; char *text = lineFileReadAll(lf); lineFileClose(&lf); return text; } - -struct parallelConn -/* struct to information on a parallel connection */ - { - struct parallelConn *next; /* next connection */ - int sd; /* socket descriptor */ - off_t rangeStart; /* where does the range start */ - off_t partSize; /* range size */ - off_t received; /* bytes received */ - }; - -void writeParaFetchStatus(char *origPath, struct parallelConn *pcList, char *url, off_t fileSize, char *dateString, boolean isFinal) -/* Write a status file. - * This has two purposes. - * First, we can use it to resume a failed transfer. - * Second, we can use it to follow progress */ -{ -char outTempX[1024]; -char outTemp[1024]; -safef(outTempX, sizeof(outTempX), "%s.paraFetchStatusX", origPath); -safef(outTemp, sizeof(outTemp), "%s.paraFetchStatus", origPath); -struct parallelConn *pc = NULL; - -FILE *f = mustOpen(outTempX, "w"); -int part = 0; -fprintf(f, "%s\n", url); -fprintf(f, "%lld\n", (long long)fileSize); -fprintf(f, "%s\n", dateString); -for(pc = pcList; pc; pc = pc->next) - { - fprintf(f, "part%d %lld %lld %lld\n", part - , (long long)pc->rangeStart - , (long long)pc->partSize - , (long long)pc->received); - ++part; - } - -carefulClose(&f); - -/* rename the successful status to the original name */ -rename(outTempX, outTemp); - -if (isFinal) /* We are done and just looking to get rid of the file. */ - unlink(outTemp); -} - - -boolean readParaFetchStatus(char *origPath, - struct parallelConn **pPcList, char **pUrl, off_t *pFileSize, char **pDateString, off_t *pTotalDownloaded) -/* Write a status file. - * This has two purposes. - * First, we can use it to resume a failed transfer. - * Second, we can use it to follow progress */ -{ -char outTemp[1024]; -char outStat[1024]; -safef(outStat, sizeof(outStat), "%s.paraFetchStatus", origPath); -safef(outTemp, sizeof(outTemp), "%s.paraFetch", origPath); -struct parallelConn *pcList = NULL, *pc = NULL; -off_t totalDownloaded = 0; - -if (!fileExists(outStat)) - { - unlink(outTemp); - return FALSE; - } - -if (!fileExists(outTemp)) - { - unlink(outStat); - return FALSE; - } - -char *line, *word; -struct lineFile *lf = lineFileOpen(outStat, TRUE); -if (!lineFileNext(lf, &line, NULL)) - { - unlink(outTemp); - unlink(outStat); - return FALSE; - } -char *url = cloneString(line); -if (!lineFileNext(lf, &line, NULL)) - { - unlink(outTemp); - unlink(outStat); - return FALSE; - } -off_t fileSize = sqlLongLong(line); -if (!lineFileNext(lf, &line, NULL)) - { - unlink(outTemp); - unlink(outStat); - return FALSE; - } -char *dateString = cloneString(line); -while (lineFileNext(lf, &line, NULL)) - { - word = nextWord(&line); - AllocVar(pc); - pc->next = NULL; - pc->sd = -4; /* no connection tried yet */ - word = nextWord(&line); - pc->rangeStart = sqlLongLong(word); - word = nextWord(&line); - pc->partSize = sqlLongLong(word); - word = nextWord(&line); - pc->received = sqlLongLong(word); - if (pc->received == pc->partSize) - pc->sd = -1; /* part all done already */ - totalDownloaded += pc->received; - slAddHead(&pcList, pc); - } -slReverse(&pcList); - -lineFileClose(&lf); - -if (slCount(pcList) < 1) - { - unlink(outTemp); - unlink(outStat); - return FALSE; - } - -*pPcList = pcList; -*pUrl = url; -*pFileSize = fileSize; -*pDateString = dateString; -*pTotalDownloaded = totalDownloaded; - -return TRUE; - -} - - -boolean parallelFetch(char *url, char *outPath, int numConnections, int numRetries, boolean newer, boolean progress) -/* Open multiple parallel connections to URL to speed downloading */ -{ -char *origPath = outPath; -char outTemp[1024]; -safef(outTemp, sizeof(outTemp), "%s.paraFetch", outPath); -outPath = outTemp; -/* get the size of the file to be downloaded */ -off_t fileSize = 0; -off_t totalDownloaded = 0; -ssize_t sinceLastStatus = 0; -char *dateString = ""; -int star = 1; -int starMax = 20; -int starStep = 1; -// TODO handle case-sensitivity of protocols input -if (startsWith("http://",url) || startsWith("https://",url)) - { - struct hash *hash = newHash(0); - int status = netUrlHead(url, hash); - if (status != 200) // && status != 302 && status != 301) - { - warn("Error code: %d, expected 200 for %s, can't proceed, sorry", status, url); - return FALSE; - } - char *sizeString = hashFindValUpperCase(hash, "Content-Length:"); - if (sizeString) - { - fileSize = atoll(sizeString); - } - else - { - warn("No Content-Length: returned in header for %s, must limit to a single connection, will not know if data is complete", url); - numConnections = 1; - fileSize = -1; - } - char *ds = hashFindValUpperCase(hash, "Last-Modified:"); - if (ds) - dateString = cloneString(ds); - hashFree(&hash); - } -else if (startsWith("ftp://",url)) - { - long long size = 0; - time_t t; - boolean ok = netGetFtpInfo(url, &size, &t); - if (!ok) - { - warn("Unable to get size info from FTP for %s, can't proceed, sorry", url); - return FALSE; - } - fileSize = size; - - struct tm *ts; - char ftpTime[80]; - - /* Format the time "Tue, 15 Jun 2010 06:45:08 GMT" */ - ts = localtime(&t); - strftime(ftpTime, sizeof(ftpTime), "%a, %d %b %Y %H:%M:%S %Z", ts); - dateString = cloneString(ftpTime); - - } -else - { - warn("unrecognized protocol: %s", url); - return FALSE; - } - - -verbose(2,"fileSize=%lld\n", (long long) fileSize); - -if (fileSize < 65536) /* special case small file */ - numConnections = 1; - -if (numConnections > 50) /* ignore high values for numConnections */ - { - warn("Currently maximum number of connections is 50. You requested %d. Will proceed with 50 on %s", numConnections, url); - numConnections = 50; - } - -verbose(2,"numConnections=%d\n", numConnections); //debug - -if (numConnections < 1) - { - warn("number of connections must be greater than 0 for %s, can't proceed, sorry", url); - return FALSE; - } - -if (numRetries < 0) - numRetries = 0; - -/* what is the size of each part */ -off_t partSize = (fileSize + numConnections -1) / numConnections; -if (fileSize == -1) - partSize = -1; -off_t base = 0; -int c; - -verbose(2,"partSize=%lld\n", (long long) partSize); //debug - - -/* n is the highest-numbered descriptor */ -int n = 0; -int connOpen = 0; -int reOpen = 0; - - -struct parallelConn *restartPcList = NULL; -char *restartUrl = NULL; -off_t restartFileSize = 0; -char *restartDateString = ""; -off_t restartTotalDownloaded = 0; -boolean restartable = readParaFetchStatus(origPath, &restartPcList, &restartUrl, &restartFileSize, &restartDateString, &restartTotalDownloaded); -if (fileSize == -1) - restartable = FALSE; - -struct parallelConn *pcList = NULL, *pc; - -if (restartable - && sameString(url, restartUrl) - && fileSize == restartFileSize - && sameString(dateString, restartDateString)) - { - pcList = restartPcList; - totalDownloaded = restartTotalDownloaded; - } -else - { - - if (newer) // only download it if it is newer than what we already have - { - /* datestamp mtime from last-modified header */ - struct tm tm; - // Last-Modified: Wed, 15 Nov 1995 04:58:08 GMT - // These strings are always GMT - if (strptime(dateString, "%a, %d %b %Y %H:%M:%S %Z", &tm) == NULL) - { - warn("unable to parse last-modified string [%s]", dateString); - } - else - { - time_t t; - // convert to UTC (GMT) time - t = mktimeFromUtc(&tm); - if (t == -1) - { - warn("mktimeFromUtc failed while converting last-modified string to UTC [%s]", dateString); - } - else - { - // get the file mtime - struct stat mystat; - ZeroVar(&mystat); - if (stat(origPath,&mystat)==0) - { - if (t <= mystat.st_mtime) - { - verbose(2,"Since nothing newer was found, skipping %s\n", origPath); - verbose(3,"t from last-modified = %ld; st_mtime = %ld\n", (long) t, (long)mystat.st_mtime); - return TRUE; - } - } - } - } - } - - /* make a list of connections */ - for (c = 0; c < numConnections; ++c) - { - AllocVar(pc); - pc->next = NULL; - pc->rangeStart = base; - base += partSize; - pc->partSize = partSize; - if (fileSize != -1 && pc->rangeStart+pc->partSize >= fileSize) - pc->partSize = fileSize - pc->rangeStart; - pc->received = 0; - pc->sd = -4; /* no connection tried yet */ - slAddHead(&pcList, pc); - } - slReverse(&pcList); - } - -if (progress) - { - char nicenumber[1024]=""; - sprintWithGreekByte(nicenumber, sizeof(nicenumber), fileSize); - printf("downloading %s ", nicenumber); fflush(stdout); - starStep = fileSize/starMax; - if (starStep < 1) - starStep = 1; - } - -int out = open(outPath, O_CREAT|O_WRONLY, 0664); -if (out < 0) - { - warn("Unable to open %s for write while downloading %s, can't proceed, sorry", url, outPath); - return FALSE; - } - - -/* descriptors for select() */ -fd_set rfds; -struct timeval tv; -int retval; - -ssize_t readCount = 0; -#define BUFSIZE 65536 * 4 -char buf[BUFSIZE]; - -/* create paraFetchStatus right away for monitoring programs */ -writeParaFetchStatus(origPath, pcList, url, fileSize, dateString, FALSE); -sinceLastStatus = 0; - -int retryCount = 0; - -time_t startTime = time(NULL); - -#define SELTIMEOUT 5 -#define RETRYSLEEPTIME 30 -while (TRUE) - { - - verbose(2,"Top of big loop\n"); - - if (progress) - { - while (totalDownloaded >= star * starStep) - { - printf("*");fflush(stdout); - ++star; - } - } - - /* are we done? */ - if (connOpen == 0) - { - boolean done = TRUE; - for(pc = pcList; pc; pc = pc->next) - if (pc->sd != -1) - done = FALSE; - if (done) break; - } - - /* See if we need to open any connections, either new or retries */ - for(pc = pcList; pc; pc = pc->next) - { - if ((pc->sd == -4) /* never even tried to open yet */ - || ((reOpen>0) /* some connections have been released */ - && (pc->sd == -2 /* started but not finished */ - || pc->sd == -3))) /* not even started */ - { - char urlExt[1024]; - safef(urlExt, sizeof(urlExt), "%s;byterange=%llu-%llu" - , url - , (unsigned long long) (pc->rangeStart + pc->received) - , (unsigned long long) (pc->rangeStart + pc->partSize - 1) ); - - - int oldSd = pc->sd; /* in case we need to remember where we were */ - if (oldSd != -4) /* decrement whether we succeed or not */ - --reOpen; - if (oldSd == -4) - oldSd = -3; /* ok this one just changes */ - if (fileSize == -1) - { - verbose(2,"opening url %s\n", url); - pc->sd = netUrlOpen(url); - } - else - { - verbose(2,"opening url %s\n", urlExt); - pc->sd = netUrlOpen(urlExt); - } - if (pc->sd < 0) - { - pc->sd = oldSd; /* failed to open, can retry later */ - } - else - { - char *newUrl = NULL; - int newSd = 0; - if (startsWith("http://",url) || startsWith("https://",url)) - { - if (!netSkipHttpHeaderLinesHandlingRedirect(pc->sd, urlExt, &newSd, &newUrl)) - { - warn("Error processing http response for %s", url); - return FALSE; - } - if (newUrl) - { - /* Update sd with newSd, replace it with newUrl, etc. */ - pc->sd = newSd; - } - } - ++connOpen; - } - } - } - - - if (connOpen == 0) - { - warn("Unable to open any connections to download %s, can't proceed, sorry", url); - return FALSE; - } - - - FD_ZERO(&rfds); - n = 0; - for(pc = pcList; pc; pc = pc->next) - { - if (pc->sd >= 0) - { - FD_SET(pc->sd, &rfds); /* reset descriptor in readfds for select() */ - if (pc->sd > n) - n = pc->sd; - } - } - - - /* Wait up to SELTIMEOUT seconds. */ - tv.tv_sec = SELTIMEOUT; - tv.tv_usec = 0; - retval = select(n+1, &rfds, NULL, NULL, &tv); - /* Don't rely on the value of tv now! */ - - if (retval < 0) - { - perror("select retval < 0"); - return FALSE; - } - else if (retval) - { - - verbose(2,"returned from select, retval=%d\n", retval); - - for(pc = pcList; pc; pc = pc->next) - { - if ((pc->sd >= 0) && FD_ISSET(pc->sd, &rfds)) - { - - verbose(2,"found a descriptor with data: %d\n", pc->sd); - - readCount = read(pc->sd, buf, BUFSIZE); - - verbose(2,"readCount = %lld\n", (long long) readCount); - - if (readCount == 0) - { - close(pc->sd); - - verbose(2,"closing descriptor: %d\n", pc->sd); - pc->sd = -1; - - if (fileSize != -1 && pc->received != pc->partSize) - { - pc->sd = -2; /* conn was closed before all data was sent, can retry later */ - return FALSE; - } - --connOpen; - ++reOpen; - writeParaFetchStatus(origPath, pcList, url, fileSize, dateString, FALSE); - sinceLastStatus = 0; - continue; - } - if (readCount < 0) - { - warn("error reading from socket for url %s", url); - return FALSE; - } - - verbose(2,"rangeStart %llu received %llu\n" - , (unsigned long long) pc->rangeStart - , (unsigned long long) pc->received ); - - verbose(2,"seeking to %llu\n", (unsigned long long) (pc->rangeStart + pc->received)); - - if (lseek(out, pc->rangeStart + pc->received, SEEK_SET) == -1) - { - perror("error seeking output file"); - warn("error seeking output file %s: rangeStart %llu received %llu for url %s" - , outPath - , (unsigned long long) pc->rangeStart - , (unsigned long long) pc->received - , url); - return FALSE; - } - int writeCount = write(out, buf, readCount); - if (writeCount < 0) - { - warn("error writing output file %s", outPath); - return FALSE; - } - pc->received += readCount; - totalDownloaded += readCount; - sinceLastStatus += readCount; - if (sinceLastStatus >= 100*1024*1024) - { - writeParaFetchStatus(origPath, pcList, url, fileSize, dateString, FALSE); - sinceLastStatus = 0; - } - } - } - } - else - { - warn("No data within %d seconds for %s", SELTIMEOUT, url); - /* Retry ? */ - if (retryCount >= numRetries) - { - return FALSE; - } - else - { - ++retryCount; - /* close any open connections */ - for(pc = pcList; pc; pc = pc->next) - { - if (pc->sd >= 0) - { - close(pc->sd); - verbose(2,"closing descriptor: %d\n", pc->sd); - } - if (pc->sd != -1) - pc->sd = -4; - } - connOpen = 0; - reOpen = 0; - /* sleep for a while, maybe the server will recover */ - sleep(RETRYSLEEPTIME); - } - } - - } - -close(out); - -/* delete the status file - by passing TRUE */ -writeParaFetchStatus(origPath, pcList, url, fileSize, dateString, TRUE); - -/* restore original file datestamp mtime from last-modified header */ -struct tm tm; -// Last-Modified: Wed, 15 Nov 1995 04:58:08 GMT -// These strings are always GMT -if (strptime(dateString, "%a, %d %b %Y %H:%M:%S %Z", &tm) == NULL) - { - warn("unable to parse last-modified string [%s]", dateString); - } -else - { - time_t t; - // convert to UTC (GMT) time - t = mktimeFromUtc(&tm); - if (t == -1) - { - warn("mktimeFromUtc failed while converting last-modified string to UTC [%s]", dateString); - } - else - { - // update the file mtime - struct utimbuf ut; - struct stat mystat; - ZeroVar(&mystat); - if (stat(outTemp,&mystat)==0) - { - ut.actime = mystat.st_atime; - ut.modtime = t; - if (utime(outTemp, &ut)==-1) - { - char errMsg[256]; - safef(errMsg, sizeof(errMsg), "paraFetch: error setting modification time of %s to %s\n", outTemp, dateString); - perror(errMsg); - } - } - } - } - -/* rename the successful download to the original name */ -rename(outTemp, origPath); - - - -if (progress) - { - while (star <= starMax) - { - printf("*");fflush(stdout); - ++star; - } - long timeDiff = (long)(time(NULL) - startTime); - if (timeDiff > 0) - { - printf(" %ld seconds", timeDiff); - float mbpersec = ((totalDownloaded - restartTotalDownloaded)/1000000) / timeDiff; - printf(" %0.1f MB/sec", mbpersec); - } - printf("\n");fflush(stdout); - } - -if (fileSize != -1 && totalDownloaded != fileSize) - { - warn("Unexpected result: Total downloaded bytes %lld is not equal to fileSize %lld" - , (long long) totalDownloaded - , (long long) fileSize); - return FALSE; - } -return TRUE; -} - - struct lineFile *netLineFileOpen(char *url) /* Return a lineFile attached to url. This one * will skip any headers. Free this with * lineFileClose(). */ { struct lineFile *lf = netLineFileMayOpen(url); if (lf == NULL) noWarnAbort(); return lf; } boolean netSendString(int sd, char *s) /* Send a string down a socket - length byte first. */ { int length = strlen(s);