db7c8abb6d7674bb26f579eabb58bc080ce76fdd max Tue May 12 07:04:05 2020 -0700 adding a protease track to uniprot, refs #25192 diff --git src/utils/apacheLogParse src/utils/apacheLogParse index 09a6f74..2bcb08c 100755 --- src/utils/apacheLogParse +++ src/utils/apacheLogParse @@ -1,86 +1,83 @@ -#!/cluster/software/bin/python2.7 +#!/usr/bin/env python3 # parse apache logfiles on the cluster -import glob, urlparse, gzip, marshal, os, shutil, gc, tempfile, types, csv, atexit, datetime, time, operator +import glob, urllib.parse, gzip, marshal, os, shutil, gc, tempfile, types, csv, atexit, datetime, time, operator from collections import namedtuple, Counter, defaultdict from os.path import basename, join, abspath, isfile, dirname, isdir from os import listdir import optparse, logging, sys, string -from itertools import imap + from operator import itemgetter import heapq #TEMPDIR = "/dev/shm" # filename to delete on exit removeTmpName = None # where do we store the raw apache logfiles baseLogDir = "/hive/data/inside/wwwstats/RR" -# years to analyze -years = ["2009", "2010", "2011", "2012", "2013", "2014", "2015", "2016", "2017", "2018", "2019", "2020"] - # list of servers to analyze # change these for debugging or if need to focus on single servers servNames = ["hgw1", "hgw2", "hgw3", "hgw4", "hgw5", "hgw6", "hgw7", "hgw8"] # directory for cluster job output jobOutDir = "/hive/data/inside/wwwstats/apacheLogParse" # directory to store csv files for C3 plot htmlDir = "/cluster/home/max/public_html/logParse/plot" # a struct for a cleaned apache log line apacheFields = ['ip', 'time', 'status', "reqType", 'speed', "filePath", "cgiParams", "agent", "referer"] ApacheLine = namedtuple("ApacheLine", apacheFields) # when writing the tab file, we need to replace some characters from the apache log file -keyValReplTable = string.maketrans("\t\n=|", " __") -tabSepReplTable = string.maketrans("\t\n", " ") +keyValReplTable = str.maketrans("\t\n=|", " __") +tabSepReplTable = str.maketrans("\t\n", " ") # debugging option to run only one job RUNONE = False # option to globally overwrite existing files in cluster jobs OVERWRITE = False # option to globally clean all output directories of cluster jobs doCleanDir = False # ---- FUNCTIONS ---- def lineFileNext(fh): """ parses tab-sep file with headers as field names yields collection.namedtuples strips "#"-prefix from header line """ line1 = fh.readline() line1 = line1.strip("\n").strip("#") headers = line1.split("\t") Record = namedtuple('tsvRec', headers) for line in fh: line = line.rstrip("\n") fields = line.split("\t") try: rec = Record(*fields) - except Exception, msg: + except Exception as msg: logging.error("Exception occured while parsing line, %s" % msg) logging.error("Filename %s" % fh.name) logging.error("Line was: %s" % repr(line)) logging.error("Does number of fields match headers?") logging.error("Headers are: %s" % headers) #raise Exception("wrong field count in line %s" % line) continue # convert fields to correct data type yield rec def dateToWeek(timeStr): " convert an apache time, and get the start of its week in format like 2013-04-25" timeObj = datetime.datetime.strptime(timeStr, "%d/%b/%Y:%H:%M:%S") dayOfYear = int(timeObj.strftime("%j")) weekStartStr = "%d-%d" % (timeObj.year, 7*(dayOfYear/7)+1) # days are 1-based @@ -119,35 +116,35 @@ reqRest = req.split(" ", 1) if len(reqRest)!=2: logging.warn("no space in request field %s" % fields) return None reqType, rest = reqRest if not rest.endswith("HTTP/1.1") and not rest.endswith("HTTP/1.0"): logging.warn("request does not end with HTTP/1.1 %s, GET params probably too long" % fields) return None reqUrl = rest.replace(" HTTP/1.1", "") reqUrl = reqUrl.replace(" HTTP/1.0", "") #reqType, reqUrl, httpVer = reqFields # split into cgi as a string and the params as a dict, e.g. "cgi-bin/hgXXX" and {'hgsid':'1233'} - filePath, paramStr = urlparse.urlsplit(reqUrl)[2:4] + filePath, paramStr = urllib.parse.urlsplit(reqUrl)[2:4] cgi = basename(filePath) - params = urlparse.parse_qs(paramStr) + params = urllib.parse.parse_qs(paramStr) paramList = [] - for key, val in params.iteritems(): + for key, val in params.items(): val = val[0] # get rid of the = and | characters in these strings key = key.translate(keyValReplTable) val = val.translate(keyValReplTable) kvStr = "%s=%s" % (key, val) if kvStr=="": continue paramList.append(kvStr) paramStr = "|".join(paramList) # we want to put this into a tab-sep file, so remove these chars filePath = filePath.translate ( tabSepReplTable) agent = agent.translate ( tabSepReplTable) referer = referer.translate ( tabSepReplTable) @@ -157,56 +154,60 @@ def printBest(ofh, counts, topX=100000): " print top 10000 names and their counts " #ofh.write("- %s\n" % title) for name, count in counts: ofh.write("%s\t%s\n" % (name, count)) def isBot(agent): " you can find most of these by looking for accesses to robots.txt " a = agent.lower() if "google." in a or "yahoo." in a or "bing." in a or "baidu." in a \ or "bot" in a or "spider" in a or "slurp" in a or "crawler" in a or \ "Commons-HttpClient" in a or "HTTPGrab" in a or "internal dummy" in a or \ "Daum" in agent or "ltx71" in agent or "python-request" in agent or \ "Scout" in agent or "Riddler" in agent or "wget" in agent or \ + "//naver.me/spd" in agent or \ a.startswith("google") or a.startswith("yahoo") or a.startswith("bing"): return True else: return False def paramStrToDict(inStr): " convert string like hgsid=12343|key=val|key2=val2 to dictionary and return " d = {} if inStr=="": return d parts = inStr.split("|") for p in parts: k, v = p.split("=") d[k] = v return d def apacheCount(inFname): " parse apache tab sep log file and return as dicts of counters " # open file logging.info( "Reading %s" % inFname) if inFname.endswith(".gz"): - ifh = gzip.open(inFname) + print("opening gzip file") + ifh = gzip.open(inFname, "rt", encoding="utf8") else: - ifh = open(inFname) + print("opening normal text file") + ifh = open(inFname, "rt", encoding="utf8") + print(ifh, ofh.name) hgcClicks = defaultdict(Counter) sidIps = defaultdict(Counter) agentCounts = defaultdict(Counter) # parse file for a in lineFileNext(ifh): # skip any further analysis if it's a bot if isBot(a.agent): logging.debug("%s is a bot" % a.agent) continue agentCounts[a.agent][a.filePath]+=1 params = paramStrToDict(a.cgiParams) # hgsids per IP address @@ -235,101 +236,105 @@ def inOutCat(inDir, outDir): " prepare filenames for parasol for the cat step, return a list of tuples (inFname, outFname) " if not os.path.isdir(outDir): os.makedirs(outDir) inFnames = glob.glob(join(inDir, "*.tab.gz")) # split all infiles by real week, not the week from the apache filename byWeek = defaultdict(list) for inFname in inFnames: date = basename(inFname).split(".")[0].split("_")[-1] byWeek[date].append(inFname) # create little files with the filenames in inDir, to get around parasol's line limit inOutFnames = [] - for date, inFnames in byWeek.iteritems(): + for date, inFnames in byWeek.items(): fileListFname = join(inDir, date+".fileList") ofh = open(fileListFname, "w") ofh.write("\n".join(inFnames)) ofh.close() outFname = join(outDir, date+".tab.gz") inOutFnames.append( (fileListFname, outFname) ) logging.info("Found %d input files, assigned to %d output files " % (len(inFnames), len(inOutFnames))) return inOutFnames def inOutTab(outDir): " prep in/out file list for parasol log tab-sep reformatting, return list of tuples (inFname, outFname)" if not os.path.isdir(outDir): os.makedirs(outDir) fnames = [] + + years = os.listdir(baseLogDir) + years = [x for x in years if x[0].isdigit()] + for year in years: yearDir = join(baseLogDir, year) for servName in servNames: servDir = join(yearDir, servName) inFnames = glob.glob(join(servDir, "access_log.*.gz")) for inFname in inFnames: # access_log.20130317.gz day =os.path.basename(inFname).split(".")[1] outFname = "%s_%s.botIps.txt" % (day, servName) # this is also used a flag file. job writes most data to __.tab.gz outFname = join(outDir, outFname) fnames.append ( (inFname, outFname) ) - print "Found %d logfiles in %s" % (len(fnames), yearDir) + print("Found %d logfiles in %s" % (len(fnames), yearDir)) return fnames def inOutCount(catDir, outDir): """ prepare (inFname, outFname) tuples for the count step outfiles go into one subDir per counting type, e.g. hgcClicks/2013-04-04.tab.gz""" if not os.path.isdir(outDir): os.makedirs(outDir) fnames = [] inFnames = glob.glob(join(catDir, "*.tab.gz")) for inFname in inFnames: fBase =os.path.basename(inFname) outPath = join(outDir, "hgcClicks", fBase) fnames.append ( (inFname, outPath) ) logging.info("Found %d logfiles in %s" % (len(fnames), catDir)) return fnames # end of parasol helper functions def submitJobs(headNode, jobOutDir, job, fnames): """ submit jobs to parasol, calling this script as the jobscript with the parameter "job" """ # clear joboutdir if doCleanDir and os.path.isdir(jobOutDir): - print ("Deleting directory %s and all contents" % jobOutDir) + print(("Deleting directory %s and all contents" % jobOutDir)) print ("You have three seconds to change your mind and hit ctrl-c") time.sleep(3) runCmd("rm -rf %s/*" % jobOutDir) # for debugging if RUNONE: logging.info("Running only one job") inFname, outFname = fnames[0] cmd = "%s %s %s %s" % (__file__, inFname, job, "temp.hgcClicks.tab") ret = os.system(cmd) if ret!=0: - print "return code %d" % ret + print("return code %d" % ret) return logging.info("Running merge on single file %s" % outFname) runReduce(jobOutDir, True) sys.exit(0) # create list of cmd lines to run jobLines = [] skipCount = 0 for inFname, outFname in fnames: if not OVERWRITE and isfile(outFname): logging.debug("Skipping parasol job, output file %s already exists" % outFname) skipCount += 1 continue outPath = join(jobOutDir, outFname) cmdLine = "%s %s %s %s {check out exists %s}\n" % \ @@ -338,64 +343,64 @@ logging.info("%d logfiles, skipped %d (already converted), %d files to convert" % \ (len(fnames), skipCount, len(jobLines))) if len(jobLines)==0: logging.info("No new logfiles to convert") return # create joblist file jlName = join(jobOutDir, "jobList") jlf = open(jlName, "w") for cmdLine in jobLines: jlf.write(cmdLine) jlf.close() # submitting joblist - print("Running jobs in dir %s" % jobOutDir) + print(("Running jobs in dir %s" % jobOutDir)) cmd = "ssh %s 'cd %s; para freeBatch; para resetCounts; para clearSickNodes; para make jobList'" % \ (headNode, jobOutDir) logging.info("running: %s" % cmd) runCmd(cmd) #def getKgIdToSym(): #"use hgsql to get mapping kgId -> symbol " #tmpFh = tempfile.NamedTemporaryFile() #cmd = 'hgsql hg19 -NB -e "select kgId, geneSymbol from kgXref;" > %s' % tmpFh.name #os.system(cmd) #res = {} #for line in open(tmpFh.name).read().splitlines(): #kgId, sym = line.split("\t") #res[kgId] = sym #return res def writeCounts(allCounts): kgIdToSym = getKgIdToSym() - for dataType, counter in allCounts.iteritems(): + for dataType, counter in allCounts.items(): ofh = open(dataType+".tab", "w") if dataType.endswith("Set"): - for id, val in counter.iteritems(): + for id, val in counter.items(): row = [id, str(len(val)), ",".join(val)] ofh.write("\t".join(row)+"\n") else: for id, count in counter.most_common(): if dataType=="genes": row = [id, kgIdToSym.get(id, ""), str(count)] else: row = [id, str(count)] ofh.write("\t".join(row)+"\n") ofh.close() - print "wrote %s" % ofh.name + print("wrote %s" % ofh.name) #def postProcess(allCounts): #""" assume allCounts is a dict with name -> someVariable #if sameVariable is a set, replace it with its length #return the result #""" # #newDict = {} #for name, var in allCounts.iteritems(): #if name.endswith("Set"): #counter = Counter() #for key, dataSet in var.iteritems(): #counter[key] = len(dataSet) #newDict[name] = counter #else: @@ -487,55 +492,55 @@ countParts.append(ipCountStr) countSum += count itemCount +=1 return countSum, itemCount, ",".join(countParts) def writeDictCounter(dictCounter, outFname, headers): """ write dict with key -> Counter to tab.gz out file with four fields: 1 key 2 number of items in counter 3 sum of counts in counter 4 complete contents of counter in format item:count|item2:count|... """ makeDir(dirname(outFname)) lines = [] - for key, itemCounts in dictCounter.iteritems(): + for key, itemCounts in dictCounter.items(): countSum, itemCount, countStr = counterToString(itemCounts) fields = [key, str(countSum), str(itemCount), countStr] lines.append("\t".join(fields)) lines.sort() writeLines(headers, lines, outFname) def firstField(line): """Extract timestamp and convert to a form that gives the expected result in a comparison """ return line.split()[0] # for example def mergeLines(lastValues, ofh): " given fieldNames and fieldName -> values dict, write merged line to ofh " c = Counter() for val in lastValues: parts = val.split(",") for part in parts: #if ":" not in part: #print part vals = part.split(":") if len(vals)!=2: - print vals + print(vals) key, count = vals c[key]+=int(count) itemCount, totalSum, countStr = counterToString(c) row = [str(itemCount), str(totalSum), countStr] #if field.endswith("Count"): #total = sum([int(x) for x in values]) #row.append(str(total)) #print "writing row", row ofh.write("\t".join(row)) ofh.write("\n") def mergeFiles(inFnames, outFname): " merge sorted tabsep files to outFname " ifhs = [] @@ -623,76 +628,76 @@ #subDirs = listdir(mergeDir) #logging.debug("Found %d input dirs" % len(subDirs)) #for subDir in subDirs: #fnames = listdir(join(mergeDir, subDir)) ## 03-cat/hgsidHgsids/20130903.tab.gz #for fname in fnames: #year = fname[:4] #outPath = join(yearDir, "%s_%s.tab.gz" % (year,subDir)) #yearToInFnames[outPath].append( join(mergeDir, subDir, fname) ) #return yearToInFnames def mergeFnames(dateToFnames): """ return list of infnames, outfname tuples. infname is like 2013_hgw8_20130414.hgcClicks.tab.gz , outfname like hgClicks/20130414.tab.gz """ - for outPath, inFnames in dateToFnames.iteritems(): + for outPath, inFnames in dateToFnames.items(): assert(len(inFnames)>0) logging.info("%s" % outPath) logging.debug("merging %s into %s" % (inFnames, outPath)) mergeFiles(inFnames, outPath) def makeDir(path): if not os.path.isdir(path): logging.info("Creating directory %s" % path) os.makedirs(path) def plot(mergeDir, htmlDir): " reformat count hgcClicks files to csv format for C3 " inDir = join(mergeDir, "hgcClicks") inFnames = glob.glob(join(inDir, "*.tab.gz")) # sort filenames into year -> names byYear = defaultdict(list) for inFname in inFnames: year = basename(inFname)[:4] byYear[year].append(basename(inFname).split(".")[0]) - for year, fnames in byYear.iteritems(): + for year, fnames in byYear.items(): # sort fnames by date dates = [(datetime.datetime.strptime(fn, "%Y-%m-%d"), fn) for fn in fnames] dates.sort() dates = [y for (x,y) in dates] trackClickSum = defaultdict(int) # parse data into nested dict date -> track -> count dateTrackCounts = {} allTracks = set() for date in dates: dateCounts = {} path = join(inDir, date+".tab.gz") for line in gzip.open(path): if line.startswith("#"): continue track, clickCount, ipCount, rest = line.split("\t") allTracks.add(track) dateCounts[track] = ipCount trackClickSum[track] += int(ipCount) dateTrackCounts[date] = dateCounts allTracks = [] - trackClickSum = trackClickSum.items() # convert dict to list of tuples (track, count) + trackClickSum = list(trackClickSum.items()) # convert dict to list of tuples (track, count) trackClickSum.sort(key=operator.itemgetter(1), reverse=True) # sort by count rank = 0 for track, count in trackClickSum: if track.startswith("htc") or track in ["getDna","refGene", "hgUserPsl"] or track.startswith("ct_"): continue #if count>=5000: #allTracks.append(track) allTracks.append(track) if rank >= 20: break rank += 1 allTracks.sort() @@ -708,95 +713,95 @@ dateTracks = dateTrackCounts[date] row = [date] for track in allTracks: count = dateTracks.get(track, 0) row.append(str(count)) ofh.write(",".join(row)) ofh.write("\n") ofh.close() logging.info("Wrote %s" % outName) def apacheToTab(inFname, outBotIpsFname): " parse apache log file to tab file. outBotIpsFname is something like 20130420_hgw4.botIps, write output to 20130420_hgw4_.tab.gz " logging.info("Parsing %s" % inFname) if inFname.endswith(".gz"): - ifh = gzip.open(inFname) + ifh = gzip.open(inFname, "rt", encoding="utf8") else: - ifh = open(inFname) + ifh = open(inFname, "rt", encoding="utf8") baseOut = outBotIpsFname.replace(".botIps.txt","") fileHandles = {} # cache the file handles count = 0 botIps = set() for row in csv.reader(ifh, delimiter = " ", escapechar='\\'): # parse apache log line count += 1 if count % 20000 == 0: - print "parsed %d rows" % count + print("parsed %d rows" % count) log = parseApacheLine(row) if log==None: #logging.info("parse error: %s" % row) continue weekDate = dateToWeek(log.time) # skip if it's a bot if isBot(log.agent): #botCounts[a.agent]+=1 botIps.add(log.ip) logging.debug("%s is a bot" % log.agent) continue outFname = baseOut+"_"+weekDate+".tab.gz" if outFname in fileHandles: ofh = fileHandles[outFname] else: - ofh = gzip.open(outFname, "w") + ofh = gzip.open(outFname, "wt", encoding="utf8") ofh.write("#"+"\t".join(apacheFields)+"\n") fileHandles[outFname] = ofh ofh.write("\t".join(log)) ofh.write("\n") ofh = open(outBotIpsFname, "w") for ip in botIps: ofh.write("%s\n" % ip) ofh.close() def catFiles(inFnames, outFname): " cat all inFnames to outFname, taking care of header lines " ofh = gzip.open(outFname, "w") headerWritten = False for inFname in inFnames: ifh = gzip.open(inFname) headerLine = ifh.readline() if not headerWritten: ofh.write(headerLine) headerWritten=True ofh.write(ifh.read()) def countJob(inFname, outFnameParasol): baseOutDir = dirname(dirname(outFnameParasol)) outBase = basename(outFnameParasol).split(".")[0] outFname = join(baseOutDir, "hgcClicks", outBase+".tab.gz") - print outFname, outFnameParasol + print(outFname, outFnameParasol) assert (outFname==outFnameParasol) # output path has to look like /hgcClicks/.tab.gz hgcClicks, ipSids, agentCounts = apacheCount(inFname) headers = ["track", "clickCount", "ipCount", "countsPerIp_CountList"] writeDictCounter(hgcClicks, outFname, headers) headers = ["ip", "clickCount", "hgsidCount", "countsPerHgsid_CountList"] outFname = join(baseOutDir, "ipHgsids", outBase+".tab.gz") writeDictCounter(ipSids, outFname, headers) headers = ["agent", "clickCount", "filePathCount", "countsPerFile_CountList"] outFname = join(baseOutDir, "agentFiles", outBase+".tab.gz") writeDictCounter(agentCounts, outFname, headers)