54a643b32015c6a4672bd540b4bf7a94a751e1ce markd Wed Feb 3 12:45:46 2021 -0800 added unit test of gfServer dynamic PCR diff --git src/gfServer/tests/dynTester src/gfServer/tests/dynTester index b1427e2..8f507bc 100755 --- src/gfServer/tests/dynTester +++ src/gfServer/tests/dynTester @@ -1,160 +1,188 @@ #!/usr/bin/env python3 # Test driver for dynamic server. This is required because of the need to interact on stdin/stdout # using a somewhat binary protocol. import sys import argparse import subprocess +import re +from collections import namedtuple debug = False valGrind = False def parseArgs(): usage = """Run a test of qfServer dynserver""" parser = argparse.ArgumentParser(description=usage) parser.add_argument('--debug', action="store_true", default=False, help="enable debug tracing") parser.add_argument('--valgrind', action="store_true", default=False, help="run valgrind on gfServer without to valgrind.out") parser.add_argument('rootDir', help="data root directory") - parser.add_argument('queryType', choices=("query", "protQuery", "transQuery"), - help="query request: query, protQuery, or transQuery") + parser.add_argument('queryType', choices=("query", "protQuery", "transQuery", "pcr"), + help="query type to run") parser.add_argument('genome', help="genome name") parser.add_argument('genomeDataDir', help="directory for genome files") parser.add_argument('queryFa', - help="query fasta") + help="query fasta, for PCR, each data line should contain the two primers, separated by a '/'") parser.add_argument('resultsOut', help="out is saved here for checking ") args = parser.parse_args() global debug, valgrind debug = args.debug valgrind = args.valgrind return args gfSignature = "0ddf270562684f29" +class Seq(namedtuple("Seq", ("seqid", "data", "revPrimer"))): + "data bag for a sequence from the fasta" + pass + +def makeSeq(idline, data): + "store a parsed FASTA record in a Seq object" + seqid = re.match("^>(\\S+)", idline).group(1) + parts = data.split('/') # for PCR + data = parts[0] + revPrimer = parts[1] if len(parts) > 1 else None + return Seq(seqid, data, revPrimer) + def readFastaSeqsFh(faFh): - """read from one sequence from fasta""" + """read from sequences from fasta""" seqs = [] - seq = "" + idline = "" + data = "" for line in faFh: line = line.strip() if line.startswith('>'): - if len(seq) > 0: - seqs.append(seq) - seq = "" + if len(data) > 0: + seqs.append(makeSeq(idline, data)) + data = "" + idline = line elif len(line) > 0: - seq = seq + line - if len(seq) > 0: - seqs.append(bytes(seq, encoding="latin-1")) + data += line.strip() + if len(data) > 0: + seqs.append(makeSeq(idline, data)) return seqs def readFastaSeqs(queryFa): """simplistic FASTA reader, just code here to avoid added dependency""" with open(queryFa) as faFh: return readFastaSeqsFh(faFh) def readBytes(fh, count): "read specified number of bytes, handing incomplete reads" buf = bytes() while len(buf) < count: resp = fh.read(count - len(buf)) if len(resp) == 0: raise Exception("unexpected EOF from server") buf += resp if debug: print("debug: read:", str(buf, encoding="latin-1"), file=sys.stderr) return buf def writeBytes(fh, buf): "write bytes with debug tracing and byte conversion" if not isinstance(buf, bytes): buf = bytes(buf, encoding="utf8") if debug: print("debug: write:", str(buf, encoding="latin-1"), file=sys.stderr) fh.write(buf) def netSendString(fh, data): "Send a string down a socket - length byte first. (matches net.c)" fh.write(fh, bytes(len(data))) fh.write(data) def netRecieveString(fd): "Read string return it (matches net.c)" buf = readBytes(fd, 1) return readBytes(fd, buf[0]) def serverStart(rootDir): "start server process" # unbuffered is required, as protocol is not line-terminate cmd = [] if valgrind: cmd += ["valgrind", "--log-file=valgrind.out"] cmd += ["gfServer"] if debug: cmd += ["-log=/dev/stderr", "-debugLog"] cmd += ["dynserver", rootDir] if debug: print("debug: start:", " ".join(cmd), file=sys.stderr) return subprocess.Popen(cmd, bufsize=0, stdin=subprocess.PIPE, stdout=subprocess.PIPE) def serverWait(gfServer): "wait for server process to exit" gfServer.stdin.close() gfServer.stdout.close() gfServer.wait() if gfServer.returncode != 0: raise Exception("gfServer process exited with {}".format(gfServer.returncode)) def readResults(gfServer, resultsFh): "read the results, which come back using the net string stuff" while True: resp = netRecieveString(gfServer.stdout) if len(resp) > 0: print(str(resp, encoding="latin-1"), file=resultsFh) if resp == b'end': break def queryRequest(gfServer, queryType, genome, genomeDataDir, seq, resultsFh): "make a query to the server" - query = "{}{} {} {} {}\n".format(gfSignature, queryType, genome, genomeDataDir, len(seq)) + query = "{}{} {} {} {}\n".format(gfSignature, queryType, genome, genomeDataDir, len(seq.data)) print(query, file=resultsFh) writeBytes(gfServer.stdin, query) resp = readBytes(gfServer.stdout, 1) if resp != b"Y": resp += gfServer.stdout.read() raise Exception("expected 'Y' from gfServer, got '{}'".format(resp)) - writeBytes(gfServer.stdin, seq) + writeBytes(gfServer.stdin, seq.data) + readResults(gfServer, resultsFh) + +def pcrRequest(gfServer, genome, genomeDataDir, seq, resultsFh): + "make a query to the server" + maxDistance = 4 * 1024 + query = "{}{} {} {} {} {} {}\n".format(gfSignature, "pcr", genome, genomeDataDir, seq.data, seq.revPrimer, maxDistance) + print(query, file=resultsFh) + writeBytes(gfServer.stdin, query) + readResults(gfServer, resultsFh) def infoRequest(gfServer, queryType, genome, genomeDataDir, resultsFh): "info request to the server" - infoType = "untransInfo" if queryType == "query" else "transInfo" + infoType = "untransInfo" if queryType in ("query", "pcr") else "transInfo" query = "{}{} {} {}\n".format(gfSignature, infoType, genome, genomeDataDir) print(query, file=resultsFh) writeBytes(gfServer.stdin, query) readResults(gfServer, resultsFh) def dynTester(rootDir, queryType, genome, genomeDataDir, queryFa, resultsFh): "run tests" seqs = readFastaSeqs(queryFa) if len(seqs) == 0: raise Exception("no sequences found in {}".format(queryFa)) gfServer = serverStart(rootDir) for seq in seqs: + if queryType == "pcr": + pcrRequest(gfServer, genome, genomeDataDir, seq, resultsFh) + else: queryRequest(gfServer, queryType, genome, genomeDataDir, seq, resultsFh) infoRequest(gfServer, queryType, genome, genomeDataDir, resultsFh) serverWait(gfServer) def main(): args = parseArgs() with open(args.resultsOut, "w") as resultsFh: dynTester(args.rootDir, args.queryType, args.genome, args.genomeDataDir, args.queryFa, resultsFh) main()