src/hg/instinct/bioInt2/bioPathwayLevelUsingEM.c 1.2
1.2 2009/09/05 01:12:01 sbenz
Added em to pipeline
Index: src/hg/instinct/bioInt2/bioPathwayLevelUsingEM.c
===================================================================
RCS file: /projects/compbio/cvsroot/kent/src/hg/instinct/bioInt2/bioPathwayLevelUsingEM.c,v
retrieving revision 1.1
retrieving revision 1.2
diff -b -B -U 1000000 -r1.1 -r1.2
--- src/hg/instinct/bioInt2/bioPathwayLevelUsingEM.c 1 Sep 2009 05:18:02 -0000 1.1
+++ src/hg/instinct/bioInt2/bioPathwayLevelUsingEM.c 5 Sep 2009 01:12:01 -0000 1.2
@@ -1,391 +1,416 @@
/* bioPathwayLevelUsingEM.c -- code to run pathway pipeline */
#include "common.h"
#include "linefile.h"
#include "hash.h"
#include "options.h"
#include "jksql.h"
#include "hPrint.h"
#include "hdb.h"
#include "dystring.h"
#include "bioIntDb.h"
#include "bioIntDriver.h"
#include "cprob.h"
#include "hgStatsLib.h"
#include "bioController.h"
#include "bioPathway.h"
-struct emParams {
- double mRNA[3][3];
- double genome[3][3];
- int count;
-};
-
// many of these functions are in bioPathwayLevel.c
void storeEMValsInDb(struct sqlConnection *biConn, char *tableName,
- int pathway_id, int genome_dataset_id, int mrna_dataset_id, struct emParams *emp)
+ int pathway_id, int cohort_id, int genome_dataset_id, int mrna_dataset_id, struct emParams *emp)
{
if (!sqlTableExists(biConn, tableName))
createPathwayEMValsTable(biConn, tableName);
int i,j;
//struct pathwayVals *pv;
for(i = 0; i < 3; i++ )
{
for(j = 0; j < 3; j++)
{
struct dyString *update = newDyString(128);
- dyStringPrintf(update, "insert into %s values ( %d,%d,%d,%d,%f)",
- tableName, pathway_id, genome_dataset_id, i, j, emp->genome[i][j]);
+ dyStringPrintf(update, "insert into %s values ( %d,%d,%d,%d,%d,%f)",
+ tableName, pathway_id, cohort_id, genome_dataset_id, i, j, emp->genome[i][j]);
sqlUpdate(biConn, update->string);
freeDyString(&update);
update = newDyString(128);
- dyStringPrintf(update, "insert into %s values ( %d,%d,%d,%d,%f)",
- tableName, pathway_id, mrna_dataset_id, i, j, emp->mRNA[i][j]);
+ dyStringPrintf(update, "insert into %s values ( %d,%d,%d,%d,%d,%f)",
+ tableName, pathway_id, cohort_id, mrna_dataset_id, i, j, emp->mRNA[i][j]);
sqlUpdate(biConn, update->string);
freeDyString(&update);
}
}
}
+struct emParams *getEMParameters(struct sqlConnection *biConn, char *tableName, int cohort_id, int genome_ds_id, int mrna_ds_id)
+{
+struct emParams *params;
+struct sqlResult *sr;
+char **row;
+
+AllocVar(params);
+
+struct dyString *emQuery = dyStringNew(256);
+dyStringPrintf(emQuery, "SELECT AVG(val),dataset_id,parent_state,child_state FROM %s WHERE cohort_id = %d GROUP BY dataset_id,parent_state,child_state", tableName, cohort_id);
+sr = sqlGetResult(biConn, emQuery->string);
+while ((row = sqlNextRow(sr)) != NULL)
+ {
+ if(sqlUnsigned(row[1]) == genome_ds_id)
+ params->genome[sqlUnsigned(row[2])][sqlUnsigned(row[3])] = sqlFloat(row[0]);
+ else
+ params->mRNA[sqlUnsigned(row[2])][sqlUnsigned(row[3])] = sqlFloat(row[0]);
+ }
+
+sqlFreeResult(&sr);
+freeDyString(&emQuery);
+
+return params;
+}
+
boolean writeEMConfigFile(char *filename, char *evidPrefix)
{
FILE *f = mustOpen(filename, "w");
if (!f)
return FALSE;
//fprintf(f, "inference [pathway_match=_pid_66_,method=BP,updates=SEQFIX,tol=1e-9,maxiter=10000,logdomain=0]\n");
//fprintf(f, "inference [pathway_match=_pid_48_,method=BP,updates=SEQFIX,tol=1e-9,maxiter=10000,logdomain=0]\n");
fprintf(f, "inference [method=JTREE,updates=HUGIN,verbose=1]\n");
fprintf(f, "evidence [suffix=_genome.tab,node=genome,disc=-0.451727;0.467580,epsilon=0.01,epsilon0=0.2]\n");
fprintf(f, "evidence [suffix=_mRNA.tab,node=mRNA,disc=-0.1817733;0.1824913,epsilon=0.01,epsilon0=0.2]\n");
fprintf(f, "em_step [_mRNA.tab=-obs>,_genome.tab=-obs>]\n");
fprintf(f, "em [max_iters=30,log_z_tol=0.001]\n");
return carefulCloseWarn(&f);
}
// eventually this will be more dynamic, but for now...
+/*
boolean writeConfigFileUsingEM(char *filename, char *evidPrefix)
{
FILE *f = mustOpen(filename, "w");
if (!f)
return FALSE;
fprintf(f, "inference [pathway_match=_pid_66_,method=BP,updates=SEQFIX,tol=1e-9,maxiter=10000,logdomain=0]\n");
fprintf(f, "inference [pathway_match=_pid_48_,method=BP,updates=SEQFIX,tol=1e-9,maxiter=10000,logdomain=0]\n");
fprintf(f, "inference [method=JTREE,updates=HUGIN,verbose=1]\n");
// pull in these params from the EM step
+
//fprintf(f, "evidence [suffix=_genome.tab,node=genome,disc=-0.451727;0.467580,factorParams=0.680434527777778;0.188006489814815;0.131558978703704;0.241894870370370;0.55878737962963;0.199317706481481;0.127232687037037;0.246933030555556;0.625834314814815]\n");
//fprintf(f, "evidence [suffix=_mRNA.tab,node=mRNA,disc=-0.1817733;0.1824913,factorParams=0.6211706;0.250951192592593;0.127878243518519;0.259141574074074;0.488297342592593;0.252561083333333;0.137161049074074;0.213898711111111;0.648940222222222]\n");
fprintf(f, "em_step [_mRNA.tab=-obs>,_genome.tab=-obs>]\n");
fprintf(f, "em [max_iters=0,log_z_tol=0.001]\n");
return carefulCloseWarn(&f);
-}
+}*/
struct emParams *readEMValsFromFile(char *emOutput)
{
struct lineFile *lf = lineFileOpen(emOutput, TRUE);
if (!lf)
errAbort("File does not exist.");
int i,j;
struct emParams *currEM;
AllocVar(currEM);
lineFileSkip(lf,2); // skip first two
for(i = 0; i < 3; i++ )
{
for(j = 0; j < 3; j++)
{
char *words[3];
lineFileRowTab(lf, words);
currEM->genome[i][j] = atof(words[2]);
}
}
lineFileSkip(lf,1); // skip one
for(i = 0; i < 3; i++ )
{
for(j = 0; j < 3; j++)
{
char *words[3];
lineFileRowTab(lf, words);
currEM->mRNA[i][j] = atof(words[2]);
}
}
lineFileClose(&lf);
return currEM;
}
struct emParams *factorGraphRunEM(struct biAnalysis *ba, void *data,
int sample_id, int feature_id)
{
if (!data)
return NULL;
struct pathwayData *pd = data;
/* make temporary file containing pathway info */
char tmpPathway[512];
safef(tmpPathway, sizeof(tmpPathway), "%s/tmp_%s_pid_%d_path.tab", tmpDir, ba->tableName, pd->id);
/* make temporary file containing evidence */
char tmpEvidence[512];
//safef(tmpEvidence, sizeof(tmpEvidence), "%s/tmp_%s_evid.tab", tmpDir, ba->tableName);
safef(tmpEvidence, sizeof(tmpEvidence), "%s/tmp_%s_evid", tmpDir, ba->tableName);
if (!writeEvidenceFile(tmpEvidence, pd))
errAbort("Problem writing evidence file %s\n", tmpEvidence);
/* make a new tmp file containing the config */
char tmpConfigEm[512];
safef(tmpConfigEm, sizeof(tmpConfigEm), "%s/tmp_%s_config_em.txt", tmpDir, ba->tableName);
if (!writeEMConfigFile(tmpConfigEm, tmpEvidence))
errAbort("Problem writing config file %s\n", tmpConfigEm);
/* build command */
char tmpOutput[128];
safef(tmpOutput, sizeof(tmpOutput), "%s/tmp.out", tmpDir);
char emOutput[256];
safef(emOutput, sizeof(emOutput), "%s/emOutput_pid_%d.out", tmpDir, pd->id);
fprintf(stderr, "Running inference...");
fflush(stderr);
char command[512];
safef(command, sizeof(command),
"%s/hgFactorGraph -p %s -c %s -b %s -e %s > %s",
fgDir, tmpPathway, tmpConfigEm, tmpEvidence, emOutput, tmpOutput);
/* Run command (cross fingers!) */
int ret = system(command);
if (ret)
{
fprintf(stderr, "Something went wrong!!\n");
return NULL;
}
fprintf(stderr, "Done!\n");
/* read in data from output */
struct emParams *currEM = readEMValsFromFile(emOutput);
+if( remove( emOutput ) != 0 )
+ fprintf(stderr, "problem with cleanup of em tmp file.\n");
+
return currEM;
}
struct analysisVals *factorGraphUsingEM(struct biAnalysis *ba, void *data,
int sample_id, int feature_id)
{
if (!data)
return NULL;
struct pathwayData *pd = data;
/* make temporary file containing pathway info */
char tmpPathway[512];
safef(tmpPathway, sizeof(tmpPathway), "%s/tmp_%s_pid_%d_path.tab", tmpDir, ba->tableName, pd->id);
/* make temporary file containing evidence */
char tmpEvidence[512];
//safef(tmpEvidence, sizeof(tmpEvidence), "%s/tmp_%s_evid.tab", tmpDir, ba->tableName);
safef(tmpEvidence, sizeof(tmpEvidence), "%s/tmp_%s_evid", tmpDir, ba->tableName);
if (!writeEvidenceFile(tmpEvidence, pd))
errAbort("Problem writing evidence file %s\n", tmpEvidence);
/* make a new tmp file containing the config */
char tmpConfig[512];
safef(tmpConfig, sizeof(tmpConfig), "%s/tmp_%s_config.txt", tmpDir, ba->tableName);
-if (!writeConfigFileUsingEM(tmpConfig, tmpEvidence))
+//if (!writeConfigFileUsingEM(tmpConfig, tmpEvidence,pd->emParameters))
+if (!writeConfigFile(tmpConfig, tmpEvidence,pd->emParameters))
errAbort("Problem writing config file %s\n", tmpConfig);
/* build command */
char tmpOutput[128];
safef(tmpOutput, sizeof(tmpOutput), "%s/tmp.out", tmpDir);
fprintf(stderr, "Running inference...");
fflush(stderr);
char command[512];
safef(command, sizeof(command),
"%s/hgFactorGraph -p %s -c %s -b %s > %s",
fgDir, tmpPathway, tmpConfig, tmpEvidence, tmpOutput);
/* Run command (cross fingers!) */
int ret = system(command);
if (ret)
{
fprintf(stderr, "Something went wrong!!\n");
return NULL;
}
fprintf(stderr, "Done!\n");
/* read in data from output */
struct analysisVals *avList = readAnalysisValsFromFile(tmpOutput, pd->entities, -1);
return avList;
}
/* Pipeline Stuff */
struct pathwayVals *pathwayLevelAnalysisUsingEM(struct sqlConnection *biConn, struct biAnalysis *ba,
struct slPair *spData, struct slPair *spPathways)
{
if (!ba->analyze)
return NULL;
int i,j;
fprintf(stdout, "starting em-based pathway analysis.\n");
struct hash *featureHash = createIdHash(biConn, AF_TABLE, "feature_name");
struct slPair *pa, *sp;
int count = 0, numPathways = slCount(spPathways);
-// init the em struct
-struct emParams *currEMSettingsTotal;
-AllocVar(currEMSettingsTotal);
-for(i = 0; i < 3; i++ )
- {
- for(j = 0; j < 3; j++)
- {
- currEMSettingsTotal->mRNA[i][j] = 0;
- currEMSettingsTotal->genome[i][j] = 0;
- }
- }
-currEMSettingsTotal->count = 0;
-
-// run EM step
-struct dyString *emTableName = dyStringNew(32);
-dyStringPrintf(emTableName, "emP_%s", ba->tableName);
+// need the dataset ids to pull the em parameters into the right places
int genome_ds_id, mrna_ds_id;
struct slName *datasetTables;
+
// get genome and mrna dataset ids
for(datasetTables = ba->inputTables; datasetTables; datasetTables = datasetTables->next)
{
struct dyString *emTableQuery = dyStringNew(256);
dyStringPrintf(emTableQuery, "SELECT * FROM %s WHERE data_table = '%s'", DA_TABLE, datasetTables->name);
struct datasets *currDS = datasetsLoadByQuery(biConn, emTableQuery->string);
freeDyString(&emTableQuery);
if(currDS->type_id == 0)
genome_ds_id = currDS->id;
else if(currDS->type_id == 1)
mrna_ds_id = currDS->id;
- datasetsFreeList(currDS);
+ datasetsFreeList(&currDS);
}
+// run EM step
for (pa = spPathways; pa; pa = pa->next)
{
struct pathwayData *pd = pa->val;
int feature_id = pd->id;
+ count++;
// skip these pathways cause EM is expensive on them
if(feature_id == 66 || feature_id == 48 || feature_id == 3 || feature_id == 9)
+ {
+ fprintf(stderr, "Skipping pathway %d because EM is too expensive.\n",count);
+ continue;
+ }
+
+ // next check to see if we've already run EM on this pathway
+ struct dyString *emResultQuery = dyStringNew(256);
+ dyStringPrintf(emResultQuery, "SELECT * FROM %s WHERE pathway_id = %d AND cohort_id = %d", EM_TABLE, feature_id, ba->cohort_id);
+
+ struct pathwayEMParams *emParams = pathwayEMParamsLoadByQuery(biConn, emResultQuery->string);
+ freeDyString(&emResultQuery);
+
+ int resultCount = slCount(emParams);
+ pathwayEMParamsFreeList(&emParams);
+ if(resultCount > 0)
+ {
+ if(resultCount != 18)
+ fprintf(stderr, "Pathway id %d has some em parameters, but less than expected (%d/18).\n", feature_id,resultCount);
+ fprintf(stderr, "Skipping pathway %d (id: %d); previous results found.\n",count,feature_id);
continue;
+ }
fprintf(stderr, "prepping pathway %s:\n", pa->name);
if (!prepFactorGraph(ba, pd))
{
- fprintf(stderr, "problem with prep, skipping pathway.\n");
+ fprintf(stderr, "problem with prep, skipping pathway %d.\n",count);
continue;
}
// redesigned to handle all data
pd->data = spData;
pd->featureIds = featureHash;
struct emParams *currEMSettings = factorGraphRunEM(ba, pd, -1, feature_id);
- count++;
fprintf(stderr, "%d of %d pathways complete!\n", count, numPathways);
fflush(stderr);
char tmpPathway[512];
safef(tmpPathway, sizeof(tmpPathway), "%s/tmp_%s_pid_%d_path.tab", tmpDir, ba->tableName, pd->id);
if( remove( tmpPathway ) != 0 )
fprintf(stderr, "problem with cleanup.\n");
if(currEMSettings)
{
- storeEMValsInDb(biConn,emTableName->string,feature_id,genome_ds_id,mrna_ds_id,currEMSettings);
+ storeEMValsInDb(biConn, EM_TABLE, feature_id, ba->cohort_id, genome_ds_id, mrna_ds_id, currEMSettings);
}
freez(&currEMSettings);
}
-/*for(i = 0; i < 3; i++ )
- {
- for(j = 0; j < 3; j++)
- {
- currEMSettingsTotal->mRNA[i][j] /= currEMSettingsTotal->count;
- currEMSettingsTotal->genome[i][j] /= currEMSettingsTotal->count;
- fprintf(stderr, "Current EM: (%d,%d) %f %f",i,j,currEMSettingsTotal->mRNA[i][j],currEMSettingsTotal->genome[i][j]);
- }
- }
-*/
fprintf(stderr, "EM Steps finished, re-running with tuned parameters\n");
-errAbort("Stop for now");
+struct emParams *emSettings = getEMParameters(biConn, EM_TABLE, ba->cohort_id, genome_ds_id, mrna_ds_id);
+//errAbort("Stop for now");
count = 0;
struct pathwayVals *pvList = NULL;
for (pa = spPathways; pa; pa = pa->next)
{
struct pathwayData *pd = pa->val;
int feature_id = pd->id;
fprintf(stderr, "prepping pathway %s:\n", pa->name);
if (!prepFactorGraph(ba, pd))
{
fprintf(stderr, "problem with prep, skipping pathway.\n");
continue;
}
// redesigned to handle all data
pd->data = spData;
pd->featureIds = featureHash;
+ pd->emParameters = emSettings;
struct analysisVals *avList = ba->analyze(ba, pd, -1, feature_id);
struct pathwayVals *newPvList = convertToPathwayVals(avList, feature_id);
pvList = slCat(pvList, newPvList);
count++;
fprintf(stderr, "%d of %d pathways complete!\n", count, numPathways);
fflush(stderr);
char tmpPathway[512];
safef(tmpPathway, sizeof(tmpPathway), "%s/tmp_%s_pid_%d_path.tab", tmpDir, ba->tableName, pd->id);
if( remove( tmpPathway ) != 0 )
fprintf(stderr, "problem with cleanup.\n");
analysisValsFreeList(&avList);
}
fprintf(stdout, "\n");
+freez(&emSettings);
return pvList;
}
void pathwayLevelPipelineUsingEM(struct biAnalysis *ba)
{
uglyTime(NULL);
struct sqlConnection *biConn = hAllocConnProfile("localDb", ba->db);
struct slPair *spData = analysisValsSamplesHashesList(biConn, ba->inputTables);
uglyTime("got sample hashes");
struct slPair *spPathways = getPathways(biConn);
uglyTime("got pathways");
struct pathwayVals *pvList = pathwayLevelAnalysisUsingEM(biConn, ba, spData, spPathways);
uglyTime("analyzed all pathways");
/* store analysis vals in database table */
fprintf(stdout, "storing pathway results...\n");
storePathwayValsInDb(biConn, ba->tableName, pvList);
uglyTime("stored all pathways\n");
hFreeConn(&biConn);
slPairHashesFreeList(&spData);
slPairPathwayFreeList(&spPathways);
hFreeConn(&biConn);
}