src/hg/instinct/bioInt2/bioPathwayLevelUsingEM.c 1.1

1.1 2009/09/01 05:18:02 sbenz
Added beginnings of EM pathway analysis
Index: src/hg/instinct/bioInt2/bioPathwayLevelUsingEM.c
===================================================================
RCS file: src/hg/instinct/bioInt2/bioPathwayLevelUsingEM.c
diff -N src/hg/instinct/bioInt2/bioPathwayLevelUsingEM.c
--- /dev/null	1 Jan 1970 00:00:00 -0000
+++ src/hg/instinct/bioInt2/bioPathwayLevelUsingEM.c	1 Sep 2009 05:18:02 -0000	1.1
@@ -0,0 +1,391 @@
+/* bioPathwayLevelUsingEM.c -- code to run pathway pipeline */
+#include "common.h"
+#include "linefile.h"
+#include "hash.h"
+#include "options.h"
+#include "jksql.h"
+#include "hPrint.h"
+#include "hdb.h"  
+#include "dystring.h"
+#include "bioIntDb.h"
+#include "bioIntDriver.h"
+#include "cprob.h"
+#include "hgStatsLib.h"
+#include "bioController.h"
+#include "bioPathway.h"
+
+struct emParams {
+	double mRNA[3][3];
+	double genome[3][3];
+	int count;
+};
+
+// many of these functions are in bioPathwayLevel.c
+
+void storeEMValsInDb(struct sqlConnection *biConn, char *tableName,
+			  int pathway_id, int genome_dataset_id, int mrna_dataset_id, struct emParams *emp)
+{
+if (!sqlTableExists(biConn, tableName))
+    createPathwayEMValsTable(biConn, tableName);
+
+int i,j;
+//struct pathwayVals *pv;
+for(i = 0; i < 3; i++ )
+	{
+		for(j = 0; j < 3; j++)
+		{
+		struct dyString *update = newDyString(128);
+		dyStringPrintf(update, "insert into %s values ( %d,%d,%d,%d,%f)", 
+		tableName,  pathway_id, genome_dataset_id,  i,  j, emp->genome[i][j]);
+		sqlUpdate(biConn, update->string);
+		freeDyString(&update);
+		
+		update = newDyString(128);
+		dyStringPrintf(update, "insert into %s values ( %d,%d,%d,%d,%f)", 
+		tableName,  pathway_id, mrna_dataset_id,  i,  j, emp->mRNA[i][j]);
+		sqlUpdate(biConn, update->string);
+		freeDyString(&update);
+		}
+	}
+}
+
+boolean writeEMConfigFile(char *filename, char *evidPrefix)
+{
+
+FILE *f = mustOpen(filename, "w");
+if (!f)
+    return FALSE;
+
+//fprintf(f, "inference [pathway_match=_pid_66_,method=BP,updates=SEQFIX,tol=1e-9,maxiter=10000,logdomain=0]\n");
+//fprintf(f, "inference [pathway_match=_pid_48_,method=BP,updates=SEQFIX,tol=1e-9,maxiter=10000,logdomain=0]\n");
+fprintf(f, "inference [method=JTREE,updates=HUGIN,verbose=1]\n");
+fprintf(f, "evidence [suffix=_genome.tab,node=genome,disc=-0.451727;0.467580,epsilon=0.01,epsilon0=0.2]\n");
+fprintf(f, "evidence [suffix=_mRNA.tab,node=mRNA,disc=-0.1817733;0.1824913,epsilon=0.01,epsilon0=0.2]\n");
+fprintf(f, "em_step [_mRNA.tab=-obs>,_genome.tab=-obs>]\n");
+fprintf(f, "em [max_iters=30,log_z_tol=0.001]\n");
+
+return carefulCloseWarn(&f);
+}
+
+// eventually this will be more dynamic, but for now...
+boolean writeConfigFileUsingEM(char *filename, char *evidPrefix)
+{
+
+FILE *f = mustOpen(filename, "w");
+if (!f)
+    return FALSE;
+
+fprintf(f, "inference [pathway_match=_pid_66_,method=BP,updates=SEQFIX,tol=1e-9,maxiter=10000,logdomain=0]\n");
+fprintf(f, "inference [pathway_match=_pid_48_,method=BP,updates=SEQFIX,tol=1e-9,maxiter=10000,logdomain=0]\n");
+fprintf(f, "inference [method=JTREE,updates=HUGIN,verbose=1]\n");
+// pull in these params from the EM step
+//fprintf(f, "evidence [suffix=_genome.tab,node=genome,disc=-0.451727;0.467580,factorParams=0.680434527777778;0.188006489814815;0.131558978703704;0.241894870370370;0.55878737962963;0.199317706481481;0.127232687037037;0.246933030555556;0.625834314814815]\n");
+//fprintf(f, "evidence [suffix=_mRNA.tab,node=mRNA,disc=-0.1817733;0.1824913,factorParams=0.6211706;0.250951192592593;0.127878243518519;0.259141574074074;0.488297342592593;0.252561083333333;0.137161049074074;0.213898711111111;0.648940222222222]\n");
+fprintf(f, "em_step [_mRNA.tab=-obs>,_genome.tab=-obs>]\n");
+fprintf(f, "em [max_iters=0,log_z_tol=0.001]\n");
+
+return carefulCloseWarn(&f);
+}
+
+struct emParams *readEMValsFromFile(char *emOutput)
+{
+struct lineFile *lf = lineFileOpen(emOutput, TRUE);
+if (!lf)
+    errAbort("File does not exist.");
+
+int i,j;
+struct emParams *currEM;
+AllocVar(currEM);
+lineFileSkip(lf,2); // skip first two
+for(i = 0; i < 3; i++ )
+{
+	for(j = 0; j < 3; j++)
+	{
+		char *words[3];
+		lineFileRowTab(lf, words);
+		currEM->genome[i][j] = atof(words[2]);
+	}
+}
+lineFileSkip(lf,1); // skip one
+for(i = 0; i < 3; i++ )
+{
+	for(j = 0; j < 3; j++)
+	{
+		char *words[3];
+		lineFileRowTab(lf, words);
+		currEM->mRNA[i][j] = atof(words[2]);
+	}
+}
+
+lineFileClose(&lf);
+
+return currEM;	
+}
+
+struct emParams *factorGraphRunEM(struct biAnalysis *ba, void *data,
+				 int sample_id, int feature_id)
+{ 
+if (!data)
+    return NULL;
+
+struct pathwayData *pd = data;
+
+/* make temporary file containing pathway info */
+char tmpPathway[512];
+safef(tmpPathway, sizeof(tmpPathway), "%s/tmp_%s_pid_%d_path.tab", tmpDir, ba->tableName, pd->id);
+
+/* make temporary file containing evidence */
+char tmpEvidence[512];
+//safef(tmpEvidence, sizeof(tmpEvidence), "%s/tmp_%s_evid.tab", tmpDir, ba->tableName);
+safef(tmpEvidence, sizeof(tmpEvidence), "%s/tmp_%s_evid", tmpDir, ba->tableName);
+
+if (!writeEvidenceFile(tmpEvidence, pd))
+    errAbort("Problem writing evidence file %s\n", tmpEvidence);
+
+/* make a new tmp file containing the config */
+char tmpConfigEm[512];
+safef(tmpConfigEm, sizeof(tmpConfigEm), "%s/tmp_%s_config_em.txt", tmpDir, ba->tableName);
+
+if (!writeEMConfigFile(tmpConfigEm, tmpEvidence))
+    errAbort("Problem writing config file %s\n", tmpConfigEm);
+
+/* build command */
+char tmpOutput[128];
+safef(tmpOutput, sizeof(tmpOutput), "%s/tmp.out", tmpDir);
+char emOutput[256];
+safef(emOutput, sizeof(emOutput), "%s/emOutput_pid_%d.out", tmpDir, pd->id);
+
+fprintf(stderr, "Running inference...");
+fflush(stderr);
+char command[512];
+safef(command, sizeof(command), 
+    "%s/hgFactorGraph -p %s -c %s -b %s -e %s > %s", 
+    fgDir, tmpPathway, tmpConfigEm, tmpEvidence, emOutput, tmpOutput);
+
+/* Run command (cross fingers!) */
+int ret = system(command);
+if (ret)
+    {
+    fprintf(stderr, "Something went wrong!!\n");
+    return NULL;
+    }
+fprintf(stderr, "Done!\n");
+
+/* read in data from output */
+struct emParams *currEM = readEMValsFromFile(emOutput); 
+return currEM;
+}
+
+struct analysisVals *factorGraphUsingEM(struct biAnalysis *ba, void *data,
+				 int sample_id, int feature_id)
+{ 
+if (!data)
+    return NULL;
+
+struct pathwayData *pd = data;
+
+/* make temporary file containing pathway info */
+char tmpPathway[512];
+safef(tmpPathway, sizeof(tmpPathway), "%s/tmp_%s_pid_%d_path.tab", tmpDir, ba->tableName, pd->id);
+
+/* make temporary file containing evidence */
+char tmpEvidence[512];
+//safef(tmpEvidence, sizeof(tmpEvidence), "%s/tmp_%s_evid.tab", tmpDir, ba->tableName);
+safef(tmpEvidence, sizeof(tmpEvidence), "%s/tmp_%s_evid", tmpDir, ba->tableName);
+
+if (!writeEvidenceFile(tmpEvidence, pd))
+    errAbort("Problem writing evidence file %s\n", tmpEvidence);
+
+/* make a new tmp file containing the config */
+char tmpConfig[512];
+safef(tmpConfig, sizeof(tmpConfig), "%s/tmp_%s_config.txt", tmpDir, ba->tableName);
+
+if (!writeConfigFileUsingEM(tmpConfig, tmpEvidence))
+    errAbort("Problem writing config file %s\n", tmpConfig);
+
+/* build command */
+char tmpOutput[128];
+safef(tmpOutput, sizeof(tmpOutput), "%s/tmp.out", tmpDir);
+
+fprintf(stderr, "Running inference...");
+fflush(stderr);
+char command[512];
+safef(command, sizeof(command), 
+    "%s/hgFactorGraph -p %s -c %s -b %s > %s", 
+    fgDir, tmpPathway, tmpConfig, tmpEvidence, tmpOutput);
+
+/* Run command (cross fingers!) */
+int ret = system(command);
+if (ret)
+    {
+    fprintf(stderr, "Something went wrong!!\n");
+    return NULL;
+    }
+fprintf(stderr, "Done!\n");
+
+/* read in data from output */
+struct analysisVals *avList = readAnalysisValsFromFile(tmpOutput, pd->entities, -1); 
+return avList;
+}
+
+/* Pipeline Stuff */
+struct pathwayVals *pathwayLevelAnalysisUsingEM(struct sqlConnection *biConn, struct biAnalysis *ba, 
+					 struct slPair *spData, struct slPair *spPathways)
+{
+if (!ba->analyze)
+    return NULL; 
+
+int i,j;
+
+fprintf(stdout, "starting em-based pathway analysis.\n");
+
+struct hash *featureHash = createIdHash(biConn, AF_TABLE, "feature_name");
+
+struct slPair *pa, *sp;
+
+int count = 0, numPathways = slCount(spPathways);
+
+// init the em struct
+struct emParams *currEMSettingsTotal;
+AllocVar(currEMSettingsTotal);
+for(i = 0; i < 3; i++ )
+	{
+	for(j = 0; j < 3; j++)
+		{
+		currEMSettingsTotal->mRNA[i][j] = 0;
+		currEMSettingsTotal->genome[i][j] = 0;
+		}
+	}
+currEMSettingsTotal->count = 0;
+
+// run EM step
+struct dyString *emTableName = dyStringNew(32);
+dyStringPrintf(emTableName, "emP_%s", ba->tableName);
+int genome_ds_id, mrna_ds_id;
+struct slName *datasetTables;
+// get genome and mrna dataset ids
+for(datasetTables = ba->inputTables; datasetTables; datasetTables = datasetTables->next)
+{
+	struct dyString *emTableQuery = dyStringNew(256);
+	dyStringPrintf(emTableQuery, "SELECT * FROM %s WHERE data_table = '%s'", DA_TABLE, datasetTables->name);
+	
+	struct datasets *currDS = datasetsLoadByQuery(biConn, emTableQuery->string);
+	freeDyString(&emTableQuery);
+	
+	if(currDS->type_id == 0)
+		genome_ds_id = currDS->id;
+	else if(currDS->type_id == 1)
+		mrna_ds_id = currDS->id;
+	datasetsFreeList(currDS);
+}
+
+for (pa = spPathways; pa; pa = pa->next)
+    {
+    struct pathwayData *pd = pa->val;
+    int feature_id = pd->id;
+
+	// skip these pathways cause EM is expensive on them
+	if(feature_id == 66 || feature_id == 48 || feature_id == 3 || feature_id == 9)
+		continue;
+
+    fprintf(stderr, "prepping pathway %s:\n", pa->name);
+    if (!prepFactorGraph(ba, pd))
+	{
+	fprintf(stderr, "problem with prep, skipping pathway.\n");
+	continue;
+	}
+
+	// redesigned to handle all data
+	pd->data = spData;
+	pd->featureIds = featureHash;
+	struct emParams *currEMSettings = factorGraphRunEM(ba, pd, -1, feature_id);
+	
+    count++;
+    fprintf(stderr, "%d of %d pathways complete!\n", count, numPathways);
+    fflush(stderr);
+
+	char tmpPathway[512];
+	safef(tmpPathway, sizeof(tmpPathway), "%s/tmp_%s_pid_%d_path.tab", tmpDir, ba->tableName, pd->id);
+	if( remove( tmpPathway ) != 0 )
+		fprintf(stderr, "problem with cleanup.\n");
+ 
+	if(currEMSettings)
+		{
+		storeEMValsInDb(biConn,emTableName->string,feature_id,genome_ds_id,mrna_ds_id,currEMSettings);	
+		}
+	freez(&currEMSettings);
+    }
+
+/*for(i = 0; i < 3; i++ )
+	{
+	for(j = 0; j < 3; j++)
+		{
+		currEMSettingsTotal->mRNA[i][j] /= currEMSettingsTotal->count;
+		currEMSettingsTotal->genome[i][j] /= currEMSettingsTotal->count;
+		fprintf(stderr, "Current EM: (%d,%d) %f %f",i,j,currEMSettingsTotal->mRNA[i][j],currEMSettingsTotal->genome[i][j]);
+		}
+	}
+*/
+fprintf(stderr, "EM Steps finished, re-running with tuned parameters\n");
+errAbort("Stop for now");
+count = 0;
+struct pathwayVals *pvList = NULL;
+for (pa = spPathways; pa; pa = pa->next)
+    {
+    struct pathwayData *pd = pa->val;
+    int feature_id = pd->id;
+
+    fprintf(stderr, "prepping pathway %s:\n", pa->name);
+    if (!prepFactorGraph(ba, pd))
+	{
+	fprintf(stderr, "problem with prep, skipping pathway.\n");
+	continue;
+	}
+
+	// redesigned to handle all data
+	pd->data = spData;
+	pd->featureIds = featureHash;
+	struct analysisVals *avList = ba->analyze(ba, pd, -1, feature_id);
+	
+    struct pathwayVals *newPvList = convertToPathwayVals(avList, feature_id);
+    pvList = slCat(pvList, newPvList);
+    count++;
+    fprintf(stderr, "%d of %d pathways complete!\n", count, numPathways);
+    fflush(stderr);
+
+	char tmpPathway[512];
+	safef(tmpPathway, sizeof(tmpPathway), "%s/tmp_%s_pid_%d_path.tab", tmpDir, ba->tableName, pd->id);
+	if( remove( tmpPathway ) != 0 )
+		fprintf(stderr, "problem with cleanup.\n");
+ 
+    analysisValsFreeList(&avList);
+    }
+
+fprintf(stdout, "\n");
+
+return pvList; 
+}
+
+void pathwayLevelPipelineUsingEM(struct biAnalysis *ba)
+{
+uglyTime(NULL);
+struct sqlConnection *biConn = hAllocConnProfile("localDb", ba->db);
+struct slPair *spData = analysisValsSamplesHashesList(biConn, ba->inputTables);
+uglyTime("got sample hashes");   
+
+struct slPair *spPathways = getPathways(biConn);
+uglyTime("got pathways");
+
+struct pathwayVals *pvList = pathwayLevelAnalysisUsingEM(biConn, ba, spData, spPathways);
+uglyTime("analyzed all pathways");
+
+/* store analysis vals in database table */
+fprintf(stdout, "storing pathway results...\n");
+storePathwayValsInDb(biConn, ba->tableName, pvList);
+uglyTime("stored all pathways\n");
+hFreeConn(&biConn);   
+
+slPairHashesFreeList(&spData);
+slPairPathwayFreeList(&spPathways);
+hFreeConn(&biConn);
+}