e72cf0100e06d6fadb7282d4e7eb2c592f320951
galt
  Mon Jul 4 13:08:35 2011 -0700
Adding parallel-fetch loading of remote bigDataUrl tracks using pthreads
diff --git src/lib/net.c src/lib/net.c
index fb01a99..2134cea 100644
--- src/lib/net.c
+++ src/lib/net.c
@@ -1,35 +1,36 @@
 /* net.c some stuff to wrap around net communications. 
  *
  * This file is copyright 2002 Jim Kent, but license is hereby
  * granted for all use - public, private or commercial. */
 
 #include "common.h"
 #include <signal.h>
 #include <errno.h>
 #include <string.h>
 #include <sys/time.h>
+#include <utime.h>
+#include <pthread.h>
 #include "internet.h"
 #include "errabort.h"
 #include "hash.h"
 #include "net.h"
 #include "linefile.h"
 #include "base64.h"
 #include "cheapcgi.h"
 #include "https.h"
 #include "sqlNum.h"
-#include <utime.h>
 #include "obscure.h"
 
 static char const rcsid[] = "$Id: net.c,v 1.80 2010/04/14 07:42:06 galt Exp $";
 
 /* Brought errno in to get more useful error messages */
 
 extern int errno;
 
 static int netStreamSocket()
 /* Create a TCP/IP streaming socket.  Complain and return something
  * negative if can't */
 {
 int sd = socket(AF_INET, SOCK_STREAM, 0);
 if (sd < 0)
     warn("Couldn't make AF_INET socket.");
@@ -801,73 +802,80 @@
 dyStringFree(&rs);
 
 safef(cmd,sizeof(cmd),"MDTM %s\r\n", npu.file);
 if (!sendFtpCommand(sd, cmd, &rs, NULL))
     {
     close(sd);
     return FALSE;
     }
 *retTime = parseFtpMDTM(rs->string);
 /* 200 YYYYMMDDhhmmss */
 dyStringFree(&rs);
 close(sd);   
 return TRUE;
 }
 
-static void sendFtpDataToPipe(int pipefd[2], int sd, int sdata, struct netParsedUrl npu)
+struct netConnectFtpParams
+/* params to pass to thread */
+{
+pthread_t thread;
+int pipefd[2];
+int sd;
+int sdata; 
+struct netParsedUrl npu;
+};
+
+static void *sendFtpDataToPipeThread(void *threadParams)
 /* This is to be executed by the child process after the fork in netGetOpenFtpSockets.
  * It keeps the ftp control socket alive while reading from the ftp data socket
  * and writing to the pipe to the parent process, which closes the ftp sockets
  * and reads from the pipe. */
 {
-fclose(stdin);
-fclose(stdout);
-close(pipefd[0]);  /* close unused half of pipe */
-/* close other file descriptors */
-int fd=0;
-for (fd = STDERR_FILENO+1; fd < 64; fd++)
-    if (fd != pipefd[1] && fd != sdata && fd != sd)
-  	close(fd);
+
+struct netConnectFtpParams *params = threadParams;
+
+pthread_detach(params->thread);  // this thread will never join back with it's progenitor
 
 char buf[32768];
 int rd = 0;
 long long dataPos = 0; 
-if (npu.byteRangeStart != -1)
-    dataPos = npu.byteRangeStart;
-while((rd = read(sdata, buf, 32768)) > 0) 
+if (params->npu.byteRangeStart != -1)
+    dataPos = params->npu.byteRangeStart;
+while((rd = read(params->sdata, buf, 32768)) > 0) 
     {
-    if (npu.byteRangeEnd != -1 && (dataPos + rd) > npu.byteRangeEnd)
-	rd = npu.byteRangeEnd - dataPos + 1;
-    int wt = write(pipefd[1], buf, rd);
-    if (wt == -1 && npu.byteRangeEnd != -1)
+    if (params->npu.byteRangeEnd != -1 && (dataPos + rd) > params->npu.byteRangeEnd)
+	rd = params->npu.byteRangeEnd - dataPos + 1;
+    int wt = write(params->pipefd[1], buf, rd);
+    if (wt == -1 && params->npu.byteRangeEnd != -1)
 	{
 	// errAbort in child process is messy; let reader complain if
 	// trouble.  If byterange was open-ended, we will hit this point
 	// when the parent stops reading and closes the pipe.
 	errnoWarn("error writing ftp data to pipe");
 	break;
 	}
     dataPos += rd;
-    if (npu.byteRangeEnd != -1 && dataPos >= npu.byteRangeEnd)
+    if (params->npu.byteRangeEnd != -1 && dataPos >= params->npu.byteRangeEnd)
 	break;	    
     }
 if (rd == -1)
     // Again, avoid abort in child process.
     errnoWarn("error reading ftp socket");
-close(pipefd[1]);  /* we are done with it */
-close(sd);
-close(sdata);
+close(params->pipefd[1]);  /* we are done with it */
+close(params->sd);
+close(params->sdata);
+return NULL;
 }
 
 static int netGetOpenFtpSockets(char *url, int *retCtrlSd)
 /* Return a socket descriptor for url data (url can end in ";byterange:start-end",
  * or -1 if error.
  * If retCtrlSd is non-null, keep the control socket alive and set *retCtrlSd.
  * Otherwise, create a pipe and fork to keep control socket alive in the child 
  * process until we are done fetching data. */
 {
 char cmd[256];
 
 /* Parse the URL and connect. */
 struct netParsedUrl npu;
 netParseUrl(url, &npu);
 if (!sameString(npu.protocol, "ftp"))
@@ -932,51 +940,47 @@
     }
 
 if (retCtrlSd != NULL)
     {
     *retCtrlSd = sd;
     return sdata;
     }
 else
     {
     /* Because some FTP servers will kill the data connection
      * as soon as the control connection closes,
      * we have to develop a workaround using a partner process. */
     fflush(stdin);
     fflush(stdout);
     fflush(stderr);
+
+    struct netConnectFtpParams *params;
+    AllocVar(params);
+    params->sd = sd;
+    params->sdata = sdata;
+    params->npu = npu;
     /* make a pipe (fds go in pipefd[0] and pipefd[1])  */
-    int pipefd[2];
-    if (pipe(pipefd) != 0)
+    if (pipe(params->pipefd) != 0)
 	errAbort("netGetOpenFtpSockets: failed to create pipe: %s", strerror(errno));
-    int pid = fork();
-    if (pid < 0)
-	errnoAbort("can't fork in netGetOpenFtpSockets");
-    if (pid == 0)
+    int rc;
+    rc = pthread_create(&params->thread, NULL, sendFtpDataToPipeThread, (void *)params);
+    if (rc)
 	{
-	/* child */
-	sendFtpDataToPipe(pipefd, sd, sdata, npu);
-	exit(0);
-	/* child will never get to here */
+	errAbort("Unexpected error %d from pthread_create(): %s",rc,strerror(rc));
 	}
 
-    /* parent */
-    close(pipefd[1]);  /* close unused unput half of pipe */
-    /* although the parent closes these, the child has them open still */
-    close(sd);   
-    close(sdata);
-    return pipefd[0];
+    return params->pipefd[0];
     }
 }
 
 
 int connectNpu(struct netParsedUrl npu, char *url)
 /* Connect using NetParsedUrl. */
 {
 int sd = -1;
 if (sameString(npu.protocol, "http"))
     {
     sd = netConnect(npu.host, atoi(npu.port));
     }
 else if (sameString(npu.protocol, "https"))
     {
     sd = netConnectHttps(npu.host, atoi(npu.port));