bc172b659b426818b82e185e9846aa63b2a9544d
max
  Tue Aug 29 09:53:44 2017 -0700
CIRM: new tool webSync, a small script that can download hundreds of thousands of
files quickly from a webserver and keep them in sync with a local
directory. Like rsync, but faster and does not require an open port.

diff --git src/utils/webSync src/utils/webSync
new file mode 100755
index 0000000..b8c6942
--- /dev/null
+++ src/utils/webSync
@@ -0,0 +1,268 @@
+#!/usr/bin/env python
+
+import logging, sys, optparse, os, time, atexit
+from collections import defaultdict
+from os.path import join, basename, dirname, isfile, isdir
+import string
+
+logDir = "webSyncLog"
+
+flagFname = join(logDir, "isRunning.flag")
+
+# ==== functions =====
+def parseArgs():
+    " setup logging, parse command line arguments and options. -h shows auto-generated help page "
+    parser = optparse.OptionParser("""usage: %prog [options] <url> - download from https server, using files.txt on their end to get the list of files
+
+    To create files.txt on the remote end, run this command:
+      du -ab > files.txt
+    Or preferably this command (empty directories will lead to transmit errors):
+      find . -type f -exec du -ab {} +
+
+    Then run this in the download directory:
+      webSync https://there.org/
+
+    This will create a "webSyncLog" directory in the current directory, compare
+    https://there.org/files.txt with the files in the current directory,
+    transfer the missing files and write the changes to webSync/transfer.log.
+
+    The URL will be saved after the first run and is not necessary from then on. You can add
+    cd xxx && webSync to your crontab. It will not start if it's already running (flagfile).
+
+    Status files after a run:
+    - webSyncLog/biggerHere.txt - list of files that are bigger here. These could be errors or OK.
+    - webSyncLog/files.here.txt - the list of files here
+    - webSyncLog/files.there.txt - the list of files there, current copy of https://there.org/files.txt
+    - webSyncLog/missingThere.txt - the list of files not on https://there.org anymore but here
+    - webSyncLog/transfer.log - big transfer log, each run, date and size of transferred file is noted here.
+    """)
+
+    parser.add_option("-d", "--debug", dest="debug", action="store_true", help="show debug messages")
+    parser.add_option("-x", "--connections", dest="connections", action="store", help="Maximum number of parallel connections to the server, default %default", default=10)
+    parser.add_option("-s", "--skipScan", dest="skipScan", action="store_true", help="Do not scan local file sizes again, in case you know it is up to date")
+    #parser.add_option("-f", "--file", dest="file", action="store", help="run on file") 
+    #parser.add_option("", "--test", dest="test", action="store_true", help="do something") 
+    (options, args) = parser.parse_args()
+
+    if args==[] and not isfile(join(logDir, "url.txt")):
+        parser.print_help()
+        exit(1)
+
+    if options.debug:
+        logging.basicConfig(level=logging.DEBUG)
+    else:
+        logging.basicConfig(level=logging.INFO)
+    return args, options
+# ----------- main --------------
+
+def run(cmd, mustRun=True):
+    " wrapper around os.system "
+    logging.info("Running %s" % cmd)
+    ret = os.system(cmd)
+    if ret!=0 and mustRun:
+        logging.error("Could not run command %s" % cmd)
+        sys.exit(0)
+    return ret
+
+def delFlag():
+    " called on exit "
+    if isfile(flagFname):
+        logging.debug("Removing flag file")
+        os.remove(flagFname)
+
+def parseFileList(fname):
+    " read output from 'du -ab' and return as dict filename -> size. Skips directory names "
+    logging.info("Reading %s" % fname)
+    sizes = dict()
+    dirs = set()
+    for line in open(fname):
+       size, fname = string.split(line.rstrip("\n"), maxsplit=1)
+       size = int(size.strip())
+       fname = fname.strip().lstrip(".").lstrip("/")
+       fdir = dirname(fname)
+       if fdir!="":
+           dirs.add(fdir)
+       sizes[fname] = size
+
+    for d in dirs:
+        if d in sizes:
+            del sizes[d]
+
+    logging.info("Read %d filenames, %d directories" % (len(sizes), len(dirs)))
+    return sizes, dirs
+
+def writeDiffUrls(thereFiles, hereFiles, url, outFname, biggerHereFname, missingThereFname, logFh):
+    " write missing or unequal sized files to outFname, prefixed by url. Return false if nothing was found. "
+    logging.debug("Writing missing files to %s" % outFname)
+    ofh = open(outFname, "w")
+    missingCount = 0
+    smallerHereCount = 0
+    smallerThereCount = 0
+    count = 0
+    biggerHereFh = open(biggerHereFname, "w")
+    missingThereFh = open(missingThereFname, "w")
+    totalSizeToGet = 0
+
+    for fn, sizeThere in thereFiles.iteritems():
+        if fn=="files.txt":
+            continue
+        sizeHere = hereFiles.get(fn)
+        doGet = False
+        if sizeHere==None:
+            missingCount +=1
+            doGet = True
+        elif sizeHere > sizeThere:
+            errMsg = "File %s is bigger here than there: here %d bytes, there %d bytes, diff %d" %\
+                (fn, sizeHere, sizeThere, (sizeHere-sizeThere))
+            logging.debug(errMsg)
+
+            shortErr = "%s %d %d %d\n" % (fn, sizeHere, sizeThere, (sizeHere-sizeThere))
+            biggerHereFh.write(shortErr)
+            biggerHereFh.flush()
+
+            smallerThereCount += 1
+        elif sizeHere < sizeThere:
+            smallerHereCount += 1
+            doGet = True
+            
+        if doGet:
+            totalSizeToGet += sizeThere
+            ofh.write(join(url, fn))
+            ofh.write("\n")
+            ofh.write(" dir=%s\n" % dirname(fn)) # special syntax of aria2c config file to specify output filename
+            ofh.write(" out=%s\n" % basename(fn))
+            count += 1
+    ofh.close()
+
+    missingThereCount = 0
+    for fn, sizeHere in hereFiles.iteritems():
+        if fn not in thereFiles:
+            missingThereFh.write("%s\n"% fn)
+            missingThereCount += 1
+
+    if missingThereCount!=0:
+        logging.info("Found %d files that are missing there (removed by upstream, see webSyncLog/missingThere.txt)" % missingThereCount)
+
+    if smallerThereCount!=0:
+        logging.info("Found %d files that are bigger here (errors? see webSyncLog/biggerHere.txt)" % smallerThereCount)
+
+    if missingCount!=0:
+        logging.info("Found %d files that are missing here" % missingCount)
+
+    if smallerHereCount!=0:
+        logging.info("Found %d files that are smaller here" % smallerHereCount)
+
+    if count==0:
+        logging.info("Nothing to download")
+        return False
+
+    logging.info("downloading %d files now (missing here or smaller here), %d bytes" % (count, totalSizeToGet))
+
+    logFh.write("========== webSync start %s ===========\n" % time.strftime("%Y-%m-%d %H:%M"))
+    logFh.write("missingHere=%d smallerHere=%d biggerHere=%d\n" % (missingCount, smallerHereCount, smallerThereCount))
+    logFh.flush()
+    return True
+
+def runAria2c(fname, ariaLog, logFh, connCount):
+    "run aria2c with fname as the input file "
+    # XX is continue=true a good idea? 
+    # XX check-certificate=false is definitely a bad idea
+    logFh.write("aria2c start %s \n" % time.strftime("%Y-%m-%d %H:%M"))
+    logFh.flush()
+    cmd = "aria2c -x %d -Z -i %s --summary-interval=0 --continue=true --enable-color=false --check-certificate=false --auto-file-renaming=false --file-allocation=none --allow-overwrite=true > %s" % \
+        (connCount, fname, ariaLog)
+    run(cmd)
+
+def rewriteAriaLog(ariaLogFname, logFh, fileSizes):
+    " rewrite aria log lines to logFname "
+    # 660a86|OK  |       0B/s|DCM_FrazerGroup/07.26.2017/RSEM_out/b0e17437-900f-423f-a6e8-30f725eefe2f.RSEMLog
+    # ff071f|ERR |       0B/s|https://cirmtransfer.salk.edu/DCM_FrazerGroup/02.16.2017/STAR_out/f7b48d9d-678d-4121-a4eb-b6b3f3ebf983__STARtmp/BAMsort/2
+    # 1ac78e|INPR|        n/a|https://cirmtransfer.salk.edu/DCM_BruneauGroup/08.08.2017_IK-2056_3BsdmRNA/QC_figures/per_sequence_gc_content.jpg
+    logFh.write("aria2c transfer log parsing %s\n" % time.strftime("%Y-%m-%d %H:%M"))
+
+    for line in open(ariaLogFname):
+        if line.count("|")==3:
+            if line.startswith("gid"):
+                continue
+            row = line.split("|")
+            gid, status, speed, fname = row
+            status = status.strip()
+            speed = speed.strip()
+            fname = fname.strip()
+            fSize = fileSizes[fname]
+            logFh.write("*%s\t%s\t%s\t%d\n" % (status, speed, fname, fSize))
+
+    logFh.write("aria2c log parse done %s\n" % time.strftime("%Y-%m-%d %H:%M"))
+
+def webSync(url, options):
+    " download using files.txt and aria2c "
+    if not isdir(logDir):
+        os.makedirs(logDir)
+
+    if isfile(flagFname):
+        logging.error("%s exists. It looks like another instance of webSync is already running." % flagFname)
+        sys.exit(1)
+
+    atexit.register(delFlag)
+    open(flagFname, "w")
+
+    filesThereName = join(logDir, "files.there.txt")
+    filesHereName = join(logDir, "files.here.txt")
+
+    if isfile(filesThereName) and not options.skipScan:
+        os.remove(filesThereName)
+    if isfile(filesHereName) and not options.skipScan:
+        os.remove(filesHereName)
+
+    if not isfile(filesThereName):
+        fileUrl = join(url, "files.txt")
+        logging.debug("Downloading %s" % fileUrl)
+        cmd = "wget -q %s --no-check-certificate -O %s" % (fileUrl, filesThereName)
+        run(cmd)
+
+    if not isfile(filesHereName):
+        cmd = "du -ab > %s" % filesHereName
+        run(cmd)
+
+    hereFiles, hereDirs = parseFileList(filesHereName)
+    thereFiles, thereDirs = parseFileList(filesThereName)
+
+    logging.debug("checking %d directories, e.g. %s" % (len(thereDirs), list(thereDirs)[:3]))
+
+    for d in thereDirs:
+        if not isdir(d):
+            os.makedirs(d)
+
+    biggerHereFname = join(logDir, "biggerHere.txt")
+    missingThereFname = join(logDir, "missingThere.txt")
+
+    ariaCmdFname = join(logDir, "aria2c.in.tmp")
+    logFname = join(logDir, "transfer.log")
+    logFh = open(logFname, "a")
+    doDownload = writeDiffUrls(thereFiles, hereFiles, url, ariaCmdFname, biggerHereFname, missingThereFname, logFh)
+
+    if doDownload:
+        ariaLogFname = join(logDir, "aria2c.out.tmp")
+        runAria2c(ariaCmdFname, ariaLogFname, logFh, options.connections)
+        rewriteAriaLog(ariaLogFname, logFh, thereFiles)
+
+    logFh.write("========== webSync end %s ===========\n" % time.strftime("%Y-%m-%d %H:%M"))
+    logFh.close()
+    logging.info("websync done")
+
+def main():
+    args, options = parseArgs()
+
+    urlFname = join(logDir, "url.txt")
+    if len(args)==1:
+        url = args[0]
+        open(urlFname, "w").write(url)
+    else:
+        url = open(urlFname).read()
+
+    webSync(url, options)
+    #if options.test:
+        #logging.debug("test is set")
+        #f = open(options.file, "r")
+
+main()