cb31fb5a2ebbbb167b7f07c19f3af9e3e724068c cvaske Tue Jan 7 13:48:52 2025 -0800 civic: remove certifi package hgwdev-new no longer needs the PyPI certificate bundle diff --git src/hg/utils/otto/civic/civicToBed.py src/hg/utils/otto/civic/civicToBed.py index b5cee02..40c1453 100644 --- src/hg/utils/otto/civic/civicToBed.py +++ src/hg/utils/otto/civic/civicToBed.py @@ -1,975 +1,969 @@ # /// script # requires-python = ">=3.12" # dependencies = [ # "pandas", -# "certifi", # ] # [tool.uv] # exclude-newer = "2024-11-20T00:00:00Z" # /// """Download CIViC DB files and convert into bigBed12 tracks This script is meant to be run with `uv run civicToBed.py` which will create a reproducible Python environment to run. EXTERNAL DEPENDENCIES: this script depends on several kent binaries, as well as data files. See the sections below with comment titles "External Shell Command Dependencies" and "Local Data File Dependencies" This script compares against the prior month of data and checks to see that there's not a drop of more than 10% of records, or and addition of greater than 100%. The primary database entry that has a genomic location is a Variant, which is in the VariantSummaries table. There are three types of "features" in CIViC in the VariantSummaries table: 1. Gene features - point mutations, insertions, deletions, amplification, overexpression, etc. Results in a single BED entry 2. Fusion features - a structural variant creating a gene fusion. Results in two BED entries, one for each fusion partner. Each BED entry will consist of a thick part for the fused exon, and a thin part for the intron where the fusion takes place. The website shows genomic locations for the exon of interest in both hg38 and hg19, but these coordinates do not make it to the downloadable TSVs. Instead, an ENST identifier plus an exon index are used. There are a few transcript identifiers that are not located in our GENCODE transcripts, unfortunately. 3. Factor features - (Ignored) complex cellular states without a genomic locus, such as kataegsis or a methylation signature. Additionally, there several other tables of interest that add disease and therapy information to each variant: - MolecularProfile - a summary of a set (possibly a singleton) of various variants that go together; example 1: BCR-ABL gene fusions with a point mutation in ABL1 p.T315I, example 2: KRAS G12 mutation when there's not a methylation signature. This is the logical structure that actually gets assigned evidence from ClinicalEvidenceSummaries or AssertionSummaries - ClinicalEvidenceSummaries - a statement made in papers/abstracts/trials that connects a molecular profile to a disease or prognosis or therapy. - AssertionSummary - an about a molecular profile that's an editor's summary of mulitple bits of clinical evidence. """ from collections import defaultdict from contextlib import closing from copy import deepcopy import dataclasses import datetime import logging import os import ssl import subprocess from typing import Callable, Final, Generator, Sequence import urllib.request -import certifi import numpy as np import pandas as pd ## ## External Shell Command Dependencies ## BED_TO_BIG_BED_CMD: Final = "bedToBigBed" LIFT_OVER_CMD: Final = "liftOver" BED_SORT_CMD: Final = "bedSort" ## ## Local Data File Dependencies ## LIFTOVER_CHAINS: Final = [ ["hg19", "hg38", "/hive/data/gbdb/hg19/liftOver/hg19ToHg38.over.chain.gz"], ["hg38", "hg19", "/hive/data/gbdb/hg38/liftOver/hg38ToHg19.over.chain.gz"], ] GENCODE_UCSC_FN: Final = { "hg38": "/hive/data/genomes/hg38/bed/gencodeV47/build/ucscGenes.bed", "hg19": "/hive/data/genomes/hg19/bed/gencodeV47lift37/build/ucscGenes.bed", } CHROM_SIZES: Final = { "hg38": "/hive/data/genomes/hg38/chrom.sizes", "hg19": "/hive/data/genomes/hg19/chrom.sizes", } -## hgwdev has trouble with SSL without using certifi, so this ensures -## that certificates from PyPI are used -ssl_ctx = ssl.create_default_context(cafile=certifi.where()) - logger = logging.getLogger(__name__) logging.basicConfig(level=logging.INFO) DOWNLOAD_BASE_URL: Final = "https://civicdb.org/downloads" DATA_TABLES: Final = [ "MolecularProfileSummaries", "VariantSummaries", "ClinicalEvidenceSummaries", "AssertionSummaries", ] ## Maximum lengeth of a string (e.g. insAGCATGACCAG...) before ## being truncated and appended with an ellipsis MAX_VARIANT_LENGTH: Final = 20 MAX_LONG_BED_FIELD_LENGTH: Final = 5000 ## Default color for items in output bed DEFAULT_BED_VARIANT_COLOR: Final = "0" DOWNLOAD_DIR: Final = "downloads" # these are in percentages MAX_ROW_INCREASE: Final = 100.0 MAX_ROW_DECREASE: Final = -10.0 # Minimum fraction of ENSEMBL transcripts found for fusions for success MIN_ENSEMBL_TX = 0.90 ## ## Not currently using this, but is the start of detecting variants ## without genomic coordinates... ## AMINO_ACIDS: Final = "ARNDCEQGHILKMFPSTWYVX" HGVS_STOP: Final = "fs|*" SIMPLE_HGVS_REGEX = f"[{AMINO_ACIDS}][0-9][0-9]*[{AMINO_ACIDS}]" def expect(condition: bool, message: str): if not condition: logging.error(f"Expectation failed: {message}") assert condition def transform_gene_variant_summaries_to_loci_2024_10(df: pd.DataFrame): """DEPRECATED---This transformation only works on VariantSummary.txt up until Oct 2024 and is included only for historical purposes Stages of processing: 1. filter out flagged variants 2. filter out variants without a reference build 3. Independently for locus1 and locus2, if one of chromosome, start, or stop is missing for the locus, make all three values missing 4. Convert each variant into upto 2 different VarianLocus entries, using the variant id as the base, and adding `_1` and `_2` for for each locus used in the variant """ logging.info(f"Initial Variants dataframe shape: {df.shape}") ## filter 1 df = df.loc[(df.is_flagged == False), :] logging.info(f"Dataframe shape of variants after filtering flagged: {df.shape}") ## filter 2 df = df.loc[~df.reference_build.isnull(), :] logging.info(f"Dataframe shape of variants after missing ref genome: {df.shape}") ## filter 3 locus1_cols = ["chromosome", "start", "stop"] locus2_cols = [x + "2" for x in locus1_cols] print(df.keys()) locus1_null = df.loc[:, locus1_cols].isnull().sum(1) > 0 locus2_null = df.loc[:, locus2_cols].isnull().sum(1) > 0 # Clear out partially entered genomic locations df.loc[locus1_null, locus1_cols] = np.nan df.loc[locus2_null, locus2_cols] = np.nan assert ((df[locus2_cols].isnull().sum(1) > 0) == locus2_null).all() assert ((df[locus1_cols].isnull().sum(1) > 0) == locus1_null).all() keep_cols = [ "variant_id", "gene", "variant", "reference_bases", "variant_bases", "reference_build", ] vldf1 = df.loc[~locus1_null, keep_cols + locus1_cols] vldf2 = df.loc[~locus2_null, keep_cols + locus2_cols] vldf2.rename(columns=dict(zip(vldf2.keys(), vldf1.keys())), inplace=True) vldf1["chromosome"] = "chr" + vldf1.chromosome.astype("string") vldf2["chromosome"] = "chr" + vldf2.chromosome.astype("string") vldf1["variant_locus_id"] = vldf1.variant_id.astype("string") + "_1" vldf2["variant_locus_id"] = vldf2.variant_id.astype("string") + "_2" vldf = pd.concat((vldf1, vldf2)) logging.info( f"Variants with a locus1 specified (1172 in Oct2024): {vldf1.shape[0]}" ) logging.info(f"Variants with a locus2 specified (4 in Oct2024): {vldf2.shape[0]}") logging.info( f"Variants with both locus1 & locus2 (2 in Oct2024): {sum(~(locus1_null | locus2_null))}" ) expect( set(vldf.reference_build) == set(["GRCh37", "GRCh38"]), message="Only two reference builds allowed, 'GRCh37', 'GRCh38'", ) return vldf def annotate_bed12(bed12df: pd.DataFrame, sourcedf: pd.DataFrame) -> pd.DataFrame: """Given a bed12 dataframe, and a fuller variant summary dataframe where the rows match up, transfer annotations from the variant summaries to make bed12+ This is used both on the gene variant bed12 and on fusion variant bed12 """ assert bed12df.shape[0] == sourcedf.shape[0] cvids = sourcedf["clinvar_ids"].fillna("").replace("^NONE FOUND$", "", regex=True) expect( cvids.str.match("[^0-9]").sum() == 0, "At least one clinvar_ids entry contains a non-numeric digit, other than 'NONE " "FOUND'. The parsing code must be updated to handle a non-numeric value", ) return ( bed12df.assign( variant_link=( sourcedf.variant_id.astype("str") + "|" + sourcedf.feature_name + " " + sourcedf.variant ) ) .assign(reference_build=sourcedf["reference_build"]) .assign(allele_registry_id=sourcedf["allele_registry_id"].fillna("")) .assign(clinvar_id=cvids) .assign(last_review_date=sourcedf["last_review_date"].fillna("")) .assign(disease_link=sourcedf["disease_links"]) .assign(therapies=sourcedf["therapies"]) .assign( mouseOverHTML="Associated therapies: " + sourcedf["therapies"].str.replace(",", ", ") + "
Associated diseases: " + sourcedf["disease_html"] ) ) ## These are includde sometimes for debugging purposes... # "variant_id": gdf.variant_id, # "reference_bases": gdf.reference_bases, # "variant_bases": gdf.variant_bases, def extract_variant_name(gdf: pd.DataFrame) -> pd.Series: """ Make a name for a variant to show in the browser, summarizing the genomic change (sequence change, fusion partners, expression, etc.) """ # this got a bit ugly, but functional if statements be like that full_variant_string = pd.Series( np.where( gdf.feature_type == "Fusion", gdf.feature_name, # fusions have very descriptive names. np.where( # these are the gene type variants gdf.reference_bases.isnull(), np.where( # empty variant bases --> variant descrption or an insertion gdf.variant_bases.isnull(), gdf.gene + " " + gdf.variant, gdf.gene + " ins" + gdf.variant_bases, ), np.where( # specified variant bases --> deleteion or variant gdf.variant_bases.isnull(), gdf.gene + " " + "del" + gdf.reference_bases, gdf.gene + " " + gdf.reference_bases + ">" + gdf.variant_bases, ), ), ) ) return pd.Series( np.where( full_variant_string.str.len() > MAX_VARIANT_LENGTH, full_variant_string.str.slice(start=0, stop=MAX_VARIANT_LENGTH - 3) + "...", full_variant_string, ) ) def transform_gene_variant_summaries(df: pd.DataFrame) -> pd.DataFrame: """Convert the gene variants from a cleaned/augmented VariantSummaries into bed12+ With the November 2024 release, the second locus columns were removed. Fusion genes instead refer to exon locations in Ensembl transcripts. Stages of processing: 1. filter out variants without a reference build 2. filter out partially entered locus info (missing chrom, start, or end) 3. construct a bed12+ DataFrame """ ## filter 1 df = df.loc[~df.reference_build.isnull(), :] logging.info(f"Dataframe shape of variants after missing ref genome: {df.shape}") ## filter 2 locus1_cols = ["chromosome", "start", "stop"] locus1_null = df.loc[:, locus1_cols].isnull().sum(1) > 0 # Clear out partially entered genomic locations df.loc[locus1_null, locus1_cols] = np.nan assert ((df[locus1_cols].isnull().sum(1) > 0) == locus1_null).all() gdf = df.loc[~locus1_null, :].reset_index(drop=True) bed_dict = { "chrom": "chr" + gdf.chromosome.astype("string"), "start": -1 + gdf.start.astype("Int64"), "end": gdf.stop.astype("Int64"), "name": gdf.short_variant_string, "score": 1, "strand": ".", "thickStart": 0, # gdf.start.astype("Int64"), "thickEnd": 0, # gdf.stop.astype("Int64"), "itemRgb": DEFAULT_BED_VARIANT_COLOR, "blockCount": 1, "blockSizes": gdf.stop.astype("Int64") - gdf.start.astype("Int64") + 1, "blockStarts": 0, # gdf.start.astype("Int64"), } vldf = annotate_bed12(pd.DataFrame(bed_dict), gdf) col_has_null = vldf.isnull().sum() > 0 col_has_null = list(col_has_null[col_has_null].index.values) expect( not col_has_null, f"Found null entries in variant locus bed columns: {col_has_null}", ) logging.info(f"Variants with a locus specified (1172 in Oct2024): {vldf.shape[0]}") expected_references = set(["GRCh37", "GRCh38"]) unexpected_references = set(vldf.reference_build) - expected_references expect( len(unexpected_references) == 0, message=f"Only know about genome references {expected_references}, but found {unexpected_references}", ) return vldf def split_one_to_n_map( df: pd.DataFrame, one_col: str, n_col: str, sep: str = ", ", value_transform=int ): d = {} for k, val_list in zip(df[one_col], df[n_col]): if k != k or val_list != val_list: ## skip missing values continue d[k] = [value_transform(x) for x in val_list.split(sep)] return d def parse_bed_int_list(l: str) -> list[int]: return [int(x) for x in l.rstrip(",").split(",")] @dataclasses.dataclass class Bed: chrom: str chromStart: int chromEnd: int name: str score: int strand: str thickStart: int thickEnd: int # itemRgb: str blockCount: int blockSizes: list[int] blockStarts: list[int] def as_str_dict(self) -> dict[str, str]: def c(field, val) -> str: if field.type == list[int]: return ",".join([str(x) for x in val]) + "," if field.type == int: return str(val) return val return { f.name: c(f, self.__getattribute__(f.name)) for f in dataclasses.fields(Bed) } @classmethod def from_line(cls, line: str) -> "Bed": fields = dataclasses.fields(Bed) v = line.rstrip("\n").split("\t") assert len(fields) == len(v) return Bed( v[0], int(v[1]), int(v[2]), v[3], int(v[4]), v[5], int(v[6]), int(v[7]), v[8], int(v[9]), parse_bed_int_list(v[10]), parse_bed_int_list(v[11]), ) def read_bed12(fn: str) -> Generator[Bed, None, None]: with closing(open(fn)) as f: for line in f: yield Bed.from_line(line) def gene_bed_to_exon( bed: Bed, index1: int, upstream_intron: bool, downstream_intron: bool, name: str ) -> Bed: """Given a gene Bed, pull out a particular exon indexed from the 5' side with a 1-based index, taking into account strand. Optionally include introns as thin parts. """ index0 = int(index1) - 1 if bed.strand == "-": upstream_intron, downstream_intron = downstream_intron, upstream_intron index0 = int(bed.blockCount) - 1 - index0 out = deepcopy(bed) ## Get the exon boundaries into the thick part out.name = name out.thickStart = int(bed.chromStart) + bed.blockStarts[index0] out.thickEnd = out.thickStart + bed.blockSizes[index0] ## Set the thin part, optionally including introns up_idx = index0 - 1 if upstream_intron and up_idx >= 0: out.chromStart += bed.blockStarts[up_idx] + bed.blockSizes[up_idx] else: out.chromStart = out.thickStart down_idx = index0 + 1 if downstream_intron and down_idx < bed.blockCount: out.chromEnd = bed.chromStart + bed.blockStarts[down_idx] else: out.chromEnd = out.thickEnd out.blockCount = 1 out.blockSizes = [out.chromEnd - out.chromStart] out.blockStarts = [0] return out def transform_fusion_variants(df: pd.DataFrame, ensembl_bed_fn: str) -> pd.DataFrame: """Convert the fusion type variants from VariantSummaries into a bed12+ data frame""" def remove_ens_version(x: str) -> str: return x.rpartition(".")[0] if x == x else "" fdf = df.loc[df["feature_type"] == "Fusion"].reset_index(drop=True) fdf = fdf.assign( upstream_tx=[remove_ens_version(x) for x in fdf["5_prime_transcript"]], upstream_exon=fdf["5_prime_end_exon"], downstream_tx=[remove_ens_version(x) for x in fdf["3_prime_transcript"]], downstream_exon=fdf["3_prime_start_exon"], reference_build="GENCODE", ) eset = set(fdf.upstream_tx) | set(fdf.downstream_tx) ens_transcript_names = set([x for x in eset if x]) # remove "" from null transcript logging.info(f"Reading BED `{ensembl_bed_fn}` to get Ensembl transcripts") ens2bed = {} for b in read_bed12(ensembl_bed_fn): b.name = remove_ens_version(str(b.name)) if b.name in ens_transcript_names: ens2bed[b.name] = b logging.info( f"Found {len(ens2bed)} transcripts from {len(ens_transcript_names)} specified in CIViC" ) expect( float(len(ens2bed)) / len(ens_transcript_names) > MIN_ENSEMBL_TX, "Not enough matches in Ensembl transcripts, parser or input fix needed", ) valid_upstream = fdf.upstream_tx.isin(ens2bed) & ~fdf.upstream_exon.isnull() upstream_variants = fdf.loc[valid_upstream, :].reset_index(drop=True) upstream_bed12 = [] for r in upstream_variants.itertuples(): upstream_bed12.append( gene_bed_to_exon( ens2bed[r.upstream_tx], r.upstream_exon, False, True, r.short_variant_string, ).as_str_dict() ) upstream_vldf = annotate_bed12(pd.DataFrame(upstream_bed12), upstream_variants) valid_downstream = fdf.downstream_tx.isin(ens2bed) & ~fdf.downstream_exon.isnull() downstream_variants = fdf.loc[valid_downstream, :].reset_index(drop=True) downstream_bed12 = [] for r in downstream_variants.itertuples(): downstream_bed12.append( gene_bed_to_exon( ens2bed[r.downstream_tx], r.downstream_exon, True, False, r.short_variant_string, ).as_str_dict() ) downstream_vldf = annotate_bed12( pd.DataFrame(downstream_bed12), downstream_variants ) return pd.concat([upstream_vldf, downstream_vldf]) def write_df_to_bed(vldf: pd.DataFrame, outfn: str) -> None: # Make sure no characters in a string can screw up the rest of the table... tsv_trans = str.maketrans("\n\t", " ") with closing(open(outfn, mode="w")) as o: for _, v in vldf.iterrows(): o.write("\t".join([str(x).translate(tsv_trans) for x in v]) + "\n") def write_fusion_beds( fusion_dfs: dict[str, pd.DataFrame], out_fn_template: str = "civic.fusion.{db}.bed", ) -> dict[str, str]: beds = {} for ref, df in fusion_dfs.items(): beds[ref] = out_fn_template.format(db=ref) write_df_to_bed(df, beds[ref]) return beds def transform_assertion_summaries(df: pd.DataFrame, mpdf: pd.DataFrame) -> pd.DataFrame: # Check overlap with molecular profiles assertions_with_mps = df["molecular_profile_id"].isin(mpdf["molecular_profile_id"]) assertions_with_mps_p = assertions_with_mps.mean() * 100 assertions_without_mps = str(list(df["assertion_id"][~assertions_with_mps])) logging.info( f"AssertionSummaries whose molecular_profile_id exists: {assertions_with_mps_p:.2f}%, assertion_ids with missing:{assertions_without_mps}" ) expect( assertions_with_mps.mean() > 0.95, message="At least 95% of Assertions should have an existing MolecularProfile", ) df["disease_html"] = "" + df["disease"] + "" doid = df["doid"].astype("Int64").astype("str") df["disease_link"] = doid.where(doid != "", df["disease"]).where( doid == "", doid + "|" + df["disease"] ) return df def transform_clinical_evidence(df: pd.DataFrame, mpdf: pd.DataFrame): evidence_with_mps = df["molecular_profile_id"].isin(mpdf["molecular_profile_id"]) evidence_with_mps_p = evidence_with_mps.mean() * 100 evidence_without_mps = str(list(df["evidence_id"][~evidence_with_mps])) logging.info( f"ClincialEvidenceSummaries whose molecular_profile_id exists: {evidence_with_mps_p:.2f}%, evidence_ids with missing:{evidence_without_mps}" ) expect( evidence_with_mps.mean() > 0.95, message="At least 95% of Evidence should have an existing MolecularProfile", ) df["disease_html"] = "" + df["disease"] + "" doid = df["doid"].astype("Int64").astype("str") df["disease_link"] = doid.where(doid != "", df["disease"]).where( doid == "", doid + "|" + df["disease"] ) return df def load_dataframes(table_dict: dict[str, str]) -> dict[str, pd.DataFrame]: """Load several dataframes. Input is a dict from name to the source path. Output is a dict from name to a Pandas DataFrame""" return {name: pd.read_csv(path, sep="\t") for name, path in table_dict.items()} def urlretrieve(url, filename): with closing(open(filename, "wb")) as outfile: - with closing(urllib.request.urlopen(url, context=ssl_ctx)) as instream: + with closing(urllib.request.urlopen(url)) as instream: outfile.write(instream.read()) def download_datadir( basedir: str, baseurl: str, dateslug: str, tablelist: list[str], overwrite: bool = True, ) -> dict[str, str]: dlpaths = {} # make directory dldir = os.path.join(basedir, dateslug) os.makedirs(dldir, exist_ok=True) # download files for tbl in tablelist: basefn = f"{dateslug}-{tbl}.tsv" dlpaths[tbl] = os.path.join(dldir, basefn) if overwrite or not os.path.exists(dlpaths[tbl]): url = f"{baseurl}/{dateslug}/{basefn}" logging.info(f"Downloading from {url} to {dlpaths[tbl]}") urlretrieve(url, dlpaths[tbl]) return dlpaths def first_of_month(day: datetime.date) -> datetime.date: return datetime.date(year=day.year, month=day.month, day=1) def get_prior_date_slug(day: datetime.date) -> str: """For a given day, get the prior date slug one month before""" priorday = datetime.timedelta(days=-14) + first_of_month(day) return get_date_slug(priorday) def get_date_slug(day: datetime.date) -> str: """CIViC archive data slug, the first of the month >>> get_date_slug(datetime.date.fromisoformat("2024-03-22")) '01-Mar-2024' """ return first_of_month(day).strftime("%d-%b-%Y") def diff_dfs( dfs: dict[str, pd.DataFrame], prior_dfs: dict[str, pd.DataFrame] ) -> dict[str, dict]: diffs = {} for k in dfs: diffs[k] = { "row_delta": dfs[k].shape[0] - prior_dfs[k].shape[0], "row_delta_percent": 100.0 * (dfs[k].shape[0] - prior_dfs[k].shape[0]) / dfs[k].shape[0], } return diffs def download_and_load_dataframes( dldir: str, baseurl: str, day: datetime.date, overwrite: bool, skip_check_prior: bool, ) -> dict[str, pd.DataFrame]: dl_files = download_datadir( dldir, baseurl, get_date_slug(day), DATA_TABLES, overwrite=overwrite ) dfs = load_dataframes(dl_files) if not skip_check_prior: prior_dl_files = download_datadir( dldir, baseurl, get_prior_date_slug(day), DATA_TABLES, overwrite=False ) prior_dfs = load_dataframes(prior_dl_files) diffs = diff_dfs(dfs, prior_dfs) for name, diff in diffs.items(): d = diff["row_delta_percent"] logging.info(f"table {name} changed by {d:.2f}% rows versus prior") expect(d > MAX_ROW_DECREASE, f"Rows in {name} decreased too much {d:.2f}%") expect(d < MAX_ROW_INCREASE, f"Rows in {name} increased too much {d:.2f}%") return dfs def shell(cmd: str) -> bytes: logging.info(f"Running shell code: {cmd}") return subprocess.check_output(cmd, shell=True) def cli1(argv: list[str]) -> bytes: logging.info(f"Running subprocess command: {argv}") return subprocess.check_output(argv) def liftover_gene_variant_beds( vldf: pd.DataFrame, out_fn_template: str = "civic.gene.{db}.bed", dir: str = "gene_features", ) -> dict[str, str]: """Take a data farme of gene variant loci in bed12+ format, and split out by reference genome into separate bed files on disk. Then use `liftOver` to transfer annotations between each reference, and finally merge into a single bed per reference. Returns a dictionary keyed on reference build (hg19/hg38) to bed12+ file paths that include variants mapped from both genomes. Steps: 1. Split variants into bed12 files based on the annotated genome 2. sort raw variants & liftover variants to other genomes 3. combine and sort variants for each genome """ os.makedirs(dir, exist_ok=True) step1_tmpl = os.path.join(dir, "temp01.unsorted." + out_fn_template) splitfiles = { "GRCh37": open(step1_tmpl.format(db="hg19"), "w"), "GRCh38": open(step1_tmpl.format(db="hg38"), "w"), } tsv_trans = str.maketrans("\n\t", " ") for index, v in vldf.iterrows(): o = splitfiles[str(v["reference_build"])] o.write("\t".join([str(x).translate(tsv_trans) for x in v]) + "\n") for f in splitfiles.values(): f.close() liftover_tmpl = os.path.join(dir, "temp02.from{from_db}." + out_fn_template) for from_db, to_db, chain in LIFTOVER_CHAINS: infile = step1_tmpl.format(db=from_db) outfile = liftover_tmpl.format(from_db=from_db, db=to_db) shell( f"{BED_SORT_CMD} {infile} /dev/stdout| {LIFT_OVER_CMD} -bedPlus=12 -tab /dev/stdin {chain} {outfile} {outfile}.unmapped" ) for from_db, to_db, _ in LIFTOVER_CHAINS: outfile = out_fn_template.format(db=to_db) if1 = step1_tmpl.format(db=to_db) if2 = liftover_tmpl.format(from_db=from_db, db=to_db) shell(f"cat {if1} {if2} | bedSort /dev/stdin {outfile}") return {db: out_fn_template.format(db=db) for db in ["hg19", "hg38"]} def filter_augment_variant_summaries(orig_vs_df: pd.DataFrame) -> pd.DataFrame: logging.info(f"Initial Variants dataframe shape: {orig_vs_df.shape}") orig_vs_df = orig_vs_df.loc[(orig_vs_df.is_flagged == False), :].reset_index( drop=True ) logging.info( f"Dataframe shape of variants after filtering flagged: {orig_vs_df.shape}" ) return orig_vs_df.assign( short_variant_string=extract_variant_name(orig_vs_df), molecular_profile_desc="", assertion_desc="", evidence_desc="", ) def join_set( sep: str, values: set[str], max_length: int = MAX_LONG_BED_FIELD_LENGTH, ellipses: str = "...", ) -> str: """Join strings together, but only up to a maximum length. If the maximum length is exceeded, add the ellipses string at the end. """ ellipses_len = len(sep) + len(ellipses) max_length -= ellipses_len r = "" loop_sep = "" # after first iteration switch to `sep` for v in values: if len(r) + len(v) > max_length: return r + loop_sep + ellipses r += loop_sep + v loop_sep = sep return r def add_variant_diseases_therapies( assertion_df: pd.DataFrame, evidence_df: pd.DataFrame, variant_df: pd.DataFrame, mid2vids: dict[int, list[int]], ) -> pd.DataFrame: vid2diseases: dict[int, set[str]] = defaultdict(set) vid2dis_html: dict[int, set[str]] = defaultdict(set) vid2therapies: dict[int, set[str]] = defaultdict(set) missing_mps = [] columns = ["molecular_profile_id", "disease_link", "disease_html", "therapies"] # Gather all the associations from the Assertion table for _, mpid, disease, disease_html, therapies in assertion_df[columns].itertuples(): # tests for self-equality will remove NaNs tset = set(therapies.split(",")) if therapies == therapies else set() dset = set([disease]) if disease == disease else set() dhset = set([disease_html]) if disease_html == disease_html else set() if mpid in mid2vids: for vid in mid2vids[mpid]: vid2diseases[vid] |= dset vid2dis_html[vid] |= dhset vid2therapies[vid] |= tset else: missing_mps.append(mpid) # Gather all the associations from the Evidence table for _, mpid, disease, disease_html, therapies in evidence_df[columns].itertuples(): tset = set(therapies.split(",")) if therapies == therapies else set() dset = set([disease]) if disease == disease else set() dhset = set([disease_html]) if disease_html == disease_html else set() if mpid in mid2vids: for vid in mid2vids[mpid]: vid2diseases[vid] |= dset vid2dis_html[vid] |= dhset vid2therapies[vid] |= tset else: missing_mps.append(mpid) variant_id = [] dhtml_col = [] dlink_col = [] therapy_col = [] for vid in set(vid2diseases.keys()) | set(vid2therapies.keys()): variant_id.append(vid) dhtml_col.append(join_set(", ", vid2dis_html.get(vid, set()))) dlink_col.append(join_set(",", vid2diseases.get(vid, set()), ellipses="|...")) therapy_col.append(join_set(", ", vid2therapies.get(vid, set()))) dt_df = pd.DataFrame( { "variant_id": variant_id, "disease_html": dhtml_col, "disease_links": dlink_col, "therapies": therapy_col, } ).set_index("variant_id") return variant_df.set_index("variant_id").join(dt_df).reset_index() def transform_dfs(dfs: dict[str, pd.DataFrame]) -> dict[str, str]: """From a dictionary of data frames for VariantSummaries, MolecularProfileSummaries, AssertionSummaries, and ClinicalEvidenceSummaries, create a unifed bigBed file for each reference genome. Returns a dictionary from reference genome slug (hg19/hg38) to the file path of the bidBed files. """ adf = transform_assertion_summaries( dfs["AssertionSummaries"], dfs["MolecularProfileSummaries"] ) edf = transform_clinical_evidence( dfs["ClinicalEvidenceSummaries"], dfs["MolecularProfileSummaries"] ) mps = dfs["MolecularProfileSummaries"] mid2vids = split_one_to_n_map(mps, "molecular_profile_id", "variant_ids") filtered_variant = filter_augment_variant_summaries(dfs["VariantSummaries"]) filtered_variant = add_variant_diseases_therapies( adf, edf, filtered_variant, mid2vids ) gene_variant_df = transform_gene_variant_summaries(filtered_variant) gene_beds = liftover_gene_variant_beds(gene_variant_df) fusion_variant_dfs = { ref: transform_fusion_variants(filtered_variant, fn) for ref, fn in GENCODE_UCSC_FN.items() } fusion_beds = write_fusion_beds(fusion_variant_dfs) finalBed_tmpl = "civic.{ref}.bed" finalBigBed_tmpl = "civic.{ref}.bb" outbb = {} for ref in gene_beds: gbed = gene_beds[ref] fbed = fusion_beds[ref] outbed = finalBed_tmpl.format(ref=ref) outbb[ref] = finalBigBed_tmpl.format(ref=ref) shell(f"cat {gbed} {fbed} | bedSort /dev/stdin {outbed}") shell( f"{BED_TO_BIG_BED_CMD} -tab -type=bed12+5 -as=civic.as {outbed} {CHROM_SIZES[ref]} {outbb[ref]}" ) return outbb def main( day: datetime.date, dldir: str, overwrite: bool, skip_check_prior: bool ) -> None: logging.info(f"Starting civicToBed for {get_date_slug(day)}") dfs = download_and_load_dataframes( dldir, DOWNLOAD_BASE_URL, day, overwrite, skip_check_prior ) transform_dfs(dfs) if __name__ == "__main__": import argparse ap = argparse.ArgumentParser(description=__doc__) ap.add_argument( "--test", help="do not run normally, instead execute unit tests", action="store_true", ) ap.add_argument( "--day", help="which archive to download, in ISO format like 2024-11-28. Default: today()", default=datetime.date.today(), type=datetime.date.fromisoformat, ) ap.add_argument( "--skip-check-prior", action="store_true", help="Don't look for the prior month to compare data sizes", ) ap.add_argument( "--download-dir", default=DOWNLOAD_DIR, help=f"base of downloads; each date gets a separate subdirectory. default: {DOWNLOAD_DIR}", ) ap.add_argument( "--overwrite", action="store_true", help=f"overwrite download of latest archive, if present", ) argv = ap.parse_args() import sys print(sys.path) if argv.test: import doctest doctest.testmod() else: main(argv.day, argv.download_dir, argv.overwrite, argv.skip_check_prior)