e72cf0100e06d6fadb7282d4e7eb2c592f320951 galt Mon Jul 4 13:08:35 2011 -0700 Adding parallel-fetch loading of remote bigDataUrl tracks using pthreads diff --git src/lib/net.c src/lib/net.c index fb01a99..2134cea 100644 --- src/lib/net.c +++ src/lib/net.c @@ -1,35 +1,36 @@ /* net.c some stuff to wrap around net communications. * * This file is copyright 2002 Jim Kent, but license is hereby * granted for all use - public, private or commercial. */ #include "common.h" #include <signal.h> #include <errno.h> #include <string.h> #include <sys/time.h> +#include <utime.h> +#include <pthread.h> #include "internet.h" #include "errabort.h" #include "hash.h" #include "net.h" #include "linefile.h" #include "base64.h" #include "cheapcgi.h" #include "https.h" #include "sqlNum.h" -#include <utime.h> #include "obscure.h" static char const rcsid[] = "$Id: net.c,v 1.80 2010/04/14 07:42:06 galt Exp $"; /* Brought errno in to get more useful error messages */ extern int errno; static int netStreamSocket() /* Create a TCP/IP streaming socket. Complain and return something * negative if can't */ { int sd = socket(AF_INET, SOCK_STREAM, 0); if (sd < 0) warn("Couldn't make AF_INET socket."); @@ -801,73 +802,80 @@ dyStringFree(&rs); safef(cmd,sizeof(cmd),"MDTM %s\r\n", npu.file); if (!sendFtpCommand(sd, cmd, &rs, NULL)) { close(sd); return FALSE; } *retTime = parseFtpMDTM(rs->string); /* 200 YYYYMMDDhhmmss */ dyStringFree(&rs); close(sd); return TRUE; } -static void sendFtpDataToPipe(int pipefd[2], int sd, int sdata, struct netParsedUrl npu) +struct netConnectFtpParams +/* params to pass to thread */ +{ +pthread_t thread; +int pipefd[2]; +int sd; +int sdata; +struct netParsedUrl npu; +}; + +static void *sendFtpDataToPipeThread(void *threadParams) /* This is to be executed by the child process after the fork in netGetOpenFtpSockets. * It keeps the ftp control socket alive while reading from the ftp data socket * and writing to the pipe to the parent process, which closes the ftp sockets * and reads from the pipe. */ { -fclose(stdin); -fclose(stdout); -close(pipefd[0]); /* close unused half of pipe */ -/* close other file descriptors */ -int fd=0; -for (fd = STDERR_FILENO+1; fd < 64; fd++) - if (fd != pipefd[1] && fd != sdata && fd != sd) - close(fd); + +struct netConnectFtpParams *params = threadParams; + +pthread_detach(params->thread); // this thread will never join back with it's progenitor char buf[32768]; int rd = 0; long long dataPos = 0; -if (npu.byteRangeStart != -1) - dataPos = npu.byteRangeStart; -while((rd = read(sdata, buf, 32768)) > 0) +if (params->npu.byteRangeStart != -1) + dataPos = params->npu.byteRangeStart; +while((rd = read(params->sdata, buf, 32768)) > 0) { - if (npu.byteRangeEnd != -1 && (dataPos + rd) > npu.byteRangeEnd) - rd = npu.byteRangeEnd - dataPos + 1; - int wt = write(pipefd[1], buf, rd); - if (wt == -1 && npu.byteRangeEnd != -1) + if (params->npu.byteRangeEnd != -1 && (dataPos + rd) > params->npu.byteRangeEnd) + rd = params->npu.byteRangeEnd - dataPos + 1; + int wt = write(params->pipefd[1], buf, rd); + if (wt == -1 && params->npu.byteRangeEnd != -1) { // errAbort in child process is messy; let reader complain if // trouble. If byterange was open-ended, we will hit this point // when the parent stops reading and closes the pipe. errnoWarn("error writing ftp data to pipe"); break; } dataPos += rd; - if (npu.byteRangeEnd != -1 && dataPos >= npu.byteRangeEnd) + if (params->npu.byteRangeEnd != -1 && dataPos >= params->npu.byteRangeEnd) break; } if (rd == -1) // Again, avoid abort in child process. errnoWarn("error reading ftp socket"); -close(pipefd[1]); /* we are done with it */ -close(sd); -close(sdata); +close(params->pipefd[1]); /* we are done with it */ +close(params->sd); +close(params->sdata); +return NULL; } static int netGetOpenFtpSockets(char *url, int *retCtrlSd) /* Return a socket descriptor for url data (url can end in ";byterange:start-end", * or -1 if error. * If retCtrlSd is non-null, keep the control socket alive and set *retCtrlSd. * Otherwise, create a pipe and fork to keep control socket alive in the child * process until we are done fetching data. */ { char cmd[256]; /* Parse the URL and connect. */ struct netParsedUrl npu; netParseUrl(url, &npu); if (!sameString(npu.protocol, "ftp")) @@ -932,51 +940,47 @@ } if (retCtrlSd != NULL) { *retCtrlSd = sd; return sdata; } else { /* Because some FTP servers will kill the data connection * as soon as the control connection closes, * we have to develop a workaround using a partner process. */ fflush(stdin); fflush(stdout); fflush(stderr); + + struct netConnectFtpParams *params; + AllocVar(params); + params->sd = sd; + params->sdata = sdata; + params->npu = npu; /* make a pipe (fds go in pipefd[0] and pipefd[1]) */ - int pipefd[2]; - if (pipe(pipefd) != 0) + if (pipe(params->pipefd) != 0) errAbort("netGetOpenFtpSockets: failed to create pipe: %s", strerror(errno)); - int pid = fork(); - if (pid < 0) - errnoAbort("can't fork in netGetOpenFtpSockets"); - if (pid == 0) + int rc; + rc = pthread_create(¶ms->thread, NULL, sendFtpDataToPipeThread, (void *)params); + if (rc) { - /* child */ - sendFtpDataToPipe(pipefd, sd, sdata, npu); - exit(0); - /* child will never get to here */ + errAbort("Unexpected error %d from pthread_create(): %s",rc,strerror(rc)); } - /* parent */ - close(pipefd[1]); /* close unused unput half of pipe */ - /* although the parent closes these, the child has them open still */ - close(sd); - close(sdata); - return pipefd[0]; + return params->pipefd[0]; } } int connectNpu(struct netParsedUrl npu, char *url) /* Connect using NetParsedUrl. */ { int sd = -1; if (sameString(npu.protocol, "http")) { sd = netConnect(npu.host, atoi(npu.port)); } else if (sameString(npu.protocol, "https")) { sd = netConnectHttps(npu.host, atoi(npu.port));