src/hg/instinct/bioInt2/bioPathwayLevelUsingEM.c 1.2

1.2 2009/09/05 01:12:01 sbenz
Added em to pipeline
Index: src/hg/instinct/bioInt2/bioPathwayLevelUsingEM.c
===================================================================
RCS file: /projects/compbio/cvsroot/kent/src/hg/instinct/bioInt2/bioPathwayLevelUsingEM.c,v
retrieving revision 1.1
retrieving revision 1.2
diff -b -B -U 4 -r1.1 -r1.2
--- src/hg/instinct/bioInt2/bioPathwayLevelUsingEM.c	1 Sep 2009 05:18:02 -0000	1.1
+++ src/hg/instinct/bioInt2/bioPathwayLevelUsingEM.c	5 Sep 2009 01:12:01 -0000	1.2
@@ -13,18 +13,12 @@
 #include "hgStatsLib.h"
 #include "bioController.h"
 #include "bioPathway.h"
 
-struct emParams {
-	double mRNA[3][3];
-	double genome[3][3];
-	int count;
-};
-
 // many of these functions are in bioPathwayLevel.c
 
 void storeEMValsInDb(struct sqlConnection *biConn, char *tableName,
-			  int pathway_id, int genome_dataset_id, int mrna_dataset_id, struct emParams *emp)
+			  int pathway_id, int cohort_id, int genome_dataset_id, int mrna_dataset_id, struct emParams *emp)
 {
 if (!sqlTableExists(biConn, tableName))
     createPathwayEMValsTable(biConn, tableName);
 
@@ -34,22 +28,47 @@
 	{
 		for(j = 0; j < 3; j++)
 		{
 		struct dyString *update = newDyString(128);
-		dyStringPrintf(update, "insert into %s values ( %d,%d,%d,%d,%f)", 
-		tableName,  pathway_id, genome_dataset_id,  i,  j, emp->genome[i][j]);
+		dyStringPrintf(update, "insert into %s values ( %d,%d,%d,%d,%d,%f)", 
+		tableName,  pathway_id, cohort_id, genome_dataset_id,  i,  j, emp->genome[i][j]);
 		sqlUpdate(biConn, update->string);
 		freeDyString(&update);
 		
 		update = newDyString(128);
-		dyStringPrintf(update, "insert into %s values ( %d,%d,%d,%d,%f)", 
-		tableName,  pathway_id, mrna_dataset_id,  i,  j, emp->mRNA[i][j]);
+		dyStringPrintf(update, "insert into %s values ( %d,%d,%d,%d,%d,%f)", 
+		tableName,  pathway_id, cohort_id, mrna_dataset_id,  i,  j, emp->mRNA[i][j]);
 		sqlUpdate(biConn, update->string);
 		freeDyString(&update);
 		}
 	}
 }
 
+struct emParams *getEMParameters(struct sqlConnection *biConn, char *tableName, int cohort_id, int genome_ds_id, int mrna_ds_id)
+{
+struct emParams *params;
+struct sqlResult *sr;
+char **row;
+
+AllocVar(params);
+
+struct dyString *emQuery = dyStringNew(256);
+dyStringPrintf(emQuery, "SELECT AVG(val),dataset_id,parent_state,child_state FROM %s WHERE cohort_id = %d GROUP BY dataset_id,parent_state,child_state", tableName, cohort_id);
+sr = sqlGetResult(biConn, emQuery->string);
+while ((row = sqlNextRow(sr)) != NULL)
+    {
+		if(sqlUnsigned(row[1]) == genome_ds_id)
+			params->genome[sqlUnsigned(row[2])][sqlUnsigned(row[3])] = sqlFloat(row[0]);
+		else
+			params->mRNA[sqlUnsigned(row[2])][sqlUnsigned(row[3])] = sqlFloat(row[0]);
+    }
+
+sqlFreeResult(&sr);
+freeDyString(&emQuery);
+
+return params;
+}
+
 boolean writeEMConfigFile(char *filename, char *evidPrefix)
 {
 
 FILE *f = mustOpen(filename, "w");
@@ -67,8 +86,9 @@
 return carefulCloseWarn(&f);
 }
 
 // eventually this will be more dynamic, but for now...
