src/hg/instinct/bioInt2/makeClusterFiles.c 1.6

1.6 2010/01/17 19:42:59 sbenz
Added some stuff dealing with pathways with no values, which was causing a seg fault
Index: src/hg/instinct/bioInt2/makeClusterFiles.c
===================================================================
RCS file: /projects/compbio/cvsroot/kent/src/hg/instinct/bioInt2/makeClusterFiles.c,v
retrieving revision 1.5
retrieving revision 1.6
diff -b -B -U 1000000 -r1.5 -r1.6
--- src/hg/instinct/bioInt2/makeClusterFiles.c	22 Dec 2009 14:37:44 -0000	1.5
+++ src/hg/instinct/bioInt2/makeClusterFiles.c	17 Jan 2010 19:42:59 -0000	1.6
@@ -1,1028 +1,1035 @@
 /* bioPathwayLevel.c -- code to run pathway pipeline */
 #include "common.h"
 #include "linefile.h"
 #include "hash.h"
 #include "options.h"
 #include "jksql.h"
 #include "hPrint.h"
 #include "hdb.h"  
 #include "dystring.h"
 #include "bioIntDb.h"
 #include "bioIntDriver.h"
 #include "cprob.h"
 #include "hgStatsLib.h"
 
 #define BAD_VAL 999999.9
 #define NUM_IN_BATCH 500
 
 void usage()
 /* Explain usage and exit. */
 {
 errAbort(
 	 "makeClusterFiles \n"
 	 "usage:\n"
 	 "  makeClusterFiles db pathway_name directory cohort_id num_iterations\n"
 	 );
 }
 
 char *fgDir = "factorGraph";
 char *tmpDir = "tmpNull";
 
 struct dataTuple {
     struct dataTuple *next;
     double *data;
 };
 
 /* Gene-level analysis functions */
 struct links {
     struct links *next;
     char *parent_name;
     char *child_name;
     char *link_type;
 };
 
 struct entities {
     struct entities *next;
     int id;
     char *type;
     char *name;
 };
 
 struct pathwayData {
     int id;
     struct entities *entities;
     struct links *links;
     int batchSize;
 	
     void *data;
     struct hash *featureIds;
 };
 
 void dataTupleFree(struct dataTuple **pEl)
 {
 struct dataTuple *el;
 if (!pEl) return;
 if ((el = *pEl) == NULL) return;
 freez(&el->data);
 freez(pEl);
 }
 
 void dataTupleFreeList(struct dataTuple **pList)
 {
 struct dataTuple *el, *next;
 
 for (el = *pList; el != NULL; el = next)
     {
     next = el->next;
     dataTupleFree(&el);
     }
 *pList = NULL;
 }  
 
 /* If we have data in all data types, return the tuple, otherwise NULL */
 struct dataTuple* dataTupleAlloc(struct typeHash* data, struct slName* sl) {
 struct dataTuple* dt;
 AllocVar(dt);
 AllocArray(dt->data, slCount(data));
 struct typeHash* th;
 double *val = dt->data;
 for (th = data; th; th = th->next)
     {
     struct hashEl *el = hashLookup(th->hash, sl->name);
     if (el)
 	{
 	struct analysisVals *av = el->val;
 	*val = av->conf;
 	}
     else
 	{
 	dataTupleFree(&dt);
 	return NULL;
 	}
     ++val;
     }
 return dt;
 }
 
 struct analysisVals *readAnalysisValsFromFile(char *filename, 
 					      struct entities *enList, int sample_id)
 {
 struct lineFile *lf = lineFileOpen(filename, TRUE);
 if (!lf)
     errAbort("File does not exist.");
 struct analysisVals *av, *avList = NULL;
 
 struct entities *en;
 struct hash *hash = hashNew(0);
 for (en = enList; en; en = en->next)
     hashAddInt(hash, en->name, en->id);
 
 char *row[2];
 while (lineFileRowTab(lf, row))
     {
     char *name = row[0];
     double val = atof(row[1]);
     int entity_id = hashIntValDefault(hash, name, -1);
     if (entity_id == -1)
 	{
 	fprintf(stderr, "entity %s not found\n", name);
 	continue;
 	}
     AllocVar(av);
     av->sample_id = sample_id;
     av->feature_id = entity_id;
     av->val = val;
     av->conf = val;
     slAddHead(&avList, av);
     }
 lineFileClose(&lf);
 slReverse(&avList);
 
 hashFree(&hash);
 return avList;  
 }
 
 struct pathwayVals *convertToPathwayVals(struct analysisVals *avList, int pathway_id)
 {
 struct pathwayVals *pv, *pvList = NULL;
 
 struct analysisVals *av;
 
 for (av = avList; av; av = av->next)
     {
     AllocVar(pv);
     pv->pathway_id = pathway_id;
     pv->sample_id  = av->sample_id;
     pv->feature_id = av->feature_id;
     pv->val        = av->val;
     pv->conf       = av->conf;
     slAddHead(&pvList, pv);
     }
 slReverse(&pvList);
 
 return pvList;
 }
 
 boolean writePathwayFile(char *filename, struct pathwayData *pd)
 {
 if (!pd)
     return FALSE;
 if (!pd->entities || !pd->links)
     return FALSE;
 
 FILE *f = mustOpen(filename, "w");
 if (!f)
     return FALSE;
 
 struct entities *en;
 for (en = pd->entities; en; en = en->next)
     fprintf(f, "%s\t%s\n", en->type, en->name);
 
 struct links *li;
 for (li = pd->links; li; li = li->next)
     fprintf(f, "%s\t%s\t%s\n", li->parent_name, li->child_name, li->link_type);
  
 return carefulCloseWarn(&f);
 }
 
 boolean prepFactorGraph(char *tableName, void *data)
 {
 if (!data)
     return FALSE;
 
 struct pathwayData *pd = data;
 
 /* make temporary file containing pathway info */
 char tmpPathway[512];
 safef(tmpPathway, sizeof(tmpPathway), "%s/%s_pid_%d_pathway.tab", 
       tmpDir, tableName, pd->id); 
 
 if (!writePathwayFile(tmpPathway, pd))
     errAbort("Problem writing pathway file %s\n", tmpPathway);
 
 return TRUE;
 }
 
 boolean prepEvidenceFiles(char *prefix, void *data)
 {
 if (!data)
     return FALSE;
 
 struct pathwayData *pd = data;
 
 boolean closedAll = 1;
 struct typeHash *th;
 for (th = pd->data; th; th = th->next)
     {
     char fileName[1024];
     safef(fileName, sizeof(fileName), "%s_%s.tab", prefix, th->type);
     FILE *evidF = mustOpen(fileName, "a");
     if (!evidF)
 	return FALSE;
 
     fprintf(evidF, "id\t");
     struct entities *en, *enList = pd->entities;
     for (en = enList; en; en = en->next)
 	{
 	int id = hashIntValDefault(pd->featureIds, en->name, -1);
 	if (id == -1)
 	    continue;
 	
 	fprintf(evidF, "%s", en->name);
 	if (en->next)
 	    {
 	    fprintf(evidF, "\t");
 	    }
 	}
     fprintf(evidF, "\n");
     
     closedAll = closedAll &&  carefulCloseWarn(&evidF);
     }
     
 return closedAll;
 }
 
 boolean writeEvidenceFile(char *prefix, char *sampleName, struct pathwayData *pd)
 {
 if (!pd)
     return FALSE;
 if (!pd->entities || !pd->links)
     return FALSE;
  
 boolean closedAll = 1;
 
 struct typeHash *th;
 for (th = pd->data; th; th = th->next)
     {
     struct entities *en, *enList = pd->entities;
     char idStr[128];
     char fileName[1024];
     safef(fileName, sizeof(fileName), "%s_%s.tab", prefix, th->type);
     FILE *evidF = mustOpen(fileName, "a");
     if (!evidF)
 	return FALSE;
     fprintf(evidF, "%s\t", sampleName);
 
     for (en = enList; en; en = en->next)
 	{
 	int id = hashIntValDefault(pd->featureIds, en->name, -1);
 	if (id == -1)
 	    continue;
 	
 	safef(idStr, sizeof(idStr), "%d", id);
 	
 	struct hashEl *el = hashLookup(th->hash, idStr);
 	if (!el)
 	    fprintf(evidF, "NA");
 	else 
 	    {
 	    struct analysisVals *av = el->val;
 	    double val = av->conf;
 	    fprintf(evidF, "%f", val);
 	    }
 
 	if (en->next)
 	    {
 	    fprintf(evidF, "\t");
 	    }
 	}
     fprintf(evidF, "\n");
     closedAll = closedAll && carefulCloseWarn(&evidF);
     }
 return closedAll;
 }
 
 boolean writeNullEvidenceFile(char *prefix, char *sampleName, struct pathwayData *pd, struct typeHash* thList)
 {
 if (!pd)
     return FALSE;
 if (!pd->entities || !pd->links)
     return FALSE;
 if (!thList)
     return FALSE;
+if(!pd->data)
+	return FALSE;
 
 int numTypes = slCount(thList);
 FILE** evidFiles;
 AllocArray(evidFiles, numTypes);
 int i = 0;
 struct typeHash* th = thList;
 for (i = 0; i < numTypes; ++i)
     {
     char fileName[1024];
     safef(fileName, sizeof(fileName), "%s_%s.tab", prefix, th->type);
     evidFiles[i] = mustOpen(fileName, "a");
 
     fprintf(evidFiles[i], "%s\t", sampleName); 
     th = th->next;
     }
 
 struct hash *dtHash = pd->data;
 int numTuples = hashNumEntries(dtHash);
 
 struct entities *en, *enList = pd->entities;
 
 for (en = enList; en; en = en->next)
     {
     int id = hashIntValDefault(pd->featureIds, en->name, -1);
     if (id == -1)
 	continue;
     
     int randIndex = rand() % numTuples;
     char randStr[128];
     safef(randStr, sizeof(randStr), "%d", randIndex);
     
     struct hashEl *el = hashLookup(dtHash, randStr);
     if (!el)
 	{
 	fprintf(stderr, "uh-oh!");
 	continue;
 	}
     struct dataTuple* dt = el->val; 
     
     for (i = 0; i < numTypes; ++i) 
 	{
 	if (dt->data[i] != BAD_VAL)
 	    {
 	    fprintf(evidFiles[i], "%f", dt->data[i]);
 	    }
 	else
 	    {
 	    fprintf(evidFiles[i], "NA");
 	    }
 	
 	if (en->next)
 	    {
 	    fprintf(evidFiles[i], "\t");
 	    }
 	}
     }
 
 boolean closedAll = 1;
 for (i = 0; i < numTypes; ++i)
     {
     fprintf(evidFiles[i], "\n");
     closedAll = closedAll & carefulCloseWarn(&evidFiles[i]);
     }
 
 freez(evidFiles);
 
 return closedAll;
 }
 
 void factorGraph(char *tableName, void *data, int sample_id, int feature_id, int batchNum)
 { 
 if (!data)
     return;
 
 struct pathwayData *pd = data;
 
 char prefix[512];
 /*safef(prefix, sizeof(prefix), "%s/%s_pid_%d", 
       tmpDir, tableName, pd->id); 
 */
 safef(prefix, sizeof(prefix),
 	  "%s/%s_pid_%d_batch_%d",
 	  tmpDir, tableName, pd->id, batchNum);
 
 char sampleName[512];
 safef(sampleName, sizeof(sampleName), "sample_%d", sample_id);
 
 if (!writeEvidenceFile(prefix, sampleName, pd))
     errAbort("Problem writing evidence file %s\n", prefix);
 }
 
 void factorGraphNull(char *tableName, void *data, struct typeHash* thList,
 		     int sample_id, int batchNum, 
 		     char *nullType)
 { 
 struct pathwayData *pd = data;
 
 /* make temporary file containing evidence */
 char prefix[512];
 if (sameString(nullType, "allData"))
     {
     safef(prefix, sizeof(prefix), 
 	  "%s/na_%s_pid_%d_batch_%d", 
 	  tmpDir, tableName, pd->id, batchNum); 
     }
 else
     {
     safef(prefix, sizeof(prefix), 
 	  "%s/nw_%s_pid_%d_batch_%d", 
 	  tmpDir, tableName, pd->id, batchNum); 
     }
 
 char sampleName[512];
 safef(sampleName, sizeof(sampleName), "iter_%d", sample_id);
 
 if (!writeNullEvidenceFile(prefix, sampleName, pd, thList))
     errAbort("Problem writing evidence file %s\n", prefix);
 }
 
 /* Pipeline Stuff */
 struct pathwayVals *pathwayLevelAnalysis(struct sqlConnection *biConn, char *tableName, 
 					 struct slPair *spData, struct slPair *spPathways,
 					 int numDataTypes)
 {
 fprintf(stdout, "starting geneset analysis.\n");
 
 struct hash *featureHash = createIdHash(biConn, AF_TABLE, "feature_name");
 
 struct slPair *pa, *sp;
 
 int count = 0;
 
 for (pa = spPathways; pa; pa = pa->next)
     {
     struct pathwayData *pd = pa->val;
     pd->featureIds = featureHash; 
 
     fprintf(stderr, "prepping pathway %s:\n", pa->name);
     if (!prepFactorGraph(tableName, pd))
 	{
 	fprintf(stderr, "problem with prep, skipping pathway.\n");
 	continue;
 	}
 	
     int numInBatch = pd->batchSize;
     int i=0;
     int batchNum = 0;
 	
     for (sp = spData; sp; sp = sp->next)
 	{
 	if ((i % numInBatch) == 0)
 	    {
 	    char prefix[512];
 	    batchNum++;
 		safef(prefix, sizeof(prefix),
 		      "%s/%s_pid_%d_batch_%d",
 		      tmpDir, tableName, pd->id, batchNum);
 		
 	    
 	    pd->data = spData->val;
 	    if (!prepEvidenceFiles(prefix, pd))
 		{
 		fprintf(stderr, "problem with evidence prep, skipping pathway.\n");
 		continue;
 		}
 	    }
 	pd->data = sp->val;
 	if (slCount(pd->data) < numDataTypes) 
 	    continue;  // currently only consider samples with more than one type of evidence.
 
 	int sample_id = atoi(sp->name);
 	factorGraph(tableName, pd, sample_id, -1, batchNum);
 	fprintf(stderr, ".");
 	fflush(stderr);
 	i++;
 	}
 
  /*   char prefix[512];
     safef(prefix, sizeof(prefix), "%s/%s_pid_%d", 
 	  tmpDir, tableName, pd->id);
 
     pd->data = spData->val;
     if (!prepEvidenceFiles(prefix, pd))
 	{
 	fprintf(stderr, "problem with evidence prep, skipping pathway.\n");
 	continue;
 	}
     for (sp = spData; sp; sp = sp->next)
 	{
 	pd->data = sp->val;
 	if (slCount(pd->data) < numDataTypes) 
 	    continue;  // currently only consider samples with more than one type of evidence.
 
 	int sample_id = atoi(sp->name);
 	factorGraph(tableName, pd, sample_id, -1);
 	fprintf(stderr, ".");
 	fflush(stderr);
 	}
  */
     fprintf(stderr, "\n");
     
     count++;
     }
 
 fprintf(stderr, "\n");
 
 return NULL; 
 }            
 
 struct slName *getAnalysisFeatureNames(struct sqlConnection *biConn, char *tableName, 
 				       char *type)
 {
 char query[256];
 safef(query, sizeof(query), 
       "select feature_name from %s where type = \"%s\"",
       tableName, type);
 
 return sqlQuickList(biConn, query);
 }
 
 
 struct dataTuple *getAllDataTuples(struct sqlConnection *biConn, struct hash *featureHash, 
 				   struct slPair *spData, int numDataTypes)
 {
 struct slName *sl, *featureNames = getAnalysisFeatureNames(biConn, AF_TABLE, "gene");
 
 char idStr[256];
 struct slName *featureIds = NULL;
 for (sl = featureNames; sl; sl = sl->next)
     {
     int id = hashIntValDefault(featureHash, sl->name, -1);
     if (id == -1)
 	continue;
     
     safef(idStr, sizeof(idStr), "%d", id);
     slNameAddHead(&featureIds, idStr);
     }
 
 slNameFreeList(&featureNames);
 
 struct slPair *sp;
 struct dataTuple *dt, *dtList = NULL;
 for (sp = spData; sp; sp = sp->next)
     {
     struct typeHash *thList = sp->val;
     if (slCount(thList) < numDataTypes) 
 	continue;  // currently only consider samples with more than one type of evidence.
     
     for (sl = featureIds; sl; sl = sl->next)
 	{
 	dt = dataTupleAlloc(thList, sl);
 	if (dt)
 	    slAddHead(&dtList, dt);
 	}
     }
 
 return dtList;
 }
 
 struct dataTuple *getPathwayDataTuples(struct sqlConnection *biConn, struct hash *featureHash, 
 				       struct slPair *spData, struct slPair *spPathway, int numDataTypes)
 {
 struct pathwayData *pd = spPathway->val;
 struct entities *en, *enList = pd->entities;
 struct slName *featureIds = NULL;
 char idStr[128];
 for (en = enList; en; en = en->next)
     {
     int id = hashIntValDefault(featureHash, en->name, -1);
     if (id == -1)
 	continue;
     
     safef(idStr, sizeof(idStr), "%d", id);
     slNameAddHead(&featureIds, idStr);
     }
 
 struct slPair *sp;
 struct dataTuple *dt, *dtList = NULL;
 for (sp = spData; sp; sp = sp->next)
     {
     struct typeHash *thList = sp->val;
     if (slCount(thList) != numDataTypes) 
 	continue;  // currently only consider samples with more than one type of evidence.
     
     struct slName *sl;
     for (sl = featureIds; sl; sl = sl->next)
 	{
 	dt = dataTupleAlloc(thList, sl);
 	if (dt)
 	    slAddHead(&dtList, dt);
 	}
     }
 
 return dtList;
 }
 
 struct hash *hashDataTuples(struct dataTuple *dtList)
 {
 if (!dtList)
     return NULL;
 
 struct hash *hash = hashNew(0);
 
 char indexStr[128];
 int index = 0;
 struct dataTuple *dt;
 for (dt = dtList; dt; dt = dt->next)
     {
     safef(indexStr, sizeof(indexStr), "%d", index);
     hashAdd(hash, indexStr, dt);
     index++;
     }
 
 return hash;
 }
 
 /* Pipeline Stuff */
 struct pathwayVals *pathwayNullAnalysis(struct sqlConnection *biConn, char *tableName, 
 					struct slPair *spData, struct slPair *spPathways,
 					int numIters, int numDataTypes, char *nullType)
 {
 if (!spData)
     return NULL;
 
 fprintf(stderr, "starting null analysis analysis.\n");
 
 struct hash *featureHash = createIdHash(biConn, AF_TABLE, "feature_name");
 struct hash *dtHash = NULL;
 struct dataTuple *dtList = NULL;
 if (sameString(nullType, "allData"))
     {
     dtList = getAllDataTuples(biConn, featureHash, spData, numDataTypes);
-    fprintf(stderr, "hashing data tuples...\n");
+    fprintf(stderr, "hashing all data tuples...\n");
     dtHash = hashDataTuples(dtList);
     }
 else if (sameString(nullType, "withinPathway"))
     {/* set dtList on each invofaction below*/}
 else
     errAbort("%s not supported", nullType);
 
 
 struct slPair *pa;
 for (pa = spPathways; pa; pa = pa->next)
     {
     struct pathwayData *pd = pa->val;
     pd->featureIds = featureHash;
 
     if (sameString(nullType, "withinPathway"))
 	{
 	dtList = getPathwayDataTuples(biConn, featureHash, spData, pa,
 				      numDataTypes);
-	fprintf(stderr, "hashing data tuples...\n");
+	if(slCount(dtList) == 0)
+		{
+		fprintf(stderr, "no data tuples found, skipping...\n");
+		continue;
+		}
+	fprintf(stderr, "hashing pathway data tuples...\n");
 	dtHash = hashDataTuples(dtList);
 	}
 
     fprintf(stderr, "prepping pathway %s:\n", pa->name);
     if (!prepFactorGraph(tableName, pd))
 	{
 	fprintf(stderr, "problem with prep, skipping pathway.\n");
 	continue;
 	}
 
     pd->data = dtHash;
     
     // Tune batch size to get around ~20 minutes per batch.
     // .. Pathways listed below take forever.
     int numInBatch = pd->batchSize;
     /*if (sameString(pa->name, "LPA receptor mediated events"))
 	numInBatch = 100;
     else if (sameString(pa->name, "PDGFR-beta signaling pathway"))
 	numInBatch = 250;
     else if (sameString(pa->name, "TCGA08_rtk_signaling"))
 	numInBatch = 250;
     else if (sameString(pa->name, "p38 MAPK signaling pathway"))
 	numInBatch = 250;
     else if (sameString(pa->name, "Canonical Wnt signaling pathway"))
 	numInBatch = 250;
     else if (sameString(pa->name, "RXR and RAR heterodimerization with other nuclear receptor"))
 	numInBatch = 250;
     else
 	numInBatch = NUM_IN_BATCH;  // default batch size*/
     
     int i;
     int batchNum = 0;
     for (i = 0; i < numIters; i++)
 	{
 	if ((i % numInBatch) == 0)
 	    {
 	    char prefix[512];
 	    batchNum++;
 	    if (sameString(nullType, "allData"))
 		{
 		safef(prefix, sizeof(prefix),
 		      "%s/na_%s_pid_%d_batch_%d",
 		      tmpDir, tableName, pd->id, batchNum);
 		}
 	    else
 		{
 		safef(prefix, sizeof(prefix),
 		      "%s/nw_%s_pid_%d_batch_%d",
 		      tmpDir, tableName, pd->id, batchNum);
 		} 
 	    
 	    pd->data = spData->val;
 	    if (!prepEvidenceFiles(prefix, pd))
 		{
 		fprintf(stderr, "problem with evidence prep, skipping pathway.\n");
 		continue;
 		}
 	    }
 	pd->data = dtHash;
 	factorGraphNull(tableName, pd, spData->val, i, batchNum, nullType);
 	fprintf(stderr, ".");
 	fflush(stderr);
 	}
     fprintf(stderr, "\n");
     }
 
 fprintf(stderr, "\n");
 
 return NULL; 
 }            
 
 void entitiesFree(struct entities **pEl)
 {
 struct entities *el;
 
 if ((el = *pEl) == NULL) return;
 
 freeMem(el->type);
 freeMem(el->name);
 freez(pEl);
 }
 
 void entitiesFreeList(struct entities **pList)
 /* Free a list of dynamically allocated pathwayVals's */
 {
 struct entities *el, *next;
 
 for (el = *pList; el != NULL; el = next)
     {
     next = el->next;
     entitiesFree(&el);
     }
 *pList = NULL;
 }    
 
 
 void linksFree(struct links **pEl)
 {
 struct links *el;
 
 if ((el = *pEl) == NULL) return;
 
 freeMem(el->parent_name);
 freeMem(el->child_name);
 freeMem(el->link_type);
 freez(pEl);
 }
 
 void linksFreeList(struct links **pList)
 /* Free a list of dynamically allocated link's */
 {
 struct links *el, *next;
 
 for (el = *pList; el != NULL; el = next)
     {
     next = el->next;
     linksFree(&el);
     }
 *pList = NULL;
 }    
 
 
 void pathwayDataFree(struct pathwayData **pEl)
 {
 struct pathwayData *el;
 
 if ((el = *pEl) == NULL) return;
 
 linksFreeList(&el->links);
 entitiesFreeList(&el->entities);
 
 freez(pEl);
 }
 
 void slPairPathwayFree(struct slPair **pEl)
 {
 struct slPair *el;
 
 if ((el = *pEl) == NULL) return;
 
 freeMem(el->name);
 
 struct pathwayData *pd = el->val;
 pathwayDataFree(&pd);
 freez(pEl);
 }
 
 void slPairPathwayFreeList(struct slPair **pList)
 {
 struct slPair *el, *next;
 
 for (el = *pList; el != NULL; el = next)
     {
     next = el->next;
     slPairPathwayFree(&el);
     }
 *pList = NULL;
 } 
     
 struct slPair *getPathwaysByName(struct sqlConnection *biConn, char *pathwayName)
 {
 char query[2048];
 
 if (!pathwayName)
     safef(query, sizeof(query), 
 	  "select %s.pathway_id,%s.pathway_name,entity_id,entity_name,entity_type,batch_size from %s "
 	  "join %s on %s.pathway_id=%s.pathway_id", // a join across a few tables
 	  EN_TABLE, EP_TABLE, EN_TABLE, 
 	  EP_TABLE, EN_TABLE, EP_TABLE);
 else
     safef(query, sizeof(query), 
 	  "select %s.pathway_id,%s.pathway_name,entity_id,entity_name,entity_type,batch_size from %s "
 	  "join %s on %s.pathway_id=%s.pathway_id where %s.pathway_name = \"%s\"", 
 	  EN_TABLE, EP_TABLE, EN_TABLE, 
 	  EP_TABLE, EN_TABLE, EP_TABLE, 
 	  EP_TABLE, pathwayName);
 
 struct sqlResult *sr = sqlGetResult(biConn, query);
 char **row = NULL;
 
 struct hash *hash = hashNew(0);
 
 struct pathwayData *pd;
 struct slPair *sp, *spList = NULL;
 while ((row = sqlNextRow(sr)) != NULL)
     {
     int pathway_id     = atoi(row[0]);
     char *pathway_name = row[1];   // pathway name
     int entity_id      = atoi(row[2]);
     char *entity_name  = row[3];   // entity name
     char *entity_type  = row[4];   // type (protein, abstract, whatever...)
 	int batchSize = atoi(row[5]);  // optimal batch size for sending to the cluster
 
     struct hashEl *el = hashLookup(hash, pathway_name);
     if (!el)
 	{
 	AllocVar(sp);
 	sp->name = cloneString(pathway_name);
 	AllocVar(pd);
 	pd->id = pathway_id;
 	pd->entities = NULL;
 	pd->links = NULL;
 	pd->data = NULL;
 	pd->featureIds = NULL;
 	if(batchSize == 0)
 		pd->batchSize = NUM_IN_BATCH;
 	else
 		pd->batchSize = batchSize;
 	sp->val = pd;
 	slAddHead(&spList, sp);
 	hashAdd(hash, pathway_name, sp);
 	}
     else
 	sp = el->val;
 
     pd = sp->val;
     if (!pd)
 	continue;
 
     struct entities *en;
     AllocVar(en);
     en->id   = entity_id;
     en->name = cloneString(entity_name);
     en->type = cloneString(entity_type);
     slAddHead(&pd->entities, en);
     }
 sqlFreeResult(&sr);
 
 safef(query, sizeof(query), 
       "select pathway_name,t1.entity_name,t2.entity_name,link_name from %s "
       "join %s on %s.pathway=%s.pathway_id "
       "join %s on link_type=%s.id "
       "join %s as t1 on %s.parent_entity=t1.entity_id "
       "join %s as t2 on %s.child_entity=t2.entity_id ", // a join across a few tables
       EL_TABLE, EP_TABLE, EL_TABLE, EP_TABLE,
       ELT_TABLE, ELT_TABLE, 
       EN_TABLE, EL_TABLE, 
       EN_TABLE, EL_TABLE);
 
 sr = sqlGetResult(biConn, query);
 while ((row = sqlNextRow(sr)) != NULL)
     {
     char *pathway_name  = row[0];   // name
     char *parent_entity = row[1];   // parent name
     char *child_entity  = row[2];   // child name
     char *link_type     = row[3];   // link type, e.g. '->'
     
     struct hashEl *el = hashLookup(hash, pathway_name);
     if (!el)
 	continue;  
 
     sp = el->val;
     pd = sp->val;
 
     if (!pd)
 	continue;
 
     struct links *li;
     AllocVar(li);
     li->parent_name = cloneString(parent_entity);
     li->child_name  = cloneString(child_entity);
     li->link_type   = cloneString(link_type);
     slAddHead(&pd->links, li);
     }
 
 sqlFreeResult(&sr);
 
 hashFree(&hash);
 
 slReverse(&spList);
 return spList;
 }
 
 char *getCohortName(struct sqlConnection *biConn, int cohort_id)
 {
 char query[256];
 safef(query, sizeof(query), 
       "select name from %s where id = %d", 
       CO_TABLE, cohort_id);
 
 return sqlQuickString(biConn, query);
 }
 
 struct slName *getCohortTables(struct sqlConnection *biConn, int cohort_id)
 {
 char query[256];
 safef(query, sizeof(query), 
       "select data_table from %s join %s on id=dataset_id where cohort_id = %d",
       DC_TABLE, DA_TABLE, cohort_id);
 
 return sqlQuickList(biConn, query);
 }
 
 struct slPair * filterAnalysisValsSampleHashesFullData(struct slPair* spData,
 						       int dataCount)
 {
 struct slPair *sp, **prevSp = &spData;
 for (sp = spData; sp;)
     {
     if (slCount(sp->val) != dataCount) 
 	{
 	*prevSp = sp->next;
 	slPairHashesFree(&sp);
 	sp = *prevSp;
 	} 
     else
 	{
 	prevSp = &(sp->next);
 	sp = sp->next;
 	}
     }
 return spData;
 }
 
 void pathwayLevelPipeline(char *db, char *pathway, int cohort_id, int numIters)
 {
 struct sqlConnection *biConn = hAllocConnProfile("localDb", db);
 
 char *cohort_name = getCohortName(biConn, cohort_id);
 struct slName *tableNames = getCohortTables(biConn, cohort_id);
 
 uglyTime(NULL);
 struct slPair *spPathways;
 if(sameWord(pathway,"all"))
 	spPathways = getPathwaysByName(biConn, NULL);
 else
 	spPathways = getPathwaysByName(biConn, pathway);
 uglyTime("got pathways");
 
 if (slCount(spPathways) == 0)
     errAbort("No pathways by name of %s", pathway);
 
 struct slPair *spData = analysisValsSamplesHashesList(biConn, tableNames);
 uglyTime("got sample hashes");   
 
 /* Only consider samples with full data.  The following output */
 /* functions require this. */
 spData = filterAnalysisValsSampleHashesFullData(spData, slCount(tableNames));
 
 pathwayLevelAnalysis(biConn, cohort_name, spData, spPathways, 
 		     slCount(tableNames));
 uglyTime("analyzed all genesets");
 
 pathwayNullAnalysis(biConn, cohort_name, spData, spPathways, numIters, 
 		    slCount(tableNames), "withinPathway");
 uglyTime("produced null distribution");
 
 pathwayNullAnalysis(biConn, cohort_name, spData, spPathways, numIters, 
 		    slCount(tableNames), "allData");
 uglyTime("produced null distribution");
 
 slPairHashesFreeList(&spData);
 slPairPathwayFreeList(&spPathways);
 hFreeConn(&biConn);
 } 
 
 int main(int argc, char *argv[])
        /* Process command line. */
 {
 if (argc != 6)
     usage();
 
 srand ( time(NULL) );
 
 char *db = argv[1];
 char *pathway = argv[2];
 tmpDir = cloneString(argv[3]);
 int cohort_id = atoi(argv[4]);
 int numIters = atoi(argv[5]);
 
 pathwayLevelPipeline(db, pathway, cohort_id, numIters);
 
 return 0;
 }