14cb55e83f7adaeab41efe5077cc2f1187da56b5
max
  Tue Nov 18 02:25:39 2014 -0800
adding apacheLogParse to git, better than having only a single copy in/cluster/bin/scripts.

diff --git src/utils/apacheLogParse src/utils/apacheLogParse
new file mode 100755
index 0000000..e72c278
--- /dev/null
+++ src/utils/apacheLogParse
@@ -0,0 +1,903 @@
+#!/cluster/software/bin/python2.7
+
+# parse apache logfiles on the cluster
+
+import glob, urlparse, gzip, marshal, os, shutil, gc, tempfile, types, csv, atexit, datetime, time, operator
+
+from collections import namedtuple, Counter, defaultdict
+from os.path import basename, join, abspath, isfile, dirname, isdir
+from os import listdir
+import optparse, logging, sys, string
+from itertools import imap
+from operator import itemgetter
+import heapq
+
+#TEMPDIR = "/dev/shm"
+
+# filename to delete on exit
+removeTmpName = None
+
+# where do we store the raw apache logfiles
+baseLogDir = "/hive/data/inside/wwwstats/RR"
+
+# years to analyze
+years = ["2009", "2010", "2011", "2012", "2013", "2014"]
+
+# list of servers to analyze
+# change these for debugging or if need to focus on single servers
+servNames = ["hgw1", "hgw2", "hgw3", "hgw4", "hgw5", "hgw6", "hgw7", "hgw8"]
+
+# directory for cluster job output
+jobOutDir = "/hive/data/inside/wwwstats/apacheLogParse"
+
+# directory to store csv files for C3 plot
+htmlDir = "/cluster/home/max/public_html/logParse/plot"
+
+# a struct for a cleaned apache log line
+apacheFields = ['ip', 'time', 'status', "reqType", 'speed', "filePath", "cgiParams", "agent", "referer"]
+ApacheLine = namedtuple("ApacheLine", apacheFields)
+
+# when writing the tab file, we need to replace some characters from the apache log file
+keyValReplTable = string.maketrans("\t\n=|", "  __")
+tabSepReplTable = string.maketrans("\t\n", "  ")
+
+# debugging option to run only one job
+RUNONE = False
+
+# option to globally overwrite existing files in cluster jobs
+OVERWRITE = False
+
+# option to globally clean all output directories of cluster jobs
+doCleanDir = False
+
+# ---- FUNCTIONS ----
+
+def lineFileNext(fh):
+    """ 
+        parses tab-sep file with headers as field names 
+        yields collection.namedtuples
+        strips "#"-prefix from header line
+    """
+    line1 = fh.readline()
+    line1 = line1.strip("\n").strip("#")
+    headers = line1.split("\t")
+    Record = namedtuple('tsvRec', headers)
+
+    for line in fh:
+        line = line.rstrip("\n")
+        fields = line.split("\t")
+        try:
+            rec = Record(*fields)
+        except Exception, msg:
+            logging.error("Exception occured while parsing line, %s" % msg)
+            logging.error("Filename %s" % fh.name)
+            logging.error("Line was: %s" % repr(line))
+            logging.error("Does number of fields match headers?")
+            logging.error("Headers are: %s" % headers)
+            #raise Exception("wrong field count in line %s" % line)
+            continue
+        # convert fields to correct data type
+        yield rec
+
+def dateToWeek(timeStr):
+    " convert an apache time, and get the start of its week in format like 2013-04-25"
+    timeObj = datetime.datetime.strptime(timeStr, "%d/%b/%Y:%H:%M:%S")
+    dayOfYear =  int(timeObj.strftime("%j"))
+    weekStartStr = "%d-%d" % (timeObj.year, 7*(dayOfYear/7)+1) # days are 1-based
+    weekStart = datetime.datetime.strptime(weekStartStr, "%Y-%j")
+    weekStartStr = weekStart.strftime("%Y-%m-%d")
+    return weekStartStr
+
+def parseApacheLine(fields):
+    " parse a apache log file line and return as a ApacheLine struct "
+    if len(fields)<9:
+        logging.warn("not enough fields: %s" % fields)
+        return None
+
+    # 119.63.193.132 - - [18/Aug/2013:07:54:21 -0700] "GET / HTTP/1.1" 200 16213 "-" "Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 6.0)" 200 microseconds
+    try:
+        ip = fields[0]
+        timeStr = fields[3].strip("[")
+        req = fields[5]
+        status = fields[6]
+        referer = fields[8]
+        agent = fields[9]
+        if len(fields)>10:
+            speed = fields[10]
+        else:
+            speed = "-1" # there was a time back in 2003 when we didn't have the speed field in the logs
+    except IndexError:
+        logging.warn("index error %s" % fields)
+        return None
+
+    if len(req)>10000:
+        logging.warn("HTTP request from %s with more than 10000 chars: %s" % (agent, "".join(req[:200])))
+        return None
+
+    # parse http request: typically something like
+    # GET cgi-bin/hgTracks?hgsid=xxxx&xxx HTTP1.1
+    reqRest = req.split(" ", 1)
+    if len(reqRest)!=2:
+        logging.warn("no space in request field %s" % fields)
+        return None
+
+    reqType, rest = reqRest
+    if not rest.endswith("HTTP/1.1") and not rest.endswith("HTTP/1.0"):
+        logging.warn("request does not end with HTTP/1.1 %s, GET params probably too long" % fields)
+        return None
+    
+    reqUrl = rest.replace(" HTTP/1.1", "")
+    reqUrl = reqUrl.replace(" HTTP/1.0", "")
+    #reqType, reqUrl, httpVer = reqFields
+
+    # split into cgi as a string and the params as a dict, e.g. "cgi-bin/hgXXX" and {'hgsid':'1233'}
+    filePath, paramStr = urlparse.urlsplit(reqUrl)[2:4]
+    cgi = basename(filePath)
+    params = urlparse.parse_qs(paramStr)
+    paramList = []
+    for key, val in params.iteritems():
+        val = val[0]
+        # get rid of the = and | characters in these strings
+        key = key.translate(keyValReplTable)
+        val = val.translate(keyValReplTable)
+        kvStr = "%s=%s" % (key, val)
+        if kvStr=="":
+            continue
+        paramList.append(kvStr)
+    paramStr = "|".join(paramList)
+
+    # we want to put this into a tab-sep file, so remove these chars
+    filePath = filePath.translate ( tabSepReplTable)
+    agent    = agent.translate    ( tabSepReplTable)
+    referer  = referer.translate  ( tabSepReplTable)
+
+    # put everything into a struct
+    a = ApacheLine(ip, timeStr, status, reqType, speed, filePath, paramStr, agent, referer)
+    return a
+        
+def printBest(ofh, counts, topX=100000):
+    " print top 10000 names and their counts "
+    #ofh.write("- %s\n" % title)
+    for name, count in counts:
+        ofh.write("%s\t%s\n" % (name, count))
+
+def isBot(agent):
+    a = agent.lower()
+    if "google." in a or "yahoo." in a or "bing." in a or "baidu." in a \
+            or "bot" in a or "spider" in a or "slurp" in a or "crawler" in a or \
+            "Commons-HttpClient" in a or "HTTPGrab" in a or "internal dummy" in a or \
+            a.startswith("google") or a.startswith("yahoo") or a.startswith("bing"):
+        return True
+    else:
+        return False
+
+def paramStrToDict(inStr):
+    " convert string like hgsid=12343|key=val|key2=val2 to dictionary and return "
+    d = {}
+    if inStr=="":
+        return d
+
+    parts = inStr.split("|")
+    for p in parts:
+        k, v = p.split("=")
+        d[k] = v
+    return d
+
+def apacheCount(inFname):
+    " parse apache tab sep log file and return as dicts of counters "
+    # open file
+    logging.info( "Reading %s" % inFname)
+    if inFname.endswith(".gz"):
+        ifh = gzip.open(inFname)
+    else:
+        ifh = open(inFname)
+
+    hgcClicks   = defaultdict(Counter)
+    sidIps      = defaultdict(Counter)
+    agentCounts = defaultdict(Counter)
+
+    # parse file
+    for a in lineFileNext(ifh):
+        # skip any further analysis if it's a bot
+        if isBot(a.agent):
+            logging.debug("%s is a bot" % a.agent)
+            continue 
+
+        agentCounts[a.agent][a.filePath]+=1
+
+        params = paramStrToDict(a.cgiParams)
+        # hgsids per IP address
+        hgsid = params.get("hgsid", None)
+        if hgsid != None:
+            hgsid = hgsid[0]
+            hgsid = hgsid.split("_")[0].strip() # strip the new secure id
+            if not hgsid.isdigit():
+                logging.warn("hgsid %s does not look like a number, line %s" % (hgsid, a))
+                continue
+
+            sidIps[a.ip][hgsid]+=1
+
+        # track clicks
+        if a.filePath.endswith("/hgc"):
+            trackName = params.get("g", None)
+            if trackName != None:
+                hgcClicks[trackName][a.ip]+=1
+            else:
+                logging.debug("hgc without g param: %s" % a.filePath)
+
+    return hgcClicks, sidIps, agentCounts
+
+# the following functions prepare lists of (inFname, outFname) for parasol
+
+def inOutCat(inDir, outDir):
+    " prepare filenames for parasol for the cat step, return a list of tuples (inFname, outFname) "
+    if not os.path.isdir(outDir):
+        os.makedirs(outDir)
+
+    inFnames = glob.glob(join(inDir, "*.tab.gz"))
+
+    # split all infiles by real week, not the week from the apache filename
+    byWeek = defaultdict(list)
+    for inFname in inFnames:
+        date = basename(inFname).split(".")[0].split("_")[-1]
+        byWeek[date].append(inFname)
+
+    # create little files with the filenames in inDir, to get around parasol's line limit
+    inOutFnames = []
+    for date, inFnames in byWeek.iteritems():
+        fileListFname = join(inDir, date+".fileList")
+
+        ofh = open(fileListFname, "w")
+        ofh.write("\n".join(inFnames))
+        ofh.close()
+
+        outFname = join(outDir, date+".tab.gz")
+        inOutFnames.append( (fileListFname, outFname) )
+
+    logging.info("Found %d input files, assigned to %d output files " % (len(inFnames), len(inOutFnames)))
+    return inOutFnames
+        
+
+def inOutTab(outDir):
+    " prep in/out file list for parasol log tab-sep reformatting, return list of tuples  (inFname, outFname)"
+    if not os.path.isdir(outDir):
+        os.makedirs(outDir)
+
+    fnames = []
+    for year in years:
+        yearDir = join(baseLogDir, year)
+        for servName in servNames:
+            servDir = join(yearDir, servName)
+            inFnames = glob.glob(join(servDir, "access_log.*.gz"))
+            for inFname in inFnames:
+                # access_log.20130317.gz
+                day =os.path.basename(inFname).split(".")[1]
+                outFname = "%s_%s.flag" % (day, servName) # this is just an empty flag file, job writes to <day>_<server>_<realWeekDate>.tab.gz
+                outFname = join(outDir, outFname)
+                fnames.append ( (inFname, outFname) )
+
+    print "Found %d logfiles in %s" % (len(fnames), yearDir)
+    return fnames
+
+def inOutCount(catDir, outDir):
+    """ prepare (inFname, outFname) tuples for the count step 
+    outfiles go into one subDir per counting type, e.g. hgcClicks/2013-04-04.tab.gz"""
+    if not os.path.isdir(outDir):
+        os.makedirs(outDir)
+
+    fnames = []
+    inFnames = glob.glob(join(catDir, "*.tab.gz"))
+    for inFname in inFnames:
+        fBase =os.path.basename(inFname)
+        outPath = join(outDir, "hgcClicks", fBase)
+        fnames.append ( (inFname, outPath) )
+
+    logging.info("Found %d logfiles in %s" % (len(fnames), catDir))
+    return fnames
+
+# end of parasol helper functions
+
+def submitJobs(headNode, jobOutDir, job, fnames):
+    """ submit jobs to parasol, calling this script as the jobscript with the parameter "job"  """
+
+    # clear joboutdir
+    if doCleanDir and os.path.isdir(jobOutDir):
+        print ("Deleting directory %s and all contents" % jobOutDir)
+        print ("You have three seconds to change your mind and hit ctrl-c")
+        time.sleep(3)
+        runCmd("rm -rf %s/*" % jobOutDir)
+
+    # for debugging
+    if RUNONE:
+        logging.info("Running only one job")
+        inFname, outFname = fnames[0]
+        cmd = "%s %s %s %s" % (__file__, inFname, job, "temp.hgcClicks.tab")
+        ret = os.system(cmd)
+        if ret!=0:
+            print "return code %d" % ret
+            return
+        logging.info("Running merge on single file %s" % outFname)
+        runReduce(jobOutDir, True)
+        sys.exit(0)
+
+    # create list of cmd lines to run
+    jobLines = []
+    skipCount = 0
+    for inFname, outFname in fnames:
+        if not OVERWRITE and isfile(outFname):
+            logging.debug("Skipping parasol job, output file %s already exists" % outFname)
+            skipCount += 1
+            continue
+        outPath = join(jobOutDir, outFname)
+        cmdLine = "%s %s %s %s {check out exists %s}\n" % \
+                (sys.executable, abspath(__file__), job, inFname, outPath)
+        jobLines.append(cmdLine)
+    logging.info("%d logfiles, skipped %d (already converted), %d files to convert" % \
+        (len(fnames), skipCount, len(jobLines)))
+
+    if len(jobLines)==0:
+        logging.info("No new logfiles to convert")
+        return
+
+    # create joblist file
+    jlName = join(jobOutDir, "jobList")
+    jlf = open(jlName, "w")
+    for cmdLine in jobLines:
+        jlf.write(cmdLine)
+    jlf.close()
+
+    # submitting joblist
+    print("Running jobs in dir %s" % jobOutDir)
+    cmd = "ssh %s 'cd %s; para freeBatch; para resetCounts; para clearSickNodes; para make jobList'" % \
+        (headNode, jobOutDir)
+    logging.info("running: %s" % cmd)
+    runCmd(cmd)
+
+#def getKgIdToSym():
+    #"use hgsql to get mapping kgId -> symbol "
+    #tmpFh = tempfile.NamedTemporaryFile()
+    #cmd = 'hgsql hg19 -NB -e "select kgId, geneSymbol from kgXref;" > %s' % tmpFh.name
+    #os.system(cmd)
+    #res = {}
+    #for line in open(tmpFh.name).read().splitlines():
+        #kgId, sym = line.split("\t")
+        #res[kgId] = sym
+    #return res
+
+def writeCounts(allCounts):
+    kgIdToSym = getKgIdToSym()
+    for dataType, counter in allCounts.iteritems():
+        ofh = open(dataType+".tab", "w")
+        if dataType.endswith("Set"):
+            for id, val in counter.iteritems():
+                row = [id, str(len(val)), ",".join(val)]
+                ofh.write("\t".join(row)+"\n")
+        else:
+            for id, count in counter.most_common():
+                if dataType=="genes":
+                    row = [id, kgIdToSym.get(id, ""), str(count)]
+                else:
+                    row = [id, str(count)]
+                ofh.write("\t".join(row)+"\n")
+        ofh.close()
+        print "wrote %s" % ofh.name
+
+#def postProcess(allCounts):
+    #""" assume allCounts is a dict with name -> someVariable 
+        #if sameVariable is a set, replace it with its length
+        #return the result
+    #"""
+#
+    #newDict = {}
+    #for name, var in allCounts.iteritems():
+        #if name.endswith("Set"):
+            #counter = Counter()
+            #for key, dataSet in var.iteritems():
+                #counter[key] = len(dataSet)
+            #newDict[name] = counter
+        #else:
+            #newDict[name] = var
+    #return newDict
+
+#def runReduce(jobOutDir, runOne):
+    #""
+    #gc.disable() # obscure setting to make script a lot faster, disables GC
+    #if isfile(jobOutDir):
+       #inFnames = [jobOutDir] # for debugging, we can do this
+    #else:
+        #inFnames = glob.glob(join(jobOutDir, "*.marshal"))
+#
+    #if runOne:
+        ##inFnames = [inFnames[0]] # debugging on a single file is faster
+        #inFnames = ["temp.hgcClicks.tab"]
+#
+    #print("Found %d input files" % len(inFnames))
+    #allCounts = defaultdict(Counter)
+#
+    #for inFname in inFnames:
+        #print("Reading %s" % inFname)
+        #mapData = marshal.load(open(inFname))
+        ##printBest(sys.stdout, mapData["genes"])
+        #for dataType, counts in mapData.iteritems():
+            #if dataType.endswith("Set") and dataType not in allCounts:
+                #allCounts[dataType] = defaultdict(set) # special case, not a counter but a set
+                #
+            #print dataType
+            #dataCounter = allCounts[dataType]
+            #for id, val in counts:
+                #if dataType.endswith("Max"):
+                    #dataCounter[id]=max(dataCounter[id], val)
+                #elif dataType.endswith("Set"):
+                    #dataCounter[id].update(val)
+                #else:
+                    #dataCounter[id]+=val
+#
+    #allCounts = postProcess(allCounts)
+    #writeCounts(allCounts)
+
+def runCmd(cmd):
+    logging.debug("Running %s" % cmd)
+    ret = os.system(cmd)
+    if ret !=0:
+        raise Exception("Could not run %s" % cmd)
+
+def writeLines(headers, lines, outFname):
+    " write text lines to file in an atomic way "
+    #ofh = open(outFname, "w")
+    atexit.register(delTmp)
+
+    global removeTmpName
+    fd, tmpName = tempfile.mkstemp(dir=dirname(outFname), suffix=".tab.gz.tmp")
+    removeTmpName = tmpName
+
+    ofh = gzip.open(tmpName, "w")
+    logging.info("Writing to %s" % ofh.name)
+
+    ofh.write("#"+"\t".join(headers)+"\n")
+
+    for l in lines:
+        ofh.write(l)
+        ofh.write("\n")
+    ofh.close()
+
+    logging.info("Renaming to %s" % outFname)
+    os.rename(tmpName, outFname)
+
+def delTmp():
+    " remove global tmp file name "
+    if isfile(removeTmpName):
+        os.remove(removeTmpName)
+
+def counterToString(c):
+    """ convert counter object to string like "C:4,abc:3", sorted by counts """
+    ipSorted = c.most_common() # sort by counts
+    countSum = 0
+    countParts = []
+    itemCount = 0
+    for ip, count in ipSorted:
+        ipCountStr = "%s:%d" % (ip, count)
+        #ofh.write("%s," % ipCountStr)
+        if ":" in ip:
+            ip = ip.replace(":", "_")
+        if "," in ip:
+            ip = ip.replace(",", "_")
+        countParts.append(ipCountStr)
+        countSum += count
+        itemCount +=1 
+    return countSum, itemCount, ",".join(countParts)
+
+def writeDictCounter(dictCounter, outFname, headers):
+    """ write dict with key -> Counter to tab.gz out file 
+    with four fields:
+    1 key
+    2 number of items in counter
+    3 sum of counts in counter
+    4 complete contents of counter in format item:count|item2:count|...
+    """
+    makeDir(dirname(outFname))
+    lines = []
+    for key, itemCounts in dictCounter.iteritems():
+        countSum, itemCount, countStr = counterToString(itemCounts)
+        fields = [key, str(countSum), str(itemCount), countStr]
+        lines.append("\t".join(fields))
+    lines.sort()
+
+    writeLines(headers, lines, outFname)
+    
+def firstField(line):
+    """Extract timestamp and convert to a form that gives the
+    expected result in a comparison
+    """
+    return line.split()[0] # for example
+
+def mergeLines(lastValues, ofh):
+    " given fieldNames and fieldName -> values dict, write merged line to ofh "
+    c = Counter()
+    for val in lastValues:
+        parts = val.split(",")
+        for part in parts:
+            #if ":" not in part:
+                #print part
+            vals = part.split(":")
+            if len(vals)!=2:
+                print vals
+            key, count = vals
+            c[key]+=int(count)
+
+    itemCount, totalSum, countStr = counterToString(c)
+    row = [str(itemCount), str(totalSum), countStr]
+    #if field.endswith("Count"):
+        #total = sum([int(x) for x in values])
+    #row.append(str(total))
+    #print "writing row", row
+    ofh.write("\t".join(row))
+    ofh.write("\n")
+    
+def mergeFiles(inFnames, outFname):
+    " merge sorted tabsep files to outFname "
+    ifhs = []
+    lastHeaders = None
+    assert(len(inFnames)>0)
+    for inFname in inFnames:
+        ifh = gzip.open(inFname)
+        headers = ifh.readline().rstrip("\n").split("\t")
+        assert(lastHeaders==None or lastHeaders==headers)
+        lastHeaders = headers
+        ifhs.append(ifh)
+    ofh = gzip.open(outFname, "w")
+    ofh.write("\t".join(headers)+"\n")
+
+    # python magic for merge operation
+    # http://stackoverflow.com/questions/12460943/merging-pre-sorted-files-without-reading-everything-into-memory
+    decorated = [ ((firstField(line), line) for line in f) for f in ifhs]
+    merged = heapq.merge(*decorated)
+
+    # now merge lines together and write out
+    lastKey = None
+    lastValues = []
+    for key, line in merged:
+        if lastKey != None and key!=lastKey:
+            ofh.write("%s\t" % lastKey)
+            #print "merging", lastKey, lastValues
+            mergeLines(lastValues, ofh)
+            lastValues = []
+
+        # sort into dict name of field -> values
+        #for field, val in zip(headers[1:], line.split("\t")[1:]):
+            #lastValues[field].append(val)
+        lastField = line.rstrip("\n").split("\t")[-1]
+        lastValues.append(lastField)
+        lastKey = key
+
+    #undecorated = imap(itemgetter(-1), merged)
+    #ofh.writelines(undecorated)
+
+def parsedToMerged(countDir, mergeDir):
+    " return a list of tuples (inFnameMask,  outPath)"
+    if not isdir(mergeDir):
+        os.makedirs(mergeDir)
+
+    inOutFnames = []
+    fTypes = set()
+
+    # sort the input files
+    subDirs =  listdir(countDir)
+
+    for subDir in subDirs:
+        logging.debug("subdir %s" % subDir)
+        inDir = join(countDir, subDir)
+        fnames =  listdir(inDir)
+
+        # make sure out dir exists
+        outDir = join(mergeDir, subDir)
+        if not isdir(outDir):
+            logging.debug("making dir %s" % outDir)
+            os.makedirs(outDir)
+
+        # get all dates
+        dates = set()
+        for fname in fnames:
+            if not fname.endswith("tab.gz"):
+                continue
+            date, server = fname.split('.')[0].split('_')
+            dates.add(date)
+
+        # create one output file per date
+        logging.info("subdir %s, %d dates" % (subDir, len(dates)))
+        for date in dates:
+            outPath = join(mergeDir, subDir, date+".tab.gz")
+            inMask = join(countDir, subDir, date+"_*.tab.gz")
+            inOutFnames.append ((inMask, outPath))
+
+    return inOutFnames
+
+#def mergedToYear( mergeDir, yearDir):
+    #if not isdir(yearDir):
+        #os.makedirs(yearDir)
+#
+    #yearToInFnames = defaultdict(list)
+#
+    #subDirs =  listdir(mergeDir)
+    #logging.debug("Found %d input dirs" % len(subDirs))
+    #for subDir in subDirs:
+        #fnames =  listdir(join(mergeDir, subDir))
+        ## 03-cat/hgsidHgsids/20130903.tab.gz
+        #for fname in fnames:
+            #year = fname[:4]
+            #outPath = join(yearDir, "%s_%s.tab.gz" % (year,subDir))
+            #yearToInFnames[outPath].append( join(mergeDir, subDir, fname) )
+    #return yearToInFnames
+        
+
+def mergeFnames(dateToFnames):
+    """ return list of infnames, outfname tuples. infname is like 2013_hgw8_20130414.hgcClicks.tab.gz , outfname like hgClicks/20130414.tab.gz """
+
+    for outPath, inFnames in dateToFnames.iteritems():
+        assert(len(inFnames)>0)
+        logging.info("%s" % outPath)
+        logging.debug("merging %s into %s" % (inFnames, outPath))
+        mergeFiles(inFnames, outPath)
+
+def makeDir(path):
+    if not os.path.isdir(path):
+            logging.info("Creating directory %s" % path)
+            os.makedirs(path)
+
+def plot(mergeDir, htmlDir):
+    " reformat count hgcClicks files to csv format for C3 "
+    inDir = join(mergeDir, "hgcClicks")
+    inFnames = glob.glob(join(inDir, "*.tab.gz"))
+
+    # sort filenames into year -> names
+    byYear = defaultdict(list)
+    for inFname in inFnames:
+        year = basename(inFname)[:4]
+        byYear[year].append(basename(inFname).split(".")[0])
+
+    for year, fnames in byYear.iteritems():
+        # sort fnames by date
+        dates = [(datetime.datetime.strptime(fn, "%Y-%m-%d"), fn) for fn in fnames]
+        dates.sort()
+        dates = [y for (x,y) in dates]
+
+        trackClickSum = defaultdict(int)
+        # parse data into nested dict date -> track -> count
+        dateTrackCounts = {}
+        allTracks = set()
+        for date in dates:
+            dateCounts = {}
+            path = join(inDir, date+".tab.gz")
+            for line in gzip.open(path):
+                if line.startswith("#"):
+                    continue
+                track, clickCount, ipCount, rest = line.split("\t")
+                allTracks.add(track)
+                dateCounts[track] = ipCount
+                trackClickSum[track] += int(ipCount)
+            dateTrackCounts[date] = dateCounts
+
+        allTracks = []
+        trackClickSum = trackClickSum.items() # convert dict to list of tuples (track, count)
+        trackClickSum.sort(key=operator.itemgetter(1), reverse=True) # sort by count
+
+        rank = 0
+        for track, count in trackClickSum:
+            if track.startswith("htc") or track in ["getDna","refGene", "hgUserPsl"] or track.startswith("ct_"):
+                continue
+            #if count>=5000:
+                #allTracks.append(track)
+            allTracks.append(track)
+            if rank >= 20:
+                break
+            rank += 1
+
+        allTracks.sort()
+
+        outName = join(htmlDir, "%s_hgcClicks.csv" % year)
+
+        # output data as table
+        ofh = open(outName, "w")
+        ofh.write("date,")
+        ofh.write(",".join(allTracks))
+        ofh.write("\n")
+
+        for date in dates:
+            dateTracks = dateTrackCounts[date]
+            row = [date]
+            for track in allTracks:
+                count = dateTracks.get(track, 0)
+                row.append(str(count))
+            ofh.write(",".join(row))
+            ofh.write("\n")
+
+        ofh.close()
+        logging.info("Wrote %s" % outName)
+
+def apacheToTab(inFname, outFlagFname):
+    " parse apache log file to tab file. outFlagFname is something like 20130420_hgw4.flag, write output to 20130420_hgw4_<realDate>.tab.gz "
+    logging.info("Parsing %s" % inFname)
+    if inFname.endswith(".gz"):
+        ifh = gzip.open(inFname)
+    else:
+        ifh = open(inFname)
+
+    baseOut = outFlagFname.replace(".flag","")
+    fileHandles = {} # cache the file handles
+
+    count = 0
+    for row in csv.reader(ifh, delimiter = " ", escapechar='\\'):
+        # parse apache log line
+        count += 1
+        if count % 20000 == 0:
+            print "parsed %d rows" % count
+
+        log = parseApacheLine(row)
+        if log==None:
+            #logging.info("parse error: %s" % row)
+            continue
+
+        weekDate = dateToWeek(log.time)
+
+        # skip if it's a bot
+        if isBot(log.agent):
+            #botCounts[a.agent]+=1
+            logging.debug("%s is a bot" % log.agent)
+            continue 
+
+        outFname = baseOut+"_"+weekDate+".tab.gz"
+
+        if outFname in fileHandles:
+            ofh = fileHandles[outFname]
+        else:
+            ofh = gzip.open(outFname, "w")
+            ofh.write("#"+"\t".join(apacheFields)+"\n")
+            fileHandles[outFname] = ofh
+
+        ofh.write("\t".join(log))
+        ofh.write("\n")
+        
+
+    open(outFlagFname, "w") # just create an empty flag file
+
+def catFiles(inFnames, outFname):
+    " cat all inFnames to outFname, taking care of header lines "
+    ofh = gzip.open(outFname, "w")
+    headerWritten = False
+    for inFname in inFnames:
+        ifh = gzip.open(inFname)
+        headerLine = ifh.readline()
+        if not headerWritten:
+            ofh.write(headerLine)
+            headerWritten=True
+
+        ofh.write(ifh.read())
+
+def countJob(inFname, outFnameParasol):
+    baseOutDir = dirname(dirname(outFnameParasol))
+    outBase = basename(outFnameParasol).split(".")[0]
+
+    outFname = join(baseOutDir, "hgcClicks", outBase+".tab.gz")
+    print outFname, outFnameParasol
+    assert (outFname==outFnameParasol) # output path has to look like  <baseDir>/hgcClicks/<baseName>.tab.gz
+
+    hgcClicks, ipSids, agentCounts = apacheCount(inFname)
+
+    headers = ["track", "clickCount", "ipCount", "countsPerIp_CountList"]
+    writeDictCounter(hgcClicks, outFname, headers)
+
+    headers = ["ip", "clickCount", "hgsidCount", "countsPerHgsid_CountList"]
+    outFname = join(baseOutDir, "ipHgsids", outBase+".tab.gz")
+    writeDictCounter(ipSids, outFname, headers)
+
+    headers = ["agent", "clickCount", "filePathCount", "countsPerFile_CountList"]
+    outFname = join(baseOutDir, "agentFiles", outBase+".tab.gz")
+    writeDictCounter(agentCounts, outFname, headers)
+
+
+def main(args, options):
+    tabDir   = join(jobOutDir, "01-tab")
+    catDir   = join(jobOutDir, "02-cat")
+    countDir = join(jobOutDir, "03-count")
+    yearDir  = join(jobOutDir, "04-byYear")
+
+    steps = args[0].split(",")
+
+    if "tab" in steps:
+        # convert all apache files to tab format, remove bots and split by correct date 
+        fnames = inOutTab(tabDir)
+        submitJobs(options.cluster, tabDir, "tabJob", fnames)
+
+    if "tabJob" in steps:
+        inFname, outFnameParasol = args[1], args[2]
+        apacheToTab(inFname, outFnameParasol)
+
+    if "cat" in steps:
+        # create one tab file per week by catting together the right pieces from the tab-step
+        fnames = inOutCat(tabDir, catDir)
+        submitJobs(options.cluster, catDir, "catJob", fnames)
+        
+    if "catJob" in steps:
+        # job gets a little text file with the input filenames, as they're too long for parasol
+        inListFname, outFname = args[1], args[2]
+        inFnames = open(inListFname).read().splitlines()
+        catFiles(inFnames, outFname)
+        
+    if "count" in steps:
+        # submit jobs to parasol
+        fnames = inOutCount(catDir, countDir)
+        submitJobs(options.cluster, countDir, "countJob", fnames)
+
+    if "countJob" in steps:
+        # cluster job: do the counting and write many output files, one per counting type
+        inFname, outFnameParasol = args[1], args[2]
+        countJob(inFname, outFnameParasol)
+
+    #if "merge" in steps:
+        ## submit merge jobs to parasol
+        #outInFnames = parsedToMerged(countDir, mergeDir)
+        ##mergeFnames(outInFnames)
+        #submitJobs(options.cluster, mergeDir, "mergeJob", outInFnames)
+
+    #if "mergeJob" in steps:
+        #inMask, outPath = args[1], args[2]
+        #inFnames = glob.glob(inMask)
+        #logging.debug("merging %s into %s" % (inFnames, outPath))
+        #mergeFiles(inFnames, outPath)
+
+    #if "year" in steps:
+        ## summarize data by year
+        #outInFnames = mergedToYear( mergeDir, yearDir)
+        #mergeFnames(outInFnames)
+        ##submitMergeJobs(options.cluster, "mergeJob", inOutFnames, mergeDir)
+
+    if "plot" in steps:
+        plot(countDir, htmlDir)
+        
+    #else:
+        #raise Exception("Unknown command in %s" % steps)
+
+# === COMMAND LINE INTERFACE, OPTIONS AND HELP ===
+parser = optparse.OptionParser("""usage: %%prog [options] stepList - parse apache logfiles, write results to various .tab files 
+
+All output goes to subdirectories of the default output directory %s. 
+It has one subdirectory per step.
+
+stepList is a comma-sep list of any of these steps, in the right order:
+"tab":
+    converts apache log files to tab-set files, removes bots and splits files by correct week
+"cat"
+    concatenates the right weekly files to get one clean tab-sep file per week
+"count":
+    do the counting of various categories (IPs, agents, hgc-clicks, etc), for each week
+"plot":
+    create .csv files for the C3 plotting library, to output dir %s
+
+internal steps used for parasol: tabJob, catJob, countJob, mergeJob
+
+By default, the script does not overwrite files that already exist, it only adds missing files.
+""" % (jobOutDir, htmlDir))
+
+# define options
+parser.add_option("-d", "--debug", dest="debug", action="store_true", help="show debug messages") 
+parser.add_option("-c", "--cluster", dest="cluster", action="store", help="cluster to use for mapping step, default %default", default="ku")
+parser.add_option("", "--runOne", dest="runOne", action="store_true", help="run only one single map job, for debugging")
+parser.add_option("-a", "--clearAll", dest="clearAll", action="store_true", help="clear (rm -rf!) complete step output directory before starting any step")
+(options, args) = parser.parse_args()
+if len(args)==0:
+    parser.print_help()
+    sys.exit(0)
+
+# python way of activating debug log mode
+if options.debug:
+    logging.basicConfig(level=logging.DEBUG)
+else:
+    logging.basicConfig(level=logging.INFO)
+
+if options.runOne:
+    RUNONE = True
+
+if options.clearAll:
+    doCleanDir = True
+
+main(args, options)
+