+/*
 boolean writeConfigFileUsingEM(char *filename, char *evidPrefix)
 {
 
 FILE *f = mustOpen(filename, "w");
@@ -84,9 +105,9 @@
 fprintf(f, "em_step [_mRNA.tab=-obs>,_genome.tab=-obs>]\n");
 fprintf(f, "em [max_iters=0,log_z_tol=0.001]\n");
 
 return carefulCloseWarn(&f);
-}
+}*/
 
 struct emParams *readEMValsFromFile(char *emOutput)
 {
 struct lineFile *lf = lineFileOpen(emOutput, TRUE);
@@ -172,8 +193,11 @@
 fprintf(stderr, "Done!\n");
 
 /* read in data from output */
 struct emParams *currEM = readEMValsFromFile(emOutput); 
+if( remove( emOutput ) != 0 )
+	fprintf(stderr, "problem with cleanup of em tmp file.\n");
+
 return currEM;
 }
 
 struct analysisVals *factorGraphUsingEM(struct biAnalysis *ba, void *data,
@@ -199,9 +223,10 @@
 /* make a new tmp file containing the config */
 char tmpConfig[512];
 safef(tmpConfig, sizeof(tmpConfig), "%s/tmp_%s_config.txt", tmpDir, ba->tableName);
 
-if (!writeConfigFileUsingEM(tmpConfig, tmpEvidence))
+//if (!writeConfigFileUsingEM(tmpConfig, tmpEvidence,pd->emParameters))
+if (!writeConfigFile(tmpConfig, tmpEvidence,pd->emParameters))
     errAbort("Problem writing config file %s\n", tmpConfig);
 
 /* build command */
 char tmpOutput[128];
@@ -244,26 +269,12 @@
 struct slPair *pa, *sp;
 
 int count = 0, numPathways = slCount(spPathways);
 
-// init the em struct
-struct emParams *currEMSettingsTotal;
-AllocVar(currEMSettingsTotal);
-for(i = 0; i < 3; i++ )
-	{
-	for(j = 0; j < 3; j++)
-		{
-		currEMSettingsTotal->mRNA[i][j] = 0;
-		currEMSettingsTotal->genome[i][j] = 0;
-		}
-	}
-currEMSettingsTotal->count = 0;
-
-// run EM step
-struct dyString *emTableName = dyStringNew(32);
-dyStringPrintf(emTableName, "emP_%s", ba->tableName);
+// need the dataset ids to pull the em parameters into the right places
 int genome_ds_id, mrna_ds_id;
 struct slName *datasetTables;
+
 // get genome and mrna dataset ids
 for(datasetTables = ba->inputTables; datasetTables; datasetTables = datasetTables->next)
 {
 	struct dyString *emTableQuery = dyStringNew(256);
@@ -275,33 +286,54 @@
 	if(currDS->type_id == 0)
 		genome_ds_id = currDS->id;
 	else if(currDS->type_id == 1)
 		mrna_ds_id = currDS->id;
-	datasetsFreeList(currDS);
+	datasetsFreeList(&currDS);
 }
 
+// run EM step
 for (pa = spPathways; pa; pa = pa->next)
     {
     struct pathwayData *pd = pa->val;
     int feature_id = pd->id;
+    count++;
 
 	// skip these pathways cause EM is expensive on them
 	if(feature_id == 66 || feature_id == 48 || feature_id == 3 || feature_id == 9)
+		{
+		fprintf(stderr, "Skipping pathway %d because EM is too expensive.\n",count);
+		continue;
+		}
+		
+	// next check to see if we've already run EM on this pathway
+	struct dyString *emResultQuery = dyStringNew(256);
+	dyStringPrintf(emResultQuery, "SELECT * FROM %s WHERE pathway_id = %d AND cohort_id = %d", EM_TABLE, feature_id, ba->cohort_id);
+	
+	struct pathwayEMParams *emParams = pathwayEMParamsLoadByQuery(biConn, emResultQuery->string);
+	freeDyString(&emResultQuery);
+	
+	int resultCount = slCount(emParams);
+	pathwayEMParamsFreeList(&emParams);
+	if(resultCount > 0)
+	{
+		if(resultCount != 18)
+		    fprintf(stderr, "Pathway id %d has some em parameters, but less than expected (%d/18).\n", feature_id,resultCount);
+		fprintf(stderr, "Skipping pathway %d (id: %d); previous results found.\n",count,feature_id);
 		continue;
+	}
 
     fprintf(stderr, "prepping pathway %s:\n", pa->name);
     if (!prepFactorGraph(ba, pd))
 	{
-	fprintf(stderr, "problem with prep, skipping pathway.\n");
+		fprintf(stderr, "problem with prep, skipping pathway %d.\n",count);
 	continue;
 	}
 
 	// redesigned to handle all data
 	pd->data = spData;
 	pd->featureIds = featureHash;
 	struct emParams *currEMSettings = factorGraphRunEM(ba, pd, -1, feature_id);
 	
-    count++;
     fprintf(stderr, "%d of %d pathways complete!\n", count, numPathways);
     fflush(stderr);
 
 	char tmpPathway[512];
@@ -310,25 +342,16 @@
 		fprintf(stderr, "problem with cleanup.\n");
  
 	if(currEMSettings)
 		{
-		storeEMValsInDb(biConn,emTableName->string,feature_id,genome_ds_id,mrna_ds_id,currEMSettings);	
+		storeEMValsInDb(biConn, EM_TABLE, feature_id, ba->cohort_id, genome_ds_id, mrna_ds_id, currEMSettings);	
 		}
 	freez(&currEMSettings);
     }
 
-/*for(i = 0; i < 3; i++ )
-	{
-	for(j = 0; j < 3; j++)
-		{
-		currEMSettingsTotal->mRNA[i][j] /= currEMSettingsTotal->count;
-		currEMSettingsTotal->genome[i][j] /= currEMSettingsTotal->count;
-		fprintf(stderr, "Current EM: (%d,%d) %f %f",i,j,currEMSettingsTotal->mRNA[i][j],currEMSettingsTotal->genome[i][j]);
-		}
-	}
-*/
 fprintf(stderr, "EM Steps finished, re-running with tuned parameters\n");
-errAbort("Stop for now");
+struct emParams *emSettings = getEMParameters(biConn, EM_TABLE, ba->cohort_id, genome_ds_id, mrna_ds_id);	
+//errAbort("Stop for now");
 count = 0;
 struct pathwayVals *pvList = NULL;
 for (pa = spPathways; pa; pa = pa->next)
     {
@@ -344,8 +367,9 @@
 
 	// redesigned to handle all data
 	pd->data = spData;
 	pd->featureIds = featureHash;
+	pd->emParameters = emSettings;
 	struct analysisVals *avList = ba->analyze(ba, pd, -1, feature_id);
 	
     struct pathwayVals *newPvList = convertToPathwayVals(avList, feature_id);
     pvList = slCat(pvList, newPvList);
@@ -361,8 +385,9 @@
     analysisValsFreeList(&avList);
     }
 
 fprintf(stdout, "\n");
+freez(&emSettings);
 
 return pvList; 
 }