cc56de033eb246075c4fcebfe14b5727492cc647 kent Tue May 7 08:25:59 2013 -0700 Improving commenting and making pointer return values optional. diff --git src/lib/paraFetch.c src/lib/paraFetch.c index bf7dbf9..f98b5de 100644 --- src/lib/paraFetch.c +++ src/lib/paraFetch.c @@ -1,40 +1,31 @@ /* paraFetch - fetch things from remote URLs in parallel. */ #include #include "common.h" #include "internet.h" #include "errabort.h" #include "hash.h" #include "linefile.h" #include "net.h" #include "https.h" #include "sqlNum.h" #include "obscure.h" -struct parallelConn -/* struct to information on a parallel connection */ - { - struct parallelConn *next; /* next connection */ - int sd; /* socket descriptor */ - off_t rangeStart; /* where does the range start */ - off_t partSize; /* range size */ - off_t received; /* bytes received */ - }; - -void writeParaFetchStatus(char *origPath, struct parallelConn *pcList, char *url, off_t fileSize, char *dateString, boolean isFinal) +static void paraFetchWriteStatus(char *origPath, struct parallelConn *pcList, + char *url, off_t fileSize, char *dateString, boolean isFinal) /* Write a status file. * This has two purposes. * First, we can use it to resume a failed transfer. * Second, we can use it to follow progress */ { char outTempX[1024]; char outTemp[1024]; safef(outTempX, sizeof(outTempX), "%s.paraFetchStatusX", origPath); safef(outTemp, sizeof(outTemp), "%s.paraFetchStatus", origPath); struct parallelConn *pc = NULL; FILE *f = mustOpen(outTempX, "w"); int part = 0; fprintf(f, "%s\n", url); fprintf(f, "%lld\n", (long long)fileSize); @@ -46,44 +37,45 @@ , (long long)pc->partSize , (long long)pc->received); ++part; } carefulClose(&f); /* rename the successful status to the original name */ rename(outTempX, outTemp); if (isFinal) /* We are done and just looking to get rid of the file. */ unlink(outTemp); } -boolean readParaFetchStatus(char *origPath, - struct parallelConn **pPcList, char **pUrl, off_t *pFileSize, char **pDateString, off_t *pTotalDownloaded) -/* Write a status file. - * This has two purposes. - * First, we can use it to resume a failed transfer. - * Second, we can use it to follow progress */ +boolean paraFetchReadStatus(char *origPath, + struct parallelConn **pPcList, char **pUrl, off_t *pFileSize, + char **pDateString, off_t *pTotalDownloaded) +/* Read a status file - which is just origPath plus .paraFetchStatus. This is updated during + * transit by parallelFetch. Returns FALSE if status file not there - possibly because + * transfer is finished. Any of the return parameters (pThis and pThat) may be NULL */ { char outTemp[1024]; char outStat[1024]; safef(outStat, sizeof(outStat), "%s.paraFetchStatus", origPath); safef(outTemp, sizeof(outTemp), "%s.paraFetch", origPath); struct parallelConn *pcList = NULL, *pc = NULL; off_t totalDownloaded = 0; +uglyf("paraFetchReadStatus on %s
\n", outStat); if (!fileExists(outStat)) { unlink(outTemp); return FALSE; } if (!fileExists(outTemp)) { unlink(outStat); return FALSE; } char *line, *word; struct lineFile *lf = lineFileOpen(outStat, TRUE); if (!lineFileNext(lf, &line, NULL)) @@ -123,38 +115,42 @@ pc->sd = -1; /* part all done already */ totalDownloaded += pc->received; slAddHead(&pcList, pc); } slReverse(&pcList); lineFileClose(&lf); if (slCount(pcList) < 1) { unlink(outTemp); unlink(outStat); return FALSE; } +if (pPcList != NULL) *pPcList = pcList; +if (pUrl != NULL) *pUrl = url; +if (pFileSize != NULL) *pFileSize = fileSize; +if (pDateString != NULL) *pDateString = dateString; +if (pTotalDownloaded != NULL) *pTotalDownloaded = totalDownloaded; return TRUE; - } boolean parallelFetch(char *url, char *outPath, int numConnections, int numRetries, boolean newer, boolean progress) /* Open multiple parallel connections to URL to speed downloading */ { char *origPath = outPath; char outTemp[1024]; safef(outTemp, sizeof(outTemp), "%s.paraFetch", outPath); outPath = outTemp; /* get the size of the file to be downloaded */ off_t fileSize = 0; off_t totalDownloaded = 0; ssize_t sinceLastStatus = 0; char *dateString = ""; @@ -246,31 +242,31 @@ verbose(2,"partSize=%lld\n", (long long) partSize); //debug /* n is the highest-numbered descriptor */ int n = 0; int connOpen = 0; int reOpen = 0; struct parallelConn *restartPcList = NULL; char *restartUrl = NULL; off_t restartFileSize = 0; char *restartDateString = ""; off_t restartTotalDownloaded = 0; -boolean restartable = readParaFetchStatus(origPath, &restartPcList, &restartUrl, &restartFileSize, &restartDateString, &restartTotalDownloaded); +boolean restartable = paraFetchReadStatus(origPath, &restartPcList, &restartUrl, &restartFileSize, &restartDateString, &restartTotalDownloaded); if (fileSize == -1) restartable = FALSE; struct parallelConn *pcList = NULL, *pc; if (restartable && sameString(url, restartUrl) && fileSize == restartFileSize && sameString(dateString, restartDateString)) { pcList = restartPcList; totalDownloaded = restartTotalDownloaded; } else { @@ -345,31 +341,31 @@ warn("Unable to open %s for write while downloading %s, can't proceed, sorry", url, outPath); return FALSE; } /* descriptors for select() */ fd_set rfds; struct timeval tv; int retval; ssize_t readCount = 0; #define BUFSIZE 65536 * 4 char buf[BUFSIZE]; /* create paraFetchStatus right away for monitoring programs */ -writeParaFetchStatus(origPath, pcList, url, fileSize, dateString, FALSE); +paraFetchWriteStatus(origPath, pcList, url, fileSize, dateString, FALSE); sinceLastStatus = 0; int retryCount = 0; time_t startTime = time(NULL); #define SELTIMEOUT 5 #define RETRYSLEEPTIME 30 while (TRUE) { verbose(2,"Top of big loop\n"); if (progress) { @@ -496,31 +492,31 @@ if (readCount == 0) { close(pc->sd); verbose(2,"closing descriptor: %d\n", pc->sd); pc->sd = -1; if (fileSize != -1 && pc->received != pc->partSize) { pc->sd = -2; /* conn was closed before all data was sent, can retry later */ return FALSE; } --connOpen; ++reOpen; - writeParaFetchStatus(origPath, pcList, url, fileSize, dateString, FALSE); + paraFetchWriteStatus(origPath, pcList, url, fileSize, dateString, FALSE); sinceLastStatus = 0; continue; } if (readCount < 0) { warn("error reading from socket for url %s", url); return FALSE; } verbose(2,"rangeStart %llu received %llu\n" , (unsigned long long) pc->rangeStart , (unsigned long long) pc->received ); verbose(2,"seeking to %llu\n", (unsigned long long) (pc->rangeStart + pc->received)); @@ -531,33 +527,33 @@ , outPath , (unsigned long long) pc->rangeStart , (unsigned long long) pc->received , url); return FALSE; } int writeCount = write(out, buf, readCount); if (writeCount < 0) { warn("error writing output file %s", outPath); return FALSE; } pc->received += readCount; totalDownloaded += readCount; sinceLastStatus += readCount; - if (sinceLastStatus >= 100*1024*1024) + if (sinceLastStatus >= 10*1024*1024) { - writeParaFetchStatus(origPath, pcList, url, fileSize, dateString, FALSE); + paraFetchWriteStatus(origPath, pcList, url, fileSize, dateString, FALSE); sinceLastStatus = 0; } } } } else { warn("No data within %d seconds for %s", SELTIMEOUT, url); /* Retry ? */ if (retryCount >= numRetries) { return FALSE; } else { @@ -573,31 +569,31 @@ if (pc->sd != -1) pc->sd = -4; } connOpen = 0; reOpen = 0; /* sleep for a while, maybe the server will recover */ sleep(RETRYSLEEPTIME); } } } close(out); /* delete the status file - by passing TRUE */ -writeParaFetchStatus(origPath, pcList, url, fileSize, dateString, TRUE); +paraFetchWriteStatus(origPath, pcList, url, fileSize, dateString, TRUE); /* restore original file datestamp mtime from last-modified header */ struct tm tm; // Last-Modified: Wed, 15 Nov 1995 04:58:08 GMT // These strings are always GMT if (strptime(dateString, "%a, %d %b %Y %H:%M:%S %Z", &tm) == NULL) { warn("unable to parse last-modified string [%s]", dateString); } else { time_t t; // convert to UTC (GMT) time t = mktimeFromUtc(&tm); if (t == -1)