src/hg/instinct/bioInt2/bioPathwayLevelUsingEM.c 1.4

1.4 2009/09/30 22:29:40 sbenz
Fixed compile warnings
Index: src/hg/instinct/bioInt2/bioPathwayLevelUsingEM.c
===================================================================
RCS file: /projects/compbio/cvsroot/kent/src/hg/instinct/bioInt2/bioPathwayLevelUsingEM.c,v
retrieving revision 1.3
retrieving revision 1.4
diff -b -B -U 1000000 -r1.3 -r1.4
--- src/hg/instinct/bioInt2/bioPathwayLevelUsingEM.c	21 Sep 2009 16:57:04 -0000	1.3
+++ src/hg/instinct/bioInt2/bioPathwayLevelUsingEM.c	30 Sep 2009 22:29:40 -0000	1.4
@@ -1,420 +1,425 @@
 /* bioPathwayLevelUsingEM.c -- code to run pathway pipeline */
 #include "common.h"
 #include "linefile.h"
 #include "hash.h"
 #include "options.h"
 #include "jksql.h"
 #include "hPrint.h"
 #include "hdb.h"  
 #include "dystring.h"
 #include "bioIntDb.h"
 #include "bioIntDriver.h"
 #include "cprob.h"
 #include "hgStatsLib.h"
 #include "bioController.h"
 #include "bioPathway.h"
 
