531a4d9b21fa112cddaed5a707792c8c4edfc3e9 kent Mon Aug 25 17:00:18 2014 -0700 Moving pthreadDoList to new library module, and the parallel hacTree to hacTree.c. diff --git src/lib/hacTree.c src/lib/hacTree.c index cdf91bd..82eddbd 100644 --- src/lib/hacTree.c +++ src/lib/hacTree.c @@ -1,24 +1,26 @@ /* hacTree - Hierarchical Agglomerative Clustering a list of inputs into a binary tree */ /* Copyright (C) 2011 The Regents of the University of California * See README in this or parent directory for licensing information. */ #include "common.h" #include "dlist.h" #include "hash.h" #include "hacTree.h" +#include "synQueue.h" +#include "pthreadDoList.h" static struct hacTree *leafNodesFromItems(const struct slList *itemList, int itemCount, struct lm *localMem) /* Allocate & initialize leaf nodes that contain only items. */ { struct hacTree *leafNodes = lmAlloc(localMem, itemCount * sizeof(struct hacTree)); int i = 0; const struct slList *item = itemList; while (item != NULL && i < itemCount) { // needMem zeroes the memory, so initialize only non-NULL stuff. struct hacTree *node = &(leafNodes[i]); if (i < itemCount-1) node->next = &(leafNodes[i+1]); node->itemOrCluster = (struct slList *)item; @@ -382,15 +384,139 @@ /* Clean up and go home. */ hashFree(&distanceHash); struct dlNode *lastNode = dlPopHead(&remaining); return lastNode->val; } struct hacTree *hacTreeForCostlyMerges(struct slList *itemList, struct lm *localMem, hacDistanceFunction *distF, hacMergeFunction *mergeF, void *extraData) /* Construct hacTree using a method that will minimize the number of calls to * the distance and merge functions, assuming they are expensive. Do a lmCleanup(localMem) * to free the returned tree. */ { return hacTreeVirtualPairing(itemList, localMem, distF, mergeF, extraData, pairSerially); } + +/** The code from here on down is an implementation that uses multiple threads **/ + +#define distKeySize 64 + +static void calcDistKey(void *a, void *b, char key[distKeySize]) +/* Put key for distance in key */ +{ +safef(key, distKeySize, "%p %p", a, b); +} + +struct hctPair +/* A pair of hacTree nodes and the distance between them */ + { + struct hctPair *next; /* Next in list */ + struct hacTree *a, *b; /* The pair of nodes */ + double distance; /* Distance between pair */ + }; + +struct distanceWorkerContext +/* Context for a distance worker */ + { + hacDistanceFunction *distF; /* Function to call to calc distance */ + void *extraData; /* Extra data for that function */ + }; + +static void distanceWorker(void *item, void *context) +/* fill in distance for pair */ +{ +struct hctPair *pair = item; +struct distanceWorkerContext *dc = context; +struct hacTree *aHt = pair->a, *bHt = pair->b; +pair->distance = (dc->distF)((struct slList *)(aHt->itemOrCluster), + (struct slList *)(bHt->itemOrCluster), dc->extraData); +} + +static int distThreadCount = 1; // Number of threads to use for distance calcs. + +static void precacheMissingDistances(struct dlList *list, struct hash *distanceHash, + hacDistanceFunction *distF, void *extraData) +/* Force computation of all distances not already in hash */ +{ +/* Make up list of all missing distances in pairList */ +struct synQueue *sq = synQueueNew(); +struct hctPair *pairList = NULL, *pair; +struct dlNode *aNode; +for (aNode = list->head; !dlEnd(aNode); aNode = aNode->next) + { + struct hacTree *aHt = aNode->val; + struct dlNode *bNode; + for (bNode = aNode->next; !dlEnd(bNode); bNode = bNode->next) + { + struct hacTree *bHt = bNode->val; + char key[64]; + calcDistKey(aHt, bHt, key); + double *pd = hashFindVal(distanceHash, key); + if (pd == NULL) + { + AllocVar(pair); + pair->a = aHt; + pair->b = bHt; + slAddHead(&pairList, pair); + synQueuePut(sq, pair); + } + } + } + +/* Parallelize distance calculations */ +struct distanceWorkerContext context = {.distF=distF, .extraData=extraData}; +pthreadDoList(distThreadCount, pairList, distanceWorker, &context); + +for (pair = pairList; pair != NULL; pair = pair->next) + { + char key[64]; + calcDistKey(pair->a, pair->b, key); + double *pd = needMem(sizeof(double)); + *pd = pair->distance; + hashAdd(distanceHash, key, pd); + } +slFreeList(&pairList); +} + +static double pairParallel(struct dlList *list, struct hash *distanceHash, + hacDistanceFunction *distF, void *extraData, struct dlNode **retNodeA, struct dlNode **retNodeB) +/* Loop through list returning closest two nodes */ +{ +precacheMissingDistances(list, distanceHash, distF, extraData); +struct dlNode *aNode; +double closestDistance = BIGDOUBLE; +struct dlNode *closestA = NULL, *closestB = NULL; +for (aNode = list->head; !dlEnd(aNode); aNode = aNode->next) + { + struct hacTree *aHt = aNode->val; + struct dlNode *bNode; + for (bNode = aNode->next; !dlEnd(bNode); bNode = bNode->next) + { + struct hacTree *bHt = bNode->val; + char key[64]; + safef(key, sizeof(key), "%p %p", aHt, bHt); + double *pd = hashMustFindVal(distanceHash, key); + double d = *pd; + if (d < closestDistance) + { + closestDistance = d; + closestA = aNode; + closestB = bNode; + } + } + } +*retNodeA = closestA; +*retNodeB = closestB; +return closestDistance; +} + +struct hacTree *hacTreeMultiThread(int threadCount, struct slList *itemList, struct lm *localMem, + hacDistanceFunction *distF, hacMergeFunction *mergeF, + void *extraData) +/* Construct hacTree minimizing number of merges called, and doing distance calls + * in parallel when possible. Do a lmCleanup(localMem) to free returned tree. */ +{ +distThreadCount = threadCount; +return hacTreeVirtualPairing(itemList, localMem, distF, mergeF, extraData, pairParallel); +} +