bc172b659b426818b82e185e9846aa63b2a9544d max Tue Aug 29 09:53:44 2017 -0700 CIRM: new tool webSync, a small script that can download hundreds of thousands of files quickly from a webserver and keep them in sync with a local directory. Like rsync, but faster and does not require an open port. diff --git src/utils/webSync src/utils/webSync new file mode 100755 index 0000000..b8c6942 --- /dev/null +++ src/utils/webSync @@ -0,0 +1,268 @@ +#!/usr/bin/env python + +import logging, sys, optparse, os, time, atexit +from collections import defaultdict +from os.path import join, basename, dirname, isfile, isdir +import string + +logDir = "webSyncLog" + +flagFname = join(logDir, "isRunning.flag") + +# ==== functions ===== +def parseArgs(): + " setup logging, parse command line arguments and options. -h shows auto-generated help page " + parser = optparse.OptionParser("""usage: %prog [options] - download from https server, using files.txt on their end to get the list of files + + To create files.txt on the remote end, run this command: + du -ab > files.txt + Or preferably this command (empty directories will lead to transmit errors): + find . -type f -exec du -ab {} + + + Then run this in the download directory: + webSync https://there.org/ + + This will create a "webSyncLog" directory in the current directory, compare + https://there.org/files.txt with the files in the current directory, + transfer the missing files and write the changes to webSync/transfer.log. + + The URL will be saved after the first run and is not necessary from then on. You can add + cd xxx && webSync to your crontab. It will not start if it's already running (flagfile). + + Status files after a run: + - webSyncLog/biggerHere.txt - list of files that are bigger here. These could be errors or OK. + - webSyncLog/files.here.txt - the list of files here + - webSyncLog/files.there.txt - the list of files there, current copy of https://there.org/files.txt + - webSyncLog/missingThere.txt - the list of files not on https://there.org anymore but here + - webSyncLog/transfer.log - big transfer log, each run, date and size of transferred file is noted here. + """) + + parser.add_option("-d", "--debug", dest="debug", action="store_true", help="show debug messages") + parser.add_option("-x", "--connections", dest="connections", action="store", help="Maximum number of parallel connections to the server, default %default", default=10) + parser.add_option("-s", "--skipScan", dest="skipScan", action="store_true", help="Do not scan local file sizes again, in case you know it is up to date") + #parser.add_option("-f", "--file", dest="file", action="store", help="run on file") + #parser.add_option("", "--test", dest="test", action="store_true", help="do something") + (options, args) = parser.parse_args() + + if args==[] and not isfile(join(logDir, "url.txt")): + parser.print_help() + exit(1) + + if options.debug: + logging.basicConfig(level=logging.DEBUG) + else: + logging.basicConfig(level=logging.INFO) + return args, options +# ----------- main -------------- + +def run(cmd, mustRun=True): + " wrapper around os.system " + logging.info("Running %s" % cmd) + ret = os.system(cmd) + if ret!=0 and mustRun: + logging.error("Could not run command %s" % cmd) + sys.exit(0) + return ret + +def delFlag(): + " called on exit " + if isfile(flagFname): + logging.debug("Removing flag file") + os.remove(flagFname) + +def parseFileList(fname): + " read output from 'du -ab' and return as dict filename -> size. Skips directory names " + logging.info("Reading %s" % fname) + sizes = dict() + dirs = set() + for line in open(fname): + size, fname = string.split(line.rstrip("\n"), maxsplit=1) + size = int(size.strip()) + fname = fname.strip().lstrip(".").lstrip("/") + fdir = dirname(fname) + if fdir!="": + dirs.add(fdir) + sizes[fname] = size + + for d in dirs: + if d in sizes: + del sizes[d] + + logging.info("Read %d filenames, %d directories" % (len(sizes), len(dirs))) + return sizes, dirs + +def writeDiffUrls(thereFiles, hereFiles, url, outFname, biggerHereFname, missingThereFname, logFh): + " write missing or unequal sized files to outFname, prefixed by url. Return false if nothing was found. " + logging.debug("Writing missing files to %s" % outFname) + ofh = open(outFname, "w") + missingCount = 0 + smallerHereCount = 0 + smallerThereCount = 0 + count = 0 + biggerHereFh = open(biggerHereFname, "w") + missingThereFh = open(missingThereFname, "w") + totalSizeToGet = 0 + + for fn, sizeThere in thereFiles.iteritems(): + if fn=="files.txt": + continue + sizeHere = hereFiles.get(fn) + doGet = False + if sizeHere==None: + missingCount +=1 + doGet = True + elif sizeHere > sizeThere: + errMsg = "File %s is bigger here than there: here %d bytes, there %d bytes, diff %d" %\ + (fn, sizeHere, sizeThere, (sizeHere-sizeThere)) + logging.debug(errMsg) + + shortErr = "%s %d %d %d\n" % (fn, sizeHere, sizeThere, (sizeHere-sizeThere)) + biggerHereFh.write(shortErr) + biggerHereFh.flush() + + smallerThereCount += 1 + elif sizeHere < sizeThere: + smallerHereCount += 1 + doGet = True + + if doGet: + totalSizeToGet += sizeThere + ofh.write(join(url, fn)) + ofh.write("\n") + ofh.write(" dir=%s\n" % dirname(fn)) # special syntax of aria2c config file to specify output filename + ofh.write(" out=%s\n" % basename(fn)) + count += 1 + ofh.close() + + missingThereCount = 0 + for fn, sizeHere in hereFiles.iteritems(): + if fn not in thereFiles: + missingThereFh.write("%s\n"% fn) + missingThereCount += 1 + + if missingThereCount!=0: + logging.info("Found %d files that are missing there (removed by upstream, see webSyncLog/missingThere.txt)" % missingThereCount) + + if smallerThereCount!=0: + logging.info("Found %d files that are bigger here (errors? see webSyncLog/biggerHere.txt)" % smallerThereCount) + + if missingCount!=0: + logging.info("Found %d files that are missing here" % missingCount) + + if smallerHereCount!=0: + logging.info("Found %d files that are smaller here" % smallerHereCount) + + if count==0: + logging.info("Nothing to download") + return False + + logging.info("downloading %d files now (missing here or smaller here), %d bytes" % (count, totalSizeToGet)) + + logFh.write("========== webSync start %s ===========\n" % time.strftime("%Y-%m-%d %H:%M")) + logFh.write("missingHere=%d smallerHere=%d biggerHere=%d\n" % (missingCount, smallerHereCount, smallerThereCount)) + logFh.flush() + return True + +def runAria2c(fname, ariaLog, logFh, connCount): + "run aria2c with fname as the input file " + # XX is continue=true a good idea? + # XX check-certificate=false is definitely a bad idea + logFh.write("aria2c start %s \n" % time.strftime("%Y-%m-%d %H:%M")) + logFh.flush() + cmd = "aria2c -x %d -Z -i %s --summary-interval=0 --continue=true --enable-color=false --check-certificate=false --auto-file-renaming=false --file-allocation=none --allow-overwrite=true > %s" % \ + (connCount, fname, ariaLog) + run(cmd) + +def rewriteAriaLog(ariaLogFname, logFh, fileSizes): + " rewrite aria log lines to logFname " + # 660a86|OK | 0B/s|DCM_FrazerGroup/07.26.2017/RSEM_out/b0e17437-900f-423f-a6e8-30f725eefe2f.RSEMLog + # ff071f|ERR | 0B/s|https://cirmtransfer.salk.edu/DCM_FrazerGroup/02.16.2017/STAR_out/f7b48d9d-678d-4121-a4eb-b6b3f3ebf983__STARtmp/BAMsort/2 + # 1ac78e|INPR| n/a|https://cirmtransfer.salk.edu/DCM_BruneauGroup/08.08.2017_IK-2056_3BsdmRNA/QC_figures/per_sequence_gc_content.jpg + logFh.write("aria2c transfer log parsing %s\n" % time.strftime("%Y-%m-%d %H:%M")) + + for line in open(ariaLogFname): + if line.count("|")==3: + if line.startswith("gid"): + continue + row = line.split("|") + gid, status, speed, fname = row + status = status.strip() + speed = speed.strip() + fname = fname.strip() + fSize = fileSizes[fname] + logFh.write("*%s\t%s\t%s\t%d\n" % (status, speed, fname, fSize)) + + logFh.write("aria2c log parse done %s\n" % time.strftime("%Y-%m-%d %H:%M")) + +def webSync(url, options): + " download using files.txt and aria2c " + if not isdir(logDir): + os.makedirs(logDir) + + if isfile(flagFname): + logging.error("%s exists. It looks like another instance of webSync is already running." % flagFname) + sys.exit(1) + + atexit.register(delFlag) + open(flagFname, "w") + + filesThereName = join(logDir, "files.there.txt") + filesHereName = join(logDir, "files.here.txt") + + if isfile(filesThereName) and not options.skipScan: + os.remove(filesThereName) + if isfile(filesHereName) and not options.skipScan: + os.remove(filesHereName) + + if not isfile(filesThereName): + fileUrl = join(url, "files.txt") + logging.debug("Downloading %s" % fileUrl) + cmd = "wget -q %s --no-check-certificate -O %s" % (fileUrl, filesThereName) + run(cmd) + + if not isfile(filesHereName): + cmd = "du -ab > %s" % filesHereName + run(cmd) + + hereFiles, hereDirs = parseFileList(filesHereName) + thereFiles, thereDirs = parseFileList(filesThereName) + + logging.debug("checking %d directories, e.g. %s" % (len(thereDirs), list(thereDirs)[:3])) + + for d in thereDirs: + if not isdir(d): + os.makedirs(d) + + biggerHereFname = join(logDir, "biggerHere.txt") + missingThereFname = join(logDir, "missingThere.txt") + + ariaCmdFname = join(logDir, "aria2c.in.tmp") + logFname = join(logDir, "transfer.log") + logFh = open(logFname, "a") + doDownload = writeDiffUrls(thereFiles, hereFiles, url, ariaCmdFname, biggerHereFname, missingThereFname, logFh) + + if doDownload: + ariaLogFname = join(logDir, "aria2c.out.tmp") + runAria2c(ariaCmdFname, ariaLogFname, logFh, options.connections) + rewriteAriaLog(ariaLogFname, logFh, thereFiles) + + logFh.write("========== webSync end %s ===========\n" % time.strftime("%Y-%m-%d %H:%M")) + logFh.close() + logging.info("websync done") + +def main(): + args, options = parseArgs() + + urlFname = join(logDir, "url.txt") + if len(args)==1: + url = args[0] + open(urlFname, "w").write(url) + else: + url = open(urlFname).read() + + webSync(url, options) + #if options.test: + #logging.debug("test is set") + #f = open(options.file, "r") + +main()