c344bba4bb978e02da57ad7be117699ee5d61255 kent Tue May 7 08:09:07 2013 -0700 Moving parallelFetch to a new module. diff --git src/lib/paraFetch.c src/lib/paraFetch.c new file mode 100644 index 0000000..bf7dbf9 --- /dev/null +++ src/lib/paraFetch.c @@ -0,0 +1,659 @@ +/* paraFetch - fetch things from remote URLs in parallel. */ + +#include <utime.h> +#include "common.h" +#include "internet.h" +#include "errabort.h" +#include "hash.h" +#include "linefile.h" +#include "net.h" +#include "https.h" +#include "sqlNum.h" +#include "obscure.h" + + +struct parallelConn +/* struct to information on a parallel connection */ + { + struct parallelConn *next; /* next connection */ + int sd; /* socket descriptor */ + off_t rangeStart; /* where does the range start */ + off_t partSize; /* range size */ + off_t received; /* bytes received */ + }; + +void writeParaFetchStatus(char *origPath, struct parallelConn *pcList, char *url, off_t fileSize, char *dateString, boolean isFinal) +/* Write a status file. + * This has two purposes. + * First, we can use it to resume a failed transfer. + * Second, we can use it to follow progress */ +{ +char outTempX[1024]; +char outTemp[1024]; +safef(outTempX, sizeof(outTempX), "%s.paraFetchStatusX", origPath); +safef(outTemp, sizeof(outTemp), "%s.paraFetchStatus", origPath); +struct parallelConn *pc = NULL; + +FILE *f = mustOpen(outTempX, "w"); +int part = 0; +fprintf(f, "%s\n", url); +fprintf(f, "%lld\n", (long long)fileSize); +fprintf(f, "%s\n", dateString); +for(pc = pcList; pc; pc = pc->next) + { + fprintf(f, "part%d %lld %lld %lld\n", part + , (long long)pc->rangeStart + , (long long)pc->partSize + , (long long)pc->received); + ++part; + } + +carefulClose(&f); + +/* rename the successful status to the original name */ +rename(outTempX, outTemp); + +if (isFinal) /* We are done and just looking to get rid of the file. */ + unlink(outTemp); +} + + +boolean readParaFetchStatus(char *origPath, + struct parallelConn **pPcList, char **pUrl, off_t *pFileSize, char **pDateString, off_t *pTotalDownloaded) +/* Write a status file. + * This has two purposes. + * First, we can use it to resume a failed transfer. + * Second, we can use it to follow progress */ +{ +char outTemp[1024]; +char outStat[1024]; +safef(outStat, sizeof(outStat), "%s.paraFetchStatus", origPath); +safef(outTemp, sizeof(outTemp), "%s.paraFetch", origPath); +struct parallelConn *pcList = NULL, *pc = NULL; +off_t totalDownloaded = 0; + +if (!fileExists(outStat)) + { + unlink(outTemp); + return FALSE; + } + +if (!fileExists(outTemp)) + { + unlink(outStat); + return FALSE; + } + +char *line, *word; +struct lineFile *lf = lineFileOpen(outStat, TRUE); +if (!lineFileNext(lf, &line, NULL)) + { + unlink(outTemp); + unlink(outStat); + return FALSE; + } +char *url = cloneString(line); +if (!lineFileNext(lf, &line, NULL)) + { + unlink(outTemp); + unlink(outStat); + return FALSE; + } +off_t fileSize = sqlLongLong(line); +if (!lineFileNext(lf, &line, NULL)) + { + unlink(outTemp); + unlink(outStat); + return FALSE; + } +char *dateString = cloneString(line); +while (lineFileNext(lf, &line, NULL)) + { + word = nextWord(&line); + AllocVar(pc); + pc->next = NULL; + pc->sd = -4; /* no connection tried yet */ + word = nextWord(&line); + pc->rangeStart = sqlLongLong(word); + word = nextWord(&line); + pc->partSize = sqlLongLong(word); + word = nextWord(&line); + pc->received = sqlLongLong(word); + if (pc->received == pc->partSize) + pc->sd = -1; /* part all done already */ + totalDownloaded += pc->received; + slAddHead(&pcList, pc); + } +slReverse(&pcList); + +lineFileClose(&lf); + +if (slCount(pcList) < 1) + { + unlink(outTemp); + unlink(outStat); + return FALSE; + } + +*pPcList = pcList; +*pUrl = url; +*pFileSize = fileSize; +*pDateString = dateString; +*pTotalDownloaded = totalDownloaded; + +return TRUE; + +} + + +boolean parallelFetch(char *url, char *outPath, int numConnections, int numRetries, boolean newer, boolean progress) +/* Open multiple parallel connections to URL to speed downloading */ +{ +char *origPath = outPath; +char outTemp[1024]; +safef(outTemp, sizeof(outTemp), "%s.paraFetch", outPath); +outPath = outTemp; +/* get the size of the file to be downloaded */ +off_t fileSize = 0; +off_t totalDownloaded = 0; +ssize_t sinceLastStatus = 0; +char *dateString = ""; +int star = 1; +int starMax = 20; +int starStep = 1; +// TODO handle case-sensitivity of protocols input +if (startsWith("http://",url) || startsWith("https://",url)) + { + struct hash *hash = newHash(0); + int status = netUrlHead(url, hash); + if (status != 200) // && status != 302 && status != 301) + { + warn("Error code: %d, expected 200 for %s, can't proceed, sorry", status, url); + return FALSE; + } + char *sizeString = hashFindValUpperCase(hash, "Content-Length:"); + if (sizeString) + { + fileSize = atoll(sizeString); + } + else + { + warn("No Content-Length: returned in header for %s, must limit to a single connection, will not know if data is complete", url); + numConnections = 1; + fileSize = -1; + } + char *ds = hashFindValUpperCase(hash, "Last-Modified:"); + if (ds) + dateString = cloneString(ds); + hashFree(&hash); + } +else if (startsWith("ftp://",url)) + { + long long size = 0; + time_t t; + boolean ok = netGetFtpInfo(url, &size, &t); + if (!ok) + { + warn("Unable to get size info from FTP for %s, can't proceed, sorry", url); + return FALSE; + } + fileSize = size; + + struct tm *ts; + char ftpTime[80]; + + /* Format the time "Tue, 15 Jun 2010 06:45:08 GMT" */ + ts = localtime(&t); + strftime(ftpTime, sizeof(ftpTime), "%a, %d %b %Y %H:%M:%S %Z", ts); + dateString = cloneString(ftpTime); + + } +else + { + warn("unrecognized protocol: %s", url); + return FALSE; + } + + +verbose(2,"fileSize=%lld\n", (long long) fileSize); + +if (fileSize < 65536) /* special case small file */ + numConnections = 1; + +if (numConnections > 50) /* ignore high values for numConnections */ + { + warn("Currently maximum number of connections is 50. You requested %d. Will proceed with 50 on %s", numConnections, url); + numConnections = 50; + } + +verbose(2,"numConnections=%d\n", numConnections); //debug + +if (numConnections < 1) + { + warn("number of connections must be greater than 0 for %s, can't proceed, sorry", url); + return FALSE; + } + +if (numRetries < 0) + numRetries = 0; + +/* what is the size of each part */ +off_t partSize = (fileSize + numConnections -1) / numConnections; +if (fileSize == -1) + partSize = -1; +off_t base = 0; +int c; + +verbose(2,"partSize=%lld\n", (long long) partSize); //debug + + +/* n is the highest-numbered descriptor */ +int n = 0; +int connOpen = 0; +int reOpen = 0; + + +struct parallelConn *restartPcList = NULL; +char *restartUrl = NULL; +off_t restartFileSize = 0; +char *restartDateString = ""; +off_t restartTotalDownloaded = 0; +boolean restartable = readParaFetchStatus(origPath, &restartPcList, &restartUrl, &restartFileSize, &restartDateString, &restartTotalDownloaded); +if (fileSize == -1) + restartable = FALSE; + +struct parallelConn *pcList = NULL, *pc; + +if (restartable + && sameString(url, restartUrl) + && fileSize == restartFileSize + && sameString(dateString, restartDateString)) + { + pcList = restartPcList; + totalDownloaded = restartTotalDownloaded; + } +else + { + + if (newer) // only download it if it is newer than what we already have + { + /* datestamp mtime from last-modified header */ + struct tm tm; + // Last-Modified: Wed, 15 Nov 1995 04:58:08 GMT + // These strings are always GMT + if (strptime(dateString, "%a, %d %b %Y %H:%M:%S %Z", &tm) == NULL) + { + warn("unable to parse last-modified string [%s]", dateString); + } + else + { + time_t t; + // convert to UTC (GMT) time + t = mktimeFromUtc(&tm); + if (t == -1) + { + warn("mktimeFromUtc failed while converting last-modified string to UTC [%s]", dateString); + } + else + { + // get the file mtime + struct stat mystat; + ZeroVar(&mystat); + if (stat(origPath,&mystat)==0) + { + if (t <= mystat.st_mtime) + { + verbose(2,"Since nothing newer was found, skipping %s\n", origPath); + verbose(3,"t from last-modified = %ld; st_mtime = %ld\n", (long) t, (long)mystat.st_mtime); + return TRUE; + } + } + } + } + } + + /* make a list of connections */ + for (c = 0; c < numConnections; ++c) + { + AllocVar(pc); + pc->next = NULL; + pc->rangeStart = base; + base += partSize; + pc->partSize = partSize; + if (fileSize != -1 && pc->rangeStart+pc->partSize >= fileSize) + pc->partSize = fileSize - pc->rangeStart; + pc->received = 0; + pc->sd = -4; /* no connection tried yet */ + slAddHead(&pcList, pc); + } + slReverse(&pcList); + } + +if (progress) + { + char nicenumber[1024]=""; + sprintWithGreekByte(nicenumber, sizeof(nicenumber), fileSize); + printf("downloading %s ", nicenumber); fflush(stdout); + starStep = fileSize/starMax; + if (starStep < 1) + starStep = 1; + } + +int out = open(outPath, O_CREAT|O_WRONLY, 0664); +if (out < 0) + { + warn("Unable to open %s for write while downloading %s, can't proceed, sorry", url, outPath); + return FALSE; + } + + +/* descriptors for select() */ +fd_set rfds; +struct timeval tv; +int retval; + +ssize_t readCount = 0; +#define BUFSIZE 65536 * 4 +char buf[BUFSIZE]; + +/* create paraFetchStatus right away for monitoring programs */ +writeParaFetchStatus(origPath, pcList, url, fileSize, dateString, FALSE); +sinceLastStatus = 0; + +int retryCount = 0; + +time_t startTime = time(NULL); + +#define SELTIMEOUT 5 +#define RETRYSLEEPTIME 30 +while (TRUE) + { + + verbose(2,"Top of big loop\n"); + + if (progress) + { + while (totalDownloaded >= star * starStep) + { + printf("*");fflush(stdout); + ++star; + } + } + + /* are we done? */ + if (connOpen == 0) + { + boolean done = TRUE; + for(pc = pcList; pc; pc = pc->next) + if (pc->sd != -1) + done = FALSE; + if (done) break; + } + + /* See if we need to open any connections, either new or retries */ + for(pc = pcList; pc; pc = pc->next) + { + if ((pc->sd == -4) /* never even tried to open yet */ + || ((reOpen>0) /* some connections have been released */ + && (pc->sd == -2 /* started but not finished */ + || pc->sd == -3))) /* not even started */ + { + char urlExt[1024]; + safef(urlExt, sizeof(urlExt), "%s;byterange=%llu-%llu" + , url + , (unsigned long long) (pc->rangeStart + pc->received) + , (unsigned long long) (pc->rangeStart + pc->partSize - 1) ); + + + int oldSd = pc->sd; /* in case we need to remember where we were */ + if (oldSd != -4) /* decrement whether we succeed or not */ + --reOpen; + if (oldSd == -4) + oldSd = -3; /* ok this one just changes */ + if (fileSize == -1) + { + verbose(2,"opening url %s\n", url); + pc->sd = netUrlOpen(url); + } + else + { + verbose(2,"opening url %s\n", urlExt); + pc->sd = netUrlOpen(urlExt); + } + if (pc->sd < 0) + { + pc->sd = oldSd; /* failed to open, can retry later */ + } + else + { + char *newUrl = NULL; + int newSd = 0; + if (startsWith("http://",url) || startsWith("https://",url)) + { + if (!netSkipHttpHeaderLinesHandlingRedirect(pc->sd, urlExt, &newSd, &newUrl)) + { + warn("Error processing http response for %s", url); + return FALSE; + } + if (newUrl) + { + /* Update sd with newSd, replace it with newUrl, etc. */ + pc->sd = newSd; + } + } + ++connOpen; + } + } + } + + + if (connOpen == 0) + { + warn("Unable to open any connections to download %s, can't proceed, sorry", url); + return FALSE; + } + + + FD_ZERO(&rfds); + n = 0; + for(pc = pcList; pc; pc = pc->next) + { + if (pc->sd >= 0) + { + FD_SET(pc->sd, &rfds); /* reset descriptor in readfds for select() */ + if (pc->sd > n) + n = pc->sd; + } + } + + + /* Wait up to SELTIMEOUT seconds. */ + tv.tv_sec = SELTIMEOUT; + tv.tv_usec = 0; + retval = select(n+1, &rfds, NULL, NULL, &tv); + /* Don't rely on the value of tv now! */ + + if (retval < 0) + { + perror("select retval < 0"); + return FALSE; + } + else if (retval) + { + + verbose(2,"returned from select, retval=%d\n", retval); + + for(pc = pcList; pc; pc = pc->next) + { + if ((pc->sd >= 0) && FD_ISSET(pc->sd, &rfds)) + { + + verbose(2,"found a descriptor with data: %d\n", pc->sd); + + readCount = read(pc->sd, buf, BUFSIZE); + + verbose(2,"readCount = %lld\n", (long long) readCount); + + if (readCount == 0) + { + close(pc->sd); + + verbose(2,"closing descriptor: %d\n", pc->sd); + pc->sd = -1; + + if (fileSize != -1 && pc->received != pc->partSize) + { + pc->sd = -2; /* conn was closed before all data was sent, can retry later */ + return FALSE; + } + --connOpen; + ++reOpen; + writeParaFetchStatus(origPath, pcList, url, fileSize, dateString, FALSE); + sinceLastStatus = 0; + continue; + } + if (readCount < 0) + { + warn("error reading from socket for url %s", url); + return FALSE; + } + + verbose(2,"rangeStart %llu received %llu\n" + , (unsigned long long) pc->rangeStart + , (unsigned long long) pc->received ); + + verbose(2,"seeking to %llu\n", (unsigned long long) (pc->rangeStart + pc->received)); + + if (lseek(out, pc->rangeStart + pc->received, SEEK_SET) == -1) + { + perror("error seeking output file"); + warn("error seeking output file %s: rangeStart %llu received %llu for url %s" + , outPath + , (unsigned long long) pc->rangeStart + , (unsigned long long) pc->received + , url); + return FALSE; + } + int writeCount = write(out, buf, readCount); + if (writeCount < 0) + { + warn("error writing output file %s", outPath); + return FALSE; + } + pc->received += readCount; + totalDownloaded += readCount; + sinceLastStatus += readCount; + if (sinceLastStatus >= 100*1024*1024) + { + writeParaFetchStatus(origPath, pcList, url, fileSize, dateString, FALSE); + sinceLastStatus = 0; + } + } + } + } + else + { + warn("No data within %d seconds for %s", SELTIMEOUT, url); + /* Retry ? */ + if (retryCount >= numRetries) + { + return FALSE; + } + else + { + ++retryCount; + /* close any open connections */ + for(pc = pcList; pc; pc = pc->next) + { + if (pc->sd >= 0) + { + close(pc->sd); + verbose(2,"closing descriptor: %d\n", pc->sd); + } + if (pc->sd != -1) + pc->sd = -4; + } + connOpen = 0; + reOpen = 0; + /* sleep for a while, maybe the server will recover */ + sleep(RETRYSLEEPTIME); + } + } + + } + +close(out); + +/* delete the status file - by passing TRUE */ +writeParaFetchStatus(origPath, pcList, url, fileSize, dateString, TRUE); + +/* restore original file datestamp mtime from last-modified header */ +struct tm tm; +// Last-Modified: Wed, 15 Nov 1995 04:58:08 GMT +// These strings are always GMT +if (strptime(dateString, "%a, %d %b %Y %H:%M:%S %Z", &tm) == NULL) + { + warn("unable to parse last-modified string [%s]", dateString); + } +else + { + time_t t; + // convert to UTC (GMT) time + t = mktimeFromUtc(&tm); + if (t == -1) + { + warn("mktimeFromUtc failed while converting last-modified string to UTC [%s]", dateString); + } + else + { + // update the file mtime + struct utimbuf ut; + struct stat mystat; + ZeroVar(&mystat); + if (stat(outTemp,&mystat)==0) + { + ut.actime = mystat.st_atime; + ut.modtime = t; + if (utime(outTemp, &ut)==-1) + { + char errMsg[256]; + safef(errMsg, sizeof(errMsg), "paraFetch: error setting modification time of %s to %s\n", outTemp, dateString); + perror(errMsg); + } + } + } + } + +/* rename the successful download to the original name */ +rename(outTemp, origPath); + + + +if (progress) + { + while (star <= starMax) + { + printf("*");fflush(stdout); + ++star; + } + long timeDiff = (long)(time(NULL) - startTime); + if (timeDiff > 0) + { + printf(" %ld seconds", timeDiff); + float mbpersec = ((totalDownloaded - restartTotalDownloaded)/1000000) / timeDiff; + printf(" %0.1f MB/sec", mbpersec); + } + printf("\n");fflush(stdout); + } + +if (fileSize != -1 && totalDownloaded != fileSize) + { + warn("Unexpected result: Total downloaded bytes %lld is not equal to fileSize %lld" + , (long long) totalDownloaded + , (long long) fileSize); + return FALSE; + } +return TRUE; +} + +