14cb55e83f7adaeab41efe5077cc2f1187da56b5 max Tue Nov 18 02:25:39 2014 -0800 adding apacheLogParse to git, better than having only a single copy in/cluster/bin/scripts. diff --git src/utils/apacheLogParse src/utils/apacheLogParse new file mode 100755 index 0000000..e72c278 --- /dev/null +++ src/utils/apacheLogParse @@ -0,0 +1,903 @@ +#!/cluster/software/bin/python2.7 + +# parse apache logfiles on the cluster + +import glob, urlparse, gzip, marshal, os, shutil, gc, tempfile, types, csv, atexit, datetime, time, operator + +from collections import namedtuple, Counter, defaultdict +from os.path import basename, join, abspath, isfile, dirname, isdir +from os import listdir +import optparse, logging, sys, string +from itertools import imap +from operator import itemgetter +import heapq + +#TEMPDIR = "/dev/shm" + +# filename to delete on exit +removeTmpName = None + +# where do we store the raw apache logfiles +baseLogDir = "/hive/data/inside/wwwstats/RR" + +# years to analyze +years = ["2009", "2010", "2011", "2012", "2013", "2014"] + +# list of servers to analyze +# change these for debugging or if need to focus on single servers +servNames = ["hgw1", "hgw2", "hgw3", "hgw4", "hgw5", "hgw6", "hgw7", "hgw8"] + +# directory for cluster job output +jobOutDir = "/hive/data/inside/wwwstats/apacheLogParse" + +# directory to store csv files for C3 plot +htmlDir = "/cluster/home/max/public_html/logParse/plot" + +# a struct for a cleaned apache log line +apacheFields = ['ip', 'time', 'status', "reqType", 'speed', "filePath", "cgiParams", "agent", "referer"] +ApacheLine = namedtuple("ApacheLine", apacheFields) + +# when writing the tab file, we need to replace some characters from the apache log file +keyValReplTable = string.maketrans("\t\n=|", " __") +tabSepReplTable = string.maketrans("\t\n", " ") + +# debugging option to run only one job +RUNONE = False + +# option to globally overwrite existing files in cluster jobs +OVERWRITE = False + +# option to globally clean all output directories of cluster jobs +doCleanDir = False + +# ---- FUNCTIONS ---- + +def lineFileNext(fh): + """ + parses tab-sep file with headers as field names + yields collection.namedtuples + strips "#"-prefix from header line + """ + line1 = fh.readline() + line1 = line1.strip("\n").strip("#") + headers = line1.split("\t") + Record = namedtuple('tsvRec', headers) + + for line in fh: + line = line.rstrip("\n") + fields = line.split("\t") + try: + rec = Record(*fields) + except Exception, msg: + logging.error("Exception occured while parsing line, %s" % msg) + logging.error("Filename %s" % fh.name) + logging.error("Line was: %s" % repr(line)) + logging.error("Does number of fields match headers?") + logging.error("Headers are: %s" % headers) + #raise Exception("wrong field count in line %s" % line) + continue + # convert fields to correct data type + yield rec + +def dateToWeek(timeStr): + " convert an apache time, and get the start of its week in format like 2013-04-25" + timeObj = datetime.datetime.strptime(timeStr, "%d/%b/%Y:%H:%M:%S") + dayOfYear = int(timeObj.strftime("%j")) + weekStartStr = "%d-%d" % (timeObj.year, 7*(dayOfYear/7)+1) # days are 1-based + weekStart = datetime.datetime.strptime(weekStartStr, "%Y-%j") + weekStartStr = weekStart.strftime("%Y-%m-%d") + return weekStartStr + +def parseApacheLine(fields): + " parse a apache log file line and return as a ApacheLine struct " + if len(fields)<9: + logging.warn("not enough fields: %s" % fields) + return None + + # 119.63.193.132 - - [18/Aug/2013:07:54:21 -0700] "GET / HTTP/1.1" 200 16213 "-" "Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 6.0)" 200 microseconds + try: + ip = fields[0] + timeStr = fields[3].strip("[") + req = fields[5] + status = fields[6] + referer = fields[8] + agent = fields[9] + if len(fields)>10: + speed = fields[10] + else: + speed = "-1" # there was a time back in 2003 when we didn't have the speed field in the logs + except IndexError: + logging.warn("index error %s" % fields) + return None + + if len(req)>10000: + logging.warn("HTTP request from %s with more than 10000 chars: %s" % (agent, "".join(req[:200]))) + return None + + # parse http request: typically something like + # GET cgi-bin/hgTracks?hgsid=xxxx&xxx HTTP1.1 + reqRest = req.split(" ", 1) + if len(reqRest)!=2: + logging.warn("no space in request field %s" % fields) + return None + + reqType, rest = reqRest + if not rest.endswith("HTTP/1.1") and not rest.endswith("HTTP/1.0"): + logging.warn("request does not end with HTTP/1.1 %s, GET params probably too long" % fields) + return None + + reqUrl = rest.replace(" HTTP/1.1", "") + reqUrl = reqUrl.replace(" HTTP/1.0", "") + #reqType, reqUrl, httpVer = reqFields + + # split into cgi as a string and the params as a dict, e.g. "cgi-bin/hgXXX" and {'hgsid':'1233'} + filePath, paramStr = urlparse.urlsplit(reqUrl)[2:4] + cgi = basename(filePath) + params = urlparse.parse_qs(paramStr) + paramList = [] + for key, val in params.iteritems(): + val = val[0] + # get rid of the = and | characters in these strings + key = key.translate(keyValReplTable) + val = val.translate(keyValReplTable) + kvStr = "%s=%s" % (key, val) + if kvStr=="": + continue + paramList.append(kvStr) + paramStr = "|".join(paramList) + + # we want to put this into a tab-sep file, so remove these chars + filePath = filePath.translate ( tabSepReplTable) + agent = agent.translate ( tabSepReplTable) + referer = referer.translate ( tabSepReplTable) + + # put everything into a struct + a = ApacheLine(ip, timeStr, status, reqType, speed, filePath, paramStr, agent, referer) + return a + +def printBest(ofh, counts, topX=100000): + " print top 10000 names and their counts " + #ofh.write("- %s\n" % title) + for name, count in counts: + ofh.write("%s\t%s\n" % (name, count)) + +def isBot(agent): + a = agent.lower() + if "google." in a or "yahoo." in a or "bing." in a or "baidu." in a \ + or "bot" in a or "spider" in a or "slurp" in a or "crawler" in a or \ + "Commons-HttpClient" in a or "HTTPGrab" in a or "internal dummy" in a or \ + a.startswith("google") or a.startswith("yahoo") or a.startswith("bing"): + return True + else: + return False + +def paramStrToDict(inStr): + " convert string like hgsid=12343|key=val|key2=val2 to dictionary and return " + d = {} + if inStr=="": + return d + + parts = inStr.split("|") + for p in parts: + k, v = p.split("=") + d[k] = v + return d + +def apacheCount(inFname): + " parse apache tab sep log file and return as dicts of counters " + # open file + logging.info( "Reading %s" % inFname) + if inFname.endswith(".gz"): + ifh = gzip.open(inFname) + else: + ifh = open(inFname) + + hgcClicks = defaultdict(Counter) + sidIps = defaultdict(Counter) + agentCounts = defaultdict(Counter) + + # parse file + for a in lineFileNext(ifh): + # skip any further analysis if it's a bot + if isBot(a.agent): + logging.debug("%s is a bot" % a.agent) + continue + + agentCounts[a.agent][a.filePath]+=1 + + params = paramStrToDict(a.cgiParams) + # hgsids per IP address + hgsid = params.get("hgsid", None) + if hgsid != None: + hgsid = hgsid[0] + hgsid = hgsid.split("_")[0].strip() # strip the new secure id + if not hgsid.isdigit(): + logging.warn("hgsid %s does not look like a number, line %s" % (hgsid, a)) + continue + + sidIps[a.ip][hgsid]+=1 + + # track clicks + if a.filePath.endswith("/hgc"): + trackName = params.get("g", None) + if trackName != None: + hgcClicks[trackName][a.ip]+=1 + else: + logging.debug("hgc without g param: %s" % a.filePath) + + return hgcClicks, sidIps, agentCounts + +# the following functions prepare lists of (inFname, outFname) for parasol + +def inOutCat(inDir, outDir): + " prepare filenames for parasol for the cat step, return a list of tuples (inFname, outFname) " + if not os.path.isdir(outDir): + os.makedirs(outDir) + + inFnames = glob.glob(join(inDir, "*.tab.gz")) + + # split all infiles by real week, not the week from the apache filename + byWeek = defaultdict(list) + for inFname in inFnames: + date = basename(inFname).split(".")[0].split("_")[-1] + byWeek[date].append(inFname) + + # create little files with the filenames in inDir, to get around parasol's line limit + inOutFnames = [] + for date, inFnames in byWeek.iteritems(): + fileListFname = join(inDir, date+".fileList") + + ofh = open(fileListFname, "w") + ofh.write("\n".join(inFnames)) + ofh.close() + + outFname = join(outDir, date+".tab.gz") + inOutFnames.append( (fileListFname, outFname) ) + + logging.info("Found %d input files, assigned to %d output files " % (len(inFnames), len(inOutFnames))) + return inOutFnames + + +def inOutTab(outDir): + " prep in/out file list for parasol log tab-sep reformatting, return list of tuples (inFname, outFname)" + if not os.path.isdir(outDir): + os.makedirs(outDir) + + fnames = [] + for year in years: + yearDir = join(baseLogDir, year) + for servName in servNames: + servDir = join(yearDir, servName) + inFnames = glob.glob(join(servDir, "access_log.*.gz")) + for inFname in inFnames: + # access_log.20130317.gz + day =os.path.basename(inFname).split(".")[1] + outFname = "%s_%s.flag" % (day, servName) # this is just an empty flag file, job writes to <day>_<server>_<realWeekDate>.tab.gz + outFname = join(outDir, outFname) + fnames.append ( (inFname, outFname) ) + + print "Found %d logfiles in %s" % (len(fnames), yearDir) + return fnames + +def inOutCount(catDir, outDir): + """ prepare (inFname, outFname) tuples for the count step + outfiles go into one subDir per counting type, e.g. hgcClicks/2013-04-04.tab.gz""" + if not os.path.isdir(outDir): + os.makedirs(outDir) + + fnames = [] + inFnames = glob.glob(join(catDir, "*.tab.gz")) + for inFname in inFnames: + fBase =os.path.basename(inFname) + outPath = join(outDir, "hgcClicks", fBase) + fnames.append ( (inFname, outPath) ) + + logging.info("Found %d logfiles in %s" % (len(fnames), catDir)) + return fnames + +# end of parasol helper functions + +def submitJobs(headNode, jobOutDir, job, fnames): + """ submit jobs to parasol, calling this script as the jobscript with the parameter "job" """ + + # clear joboutdir + if doCleanDir and os.path.isdir(jobOutDir): + print ("Deleting directory %s and all contents" % jobOutDir) + print ("You have three seconds to change your mind and hit ctrl-c") + time.sleep(3) + runCmd("rm -rf %s/*" % jobOutDir) + + # for debugging + if RUNONE: + logging.info("Running only one job") + inFname, outFname = fnames[0] + cmd = "%s %s %s %s" % (__file__, inFname, job, "temp.hgcClicks.tab") + ret = os.system(cmd) + if ret!=0: + print "return code %d" % ret + return + logging.info("Running merge on single file %s" % outFname) + runReduce(jobOutDir, True) + sys.exit(0) + + # create list of cmd lines to run + jobLines = [] + skipCount = 0 + for inFname, outFname in fnames: + if not OVERWRITE and isfile(outFname): + logging.debug("Skipping parasol job, output file %s already exists" % outFname) + skipCount += 1 + continue + outPath = join(jobOutDir, outFname) + cmdLine = "%s %s %s %s {check out exists %s}\n" % \ + (sys.executable, abspath(__file__), job, inFname, outPath) + jobLines.append(cmdLine) + logging.info("%d logfiles, skipped %d (already converted), %d files to convert" % \ + (len(fnames), skipCount, len(jobLines))) + + if len(jobLines)==0: + logging.info("No new logfiles to convert") + return + + # create joblist file + jlName = join(jobOutDir, "jobList") + jlf = open(jlName, "w") + for cmdLine in jobLines: + jlf.write(cmdLine) + jlf.close() + + # submitting joblist + print("Running jobs in dir %s" % jobOutDir) + cmd = "ssh %s 'cd %s; para freeBatch; para resetCounts; para clearSickNodes; para make jobList'" % \ + (headNode, jobOutDir) + logging.info("running: %s" % cmd) + runCmd(cmd) + +#def getKgIdToSym(): + #"use hgsql to get mapping kgId -> symbol " + #tmpFh = tempfile.NamedTemporaryFile() + #cmd = 'hgsql hg19 -NB -e "select kgId, geneSymbol from kgXref;" > %s' % tmpFh.name + #os.system(cmd) + #res = {} + #for line in open(tmpFh.name).read().splitlines(): + #kgId, sym = line.split("\t") + #res[kgId] = sym + #return res + +def writeCounts(allCounts): + kgIdToSym = getKgIdToSym() + for dataType, counter in allCounts.iteritems(): + ofh = open(dataType+".tab", "w") + if dataType.endswith("Set"): + for id, val in counter.iteritems(): + row = [id, str(len(val)), ",".join(val)] + ofh.write("\t".join(row)+"\n") + else: + for id, count in counter.most_common(): + if dataType=="genes": + row = [id, kgIdToSym.get(id, ""), str(count)] + else: + row = [id, str(count)] + ofh.write("\t".join(row)+"\n") + ofh.close() + print "wrote %s" % ofh.name + +#def postProcess(allCounts): + #""" assume allCounts is a dict with name -> someVariable + #if sameVariable is a set, replace it with its length + #return the result + #""" +# + #newDict = {} + #for name, var in allCounts.iteritems(): + #if name.endswith("Set"): + #counter = Counter() + #for key, dataSet in var.iteritems(): + #counter[key] = len(dataSet) + #newDict[name] = counter + #else: + #newDict[name] = var + #return newDict + +#def runReduce(jobOutDir, runOne): + #"" + #gc.disable() # obscure setting to make script a lot faster, disables GC + #if isfile(jobOutDir): + #inFnames = [jobOutDir] # for debugging, we can do this + #else: + #inFnames = glob.glob(join(jobOutDir, "*.marshal")) +# + #if runOne: + ##inFnames = [inFnames[0]] # debugging on a single file is faster + #inFnames = ["temp.hgcClicks.tab"] +# + #print("Found %d input files" % len(inFnames)) + #allCounts = defaultdict(Counter) +# + #for inFname in inFnames: + #print("Reading %s" % inFname) + #mapData = marshal.load(open(inFname)) + ##printBest(sys.stdout, mapData["genes"]) + #for dataType, counts in mapData.iteritems(): + #if dataType.endswith("Set") and dataType not in allCounts: + #allCounts[dataType] = defaultdict(set) # special case, not a counter but a set + # + #print dataType + #dataCounter = allCounts[dataType] + #for id, val in counts: + #if dataType.endswith("Max"): + #dataCounter[id]=max(dataCounter[id], val) + #elif dataType.endswith("Set"): + #dataCounter[id].update(val) + #else: + #dataCounter[id]+=val +# + #allCounts = postProcess(allCounts) + #writeCounts(allCounts) + +def runCmd(cmd): + logging.debug("Running %s" % cmd) + ret = os.system(cmd) + if ret !=0: + raise Exception("Could not run %s" % cmd) + +def writeLines(headers, lines, outFname): + " write text lines to file in an atomic way " + #ofh = open(outFname, "w") + atexit.register(delTmp) + + global removeTmpName + fd, tmpName = tempfile.mkstemp(dir=dirname(outFname), suffix=".tab.gz.tmp") + removeTmpName = tmpName + + ofh = gzip.open(tmpName, "w") + logging.info("Writing to %s" % ofh.name) + + ofh.write("#"+"\t".join(headers)+"\n") + + for l in lines: + ofh.write(l) + ofh.write("\n") + ofh.close() + + logging.info("Renaming to %s" % outFname) + os.rename(tmpName, outFname) + +def delTmp(): + " remove global tmp file name " + if isfile(removeTmpName): + os.remove(removeTmpName) + +def counterToString(c): + """ convert counter object to string like "C:4,abc:3", sorted by counts """ + ipSorted = c.most_common() # sort by counts + countSum = 0 + countParts = [] + itemCount = 0 + for ip, count in ipSorted: + ipCountStr = "%s:%d" % (ip, count) + #ofh.write("%s," % ipCountStr) + if ":" in ip: + ip = ip.replace(":", "_") + if "," in ip: + ip = ip.replace(",", "_") + countParts.append(ipCountStr) + countSum += count + itemCount +=1 + return countSum, itemCount, ",".join(countParts) + +def writeDictCounter(dictCounter, outFname, headers): + """ write dict with key -> Counter to tab.gz out file + with four fields: + 1 key + 2 number of items in counter + 3 sum of counts in counter + 4 complete contents of counter in format item:count|item2:count|... + """ + makeDir(dirname(outFname)) + lines = [] + for key, itemCounts in dictCounter.iteritems(): + countSum, itemCount, countStr = counterToString(itemCounts) + fields = [key, str(countSum), str(itemCount), countStr] + lines.append("\t".join(fields)) + lines.sort() + + writeLines(headers, lines, outFname) + +def firstField(line): + """Extract timestamp and convert to a form that gives the + expected result in a comparison + """ + return line.split()[0] # for example + +def mergeLines(lastValues, ofh): + " given fieldNames and fieldName -> values dict, write merged line to ofh " + c = Counter() + for val in lastValues: + parts = val.split(",") + for part in parts: + #if ":" not in part: + #print part + vals = part.split(":") + if len(vals)!=2: + print vals + key, count = vals + c[key]+=int(count) + + itemCount, totalSum, countStr = counterToString(c) + row = [str(itemCount), str(totalSum), countStr] + #if field.endswith("Count"): + #total = sum([int(x) for x in values]) + #row.append(str(total)) + #print "writing row", row + ofh.write("\t".join(row)) + ofh.write("\n") + +def mergeFiles(inFnames, outFname): + " merge sorted tabsep files to outFname " + ifhs = [] + lastHeaders = None + assert(len(inFnames)>0) + for inFname in inFnames: + ifh = gzip.open(inFname) + headers = ifh.readline().rstrip("\n").split("\t") + assert(lastHeaders==None or lastHeaders==headers) + lastHeaders = headers + ifhs.append(ifh) + ofh = gzip.open(outFname, "w") + ofh.write("\t".join(headers)+"\n") + + # python magic for merge operation + # http://stackoverflow.com/questions/12460943/merging-pre-sorted-files-without-reading-everything-into-memory + decorated = [ ((firstField(line), line) for line in f) for f in ifhs] + merged = heapq.merge(*decorated) + + # now merge lines together and write out + lastKey = None + lastValues = [] + for key, line in merged: + if lastKey != None and key!=lastKey: + ofh.write("%s\t" % lastKey) + #print "merging", lastKey, lastValues + mergeLines(lastValues, ofh) + lastValues = [] + + # sort into dict name of field -> values + #for field, val in zip(headers[1:], line.split("\t")[1:]): + #lastValues[field].append(val) + lastField = line.rstrip("\n").split("\t")[-1] + lastValues.append(lastField) + lastKey = key + + #undecorated = imap(itemgetter(-1), merged) + #ofh.writelines(undecorated) + +def parsedToMerged(countDir, mergeDir): + " return a list of tuples (inFnameMask, outPath)" + if not isdir(mergeDir): + os.makedirs(mergeDir) + + inOutFnames = [] + fTypes = set() + + # sort the input files + subDirs = listdir(countDir) + + for subDir in subDirs: + logging.debug("subdir %s" % subDir) + inDir = join(countDir, subDir) + fnames = listdir(inDir) + + # make sure out dir exists + outDir = join(mergeDir, subDir) + if not isdir(outDir): + logging.debug("making dir %s" % outDir) + os.makedirs(outDir) + + # get all dates + dates = set() + for fname in fnames: + if not fname.endswith("tab.gz"): + continue + date, server = fname.split('.')[0].split('_') + dates.add(date) + + # create one output file per date + logging.info("subdir %s, %d dates" % (subDir, len(dates))) + for date in dates: + outPath = join(mergeDir, subDir, date+".tab.gz") + inMask = join(countDir, subDir, date+"_*.tab.gz") + inOutFnames.append ((inMask, outPath)) + + return inOutFnames + +#def mergedToYear( mergeDir, yearDir): + #if not isdir(yearDir): + #os.makedirs(yearDir) +# + #yearToInFnames = defaultdict(list) +# + #subDirs = listdir(mergeDir) + #logging.debug("Found %d input dirs" % len(subDirs)) + #for subDir in subDirs: + #fnames = listdir(join(mergeDir, subDir)) + ## 03-cat/hgsidHgsids/20130903.tab.gz + #for fname in fnames: + #year = fname[:4] + #outPath = join(yearDir, "%s_%s.tab.gz" % (year,subDir)) + #yearToInFnames[outPath].append( join(mergeDir, subDir, fname) ) + #return yearToInFnames + + +def mergeFnames(dateToFnames): + """ return list of infnames, outfname tuples. infname is like 2013_hgw8_20130414.hgcClicks.tab.gz , outfname like hgClicks/20130414.tab.gz """ + + for outPath, inFnames in dateToFnames.iteritems(): + assert(len(inFnames)>0) + logging.info("%s" % outPath) + logging.debug("merging %s into %s" % (inFnames, outPath)) + mergeFiles(inFnames, outPath) + +def makeDir(path): + if not os.path.isdir(path): + logging.info("Creating directory %s" % path) + os.makedirs(path) + +def plot(mergeDir, htmlDir): + " reformat count hgcClicks files to csv format for C3 " + inDir = join(mergeDir, "hgcClicks") + inFnames = glob.glob(join(inDir, "*.tab.gz")) + + # sort filenames into year -> names + byYear = defaultdict(list) + for inFname in inFnames: + year = basename(inFname)[:4] + byYear[year].append(basename(inFname).split(".")[0]) + + for year, fnames in byYear.iteritems(): + # sort fnames by date + dates = [(datetime.datetime.strptime(fn, "%Y-%m-%d"), fn) for fn in fnames] + dates.sort() + dates = [y for (x,y) in dates] + + trackClickSum = defaultdict(int) + # parse data into nested dict date -> track -> count + dateTrackCounts = {} + allTracks = set() + for date in dates: + dateCounts = {} + path = join(inDir, date+".tab.gz") + for line in gzip.open(path): + if line.startswith("#"): + continue + track, clickCount, ipCount, rest = line.split("\t") + allTracks.add(track) + dateCounts[track] = ipCount + trackClickSum[track] += int(ipCount) + dateTrackCounts[date] = dateCounts + + allTracks = [] + trackClickSum = trackClickSum.items() # convert dict to list of tuples (track, count) + trackClickSum.sort(key=operator.itemgetter(1), reverse=True) # sort by count + + rank = 0 + for track, count in trackClickSum: + if track.startswith("htc") or track in ["getDna","refGene", "hgUserPsl"] or track.startswith("ct_"): + continue + #if count>=5000: + #allTracks.append(track) + allTracks.append(track) + if rank >= 20: + break + rank += 1 + + allTracks.sort() + + outName = join(htmlDir, "%s_hgcClicks.csv" % year) + + # output data as table + ofh = open(outName, "w") + ofh.write("date,") + ofh.write(",".join(allTracks)) + ofh.write("\n") + + for date in dates: + dateTracks = dateTrackCounts[date] + row = [date] + for track in allTracks: + count = dateTracks.get(track, 0) + row.append(str(count)) + ofh.write(",".join(row)) + ofh.write("\n") + + ofh.close() + logging.info("Wrote %s" % outName) + +def apacheToTab(inFname, outFlagFname): + " parse apache log file to tab file. outFlagFname is something like 20130420_hgw4.flag, write output to 20130420_hgw4_<realDate>.tab.gz " + logging.info("Parsing %s" % inFname) + if inFname.endswith(".gz"): + ifh = gzip.open(inFname) + else: + ifh = open(inFname) + + baseOut = outFlagFname.replace(".flag","") + fileHandles = {} # cache the file handles + + count = 0 + for row in csv.reader(ifh, delimiter = " ", escapechar='\\'): + # parse apache log line + count += 1 + if count % 20000 == 0: + print "parsed %d rows" % count + + log = parseApacheLine(row) + if log==None: + #logging.info("parse error: %s" % row) + continue + + weekDate = dateToWeek(log.time) + + # skip if it's a bot + if isBot(log.agent): + #botCounts[a.agent]+=1 + logging.debug("%s is a bot" % log.agent) + continue + + outFname = baseOut+"_"+weekDate+".tab.gz" + + if outFname in fileHandles: + ofh = fileHandles[outFname] + else: + ofh = gzip.open(outFname, "w") + ofh.write("#"+"\t".join(apacheFields)+"\n") + fileHandles[outFname] = ofh + + ofh.write("\t".join(log)) + ofh.write("\n") + + + open(outFlagFname, "w") # just create an empty flag file + +def catFiles(inFnames, outFname): + " cat all inFnames to outFname, taking care of header lines " + ofh = gzip.open(outFname, "w") + headerWritten = False + for inFname in inFnames: + ifh = gzip.open(inFname) + headerLine = ifh.readline() + if not headerWritten: + ofh.write(headerLine) + headerWritten=True + + ofh.write(ifh.read()) + +def countJob(inFname, outFnameParasol): + baseOutDir = dirname(dirname(outFnameParasol)) + outBase = basename(outFnameParasol).split(".")[0] + + outFname = join(baseOutDir, "hgcClicks", outBase+".tab.gz") + print outFname, outFnameParasol + assert (outFname==outFnameParasol) # output path has to look like <baseDir>/hgcClicks/<baseName>.tab.gz + + hgcClicks, ipSids, agentCounts = apacheCount(inFname) + + headers = ["track", "clickCount", "ipCount", "countsPerIp_CountList"] + writeDictCounter(hgcClicks, outFname, headers) + + headers = ["ip", "clickCount", "hgsidCount", "countsPerHgsid_CountList"] + outFname = join(baseOutDir, "ipHgsids", outBase+".tab.gz") + writeDictCounter(ipSids, outFname, headers) + + headers = ["agent", "clickCount", "filePathCount", "countsPerFile_CountList"] + outFname = join(baseOutDir, "agentFiles", outBase+".tab.gz") + writeDictCounter(agentCounts, outFname, headers) + + +def main(args, options): + tabDir = join(jobOutDir, "01-tab") + catDir = join(jobOutDir, "02-cat") + countDir = join(jobOutDir, "03-count") + yearDir = join(jobOutDir, "04-byYear") + + steps = args[0].split(",") + + if "tab" in steps: + # convert all apache files to tab format, remove bots and split by correct date + fnames = inOutTab(tabDir) + submitJobs(options.cluster, tabDir, "tabJob", fnames) + + if "tabJob" in steps: + inFname, outFnameParasol = args[1], args[2] + apacheToTab(inFname, outFnameParasol) + + if "cat" in steps: + # create one tab file per week by catting together the right pieces from the tab-step + fnames = inOutCat(tabDir, catDir) + submitJobs(options.cluster, catDir, "catJob", fnames) + + if "catJob" in steps: + # job gets a little text file with the input filenames, as they're too long for parasol + inListFname, outFname = args[1], args[2] + inFnames = open(inListFname).read().splitlines() + catFiles(inFnames, outFname) + + if "count" in steps: + # submit jobs to parasol + fnames = inOutCount(catDir, countDir) + submitJobs(options.cluster, countDir, "countJob", fnames) + + if "countJob" in steps: + # cluster job: do the counting and write many output files, one per counting type + inFname, outFnameParasol = args[1], args[2] + countJob(inFname, outFnameParasol) + + #if "merge" in steps: + ## submit merge jobs to parasol + #outInFnames = parsedToMerged(countDir, mergeDir) + ##mergeFnames(outInFnames) + #submitJobs(options.cluster, mergeDir, "mergeJob", outInFnames) + + #if "mergeJob" in steps: + #inMask, outPath = args[1], args[2] + #inFnames = glob.glob(inMask) + #logging.debug("merging %s into %s" % (inFnames, outPath)) + #mergeFiles(inFnames, outPath) + + #if "year" in steps: + ## summarize data by year + #outInFnames = mergedToYear( mergeDir, yearDir) + #mergeFnames(outInFnames) + ##submitMergeJobs(options.cluster, "mergeJob", inOutFnames, mergeDir) + + if "plot" in steps: + plot(countDir, htmlDir) + + #else: + #raise Exception("Unknown command in %s" % steps) + +# === COMMAND LINE INTERFACE, OPTIONS AND HELP === +parser = optparse.OptionParser("""usage: %%prog [options] stepList - parse apache logfiles, write results to various .tab files + +All output goes to subdirectories of the default output directory %s. +It has one subdirectory per step. + +stepList is a comma-sep list of any of these steps, in the right order: +"tab": + converts apache log files to tab-set files, removes bots and splits files by correct week +"cat" + concatenates the right weekly files to get one clean tab-sep file per week +"count": + do the counting of various categories (IPs, agents, hgc-clicks, etc), for each week +"plot": + create .csv files for the C3 plotting library, to output dir %s + +internal steps used for parasol: tabJob, catJob, countJob, mergeJob + +By default, the script does not overwrite files that already exist, it only adds missing files. +""" % (jobOutDir, htmlDir)) + +# define options +parser.add_option("-d", "--debug", dest="debug", action="store_true", help="show debug messages") +parser.add_option("-c", "--cluster", dest="cluster", action="store", help="cluster to use for mapping step, default %default", default="ku") +parser.add_option("", "--runOne", dest="runOne", action="store_true", help="run only one single map job, for debugging") +parser.add_option("-a", "--clearAll", dest="clearAll", action="store_true", help="clear (rm -rf!) complete step output directory before starting any step") +(options, args) = parser.parse_args() +if len(args)==0: + parser.print_help() + sys.exit(0) + +# python way of activating debug log mode +if options.debug: + logging.basicConfig(level=logging.DEBUG) +else: + logging.basicConfig(level=logging.INFO) + +if options.runOne: + RUNONE = True + +if options.clearAll: + doCleanDir = True + +main(args, options) +