7cc9ee442be0bb8ab7a90e77566f5df30bdb7277 markd Thu Jan 9 08:29:12 2020 -0800 add option to use parallel sort for pslPartition and bedPartition diff --git src/hg/utils/bedPartition/bedPartition.c src/hg/utils/bedPartition/bedPartition.c index 143e778..9e34048 100644 --- src/hg/utils/bedPartition/bedPartition.c +++ src/hg/utils/bedPartition/bedPartition.c @@ -1,84 +1,69 @@ /* bedPartition - split BED ranges into non-overlapping ranges */ /* Copyright (C) 2019 The Regents of the University of California * See README in this or parent directory for licensing information. */ #include "common.h" #include "options.h" -#include "pipeline.h" +#include "partitionSort.h" #include "basicBed.h" #include "sqlNum.h" #include "dystring.h" #include "portable.h" /* command line options and values */ static struct optionSpec optionSpecs[] = { + {"parallel", OPTION_INT}, {NULL, 0} }; +static int gParallel = 0; static void usage(char *msg) /* Explain usage and exit. */ { errAbort("Error: %s\n" "bedPartition - split BED ranges into non-overlapping ranges\n" "usage:\n" " bedPartition [options] bedFile rangesBed\n" "\n" "Split ranges in a BED into non-overlapping sets for use in cluster jobs.\n" "Output is a BED 3 of the ranges.\n" "The bedFile maybe compressed and no ordering is assumed.\n" "\n" "options:\n" + " -parallel=n - use this many cores for parallel sorting\n" "\n", msg); } struct bedInput /* object to read a bed */ { struct pipeline *pl; /* sorting pipeline */ struct lineFile *lf; /* lineFile to pipeline */ struct bed3 *pending; /* next bed to read, if not NULL */ }; -static struct pipeline *openBedSortPipe(char *bedFile) -/* open pipeline that sorts bed */ -{ -static char *zcatCmd[] = {"zcat", NULL}; -static char *bzcatCmd[] = {"zcat", NULL}; -static char *sortCmd[] = {"sort", "-k", "1,1", "-k", "2,2n", "-k", "3,3nr", NULL}; -int iCmd = 0; -char **cmds[3]; - -if (endsWith(bedFile, ".gz") || endsWith(bedFile, ".Z")) - cmds[iCmd++] = zcatCmd; -else if (endsWith(bedFile, ".bz2")) - cmds[iCmd++] = bzcatCmd; -cmds[iCmd++] = sortCmd; -cmds[iCmd++] = NULL; - -return pipelineOpen(cmds, pipelineRead, bedFile, NULL); -} - static struct bedInput *bedInputNew(char *bedFile) /* create object to read BEDs */ { struct bedInput *bi; AllocVar(bi); -bi->pl = openBedSortPipe(bedFile); +bi->pl = partitionSortOpenPipeline(bedFile, 0, 1, 2, gParallel); bi->lf = pipelineLineFile(bi->pl); + return bi; } static void bedInputFree(struct bedInput **biPtr) /* free bedInput object */ { struct bedInput *bi = *biPtr; if (bi != NULL) { assert(bi->pending == NULL); pipelineClose(&bi->pl); freez(biPtr); } } @@ -155,18 +140,19 @@ while ((bedPart = readPartition(bi)) != NULL) { fprintf(outFh, "%s\t%d\t%d\n", bedPart->chrom, bedPart->chromStart, bedPart->chromEnd); bed3Free(&bedPart); } carefulClose(&outFh); bedInputFree(&bi); } int main(int argc, char *argv[]) /* Process command line. */ { optionInit(&argc, argv, optionSpecs); if (argc != 3) usage("wrong # args"); +gParallel = optionInt("parallel", gParallel); bedPartition(argv[1], argv[2]); return 0; }