src/hg/instinct/bioInt2/bioPathwayLevelUsingEM.c 1.1
1.1 2009/09/01 05:18:02 sbenz
Added beginnings of EM pathway analysis
Index: src/hg/instinct/bioInt2/bioPathwayLevelUsingEM.c
===================================================================
RCS file: src/hg/instinct/bioInt2/bioPathwayLevelUsingEM.c
diff -N src/hg/instinct/bioInt2/bioPathwayLevelUsingEM.c
--- /dev/null 1 Jan 1970 00:00:00 -0000
+++ src/hg/instinct/bioInt2/bioPathwayLevelUsingEM.c 1 Sep 2009 05:18:02 -0000 1.1
@@ -0,0 +1,391 @@
+/* bioPathwayLevelUsingEM.c -- code to run pathway pipeline */
+#include "common.h"
+#include "linefile.h"
+#include "hash.h"
+#include "options.h"
+#include "jksql.h"
+#include "hPrint.h"
+#include "hdb.h"
+#include "dystring.h"
+#include "bioIntDb.h"
+#include "bioIntDriver.h"
+#include "cprob.h"
+#include "hgStatsLib.h"
+#include "bioController.h"
+#include "bioPathway.h"
+
+struct emParams {
+ double mRNA[3][3];
+ double genome[3][3];
+ int count;
+};
+
+// many of these functions are in bioPathwayLevel.c
+
+void storeEMValsInDb(struct sqlConnection *biConn, char *tableName,
+ int pathway_id, int genome_dataset_id, int mrna_dataset_id, struct emParams *emp)
+{
+if (!sqlTableExists(biConn, tableName))
+ createPathwayEMValsTable(biConn, tableName);
+
+int i,j;
+//struct pathwayVals *pv;
+for(i = 0; i < 3; i++ )
+ {
+ for(j = 0; j < 3; j++)
+ {
+ struct dyString *update = newDyString(128);
+ dyStringPrintf(update, "insert into %s values ( %d,%d,%d,%d,%f)",
+ tableName, pathway_id, genome_dataset_id, i, j, emp->genome[i][j]);
+ sqlUpdate(biConn, update->string);
+ freeDyString(&update);
+
+ update = newDyString(128);
+ dyStringPrintf(update, "insert into %s values ( %d,%d,%d,%d,%f)",
+ tableName, pathway_id, mrna_dataset_id, i, j, emp->mRNA[i][j]);
+ sqlUpdate(biConn, update->string);
+ freeDyString(&update);
+ }
+ }
+}
+
+boolean writeEMConfigFile(char *filename, char *evidPrefix)
+{
+
+FILE *f = mustOpen(filename, "w");
+if (!f)
+ return FALSE;
+
+//fprintf(f, "inference [pathway_match=_pid_66_,method=BP,updates=SEQFIX,tol=1e-9,maxiter=10000,logdomain=0]\n");
+//fprintf(f, "inference [pathway_match=_pid_48_,method=BP,updates=SEQFIX,tol=1e-9,maxiter=10000,logdomain=0]\n");
+fprintf(f, "inference [method=JTREE,updates=HUGIN,verbose=1]\n");
+fprintf(f, "evidence [suffix=_genome.tab,node=genome,disc=-0.451727;0.467580,epsilon=0.01,epsilon0=0.2]\n");
+fprintf(f, "evidence [suffix=_mRNA.tab,node=mRNA,disc=-0.1817733;0.1824913,epsilon=0.01,epsilon0=0.2]\n");
+fprintf(f, "em_step [_mRNA.tab=-obs>,_genome.tab=-obs>]\n");
+fprintf(f, "em [max_iters=30,log_z_tol=0.001]\n");
+
+return carefulCloseWarn(&f);
+}
+
+// eventually this will be more dynamic, but for now...
+boolean writeConfigFileUsingEM(char *filename, char *evidPrefix)
+{
+
+FILE *f = mustOpen(filename, "w");
+if (!f)
+ return FALSE;
+
+fprintf(f, "inference [pathway_match=_pid_66_,method=BP,updates=SEQFIX,tol=1e-9,maxiter=10000,logdomain=0]\n");
+fprintf(f, "inference [pathway_match=_pid_48_,method=BP,updates=SEQFIX,tol=1e-9,maxiter=10000,logdomain=0]\n");
+fprintf(f, "inference [method=JTREE,updates=HUGIN,verbose=1]\n");
+// pull in these params from the EM step
+//fprintf(f, "evidence [suffix=_genome.tab,node=genome,disc=-0.451727;0.467580,factorParams=0.680434527777778;0.188006489814815;0.131558978703704;0.241894870370370;0.55878737962963;0.199317706481481;0.127232687037037;0.246933030555556;0.625834314814815]\n");
+//fprintf(f, "evidence [suffix=_mRNA.tab,node=mRNA,disc=-0.1817733;0.1824913,factorParams=0.6211706;0.250951192592593;0.127878243518519;0.259141574074074;0.488297342592593;0.252561083333333;0.137161049074074;0.213898711111111;0.648940222222222]\n");
+fprintf(f, "em_step [_mRNA.tab=-obs>,_genome.tab=-obs>]\n");
+fprintf(f, "em [max_iters=0,log_z_tol=0.001]\n");
+
+return carefulCloseWarn(&f);
+}
+
+struct emParams *readEMValsFromFile(char *emOutput)
+{
+struct lineFile *lf = lineFileOpen(emOutput, TRUE);
+if (!lf)
+ errAbort("File does not exist.");
+
+int i,j;
+struct emParams *currEM;
+AllocVar(currEM);
+lineFileSkip(lf,2); // skip first two
+for(i = 0; i < 3; i++ )
+{
+ for(j = 0; j < 3; j++)
+ {
+ char *words[3];
+ lineFileRowTab(lf, words);
+ currEM->genome[i][j] = atof(words[2]);
+ }
+}
+lineFileSkip(lf,1); // skip one
+for(i = 0; i < 3; i++ )
+{
+ for(j = 0; j < 3; j++)
+ {
+ char *words[3];
+ lineFileRowTab(lf, words);
+ currEM->mRNA[i][j] = atof(words[2]);
+ }
+}
+
+lineFileClose(&lf);
+
+return currEM;
+}
+
+struct emParams *factorGraphRunEM(struct biAnalysis *ba, void *data,
+ int sample_id, int feature_id)
+{
+if (!data)
+ return NULL;
+
+struct pathwayData *pd = data;
+
+/* make temporary file containing pathway info */
+char tmpPathway[512];
+safef(tmpPathway, sizeof(tmpPathway), "%s/tmp_%s_pid_%d_path.tab", tmpDir, ba->tableName, pd->id);
+
+/* make temporary file containing evidence */
+char tmpEvidence[512];
+//safef(tmpEvidence, sizeof(tmpEvidence), "%s/tmp_%s_evid.tab", tmpDir, ba->tableName);
+safef(tmpEvidence, sizeof(tmpEvidence), "%s/tmp_%s_evid", tmpDir, ba->tableName);
+
+if (!writeEvidenceFile(tmpEvidence, pd))
+ errAbort("Problem writing evidence file %s\n", tmpEvidence);
+
+/* make a new tmp file containing the config */
+char tmpConfigEm[512];
+safef(tmpConfigEm, sizeof(tmpConfigEm), "%s/tmp_%s_config_em.txt", tmpDir, ba->tableName);
+
+if (!writeEMConfigFile(tmpConfigEm, tmpEvidence))
+ errAbort("Problem writing config file %s\n", tmpConfigEm);
+
+/* build command */
+char tmpOutput[128];
+safef(tmpOutput, sizeof(tmpOutput), "%s/tmp.out", tmpDir);
+char emOutput[256];
+safef(emOutput, sizeof(emOutput), "%s/emOutput_pid_%d.out", tmpDir, pd->id);
+
+fprintf(stderr, "Running inference...");
+fflush(stderr);
+char command[512];
+safef(command, sizeof(command),
+ "%s/hgFactorGraph -p %s -c %s -b %s -e %s > %s",
+ fgDir, tmpPathway, tmpConfigEm, tmpEvidence, emOutput, tmpOutput);
+
+/* Run command (cross fingers!) */
+int ret = system(command);
+if (ret)
+ {
+ fprintf(stderr, "Something went wrong!!\n");
+ return NULL;
+ }
+fprintf(stderr, "Done!\n");
+
+/* read in data from output */
+struct emParams *currEM = readEMValsFromFile(emOutput);
+return currEM;
+}
+
+struct analysisVals *factorGraphUsingEM(struct biAnalysis *ba, void *data,
+ int sample_id, int feature_id)
+{
+if (!data)
+ return NULL;
+
+struct pathwayData *pd = data;
+
+/* make temporary file containing pathway info */
+char tmpPathway[512];
+safef(tmpPathway, sizeof(tmpPathway), "%s/tmp_%s_pid_%d_path.tab", tmpDir, ba->tableName, pd->id);
+
+/* make temporary file containing evidence */
+char tmpEvidence[512];
+//safef(tmpEvidence, sizeof(tmpEvidence), "%s/tmp_%s_evid.tab", tmpDir, ba->tableName);
+safef(tmpEvidence, sizeof(tmpEvidence), "%s/tmp_%s_evid", tmpDir, ba->tableName);
+
+if (!writeEvidenceFile(tmpEvidence, pd))
+ errAbort("Problem writing evidence file %s\n", tmpEvidence);
+
+/* make a new tmp file containing the config */
+char tmpConfig[512];
+safef(tmpConfig, sizeof(tmpConfig), "%s/tmp_%s_config.txt", tmpDir, ba->tableName);
+
+if (!writeConfigFileUsingEM(tmpConfig, tmpEvidence))
+ errAbort("Problem writing config file %s\n", tmpConfig);
+
+/* build command */
+char tmpOutput[128];
+safef(tmpOutput, sizeof(tmpOutput), "%s/tmp.out", tmpDir);
+
+fprintf(stderr, "Running inference...");
+fflush(stderr);
+char command[512];
+safef(command, sizeof(command),
+ "%s/hgFactorGraph -p %s -c %s -b %s > %s",
+ fgDir, tmpPathway, tmpConfig, tmpEvidence, tmpOutput);
+
+/* Run command (cross fingers!) */
+int ret = system(command);
+if (ret)
+ {
+ fprintf(stderr, "Something went wrong!!\n");
+ return NULL;
+ }
+fprintf(stderr, "Done!\n");
+
+/* read in data from output */
+struct analysisVals *avList = readAnalysisValsFromFile(tmpOutput, pd->entities, -1);
+return avList;
+}
+
+/* Pipeline Stuff */
+struct pathwayVals *pathwayLevelAnalysisUsingEM(struct sqlConnection *biConn, struct biAnalysis *ba,
+ struct slPair *spData, struct slPair *spPathways)
+{
+if (!ba->analyze)
+ return NULL;
+
+int i,j;
+
+fprintf(stdout, "starting em-based pathway analysis.\n");
+
+struct hash *featureHash = createIdHash(biConn, AF_TABLE, "feature_name");
+
+struct slPair *pa, *sp;
+
+int count = 0, numPathways = slCount(spPathways);
+
+// init the em struct
+struct emParams *currEMSettingsTotal;
+AllocVar(currEMSettingsTotal);
+for(i = 0; i < 3; i++ )
+ {
+ for(j = 0; j < 3; j++)
+ {
+ currEMSettingsTotal->mRNA[i][j] = 0;
+ currEMSettingsTotal->genome[i][j] = 0;
+ }
+ }
+currEMSettingsTotal->count = 0;
+
+// run EM step
+struct dyString *emTableName = dyStringNew(32);
+dyStringPrintf(emTableName, "emP_%s", ba->tableName);
+int genome_ds_id, mrna_ds_id;
+struct slName *datasetTables;
+// get genome and mrna dataset ids
+for(datasetTables = ba->inputTables; datasetTables; datasetTables = datasetTables->next)
+{
+ struct dyString *emTableQuery = dyStringNew(256);
+ dyStringPrintf(emTableQuery, "SELECT * FROM %s WHERE data_table = '%s'", DA_TABLE, datasetTables->name);
+
+ struct datasets *currDS = datasetsLoadByQuery(biConn, emTableQuery->string);
+ freeDyString(&emTableQuery);
+
+ if(currDS->type_id == 0)
+ genome_ds_id = currDS->id;
+ else if(currDS->type_id == 1)
+ mrna_ds_id = currDS->id;
+ datasetsFreeList(currDS);
+}
+
+for (pa = spPathways; pa; pa = pa->next)
+ {
+ struct pathwayData *pd = pa->val;
+ int feature_id = pd->id;
+
+ // skip these pathways cause EM is expensive on them
+ if(feature_id == 66 || feature_id == 48 || feature_id == 3 || feature_id == 9)
+ continue;
+
+ fprintf(stderr, "prepping pathway %s:\n", pa->name);
+ if (!prepFactorGraph(ba, pd))
+ {
+ fprintf(stderr, "problem with prep, skipping pathway.\n");
+ continue;
+ }
+
+ // redesigned to handle all data
+ pd->data = spData;
+ pd->featureIds = featureHash;
+ struct emParams *currEMSettings = factorGraphRunEM(ba, pd, -1, feature_id);
+
+ count++;
+ fprintf(stderr, "%d of %d pathways complete!\n", count, numPathways);
+ fflush(stderr);
+
+ char tmpPathway[512];
+ safef(tmpPathway, sizeof(tmpPathway), "%s/tmp_%s_pid_%d_path.tab", tmpDir, ba->tableName, pd->id);
+ if( remove( tmpPathway ) != 0 )
+ fprintf(stderr, "problem with cleanup.\n");
+
+ if(currEMSettings)
+ {
+ storeEMValsInDb(biConn,emTableName->string,feature_id,genome_ds_id,mrna_ds_id,currEMSettings);
+ }
+ freez(&currEMSettings);
+ }
+
+/*for(i = 0; i < 3; i++ )
+ {
+ for(j = 0; j < 3; j++)
+ {
+ currEMSettingsTotal->mRNA[i][j] /= currEMSettingsTotal->count;
+ currEMSettingsTotal->genome[i][j] /= currEMSettingsTotal->count;
+ fprintf(stderr, "Current EM: (%d,%d) %f %f",i,j,currEMSettingsTotal->mRNA[i][j],currEMSettingsTotal->genome[i][j]);
+ }
+ }
+*/
+fprintf(stderr, "EM Steps finished, re-running with tuned parameters\n");
+errAbort("Stop for now");
+count = 0;
+struct pathwayVals *pvList = NULL;
+for (pa = spPathways; pa; pa = pa->next)
+ {
+ struct pathwayData *pd = pa->val;
+ int feature_id = pd->id;
+
+ fprintf(stderr, "prepping pathway %s:\n", pa->name);
+ if (!prepFactorGraph(ba, pd))
+ {
+ fprintf(stderr, "problem with prep, skipping pathway.\n");
+ continue;
+ }
+
+ // redesigned to handle all data
+ pd->data = spData;
+ pd->featureIds = featureHash;
+ struct analysisVals *avList = ba->analyze(ba, pd, -1, feature_id);
+
+ struct pathwayVals *newPvList = convertToPathwayVals(avList, feature_id);
+ pvList = slCat(pvList, newPvList);
+ count++;
+ fprintf(stderr, "%d of %d pathways complete!\n", count, numPathways);
+ fflush(stderr);
+
+ char tmpPathway[512];
+ safef(tmpPathway, sizeof(tmpPathway), "%s/tmp_%s_pid_%d_path.tab", tmpDir, ba->tableName, pd->id);
+ if( remove( tmpPathway ) != 0 )
+ fprintf(stderr, "problem with cleanup.\n");
+
+ analysisValsFreeList(&avList);
+ }
+
+fprintf(stdout, "\n");
+
+return pvList;
+}
+
+void pathwayLevelPipelineUsingEM(struct biAnalysis *ba)
+{
+uglyTime(NULL);
+struct sqlConnection *biConn = hAllocConnProfile("localDb", ba->db);
+struct slPair *spData = analysisValsSamplesHashesList(biConn, ba->inputTables);
+uglyTime("got sample hashes");
+
+struct slPair *spPathways = getPathways(biConn);
+uglyTime("got pathways");
+
+struct pathwayVals *pvList = pathwayLevelAnalysisUsingEM(biConn, ba, spData, spPathways);
+uglyTime("analyzed all pathways");
+
+/* store analysis vals in database table */
+fprintf(stdout, "storing pathway results...\n");
+storePathwayValsInDb(biConn, ba->tableName, pvList);
+uglyTime("stored all pathways\n");
+hFreeConn(&biConn);
+
+slPairHashesFreeList(&spData);
+slPairPathwayFreeList(&spPathways);
+hFreeConn(&biConn);
+}