f225bd9ef750fb706f2f14bd87e3512b43e23dc1 max Wed May 13 13:50:34 2020 -0700 Revert "adding a protease track to uniprot, refs #25192". I have no idea what happened here, looks like I typed git commit -a instead of git commit -m This reverts commit db7c8abb6d7674bb26f579eabb58bc080ce76fdd. diff --git src/utils/apacheLogParse src/utils/apacheLogParse index 2bcb08c..09a6f74 100755 --- src/utils/apacheLogParse +++ src/utils/apacheLogParse @@ -1,915 +1,910 @@ -#!/usr/bin/env python3 +#!/cluster/software/bin/python2.7 # parse apache logfiles on the cluster -import glob, urllib.parse, gzip, marshal, os, shutil, gc, tempfile, types, csv, atexit, datetime, time, operator +import glob, urlparse, gzip, marshal, os, shutil, gc, tempfile, types, csv, atexit, datetime, time, operator from collections import namedtuple, Counter, defaultdict from os.path import basename, join, abspath, isfile, dirname, isdir from os import listdir import optparse, logging, sys, string - +from itertools import imap from operator import itemgetter import heapq #TEMPDIR = "/dev/shm" # filename to delete on exit removeTmpName = None # where do we store the raw apache logfiles baseLogDir = "/hive/data/inside/wwwstats/RR" +# years to analyze +years = ["2009", "2010", "2011", "2012", "2013", "2014", "2015", "2016", "2017", "2018", "2019", "2020"] + # list of servers to analyze # change these for debugging or if need to focus on single servers servNames = ["hgw1", "hgw2", "hgw3", "hgw4", "hgw5", "hgw6", "hgw7", "hgw8"] # directory for cluster job output jobOutDir = "/hive/data/inside/wwwstats/apacheLogParse" # directory to store csv files for C3 plot htmlDir = "/cluster/home/max/public_html/logParse/plot" # a struct for a cleaned apache log line apacheFields = ['ip', 'time', 'status', "reqType", 'speed', "filePath", "cgiParams", "agent", "referer"] ApacheLine = namedtuple("ApacheLine", apacheFields) # when writing the tab file, we need to replace some characters from the apache log file -keyValReplTable = str.maketrans("\t\n=|", " __") -tabSepReplTable = str.maketrans("\t\n", " ") +keyValReplTable = string.maketrans("\t\n=|", " __") +tabSepReplTable = string.maketrans("\t\n", " ") # debugging option to run only one job RUNONE = False # option to globally overwrite existing files in cluster jobs OVERWRITE = False # option to globally clean all output directories of cluster jobs doCleanDir = False # ---- FUNCTIONS ---- def lineFileNext(fh): """ parses tab-sep file with headers as field names yields collection.namedtuples strips "#"-prefix from header line """ line1 = fh.readline() line1 = line1.strip("\n").strip("#") headers = line1.split("\t") Record = namedtuple('tsvRec', headers) for line in fh: line = line.rstrip("\n") fields = line.split("\t") try: rec = Record(*fields) - except Exception as msg: + except Exception, msg: logging.error("Exception occured while parsing line, %s" % msg) logging.error("Filename %s" % fh.name) logging.error("Line was: %s" % repr(line)) logging.error("Does number of fields match headers?") logging.error("Headers are: %s" % headers) #raise Exception("wrong field count in line %s" % line) continue # convert fields to correct data type yield rec def dateToWeek(timeStr): " convert an apache time, and get the start of its week in format like 2013-04-25" timeObj = datetime.datetime.strptime(timeStr, "%d/%b/%Y:%H:%M:%S") dayOfYear = int(timeObj.strftime("%j")) weekStartStr = "%d-%d" % (timeObj.year, 7*(dayOfYear/7)+1) # days are 1-based weekStart = datetime.datetime.strptime(weekStartStr, "%Y-%j") weekStartStr = weekStart.strftime("%Y-%m-%d") return weekStartStr def parseApacheLine(fields): " parse a apache log file line and return as a ApacheLine struct " if len(fields)<9: logging.warn("not enough fields: %s" % fields) return None # 119.63.193.132 - - [18/Aug/2013:07:54:21 -0700] "GET / HTTP/1.1" 200 16213 "-" "Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 6.0)" 200 microseconds try: ip = fields[0] timeStr = fields[3].strip("[") req = fields[5] status = fields[6] referer = fields[8] agent = fields[9] if len(fields)>10: speed = fields[10] else: speed = "-1" # there was a time back in 2003 when we didn't have the speed field in the logs except IndexError: logging.warn("index error %s" % fields) return None if len(req)>10000: logging.warn("HTTP request from %s with more than 10000 chars: %s" % (agent, "".join(req[:200]))) return None # parse http request: typically something like # GET cgi-bin/hgTracks?hgsid=xxxx&xxx HTTP1.1 reqRest = req.split(" ", 1) if len(reqRest)!=2: logging.warn("no space in request field %s" % fields) return None reqType, rest = reqRest if not rest.endswith("HTTP/1.1") and not rest.endswith("HTTP/1.0"): logging.warn("request does not end with HTTP/1.1 %s, GET params probably too long" % fields) return None reqUrl = rest.replace(" HTTP/1.1", "") reqUrl = reqUrl.replace(" HTTP/1.0", "") #reqType, reqUrl, httpVer = reqFields # split into cgi as a string and the params as a dict, e.g. "cgi-bin/hgXXX" and {'hgsid':'1233'} - filePath, paramStr = urllib.parse.urlsplit(reqUrl)[2:4] + filePath, paramStr = urlparse.urlsplit(reqUrl)[2:4] cgi = basename(filePath) - params = urllib.parse.parse_qs(paramStr) + params = urlparse.parse_qs(paramStr) paramList = [] - for key, val in params.items(): + for key, val in params.iteritems(): val = val[0] # get rid of the = and | characters in these strings key = key.translate(keyValReplTable) val = val.translate(keyValReplTable) kvStr = "%s=%s" % (key, val) if kvStr=="": continue paramList.append(kvStr) paramStr = "|".join(paramList) # we want to put this into a tab-sep file, so remove these chars filePath = filePath.translate ( tabSepReplTable) agent = agent.translate ( tabSepReplTable) referer = referer.translate ( tabSepReplTable) # put everything into a struct a = ApacheLine(ip, timeStr, status, reqType, speed, filePath, paramStr, agent, referer) return a def printBest(ofh, counts, topX=100000): " print top 10000 names and their counts " #ofh.write("- %s\n" % title) for name, count in counts: ofh.write("%s\t%s\n" % (name, count)) def isBot(agent): " you can find most of these by looking for accesses to robots.txt " a = agent.lower() if "google." in a or "yahoo." in a or "bing." in a or "baidu." in a \ or "bot" in a or "spider" in a or "slurp" in a or "crawler" in a or \ "Commons-HttpClient" in a or "HTTPGrab" in a or "internal dummy" in a or \ "Daum" in agent or "ltx71" in agent or "python-request" in agent or \ "Scout" in agent or "Riddler" in agent or "wget" in agent or \ - "//naver.me/spd" in agent or \ a.startswith("google") or a.startswith("yahoo") or a.startswith("bing"): return True else: return False def paramStrToDict(inStr): " convert string like hgsid=12343|key=val|key2=val2 to dictionary and return " d = {} if inStr=="": return d parts = inStr.split("|") for p in parts: k, v = p.split("=") d[k] = v return d def apacheCount(inFname): " parse apache tab sep log file and return as dicts of counters " # open file logging.info( "Reading %s" % inFname) if inFname.endswith(".gz"): - print("opening gzip file") - ifh = gzip.open(inFname, "rt", encoding="utf8") + ifh = gzip.open(inFname) else: - print("opening normal text file") - ifh = open(inFname, "rt", encoding="utf8") + ifh = open(inFname) - print(ifh, ofh.name) hgcClicks = defaultdict(Counter) sidIps = defaultdict(Counter) agentCounts = defaultdict(Counter) # parse file for a in lineFileNext(ifh): # skip any further analysis if it's a bot if isBot(a.agent): logging.debug("%s is a bot" % a.agent) continue agentCounts[a.agent][a.filePath]+=1 params = paramStrToDict(a.cgiParams) # hgsids per IP address hgsid = params.get("hgsid", None) if hgsid != None: hgsid = hgsid[0] hgsid = hgsid.split("_")[0].strip() # strip the new secure id if not hgsid.isdigit(): logging.warn("hgsid %s does not look like a number, line %s" % (hgsid, a)) continue sidIps[a.ip][hgsid]+=1 # track clicks if a.filePath.endswith("/hgc"): trackName = params.get("g", None) if trackName != None: hgcClicks[trackName][a.ip]+=1 else: logging.debug("hgc without g param: %s" % a.filePath) return hgcClicks, sidIps, agentCounts # the following functions prepare lists of (inFname, outFname) for parasol def inOutCat(inDir, outDir): " prepare filenames for parasol for the cat step, return a list of tuples (inFname, outFname) " if not os.path.isdir(outDir): os.makedirs(outDir) inFnames = glob.glob(join(inDir, "*.tab.gz")) # split all infiles by real week, not the week from the apache filename byWeek = defaultdict(list) for inFname in inFnames: date = basename(inFname).split(".")[0].split("_")[-1] byWeek[date].append(inFname) # create little files with the filenames in inDir, to get around parasol's line limit inOutFnames = [] - for date, inFnames in byWeek.items(): + for date, inFnames in byWeek.iteritems(): fileListFname = join(inDir, date+".fileList") ofh = open(fileListFname, "w") ofh.write("\n".join(inFnames)) ofh.close() outFname = join(outDir, date+".tab.gz") inOutFnames.append( (fileListFname, outFname) ) logging.info("Found %d input files, assigned to %d output files " % (len(inFnames), len(inOutFnames))) return inOutFnames def inOutTab(outDir): " prep in/out file list for parasol log tab-sep reformatting, return list of tuples (inFname, outFname)" if not os.path.isdir(outDir): os.makedirs(outDir) fnames = [] - - years = os.listdir(baseLogDir) - years = [x for x in years if x[0].isdigit()] - for year in years: yearDir = join(baseLogDir, year) for servName in servNames: servDir = join(yearDir, servName) inFnames = glob.glob(join(servDir, "access_log.*.gz")) for inFname in inFnames: # access_log.20130317.gz day =os.path.basename(inFname).split(".")[1] outFname = "%s_%s.botIps.txt" % (day, servName) # this is also used a flag file. job writes most data to <day>_<server>_<realWeekDate>.tab.gz outFname = join(outDir, outFname) fnames.append ( (inFname, outFname) ) - print("Found %d logfiles in %s" % (len(fnames), yearDir)) + print "Found %d logfiles in %s" % (len(fnames), yearDir) return fnames def inOutCount(catDir, outDir): """ prepare (inFname, outFname) tuples for the count step outfiles go into one subDir per counting type, e.g. hgcClicks/2013-04-04.tab.gz""" if not os.path.isdir(outDir): os.makedirs(outDir) fnames = [] inFnames = glob.glob(join(catDir, "*.tab.gz")) for inFname in inFnames: fBase =os.path.basename(inFname) outPath = join(outDir, "hgcClicks", fBase) fnames.append ( (inFname, outPath) ) logging.info("Found %d logfiles in %s" % (len(fnames), catDir)) return fnames # end of parasol helper functions def submitJobs(headNode, jobOutDir, job, fnames): """ submit jobs to parasol, calling this script as the jobscript with the parameter "job" """ # clear joboutdir if doCleanDir and os.path.isdir(jobOutDir): - print(("Deleting directory %s and all contents" % jobOutDir)) + print ("Deleting directory %s and all contents" % jobOutDir) print ("You have three seconds to change your mind and hit ctrl-c") time.sleep(3) runCmd("rm -rf %s/*" % jobOutDir) # for debugging if RUNONE: logging.info("Running only one job") inFname, outFname = fnames[0] cmd = "%s %s %s %s" % (__file__, inFname, job, "temp.hgcClicks.tab") ret = os.system(cmd) if ret!=0: - print("return code %d" % ret) + print "return code %d" % ret return logging.info("Running merge on single file %s" % outFname) runReduce(jobOutDir, True) sys.exit(0) # create list of cmd lines to run jobLines = [] skipCount = 0 for inFname, outFname in fnames: if not OVERWRITE and isfile(outFname): logging.debug("Skipping parasol job, output file %s already exists" % outFname) skipCount += 1 continue outPath = join(jobOutDir, outFname) cmdLine = "%s %s %s %s {check out exists %s}\n" % \ (sys.executable, abspath(__file__), job, inFname, outPath) jobLines.append(cmdLine) logging.info("%d logfiles, skipped %d (already converted), %d files to convert" % \ (len(fnames), skipCount, len(jobLines))) if len(jobLines)==0: logging.info("No new logfiles to convert") return # create joblist file jlName = join(jobOutDir, "jobList") jlf = open(jlName, "w") for cmdLine in jobLines: jlf.write(cmdLine) jlf.close() # submitting joblist - print(("Running jobs in dir %s" % jobOutDir)) + print("Running jobs in dir %s" % jobOutDir) cmd = "ssh %s 'cd %s; para freeBatch; para resetCounts; para clearSickNodes; para make jobList'" % \ (headNode, jobOutDir) logging.info("running: %s" % cmd) runCmd(cmd) #def getKgIdToSym(): #"use hgsql to get mapping kgId -> symbol " #tmpFh = tempfile.NamedTemporaryFile() #cmd = 'hgsql hg19 -NB -e "select kgId, geneSymbol from kgXref;" > %s' % tmpFh.name #os.system(cmd) #res = {} #for line in open(tmpFh.name).read().splitlines(): #kgId, sym = line.split("\t") #res[kgId] = sym #return res def writeCounts(allCounts): kgIdToSym = getKgIdToSym() - for dataType, counter in allCounts.items(): + for dataType, counter in allCounts.iteritems(): ofh = open(dataType+".tab", "w") if dataType.endswith("Set"): - for id, val in counter.items(): + for id, val in counter.iteritems(): row = [id, str(len(val)), ",".join(val)] ofh.write("\t".join(row)+"\n") else: for id, count in counter.most_common(): if dataType=="genes": row = [id, kgIdToSym.get(id, ""), str(count)] else: row = [id, str(count)] ofh.write("\t".join(row)+"\n") ofh.close() - print("wrote %s" % ofh.name) + print "wrote %s" % ofh.name #def postProcess(allCounts): #""" assume allCounts is a dict with name -> someVariable #if sameVariable is a set, replace it with its length #return the result #""" # #newDict = {} #for name, var in allCounts.iteritems(): #if name.endswith("Set"): #counter = Counter() #for key, dataSet in var.iteritems(): #counter[key] = len(dataSet) #newDict[name] = counter #else: #newDict[name] = var #return newDict #def runReduce(jobOutDir, runOne): #"" #gc.disable() # obscure setting to make script a lot faster, disables GC #if isfile(jobOutDir): #inFnames = [jobOutDir] # for debugging, we can do this #else: #inFnames = glob.glob(join(jobOutDir, "*.marshal")) # #if runOne: ##inFnames = [inFnames[0]] # debugging on a single file is faster #inFnames = ["temp.hgcClicks.tab"] # #print("Found %d input files" % len(inFnames)) #allCounts = defaultdict(Counter) # #for inFname in inFnames: #print("Reading %s" % inFname) #mapData = marshal.load(open(inFname)) ##printBest(sys.stdout, mapData["genes"]) #for dataType, counts in mapData.iteritems(): #if dataType.endswith("Set") and dataType not in allCounts: #allCounts[dataType] = defaultdict(set) # special case, not a counter but a set # #print dataType #dataCounter = allCounts[dataType] #for id, val in counts: #if dataType.endswith("Max"): #dataCounter[id]=max(dataCounter[id], val) #elif dataType.endswith("Set"): #dataCounter[id].update(val) #else: #dataCounter[id]+=val # #allCounts = postProcess(allCounts) #writeCounts(allCounts) def runCmd(cmd): logging.debug("Running %s" % cmd) ret = os.system(cmd) if ret !=0: raise Exception("Could not run %s" % cmd) def writeLines(headers, lines, outFname): " write text lines to file in an atomic way " #ofh = open(outFname, "w") atexit.register(delTmp) global removeTmpName fd, tmpName = tempfile.mkstemp(dir=dirname(outFname), suffix=".tab.gz.tmp") removeTmpName = tmpName ofh = gzip.open(tmpName, "w") logging.info("Writing to %s" % ofh.name) ofh.write("#"+"\t".join(headers)+"\n") for l in lines: ofh.write(l) ofh.write("\n") ofh.close() logging.info("Renaming to %s" % outFname) os.rename(tmpName, outFname) def delTmp(): " remove global tmp file name " if isfile(removeTmpName): os.remove(removeTmpName) def counterToString(c): """ convert counter object to string like "C:4,abc:3", sorted by counts """ ipSorted = c.most_common() # sort by counts countSum = 0 countParts = [] itemCount = 0 for ip, count in ipSorted: ipCountStr = "%s:%d" % (ip, count) #ofh.write("%s," % ipCountStr) if ":" in ip: ip = ip.replace(":", "_") if "," in ip: ip = ip.replace(",", "_") countParts.append(ipCountStr) countSum += count itemCount +=1 return countSum, itemCount, ",".join(countParts) def writeDictCounter(dictCounter, outFname, headers): """ write dict with key -> Counter to tab.gz out file with four fields: 1 key 2 number of items in counter 3 sum of counts in counter 4 complete contents of counter in format item:count|item2:count|... """ makeDir(dirname(outFname)) lines = [] - for key, itemCounts in dictCounter.items(): + for key, itemCounts in dictCounter.iteritems(): countSum, itemCount, countStr = counterToString(itemCounts) fields = [key, str(countSum), str(itemCount), countStr] lines.append("\t".join(fields)) lines.sort() writeLines(headers, lines, outFname) def firstField(line): """Extract timestamp and convert to a form that gives the expected result in a comparison """ return line.split()[0] # for example def mergeLines(lastValues, ofh): " given fieldNames and fieldName -> values dict, write merged line to ofh " c = Counter() for val in lastValues: parts = val.split(",") for part in parts: #if ":" not in part: #print part vals = part.split(":") if len(vals)!=2: - print(vals) + print vals key, count = vals c[key]+=int(count) itemCount, totalSum, countStr = counterToString(c) row = [str(itemCount), str(totalSum), countStr] #if field.endswith("Count"): #total = sum([int(x) for x in values]) #row.append(str(total)) #print "writing row", row ofh.write("\t".join(row)) ofh.write("\n") def mergeFiles(inFnames, outFname): " merge sorted tabsep files to outFname " ifhs = [] lastHeaders = None assert(len(inFnames)>0) for inFname in inFnames: ifh = gzip.open(inFname) headers = ifh.readline().rstrip("\n").split("\t") assert(lastHeaders==None or lastHeaders==headers) lastHeaders = headers ifhs.append(ifh) ofh = gzip.open(outFname, "w") ofh.write("\t".join(headers)+"\n") # python magic for merge operation # http://stackoverflow.com/questions/12460943/merging-pre-sorted-files-without-reading-everything-into-memory decorated = [ ((firstField(line), line) for line in f) for f in ifhs] merged = heapq.merge(*decorated) # now merge lines together and write out lastKey = None lastValues = [] for key, line in merged: if lastKey != None and key!=lastKey: ofh.write("%s\t" % lastKey) #print "merging", lastKey, lastValues mergeLines(lastValues, ofh) lastValues = [] # sort into dict name of field -> values #for field, val in zip(headers[1:], line.split("\t")[1:]): #lastValues[field].append(val) lastField = line.rstrip("\n").split("\t")[-1] lastValues.append(lastField) lastKey = key #undecorated = imap(itemgetter(-1), merged) #ofh.writelines(undecorated) def parsedToMerged(countDir, mergeDir): " return a list of tuples (inFnameMask, outPath)" if not isdir(mergeDir): os.makedirs(mergeDir) inOutFnames = [] fTypes = set() # sort the input files subDirs = listdir(countDir) for subDir in subDirs: logging.debug("subdir %s" % subDir) inDir = join(countDir, subDir) fnames = listdir(inDir) # make sure out dir exists outDir = join(mergeDir, subDir) if not isdir(outDir): logging.debug("making dir %s" % outDir) os.makedirs(outDir) # get all dates dates = set() for fname in fnames: if not fname.endswith("tab.gz"): continue date, server = fname.split('.')[0].split('_') dates.add(date) # create one output file per date logging.info("subdir %s, %d dates" % (subDir, len(dates))) for date in dates: outPath = join(mergeDir, subDir, date+".tab.gz") inMask = join(countDir, subDir, date+"_*.tab.gz") inOutFnames.append ((inMask, outPath)) return inOutFnames #def mergedToYear( mergeDir, yearDir): #if not isdir(yearDir): #os.makedirs(yearDir) # #yearToInFnames = defaultdict(list) # #subDirs = listdir(mergeDir) #logging.debug("Found %d input dirs" % len(subDirs)) #for subDir in subDirs: #fnames = listdir(join(mergeDir, subDir)) ## 03-cat/hgsidHgsids/20130903.tab.gz #for fname in fnames: #year = fname[:4] #outPath = join(yearDir, "%s_%s.tab.gz" % (year,subDir)) #yearToInFnames[outPath].append( join(mergeDir, subDir, fname) ) #return yearToInFnames def mergeFnames(dateToFnames): """ return list of infnames, outfname tuples. infname is like 2013_hgw8_20130414.hgcClicks.tab.gz , outfname like hgClicks/20130414.tab.gz """ - for outPath, inFnames in dateToFnames.items(): + for outPath, inFnames in dateToFnames.iteritems(): assert(len(inFnames)>0) logging.info("%s" % outPath) logging.debug("merging %s into %s" % (inFnames, outPath)) mergeFiles(inFnames, outPath) def makeDir(path): if not os.path.isdir(path): logging.info("Creating directory %s" % path) os.makedirs(path) def plot(mergeDir, htmlDir): " reformat count hgcClicks files to csv format for C3 " inDir = join(mergeDir, "hgcClicks") inFnames = glob.glob(join(inDir, "*.tab.gz")) # sort filenames into year -> names byYear = defaultdict(list) for inFname in inFnames: year = basename(inFname)[:4] byYear[year].append(basename(inFname).split(".")[0]) - for year, fnames in byYear.items(): + for year, fnames in byYear.iteritems(): # sort fnames by date dates = [(datetime.datetime.strptime(fn, "%Y-%m-%d"), fn) for fn in fnames] dates.sort() dates = [y for (x,y) in dates] trackClickSum = defaultdict(int) # parse data into nested dict date -> track -> count dateTrackCounts = {} allTracks = set() for date in dates: dateCounts = {} path = join(inDir, date+".tab.gz") for line in gzip.open(path): if line.startswith("#"): continue track, clickCount, ipCount, rest = line.split("\t") allTracks.add(track) dateCounts[track] = ipCount trackClickSum[track] += int(ipCount) dateTrackCounts[date] = dateCounts allTracks = [] - trackClickSum = list(trackClickSum.items()) # convert dict to list of tuples (track, count) + trackClickSum = trackClickSum.items() # convert dict to list of tuples (track, count) trackClickSum.sort(key=operator.itemgetter(1), reverse=True) # sort by count rank = 0 for track, count in trackClickSum: if track.startswith("htc") or track in ["getDna","refGene", "hgUserPsl"] or track.startswith("ct_"): continue #if count>=5000: #allTracks.append(track) allTracks.append(track) if rank >= 20: break rank += 1 allTracks.sort() outName = join(htmlDir, "%s_hgcClicks.csv" % year) # output data as table ofh = open(outName, "w") ofh.write("date,") ofh.write(",".join(allTracks)) ofh.write("\n") for date in dates: dateTracks = dateTrackCounts[date] row = [date] for track in allTracks: count = dateTracks.get(track, 0) row.append(str(count)) ofh.write(",".join(row)) ofh.write("\n") ofh.close() logging.info("Wrote %s" % outName) def apacheToTab(inFname, outBotIpsFname): " parse apache log file to tab file. outBotIpsFname is something like 20130420_hgw4.botIps, write output to 20130420_hgw4_<realDate>.tab.gz " logging.info("Parsing %s" % inFname) if inFname.endswith(".gz"): - ifh = gzip.open(inFname, "rt", encoding="utf8") + ifh = gzip.open(inFname) else: - ifh = open(inFname, "rt", encoding="utf8") + ifh = open(inFname) baseOut = outBotIpsFname.replace(".botIps.txt","") fileHandles = {} # cache the file handles count = 0 botIps = set() for row in csv.reader(ifh, delimiter = " ", escapechar='\\'): # parse apache log line count += 1 if count % 20000 == 0: - print("parsed %d rows" % count) + print "parsed %d rows" % count log = parseApacheLine(row) if log==None: #logging.info("parse error: %s" % row) continue weekDate = dateToWeek(log.time) # skip if it's a bot if isBot(log.agent): #botCounts[a.agent]+=1 botIps.add(log.ip) logging.debug("%s is a bot" % log.agent) continue outFname = baseOut+"_"+weekDate+".tab.gz" if outFname in fileHandles: ofh = fileHandles[outFname] else: - ofh = gzip.open(outFname, "wt", encoding="utf8") + ofh = gzip.open(outFname, "w") ofh.write("#"+"\t".join(apacheFields)+"\n") fileHandles[outFname] = ofh ofh.write("\t".join(log)) ofh.write("\n") ofh = open(outBotIpsFname, "w") for ip in botIps: ofh.write("%s\n" % ip) ofh.close() def catFiles(inFnames, outFname): " cat all inFnames to outFname, taking care of header lines " ofh = gzip.open(outFname, "w") headerWritten = False for inFname in inFnames: ifh = gzip.open(inFname) headerLine = ifh.readline() if not headerWritten: ofh.write(headerLine) headerWritten=True ofh.write(ifh.read()) def countJob(inFname, outFnameParasol): baseOutDir = dirname(dirname(outFnameParasol)) outBase = basename(outFnameParasol).split(".")[0] outFname = join(baseOutDir, "hgcClicks", outBase+".tab.gz") - print(outFname, outFnameParasol) + print outFname, outFnameParasol assert (outFname==outFnameParasol) # output path has to look like <baseDir>/hgcClicks/<baseName>.tab.gz hgcClicks, ipSids, agentCounts = apacheCount(inFname) headers = ["track", "clickCount", "ipCount", "countsPerIp_CountList"] writeDictCounter(hgcClicks, outFname, headers) headers = ["ip", "clickCount", "hgsidCount", "countsPerHgsid_CountList"] outFname = join(baseOutDir, "ipHgsids", outBase+".tab.gz") writeDictCounter(ipSids, outFname, headers) headers = ["agent", "clickCount", "filePathCount", "countsPerFile_CountList"] outFname = join(baseOutDir, "agentFiles", outBase+".tab.gz") writeDictCounter(agentCounts, outFname, headers) def main(args, options): tabDir = join(jobOutDir, "01-tab") catDir = join(jobOutDir, "02-cat") countDir = join(jobOutDir, "03-count") yearDir = join(jobOutDir, "04-byYear") steps = args[0].split(",") if "tab" in steps: # convert all apache files to tab format, remove bots and split by correct date fnames = inOutTab(tabDir) submitJobs(options.cluster, tabDir, "tabJob", fnames) if "tabJob" in steps: inFname, outFnameParasol = args[1], args[2] apacheToTab(inFname, outFnameParasol) if "cat" in steps: # create one tab file per week by catting together the right pieces from the tab-step fnames = inOutCat(tabDir, catDir) submitJobs(options.cluster, catDir, "catJob", fnames) if "catJob" in steps: # job gets a little text file with the input filenames, as they're too long for parasol inListFname, outFname = args[1], args[2] inFnames = open(inListFname).read().splitlines() catFiles(inFnames, outFname) if "count" in steps: # submit jobs to parasol fnames = inOutCount(catDir, countDir) submitJobs(options.cluster, countDir, "countJob", fnames) if "countJob" in steps: # cluster job: do the counting and write many output files, one per counting type inFname, outFnameParasol = args[1], args[2] countJob(inFname, outFnameParasol) #if "merge" in steps: ## submit merge jobs to parasol #outInFnames = parsedToMerged(countDir, mergeDir) ##mergeFnames(outInFnames) #submitJobs(options.cluster, mergeDir, "mergeJob", outInFnames) #if "mergeJob" in steps: #inMask, outPath = args[1], args[2] #inFnames = glob.glob(inMask) #logging.debug("merging %s into %s" % (inFnames, outPath)) #mergeFiles(inFnames, outPath) #if "year" in steps: ## summarize data by year #outInFnames = mergedToYear( mergeDir, yearDir) #mergeFnames(outInFnames) ##submitMergeJobs(options.cluster, "mergeJob", inOutFnames, mergeDir) if "plot" in steps: plot(countDir, htmlDir) #else: #raise Exception("Unknown command in %s" % steps) # === COMMAND LINE INTERFACE, OPTIONS AND HELP === parser = optparse.OptionParser("""usage: %%prog [options] stepList - parse apache logfiles, write results to various .tab files All output goes to subdirectories of the default output directory %s. It has one subdirectory per step. stepList is a comma-sep list of any of these steps, in the right order: "tab": converts apache log files to tab-set files, removes bots and splits files by correct week "cat" concatenates the right weekly files to get one clean tab-sep file per week "count": do the counting of various categories (IPs, agents, hgc-clicks, etc), for each week "plot": create .csv files for the C3 plotting library, to output dir %s internal steps used for parasol: tabJob, catJob, countJob, mergeJob By default, the script does not overwrite files that already exist, it only adds missing files. """ % (jobOutDir, htmlDir)) # define options parser.add_option("-d", "--debug", dest="debug", action="store_true", help="show debug messages") parser.add_option("-c", "--cluster", dest="cluster", action="store", help="cluster to use for mapping step, default %default", default="ku") parser.add_option("", "--runOne", dest="runOne", action="store_true", help="run only one single map job, for debugging") parser.add_option("-a", "--clearAll", dest="clearAll", action="store_true", help="clear (rm -rf!) complete step output directory before starting any step") (options, args) = parser.parse_args() if len(args)==0: parser.print_help() sys.exit(0) # python way of activating debug log mode if options.debug: logging.basicConfig(level=logging.DEBUG) else: logging.basicConfig(level=logging.INFO) if options.runOne: RUNONE = True if options.clearAll: doCleanDir = True main(args, options)