531a4d9b21fa112cddaed5a707792c8c4edfc3e9
kent
  Mon Aug 25 17:00:18 2014 -0700
Moving pthreadDoList to new library module, and the parallel hacTree to hacTree.c.
diff --git src/lib/hacTree.c src/lib/hacTree.c
index cdf91bd..82eddbd 100644
--- src/lib/hacTree.c
+++ src/lib/hacTree.c
@@ -1,24 +1,26 @@
 /* hacTree - Hierarchical Agglomerative Clustering a list of inputs into a binary tree */
 
 /* Copyright (C) 2011 The Regents of the University of California 
  * See README in this or parent directory for licensing information. */
 
 #include "common.h"
 #include "dlist.h"
 #include "hash.h"
 #include "hacTree.h"
+#include "synQueue.h"
+#include "pthreadDoList.h"
 
 static struct hacTree *leafNodesFromItems(const struct slList *itemList, int itemCount,
 					  struct lm *localMem)
 /* Allocate & initialize leaf nodes that contain only items. */
 {
 struct hacTree *leafNodes = lmAlloc(localMem, itemCount * sizeof(struct hacTree));
 int i = 0;
 const struct slList *item = itemList;
 while (item != NULL && i < itemCount)
     {
     // needMem zeroes the memory, so initialize only non-NULL stuff.
     struct hacTree *node = &(leafNodes[i]);
     if (i < itemCount-1)
 	node->next = &(leafNodes[i+1]);
     node->itemOrCluster = (struct slList *)item;
@@ -382,15 +384,139 @@
 /* Clean up and go home. */
 hashFree(&distanceHash);
 struct dlNode *lastNode = dlPopHead(&remaining);
 return lastNode->val;
 }
 
 struct hacTree *hacTreeForCostlyMerges(struct slList *itemList, struct lm *localMem,
 				 hacDistanceFunction *distF, hacMergeFunction *mergeF,
 				 void *extraData)
 /* Construct hacTree using a method that will minimize the number of calls to
  * the distance and merge functions, assuming they are expensive.  Do a lmCleanup(localMem)
  * to free the returned tree. */
 {
 return hacTreeVirtualPairing(itemList, localMem, distF, mergeF, extraData, pairSerially);
 }
+
+/** The code from here on down is an implementation that uses multiple threads **/
+
+#define distKeySize 64  
+
+static void calcDistKey(void *a, void *b, char key[distKeySize])
+/* Put key for distance in key */
+{
+safef(key, distKeySize, "%p %p", a, b);
+}
+
+struct hctPair
+/* A pair of hacTree nodes and the distance between them */
+    {
+    struct hctPair *next;  /* Next in list */
+    struct hacTree *a, *b;	   /* The pair of nodes */
+    double distance;	   /* Distance between pair */
+    };
+
+struct distanceWorkerContext
+/* Context for a distance worker */
+    {
+    hacDistanceFunction *distF;	/* Function to call to calc distance */
+    void *extraData;		/* Extra data for that function */
+    };
+
+static void distanceWorker(void *item, void *context)
+/* fill in distance for pair */
+{
+struct hctPair *pair = item;
+struct distanceWorkerContext *dc = context;
+struct hacTree *aHt = pair->a, *bHt = pair->b;
+pair->distance = (dc->distF)((struct slList *)(aHt->itemOrCluster), 
+    (struct slList *)(bHt->itemOrCluster), dc->extraData);
+}
+
+static int distThreadCount = 1;  // Number of threads to use for distance calcs.
+
+static void precacheMissingDistances(struct dlList *list, struct hash *distanceHash,
+    hacDistanceFunction *distF, void *extraData)
+/* Force computation of all distances not already in hash */
+{
+/* Make up list of all missing distances in pairList */
+struct synQueue *sq = synQueueNew();
+struct hctPair *pairList = NULL, *pair;
+struct dlNode *aNode;
+for (aNode = list->head; !dlEnd(aNode); aNode = aNode->next)
+    {
+    struct hacTree *aHt = aNode->val;
+    struct dlNode *bNode;
+    for (bNode = aNode->next; !dlEnd(bNode); bNode = bNode->next)
+        {
+	struct hacTree *bHt = bNode->val;
+	char key[64];
+	calcDistKey(aHt, bHt, key);
+	double *pd = hashFindVal(distanceHash, key);
+	if (pd == NULL)
+	     {
+	     AllocVar(pair);
+	     pair->a = aHt;
+	     pair->b = bHt;
+	     slAddHead(&pairList, pair);
+	     synQueuePut(sq, pair);
+	     }
+	}
+    }
+
+/* Parallelize distance calculations */
+struct distanceWorkerContext context = {.distF=distF, .extraData=extraData};
+pthreadDoList(distThreadCount, pairList, distanceWorker, &context);
+
+for (pair = pairList; pair != NULL; pair = pair->next)
+    {
+    char key[64];
+    calcDistKey(pair->a, pair->b, key);
+    double *pd = needMem(sizeof(double));
+    *pd = pair->distance;
+    hashAdd(distanceHash, key, pd);
+    }
+slFreeList(&pairList);
+}
+
+static double pairParallel(struct dlList *list, struct hash *distanceHash, 
+    hacDistanceFunction *distF, void *extraData, struct dlNode **retNodeA, struct dlNode **retNodeB)
+/* Loop through list returning closest two nodes */
+{
+precacheMissingDistances(list, distanceHash, distF, extraData);
+struct dlNode *aNode;
+double closestDistance = BIGDOUBLE;
+struct dlNode *closestA = NULL, *closestB = NULL;
+for (aNode = list->head; !dlEnd(aNode); aNode = aNode->next)
+    {
+    struct hacTree *aHt = aNode->val;
+    struct dlNode *bNode;
+    for (bNode = aNode->next; !dlEnd(bNode); bNode = bNode->next)
+        {
+	struct hacTree *bHt = bNode->val;
+	char key[64];
+	safef(key, sizeof(key), "%p %p", aHt, bHt);
+	double *pd = hashMustFindVal(distanceHash, key);
+	double d = *pd;
+	if (d < closestDistance)
+	    {
+	    closestDistance = d;
+	    closestA = aNode;
+	    closestB = bNode;
+	    }
+	}
+    }
+*retNodeA = closestA;
+*retNodeB = closestB;
+return closestDistance;
+}
+
+struct hacTree *hacTreeMultiThread(int threadCount, struct slList *itemList, struct lm *localMem,
+				 hacDistanceFunction *distF, hacMergeFunction *mergeF,
+				 void *extraData)
+/* Construct hacTree minimizing number of merges called, and doing distance calls
+ * in parallel when possible.   Do a lmCleanup(localMem) to free returned tree. */
+{
+distThreadCount = threadCount;
+return hacTreeVirtualPairing(itemList, localMem, distF, mergeF, extraData, pairParallel);
+}
+