+// defined in createTables.c
+
+
 // many of these functions are in bioPathwayLevel.c
 
 void storeEMValsInDb(struct sqlConnection *biConn, char *tableName,
 			  int pathway_id, int cohort_id, int genome_dataset_id, int mrna_dataset_id, struct emParams *emp)
 {
 if (!sqlTableExists(biConn, tableName))
     createPathwayEMValsTable(biConn, tableName);
 
 int i,j;
 //struct pathwayVals *pv;
 for(i = 0; i < 3; i++ )
 	{
 		for(j = 0; j < 3; j++)
 		{
 		struct dyString *update = newDyString(128);
 		dyStringPrintf(update, "insert into %s values ( %d,%d,%d,%d,%d,%f)", 
 		tableName,  pathway_id, cohort_id, genome_dataset_id,  i,  j, emp->genome[i][j]);
 		sqlUpdate(biConn, update->string);
 		freeDyString(&update);
 		
 		update = newDyString(128);
 		dyStringPrintf(update, "insert into %s values ( %d,%d,%d,%d,%d,%f)", 
 		tableName,  pathway_id, cohort_id, mrna_dataset_id,  i,  j, emp->mRNA[i][j]);
 		sqlUpdate(biConn, update->string);
 		freeDyString(&update);
 		}
 	}
 }
 
 struct emParams *getEMParameters(struct sqlConnection *biConn, char *tableName, int cohort_id, int genome_ds_id, int mrna_ds_id)
 {
 struct emParams *params;
 struct sqlResult *sr;
 char **row;
 
 AllocVar(params);
 
 struct dyString *emQuery = dyStringNew(256);
 dyStringPrintf(emQuery, "SELECT AVG(val),dataset_id,parent_state,child_state FROM %s WHERE cohort_id = %d GROUP BY dataset_id,parent_state,child_state", tableName, cohort_id);
 sr = sqlGetResult(biConn, emQuery->string);
 while ((row = sqlNextRow(sr)) != NULL)
     {
 		if(sqlUnsigned(row[1]) == genome_ds_id)
 			params->genome[sqlUnsigned(row[2])][sqlUnsigned(row[3])] = sqlFloat(row[0]);
 		else
 			params->mRNA[sqlUnsigned(row[2])][sqlUnsigned(row[3])] = sqlFloat(row[0]);
     }
 
 sqlFreeResult(&sr);
 freeDyString(&emQuery);
 
 return params;
 }
 
 boolean writeEMConfigFile(char *filename, char *evidPrefix)
 {
 
 FILE *f = mustOpen(filename, "w");
 if (!f)
     return FALSE;
 
 //fprintf(f, "inference [pathway_match=_pid_66_,method=BP,updates=SEQFIX,tol=1e-9,maxiter=10000,logdomain=0]\n");
 //fprintf(f, "inference [pathway_match=_pid_48_,method=BP,updates=SEQFIX,tol=1e-9,maxiter=10000,logdomain=0]\n");
 fprintf(f, "inference [method=JTREE,updates=HUGIN,verbose=1]\n");
 fprintf(f, "evidence [suffix=_genome.tab,node=genome,disc=-1.3;1.3,epsilon=0.01,epsilon0=0.2]\n");
 fprintf(f, "evidence [suffix=_mRNA.tab,node=mRNA,disc=-1.3;1.3,epsilon=0.01,epsilon0=0.2]\n");
 fprintf(f, "em_step [_mRNA.tab=-obs>,_genome.tab=-obs>]\n");
 fprintf(f, "em [max_iters=30,log_z_tol=0.001]\n");
 
 return carefulCloseWarn(&f);
 }
 
 // eventually this will be more dynamic, but for now...
 /*
 boolean writeConfigFileUsingEM(char *filename, char *evidPrefix)
 {
 
 FILE *f = mustOpen(filename, "w");
 if (!f)
     return FALSE;
 
 fprintf(f, "inference [pathway_match=_pid_66_,method=BP,updates=SEQFIX,tol=1e-9,maxiter=10000,logdomain=0]\n");
 fprintf(f, "inference [pathway_match=_pid_48_,method=BP,updates=SEQFIX,tol=1e-9,maxiter=10000,logdomain=0]\n");
 fprintf(f, "inference [method=JTREE,updates=HUGIN,verbose=1]\n");
 // pull in these params from the EM step
 
 //fprintf(f, "evidence [suffix=_genome.tab,node=genome,disc=-0.451727;0.467580,factorParams=0.680434527777778;0.188006489814815;0.131558978703704;0.241894870370370;0.55878737962963;0.199317706481481;0.127232687037037;0.246933030555556;0.625834314814815]\n");
 //fprintf(f, "evidence [suffix=_mRNA.tab,node=mRNA,disc=-0.1817733;0.1824913,factorParams=0.6211706;0.250951192592593;0.127878243518519;0.259141574074074;0.488297342592593;0.252561083333333;0.137161049074074;0.213898711111111;0.648940222222222]\n");
 fprintf(f, "em_step [_mRNA.tab=-obs>,_genome.tab=-obs>]\n");
 fprintf(f, "em [max_iters=0,log_z_tol=0.001]\n");
 
 return carefulCloseWarn(&f);
 }*/
 
 struct emParams *readEMValsFromFile(char *emOutput)
 {
 struct lineFile *lf = lineFileOpen(emOutput, TRUE);
 if (!lf)
     errAbort("File does not exist.");
 
 int i,j;
 struct emParams *currEM;
 AllocVar(currEM);
 lineFileSkip(lf,2); // skip first two
 for(i = 0; i < 3; i++ )
 {
 	for(j = 0; j < 3; j++)
 	{
 		char *words[3];
 		lineFileRowTab(lf, words);
 		currEM->genome[i][j] = atof(words[2]);
 	}
 }
 lineFileSkip(lf,1); // skip one
 for(i = 0; i < 3; i++ )
 {
 	for(j = 0; j < 3; j++)
 	{
 		char *words[3];
 		lineFileRowTab(lf, words);
 		currEM->mRNA[i][j] = atof(words[2]);
 	}
 }
 
 lineFileClose(&lf);
 
 return currEM;	
 }
 
 struct emParams *factorGraphRunEM(struct biAnalysis *ba, void *data,
 				 int sample_id, int feature_id)
 { 
 if (!data)
     return NULL;
 
 struct pathwayData *pd = data;
 
 /* make temporary file containing pathway info */
 char tmpPathway[512];
 safef(tmpPathway, sizeof(tmpPathway), "%s/tmp_%s_pid_%d_path.tab", tmpDir, ba->tableName, pd->id);
 
 /* make temporary file containing evidence */
 char tmpEvidence[512];
 //safef(tmpEvidence, sizeof(tmpEvidence), "%s/tmp_%s_evid.tab", tmpDir, ba->tableName);
 safef(tmpEvidence, sizeof(tmpEvidence), "%s/tmp_%s_evid", tmpDir, ba->tableName);
 
 if (!writeEvidenceFile(tmpEvidence, pd))
     errAbort("Problem writing evidence file %s\n", tmpEvidence);
 
 /* make a new tmp file containing the config */
 char tmpConfigEm[512];
 safef(tmpConfigEm, sizeof(tmpConfigEm), "%s/tmp_%s_config_em.txt", tmpDir, ba->tableName);
 
 if (!writeEMConfigFile(tmpConfigEm, tmpEvidence))
     errAbort("Problem writing config file %s\n", tmpConfigEm);
 
 /* build command */
 char tmpOutput[128];
 safef(tmpOutput, sizeof(tmpOutput), "%s/tmp.out", tmpDir);
 char emOutput[256];
 safef(emOutput, sizeof(emOutput), "%s/emOutput_pid_%d.out", tmpDir, pd->id);
 
 fprintf(stderr, "Running inference...");
 fflush(stderr);
 char command[512];
 safef(command, sizeof(command), 
     "%s/hgFactorGraph -p %s -c %s -b %s -e %s > %s", 
     fgDir, tmpPathway, tmpConfigEm, tmpEvidence, emOutput, tmpOutput);
 
 /* Run command (cross fingers!) */
 int ret = system(command);
 if (ret)
     {
     fprintf(stderr, "Something went wrong!!\n");
     return NULL;
     }
 fprintf(stderr, "Done!\n");
 
 /* read in data from output */
 struct emParams *currEM = readEMValsFromFile(emOutput);
 if( remove( emOutput ) != 0 )
 	fprintf(stderr, "problem with cleanup of em tmp file.\n");
 
 return currEM;
 }
 
 struct analysisVals *factorGraphUsingEM(struct biAnalysis *ba, void *data,
 				 int sample_id, int feature_id)
 { 
 if (!data)
     return NULL;
 
 struct pathwayData *pd = data;
 
 /* make temporary file containing pathway info */
 char tmpPathway[512];
 safef(tmpPathway, sizeof(tmpPathway), "%s/tmp_%s_pid_%d_path.tab", tmpDir, ba->tableName, pd->id);
 
 /* make temporary file containing evidence */
 char tmpEvidence[512];
 //safef(tmpEvidence, sizeof(tmpEvidence), "%s/tmp_%s_evid.tab", tmpDir, ba->tableName);
 safef(tmpEvidence, sizeof(tmpEvidence), "%s/tmp_%s_evid", tmpDir, ba->tableName);
 
 if (!writeEvidenceFile(tmpEvidence, pd))
     errAbort("Problem writing evidence file %s\n", tmpEvidence);
 
 /* make a new tmp file containing the config */
 char tmpConfig[512];
 safef(tmpConfig, sizeof(tmpConfig), "%s/tmp_%s_config.txt", tmpDir, ba->tableName);
 
 //if (!writeConfigFileUsingEM(tmpConfig, tmpEvidence,pd->emParameters))
 if (!writeConfigFile(tmpConfig, tmpEvidence,pd->emParameters))
     errAbort("Problem writing config file %s\n", tmpConfig);
 
 /* build command */
 char tmpOutput[128];
 safef(tmpOutput, sizeof(tmpOutput), "%s/tmp.out", tmpDir);
 
 fprintf(stderr, "Running inference...");
 fflush(stderr);
 char command[512];
 safef(command, sizeof(command), 
     "%s/hgFactorGraph -p %s -c %s -b %s > %s", 
     fgDir, tmpPathway, tmpConfig, tmpEvidence, tmpOutput);
 
 /* Run command (cross fingers!) */
 int ret = system(command);
 if (ret)
     {
     fprintf(stderr, "Something went wrong!!\n");
     return NULL;
     }
 fprintf(stderr, "Done!\n");
 
 /* read in data from output */
 struct analysisVals *avList = readAnalysisValsFromFile(tmpOutput, pd->entities, -1); 
 return avList;
 }
 
 /* Pipeline Stuff */
 struct pathwayVals *pathwayLevelAnalysisUsingEM(struct sqlConnection *biConn, struct biAnalysis *ba, 
 					 struct slPair *spData, struct slPair *spPathways)
 {
 if (!ba->analyze)
     return NULL; 
 
-int i,j;
+//int i,j;
 
 fprintf(stdout, "starting em-based pathway analysis.\n");
 
 struct hash *featureHash = createIdHash(biConn, AF_TABLE, "feature_name");
 
-struct slPair *pa, *sp;
+//struct slPair *pa, *sp;
+struct slPair *pa;
 
 int count = 0, numPathways = slCount(spPathways);
 
 // need the dataset ids to pull the em parameters into the right places
-int genome_ds_id, mrna_ds_id;
+int genome_ds_id = -1, mrna_ds_id = -1;
 struct slName *datasetTables;
 
 // get genome and mrna dataset ids
 for(datasetTables = ba->inputTables; datasetTables; datasetTables = datasetTables->next)
 {
 	struct dyString *emTableQuery = dyStringNew(256);
 	dyStringPrintf(emTableQuery, "SELECT * FROM %s WHERE data_table = '%s'", DA_TABLE, datasetTables->name);
 	
 	struct datasets *currDS = datasetsLoadByQuery(biConn, emTableQuery->string);
 	freeDyString(&emTableQuery);
 	
 	if(currDS->type_id == 0)
 		genome_ds_id = currDS->id;
 	else if(currDS->type_id == 1)
 		mrna_ds_id = currDS->id;
 	datasetsFreeList(&currDS);
 }
 
 // run EM step
 if (!sqlTableExists(biConn, EM_TABLE))
     createPathwayEMValsTable(biConn, EM_TABLE);
 
 for (pa = spPathways; pa; pa = pa->next)
     {
     struct pathwayData *pd = pa->val;
     int feature_id = pd->id;
     count++;
 
 	// skip these pathways cause EM is expensive on them
-	if(feature_id == 8 || feature_id == 20 || feature_id == 27 || feature_id == 28 || feature_id == 29 || feature_id == 30 || feature_id == 32 || feature_id == 40 || feature_id == 42 || feature_id == 55  || feature_id == 56 || feature_id == 58 || feature_id == 59 || feature_id == 72 || feature_id == 74 || feature_id == 92 || feature_id == 99 || feature_id == 104 || feature_id == 106 || feature_id == 112 || feature_id == 121 || feature_id == 132 || feature_id == 135
+	/*if(feature_id == 8 || feature_id == 20 || feature_id == 27 || feature_id == 28 || feature_id == 29 || feature_id == 30 || feature_id == 32 || feature_id == 40 || feature_id == 42 || feature_id == 55  || feature_id == 56 || feature_id == 58 || feature_id == 59 || feature_id == 72 || feature_id == 74 || feature_id == 92 || feature_id == 99 || feature_id == 104 || feature_id == 106 || feature_id == 112 || feature_id == 121 || feature_id == 132 || feature_id == 135
 	   )
 		{
 		fprintf(stderr, "Skipping pathway %d because EM is too expensive.\n",count);
 		continue;
 		}
+	*/
 		
 	// next check to see if we've already run EM on this pathway
 	struct dyString *emResultQuery = dyStringNew(256);
 	dyStringPrintf(emResultQuery, "SELECT * FROM %s WHERE pathway_id = %d AND cohort_id = %d", EM_TABLE, feature_id, ba->cohort_id);
 	
 	struct pathwayEMParams *emParams = pathwayEMParamsLoadByQuery(biConn, emResultQuery->string);
 	freeDyString(&emResultQuery);
 	
 	int resultCount = slCount(emParams);
 	pathwayEMParamsFreeList(&emParams);
 	if(resultCount > 0)
 	{
 		if(resultCount != 18)
 		    fprintf(stderr, "Pathway id %d has some em parameters, but less than expected (%d/18).\n", feature_id,resultCount);
 		fprintf(stderr, "Skipping pathway %d (id: %d); previous results found.\n",count,feature_id);
 		continue;
 	}
 
     fprintf(stderr, "prepping pathway %s:\n", pa->name);
     if (!prepFactorGraph(ba, pd))
 		{
 		fprintf(stderr, "problem with prep, skipping pathway %d.\n",count);
 		continue;
 		}
 
 	// redesigned to handle all data
 	pd->data = spData;
 	pd->featureIds = featureHash;
 	struct emParams *currEMSettings = factorGraphRunEM(ba, pd, -1, feature_id);
 	
     fprintf(stderr, "%d of %d pathways complete!\n", count, numPathways);
     fflush(stderr);
 
 	char tmpPathway[512];
 	safef(tmpPathway, sizeof(tmpPathway), "%s/tmp_%s_pid_%d_path.tab", tmpDir, ba->tableName, pd->id);
 	if( remove( tmpPathway ) != 0 )
 		fprintf(stderr, "problem with cleanup.\n");
  
 	if(currEMSettings)
 		{
 		storeEMValsInDb(biConn, EM_TABLE, feature_id, ba->cohort_id, genome_ds_id, mrna_ds_id, currEMSettings);	
 		}
 	freez(&currEMSettings);
     }
 
 fprintf(stderr, "EM Steps finished, re-running with tuned parameters\n");
 struct emParams *emSettings = getEMParameters(biConn, EM_TABLE, ba->cohort_id, genome_ds_id, mrna_ds_id);	
 //errAbort("Stop for now");
 count = 0;
 struct pathwayVals *pvList = NULL;
 for (pa = spPathways; pa; pa = pa->next)
     {
     struct pathwayData *pd = pa->val;
     int feature_id = pd->id;
 
     fprintf(stderr, "prepping pathway %s:\n", pa->name);
     if (!prepFactorGraph(ba, pd))
 	{
 	fprintf(stderr, "problem with prep, skipping pathway.\n");
 	continue;
 	}
 
 	// redesigned to handle all data
 	pd->data = spData;
 	pd->featureIds = featureHash;
 	pd->emParameters = emSettings;
 	struct analysisVals *avList = ba->analyze(ba, pd, -1, feature_id);
 	
     struct pathwayVals *newPvList = convertToPathwayVals(avList, feature_id);
     pvList = slCat(pvList, newPvList);
     count++;
     fprintf(stderr, "%d of %d pathways complete!\n", count, numPathways);
     fflush(stderr);
 
 	char tmpPathway[512];
 	safef(tmpPathway, sizeof(tmpPathway), "%s/tmp_%s_pid_%d_path.tab", tmpDir, ba->tableName, pd->id);
 	if( remove( tmpPathway ) != 0 )
 		fprintf(stderr, "problem with cleanup.\n");
  
     analysisValsFreeList(&avList);
     }
 
 fprintf(stdout, "\n");
 freez(&emSettings);
 
 return pvList; 
 }
 
 void pathwayLevelPipelineUsingEM(struct biAnalysis *ba)
 {
 uglyTime(NULL);
 struct sqlConnection *biConn = hAllocConnProfile("localDb", ba->db);
 struct slPair *spData = analysisValsSamplesHashesList(biConn, ba->inputTables);
 uglyTime("got sample hashes");   
 
 struct slPair *spPathways = getPathways(biConn);
 uglyTime("got pathways");
 
 struct pathwayVals *pvList = pathwayLevelAnalysisUsingEM(biConn, ba, spData, spPathways);
 uglyTime("analyzed all pathways");
 
 /* store analysis vals in database table */
 fprintf(stdout, "storing pathway results...\n");
 storePathwayValsInDb(biConn, ba->tableName, pvList);
 uglyTime("stored all pathways\n");
 hFreeConn(&biConn);   
 
 slPairHashesFreeList(&spData);
 slPairPathwayFreeList(&spPathways);
 hFreeConn(&biConn);
 }