cb31fb5a2ebbbb167b7f07c19f3af9e3e724068c
cvaske
Tue Jan 7 13:48:52 2025 -0800
civic: remove certifi package
hgwdev-new no longer needs the PyPI certificate bundle
diff --git src/hg/utils/otto/civic/civicToBed.py src/hg/utils/otto/civic/civicToBed.py
index b5cee02..40c1453 100644
--- src/hg/utils/otto/civic/civicToBed.py
+++ src/hg/utils/otto/civic/civicToBed.py
@@ -1,975 +1,969 @@
# /// script
# requires-python = ">=3.12"
# dependencies = [
# "pandas",
-# "certifi",
# ]
# [tool.uv]
# exclude-newer = "2024-11-20T00:00:00Z"
# ///
"""Download CIViC DB files and convert into bigBed12 tracks
This script is meant to be run with `uv run civicToBed.py` which
will create a reproducible Python environment to run.
EXTERNAL DEPENDENCIES: this script depends on several kent binaries,
as well as data files. See the sections below with comment titles
"External Shell Command Dependencies" and "Local Data File
Dependencies"
This script compares against the prior month of data and checks to see
that there's not a drop of more than 10% of records, or and addition
of greater than 100%.
The primary database entry that has a genomic location is a Variant,
which is in the VariantSummaries table.
There are three types of "features" in CIViC in the VariantSummaries
table:
1. Gene features - point mutations, insertions, deletions,
amplification, overexpression, etc. Results in a single BED entry
2. Fusion features - a structural variant creating a gene
fusion. Results in two BED entries, one for each fusion
partner. Each BED entry will consist of a thick part for the fused
exon, and a thin part for the intron where the fusion takes place.
The website shows genomic locations for the exon of interest in
both hg38 and hg19, but these coordinates do not make it to the
downloadable TSVs. Instead, an ENST identifier plus an exon index
are used. There are a few transcript identifiers that are not
located in our GENCODE transcripts, unfortunately.
3. Factor features - (Ignored) complex cellular states without a
genomic locus, such as kataegsis or a methylation signature.
Additionally, there several other tables of interest that add
disease and therapy information to each variant:
- MolecularProfile - a summary of a set (possibly a singleton) of
various variants that go together; example 1: BCR-ABL gene fusions
with a point mutation in ABL1 p.T315I, example 2: KRAS G12 mutation
when there's not a methylation signature. This is the logical
structure that actually gets assigned evidence from
ClinicalEvidenceSummaries or AssertionSummaries
- ClinicalEvidenceSummaries - a statement made in
papers/abstracts/trials that connects a molecular profile to a
disease or prognosis or therapy.
- AssertionSummary - an about a molecular profile that's an editor's
summary of mulitple bits of clinical evidence.
"""
from collections import defaultdict
from contextlib import closing
from copy import deepcopy
import dataclasses
import datetime
import logging
import os
import ssl
import subprocess
from typing import Callable, Final, Generator, Sequence
import urllib.request
-import certifi
import numpy as np
import pandas as pd
##
## External Shell Command Dependencies
##
BED_TO_BIG_BED_CMD: Final = "bedToBigBed"
LIFT_OVER_CMD: Final = "liftOver"
BED_SORT_CMD: Final = "bedSort"
##
## Local Data File Dependencies
##
LIFTOVER_CHAINS: Final = [
["hg19", "hg38", "/hive/data/gbdb/hg19/liftOver/hg19ToHg38.over.chain.gz"],
["hg38", "hg19", "/hive/data/gbdb/hg38/liftOver/hg38ToHg19.over.chain.gz"],
]
GENCODE_UCSC_FN: Final = {
"hg38": "/hive/data/genomes/hg38/bed/gencodeV47/build/ucscGenes.bed",
"hg19": "/hive/data/genomes/hg19/bed/gencodeV47lift37/build/ucscGenes.bed",
}
CHROM_SIZES: Final = {
"hg38": "/hive/data/genomes/hg38/chrom.sizes",
"hg19": "/hive/data/genomes/hg19/chrom.sizes",
}
-## hgwdev has trouble with SSL without using certifi, so this ensures
-## that certificates from PyPI are used
-ssl_ctx = ssl.create_default_context(cafile=certifi.where())
-
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
DOWNLOAD_BASE_URL: Final = "https://civicdb.org/downloads"
DATA_TABLES: Final = [
"MolecularProfileSummaries",
"VariantSummaries",
"ClinicalEvidenceSummaries",
"AssertionSummaries",
]
## Maximum lengeth of a string (e.g. insAGCATGACCAG...) before
## being truncated and appended with an ellipsis
MAX_VARIANT_LENGTH: Final = 20
MAX_LONG_BED_FIELD_LENGTH: Final = 5000
## Default color for items in output bed
DEFAULT_BED_VARIANT_COLOR: Final = "0"
DOWNLOAD_DIR: Final = "downloads"
# these are in percentages
MAX_ROW_INCREASE: Final = 100.0
MAX_ROW_DECREASE: Final = -10.0
# Minimum fraction of ENSEMBL transcripts found for fusions for success
MIN_ENSEMBL_TX = 0.90
##
## Not currently using this, but is the start of detecting variants
## without genomic coordinates...
##
AMINO_ACIDS: Final = "ARNDCEQGHILKMFPSTWYVX"
HGVS_STOP: Final = "fs|*"
SIMPLE_HGVS_REGEX = f"[{AMINO_ACIDS}][0-9][0-9]*[{AMINO_ACIDS}]"
def expect(condition: bool, message: str):
if not condition:
logging.error(f"Expectation failed: {message}")
assert condition
def transform_gene_variant_summaries_to_loci_2024_10(df: pd.DataFrame):
"""DEPRECATED---This transformation only works on
VariantSummary.txt up until Oct 2024 and is included only for
historical purposes
Stages of processing:
1. filter out flagged variants
2. filter out variants without a reference build
3. Independently for locus1 and locus2, if one of chromosome,
start, or stop is missing for the locus, make all three values missing
4. Convert each variant into upto 2 different VarianLocus entries,
using the variant id as the base, and adding `_1` and `_2` for
for each locus used in the variant
"""
logging.info(f"Initial Variants dataframe shape: {df.shape}")
## filter 1
df = df.loc[(df.is_flagged == False), :]
logging.info(f"Dataframe shape of variants after filtering flagged: {df.shape}")
## filter 2
df = df.loc[~df.reference_build.isnull(), :]
logging.info(f"Dataframe shape of variants after missing ref genome: {df.shape}")
## filter 3
locus1_cols = ["chromosome", "start", "stop"]
locus2_cols = [x + "2" for x in locus1_cols]
print(df.keys())
locus1_null = df.loc[:, locus1_cols].isnull().sum(1) > 0
locus2_null = df.loc[:, locus2_cols].isnull().sum(1) > 0
# Clear out partially entered genomic locations
df.loc[locus1_null, locus1_cols] = np.nan
df.loc[locus2_null, locus2_cols] = np.nan
assert ((df[locus2_cols].isnull().sum(1) > 0) == locus2_null).all()
assert ((df[locus1_cols].isnull().sum(1) > 0) == locus1_null).all()
keep_cols = [
"variant_id",
"gene",
"variant",
"reference_bases",
"variant_bases",
"reference_build",
]
vldf1 = df.loc[~locus1_null, keep_cols + locus1_cols]
vldf2 = df.loc[~locus2_null, keep_cols + locus2_cols]
vldf2.rename(columns=dict(zip(vldf2.keys(), vldf1.keys())), inplace=True)
vldf1["chromosome"] = "chr" + vldf1.chromosome.astype("string")
vldf2["chromosome"] = "chr" + vldf2.chromosome.astype("string")
vldf1["variant_locus_id"] = vldf1.variant_id.astype("string") + "_1"
vldf2["variant_locus_id"] = vldf2.variant_id.astype("string") + "_2"
vldf = pd.concat((vldf1, vldf2))
logging.info(
f"Variants with a locus1 specified (1172 in Oct2024): {vldf1.shape[0]}"
)
logging.info(f"Variants with a locus2 specified (4 in Oct2024): {vldf2.shape[0]}")
logging.info(
f"Variants with both locus1 & locus2 (2 in Oct2024): {sum(~(locus1_null | locus2_null))}"
)
expect(
set(vldf.reference_build) == set(["GRCh37", "GRCh38"]),
message="Only two reference builds allowed, 'GRCh37', 'GRCh38'",
)
return vldf
def annotate_bed12(bed12df: pd.DataFrame, sourcedf: pd.DataFrame) -> pd.DataFrame:
"""Given a bed12 dataframe, and a fuller variant summary dataframe
where the rows match up, transfer annotations from the variant
summaries to make bed12+
This is used both on the gene variant bed12 and on fusion variant
bed12
"""
assert bed12df.shape[0] == sourcedf.shape[0]
cvids = sourcedf["clinvar_ids"].fillna("").replace("^NONE FOUND$", "", regex=True)
expect(
cvids.str.match("[^0-9]").sum() == 0,
"At least one clinvar_ids entry contains a non-numeric digit, other than 'NONE "
"FOUND'. The parsing code must be updated to handle a non-numeric value",
)
return (
bed12df.assign(
variant_link=(
sourcedf.variant_id.astype("str")
+ "|"
+ sourcedf.feature_name
+ " "
+ sourcedf.variant
)
)
.assign(reference_build=sourcedf["reference_build"])
.assign(allele_registry_id=sourcedf["allele_registry_id"].fillna(""))
.assign(clinvar_id=cvids)
.assign(last_review_date=sourcedf["last_review_date"].fillna(""))
.assign(disease_link=sourcedf["disease_links"])
.assign(therapies=sourcedf["therapies"])
.assign(
mouseOverHTML="Associated therapies: "
+ sourcedf["therapies"].str.replace(",", ", ")
+ "
Associated diseases: "
+ sourcedf["disease_html"]
)
)
## These are includde sometimes for debugging purposes...
# "variant_id": gdf.variant_id,
# "reference_bases": gdf.reference_bases,
# "variant_bases": gdf.variant_bases,
def extract_variant_name(gdf: pd.DataFrame) -> pd.Series:
"""
Make a name for a variant to show in the browser, summarizing
the genomic change (sequence change, fusion partners, expression,
etc.)
"""
# this got a bit ugly, but functional if statements be like that
full_variant_string = pd.Series(
np.where(
gdf.feature_type == "Fusion",
gdf.feature_name, # fusions have very descriptive names.
np.where( # these are the gene type variants
gdf.reference_bases.isnull(),
np.where( # empty variant bases --> variant descrption or an insertion
gdf.variant_bases.isnull(),
gdf.gene + " " + gdf.variant,
gdf.gene + " ins" + gdf.variant_bases,
),
np.where( # specified variant bases --> deleteion or variant
gdf.variant_bases.isnull(),
gdf.gene + " " + "del" + gdf.reference_bases,
gdf.gene + " " + gdf.reference_bases + ">" + gdf.variant_bases,
),
),
)
)
return pd.Series(
np.where(
full_variant_string.str.len() > MAX_VARIANT_LENGTH,
full_variant_string.str.slice(start=0, stop=MAX_VARIANT_LENGTH - 3) + "...",
full_variant_string,
)
)
def transform_gene_variant_summaries(df: pd.DataFrame) -> pd.DataFrame:
"""Convert the gene variants from a cleaned/augmented
VariantSummaries into bed12+
With the November 2024 release, the second locus columns were removed.
Fusion genes instead refer to exon locations in Ensembl transcripts.
Stages of processing:
1. filter out variants without a reference build
2. filter out partially entered locus info (missing chrom, start, or end)
3. construct a bed12+ DataFrame
"""
## filter 1
df = df.loc[~df.reference_build.isnull(), :]
logging.info(f"Dataframe shape of variants after missing ref genome: {df.shape}")
## filter 2
locus1_cols = ["chromosome", "start", "stop"]
locus1_null = df.loc[:, locus1_cols].isnull().sum(1) > 0
# Clear out partially entered genomic locations
df.loc[locus1_null, locus1_cols] = np.nan
assert ((df[locus1_cols].isnull().sum(1) > 0) == locus1_null).all()
gdf = df.loc[~locus1_null, :].reset_index(drop=True)
bed_dict = {
"chrom": "chr" + gdf.chromosome.astype("string"),
"start": -1 + gdf.start.astype("Int64"),
"end": gdf.stop.astype("Int64"),
"name": gdf.short_variant_string,
"score": 1,
"strand": ".",
"thickStart": 0, # gdf.start.astype("Int64"),
"thickEnd": 0, # gdf.stop.astype("Int64"),
"itemRgb": DEFAULT_BED_VARIANT_COLOR,
"blockCount": 1,
"blockSizes": gdf.stop.astype("Int64") - gdf.start.astype("Int64") + 1,
"blockStarts": 0, # gdf.start.astype("Int64"),
}
vldf = annotate_bed12(pd.DataFrame(bed_dict), gdf)
col_has_null = vldf.isnull().sum() > 0
col_has_null = list(col_has_null[col_has_null].index.values)
expect(
not col_has_null,
f"Found null entries in variant locus bed columns: {col_has_null}",
)
logging.info(f"Variants with a locus specified (1172 in Oct2024): {vldf.shape[0]}")
expected_references = set(["GRCh37", "GRCh38"])
unexpected_references = set(vldf.reference_build) - expected_references
expect(
len(unexpected_references) == 0,
message=f"Only know about genome references {expected_references}, but found {unexpected_references}",
)
return vldf
def split_one_to_n_map(
df: pd.DataFrame, one_col: str, n_col: str, sep: str = ", ", value_transform=int
):
d = {}
for k, val_list in zip(df[one_col], df[n_col]):
if k != k or val_list != val_list: ## skip missing values
continue
d[k] = [value_transform(x) for x in val_list.split(sep)]
return d
def parse_bed_int_list(l: str) -> list[int]:
return [int(x) for x in l.rstrip(",").split(",")]
@dataclasses.dataclass
class Bed:
chrom: str
chromStart: int
chromEnd: int
name: str
score: int
strand: str
thickStart: int
thickEnd: int #
itemRgb: str
blockCount: int
blockSizes: list[int]
blockStarts: list[int]
def as_str_dict(self) -> dict[str, str]:
def c(field, val) -> str:
if field.type == list[int]:
return ",".join([str(x) for x in val]) + ","
if field.type == int:
return str(val)
return val
return {
f.name: c(f, self.__getattribute__(f.name)) for f in dataclasses.fields(Bed)
}
@classmethod
def from_line(cls, line: str) -> "Bed":
fields = dataclasses.fields(Bed)
v = line.rstrip("\n").split("\t")
assert len(fields) == len(v)
return Bed(
v[0],
int(v[1]),
int(v[2]),
v[3],
int(v[4]),
v[5],
int(v[6]),
int(v[7]),
v[8],
int(v[9]),
parse_bed_int_list(v[10]),
parse_bed_int_list(v[11]),
)
def read_bed12(fn: str) -> Generator[Bed, None, None]:
with closing(open(fn)) as f:
for line in f:
yield Bed.from_line(line)
def gene_bed_to_exon(
bed: Bed, index1: int, upstream_intron: bool, downstream_intron: bool, name: str
) -> Bed:
"""Given a gene Bed, pull out a particular exon indexed from the
5' side with a 1-based index, taking into account
strand. Optionally include introns as thin parts.
"""
index0 = int(index1) - 1
if bed.strand == "-":
upstream_intron, downstream_intron = downstream_intron, upstream_intron
index0 = int(bed.blockCount) - 1 - index0
out = deepcopy(bed)
## Get the exon boundaries into the thick part
out.name = name
out.thickStart = int(bed.chromStart) + bed.blockStarts[index0]
out.thickEnd = out.thickStart + bed.blockSizes[index0]
## Set the thin part, optionally including introns
up_idx = index0 - 1
if upstream_intron and up_idx >= 0:
out.chromStart += bed.blockStarts[up_idx] + bed.blockSizes[up_idx]
else:
out.chromStart = out.thickStart
down_idx = index0 + 1
if downstream_intron and down_idx < bed.blockCount:
out.chromEnd = bed.chromStart + bed.blockStarts[down_idx]
else:
out.chromEnd = out.thickEnd
out.blockCount = 1
out.blockSizes = [out.chromEnd - out.chromStart]
out.blockStarts = [0]
return out
def transform_fusion_variants(df: pd.DataFrame, ensembl_bed_fn: str) -> pd.DataFrame:
"""Convert the fusion type variants from VariantSummaries into a bed12+ data frame"""
def remove_ens_version(x: str) -> str:
return x.rpartition(".")[0] if x == x else ""
fdf = df.loc[df["feature_type"] == "Fusion"].reset_index(drop=True)
fdf = fdf.assign(
upstream_tx=[remove_ens_version(x) for x in fdf["5_prime_transcript"]],
upstream_exon=fdf["5_prime_end_exon"],
downstream_tx=[remove_ens_version(x) for x in fdf["3_prime_transcript"]],
downstream_exon=fdf["3_prime_start_exon"],
reference_build="GENCODE",
)
eset = set(fdf.upstream_tx) | set(fdf.downstream_tx)
ens_transcript_names = set([x for x in eset if x]) # remove "" from null transcript
logging.info(f"Reading BED `{ensembl_bed_fn}` to get Ensembl transcripts")
ens2bed = {}
for b in read_bed12(ensembl_bed_fn):
b.name = remove_ens_version(str(b.name))
if b.name in ens_transcript_names:
ens2bed[b.name] = b
logging.info(
f"Found {len(ens2bed)} transcripts from {len(ens_transcript_names)} specified in CIViC"
)
expect(
float(len(ens2bed)) / len(ens_transcript_names) > MIN_ENSEMBL_TX,
"Not enough matches in Ensembl transcripts, parser or input fix needed",
)
valid_upstream = fdf.upstream_tx.isin(ens2bed) & ~fdf.upstream_exon.isnull()
upstream_variants = fdf.loc[valid_upstream, :].reset_index(drop=True)
upstream_bed12 = []
for r in upstream_variants.itertuples():
upstream_bed12.append(
gene_bed_to_exon(
ens2bed[r.upstream_tx],
r.upstream_exon,
False,
True,
r.short_variant_string,
).as_str_dict()
)
upstream_vldf = annotate_bed12(pd.DataFrame(upstream_bed12), upstream_variants)
valid_downstream = fdf.downstream_tx.isin(ens2bed) & ~fdf.downstream_exon.isnull()
downstream_variants = fdf.loc[valid_downstream, :].reset_index(drop=True)
downstream_bed12 = []
for r in downstream_variants.itertuples():
downstream_bed12.append(
gene_bed_to_exon(
ens2bed[r.downstream_tx],
r.downstream_exon,
True,
False,
r.short_variant_string,
).as_str_dict()
)
downstream_vldf = annotate_bed12(
pd.DataFrame(downstream_bed12), downstream_variants
)
return pd.concat([upstream_vldf, downstream_vldf])
def write_df_to_bed(vldf: pd.DataFrame, outfn: str) -> None:
# Make sure no characters in a string can screw up the rest of the table...
tsv_trans = str.maketrans("\n\t", " ")
with closing(open(outfn, mode="w")) as o:
for _, v in vldf.iterrows():
o.write("\t".join([str(x).translate(tsv_trans) for x in v]) + "\n")
def write_fusion_beds(
fusion_dfs: dict[str, pd.DataFrame],
out_fn_template: str = "civic.fusion.{db}.bed",
) -> dict[str, str]:
beds = {}
for ref, df in fusion_dfs.items():
beds[ref] = out_fn_template.format(db=ref)
write_df_to_bed(df, beds[ref])
return beds
def transform_assertion_summaries(df: pd.DataFrame, mpdf: pd.DataFrame) -> pd.DataFrame:
# Check overlap with molecular profiles
assertions_with_mps = df["molecular_profile_id"].isin(mpdf["molecular_profile_id"])
assertions_with_mps_p = assertions_with_mps.mean() * 100
assertions_without_mps = str(list(df["assertion_id"][~assertions_with_mps]))
logging.info(
f"AssertionSummaries whose molecular_profile_id exists: {assertions_with_mps_p:.2f}%, assertion_ids with missing:{assertions_without_mps}"
)
expect(
assertions_with_mps.mean() > 0.95,
message="At least 95% of Assertions should have an existing MolecularProfile",
)
df["disease_html"] = "" + df["disease"] + ""
doid = df["doid"].astype("Int64").astype("str")
df["disease_link"] = doid.where(doid != "", df["disease"]).where(
doid == "", doid + "|" + df["disease"]
)
return df
def transform_clinical_evidence(df: pd.DataFrame, mpdf: pd.DataFrame):
evidence_with_mps = df["molecular_profile_id"].isin(mpdf["molecular_profile_id"])
evidence_with_mps_p = evidence_with_mps.mean() * 100
evidence_without_mps = str(list(df["evidence_id"][~evidence_with_mps]))
logging.info(
f"ClincialEvidenceSummaries whose molecular_profile_id exists: {evidence_with_mps_p:.2f}%, evidence_ids with missing:{evidence_without_mps}"
)
expect(
evidence_with_mps.mean() > 0.95,
message="At least 95% of Evidence should have an existing MolecularProfile",
)
df["disease_html"] = "" + df["disease"] + ""
doid = df["doid"].astype("Int64").astype("str")
df["disease_link"] = doid.where(doid != "", df["disease"]).where(
doid == "", doid + "|" + df["disease"]
)
return df
def load_dataframes(table_dict: dict[str, str]) -> dict[str, pd.DataFrame]:
"""Load several dataframes.
Input is a dict from name to the source path.
Output is a dict from name to a Pandas DataFrame"""
return {name: pd.read_csv(path, sep="\t") for name, path in table_dict.items()}
def urlretrieve(url, filename):
with closing(open(filename, "wb")) as outfile:
- with closing(urllib.request.urlopen(url, context=ssl_ctx)) as instream:
+ with closing(urllib.request.urlopen(url)) as instream:
outfile.write(instream.read())
def download_datadir(
basedir: str,
baseurl: str,
dateslug: str,
tablelist: list[str],
overwrite: bool = True,
) -> dict[str, str]:
dlpaths = {}
# make directory
dldir = os.path.join(basedir, dateslug)
os.makedirs(dldir, exist_ok=True)
# download files
for tbl in tablelist:
basefn = f"{dateslug}-{tbl}.tsv"
dlpaths[tbl] = os.path.join(dldir, basefn)
if overwrite or not os.path.exists(dlpaths[tbl]):
url = f"{baseurl}/{dateslug}/{basefn}"
logging.info(f"Downloading from {url} to {dlpaths[tbl]}")
urlretrieve(url, dlpaths[tbl])
return dlpaths
def first_of_month(day: datetime.date) -> datetime.date:
return datetime.date(year=day.year, month=day.month, day=1)
def get_prior_date_slug(day: datetime.date) -> str:
"""For a given day, get the prior date slug one month before"""
priorday = datetime.timedelta(days=-14) + first_of_month(day)
return get_date_slug(priorday)
def get_date_slug(day: datetime.date) -> str:
"""CIViC archive data slug, the first of the month
>>> get_date_slug(datetime.date.fromisoformat("2024-03-22"))
'01-Mar-2024'
"""
return first_of_month(day).strftime("%d-%b-%Y")
def diff_dfs(
dfs: dict[str, pd.DataFrame], prior_dfs: dict[str, pd.DataFrame]
) -> dict[str, dict]:
diffs = {}
for k in dfs:
diffs[k] = {
"row_delta": dfs[k].shape[0] - prior_dfs[k].shape[0],
"row_delta_percent": 100.0
* (dfs[k].shape[0] - prior_dfs[k].shape[0])
/ dfs[k].shape[0],
}
return diffs
def download_and_load_dataframes(
dldir: str,
baseurl: str,
day: datetime.date,
overwrite: bool,
skip_check_prior: bool,
) -> dict[str, pd.DataFrame]:
dl_files = download_datadir(
dldir, baseurl, get_date_slug(day), DATA_TABLES, overwrite=overwrite
)
dfs = load_dataframes(dl_files)
if not skip_check_prior:
prior_dl_files = download_datadir(
dldir, baseurl, get_prior_date_slug(day), DATA_TABLES, overwrite=False
)
prior_dfs = load_dataframes(prior_dl_files)
diffs = diff_dfs(dfs, prior_dfs)
for name, diff in diffs.items():
d = diff["row_delta_percent"]
logging.info(f"table {name} changed by {d:.2f}% rows versus prior")
expect(d > MAX_ROW_DECREASE, f"Rows in {name} decreased too much {d:.2f}%")
expect(d < MAX_ROW_INCREASE, f"Rows in {name} increased too much {d:.2f}%")
return dfs
def shell(cmd: str) -> bytes:
logging.info(f"Running shell code: {cmd}")
return subprocess.check_output(cmd, shell=True)
def cli1(argv: list[str]) -> bytes:
logging.info(f"Running subprocess command: {argv}")
return subprocess.check_output(argv)
def liftover_gene_variant_beds(
vldf: pd.DataFrame,
out_fn_template: str = "civic.gene.{db}.bed",
dir: str = "gene_features",
) -> dict[str, str]:
"""Take a data farme of gene variant loci in bed12+ format, and
split out by reference genome into separate bed files on
disk. Then use `liftOver` to transfer annotations between each
reference, and finally merge into a single bed per reference.
Returns a dictionary keyed on reference build (hg19/hg38) to
bed12+ file paths that include variants mapped from both genomes.
Steps:
1. Split variants into bed12 files based on the annotated genome
2. sort raw variants & liftover variants to other genomes
3. combine and sort variants for each genome
"""
os.makedirs(dir, exist_ok=True)
step1_tmpl = os.path.join(dir, "temp01.unsorted." + out_fn_template)
splitfiles = {
"GRCh37": open(step1_tmpl.format(db="hg19"), "w"),
"GRCh38": open(step1_tmpl.format(db="hg38"), "w"),
}
tsv_trans = str.maketrans("\n\t", " ")
for index, v in vldf.iterrows():
o = splitfiles[str(v["reference_build"])]
o.write("\t".join([str(x).translate(tsv_trans) for x in v]) + "\n")
for f in splitfiles.values():
f.close()
liftover_tmpl = os.path.join(dir, "temp02.from{from_db}." + out_fn_template)
for from_db, to_db, chain in LIFTOVER_CHAINS:
infile = step1_tmpl.format(db=from_db)
outfile = liftover_tmpl.format(from_db=from_db, db=to_db)
shell(
f"{BED_SORT_CMD} {infile} /dev/stdout| {LIFT_OVER_CMD} -bedPlus=12 -tab /dev/stdin {chain} {outfile} {outfile}.unmapped"
)
for from_db, to_db, _ in LIFTOVER_CHAINS:
outfile = out_fn_template.format(db=to_db)
if1 = step1_tmpl.format(db=to_db)
if2 = liftover_tmpl.format(from_db=from_db, db=to_db)
shell(f"cat {if1} {if2} | bedSort /dev/stdin {outfile}")
return {db: out_fn_template.format(db=db) for db in ["hg19", "hg38"]}
def filter_augment_variant_summaries(orig_vs_df: pd.DataFrame) -> pd.DataFrame:
logging.info(f"Initial Variants dataframe shape: {orig_vs_df.shape}")
orig_vs_df = orig_vs_df.loc[(orig_vs_df.is_flagged == False), :].reset_index(
drop=True
)
logging.info(
f"Dataframe shape of variants after filtering flagged: {orig_vs_df.shape}"
)
return orig_vs_df.assign(
short_variant_string=extract_variant_name(orig_vs_df),
molecular_profile_desc="",
assertion_desc="",
evidence_desc="",
)
def join_set(
sep: str,
values: set[str],
max_length: int = MAX_LONG_BED_FIELD_LENGTH,
ellipses: str = "...",
) -> str:
"""Join strings together, but only up to a maximum length. If the
maximum length is exceeded, add the ellipses string at the end.
"""
ellipses_len = len(sep) + len(ellipses)
max_length -= ellipses_len
r = ""
loop_sep = "" # after first iteration switch to `sep`
for v in values:
if len(r) + len(v) > max_length:
return r + loop_sep + ellipses
r += loop_sep + v
loop_sep = sep
return r
def add_variant_diseases_therapies(
assertion_df: pd.DataFrame,
evidence_df: pd.DataFrame,
variant_df: pd.DataFrame,
mid2vids: dict[int, list[int]],
) -> pd.DataFrame:
vid2diseases: dict[int, set[str]] = defaultdict(set)
vid2dis_html: dict[int, set[str]] = defaultdict(set)
vid2therapies: dict[int, set[str]] = defaultdict(set)
missing_mps = []
columns = ["molecular_profile_id", "disease_link", "disease_html", "therapies"]
# Gather all the associations from the Assertion table
for _, mpid, disease, disease_html, therapies in assertion_df[columns].itertuples():
# tests for self-equality will remove NaNs
tset = set(therapies.split(",")) if therapies == therapies else set()
dset = set([disease]) if disease == disease else set()
dhset = set([disease_html]) if disease_html == disease_html else set()
if mpid in mid2vids:
for vid in mid2vids[mpid]:
vid2diseases[vid] |= dset
vid2dis_html[vid] |= dhset
vid2therapies[vid] |= tset
else:
missing_mps.append(mpid)
# Gather all the associations from the Evidence table
for _, mpid, disease, disease_html, therapies in evidence_df[columns].itertuples():
tset = set(therapies.split(",")) if therapies == therapies else set()
dset = set([disease]) if disease == disease else set()
dhset = set([disease_html]) if disease_html == disease_html else set()
if mpid in mid2vids:
for vid in mid2vids[mpid]:
vid2diseases[vid] |= dset
vid2dis_html[vid] |= dhset
vid2therapies[vid] |= tset
else:
missing_mps.append(mpid)
variant_id = []
dhtml_col = []
dlink_col = []
therapy_col = []
for vid in set(vid2diseases.keys()) | set(vid2therapies.keys()):
variant_id.append(vid)
dhtml_col.append(join_set(", ", vid2dis_html.get(vid, set())))
dlink_col.append(join_set(",", vid2diseases.get(vid, set()), ellipses="|..."))
therapy_col.append(join_set(", ", vid2therapies.get(vid, set())))
dt_df = pd.DataFrame(
{
"variant_id": variant_id,
"disease_html": dhtml_col,
"disease_links": dlink_col,
"therapies": therapy_col,
}
).set_index("variant_id")
return variant_df.set_index("variant_id").join(dt_df).reset_index()
def transform_dfs(dfs: dict[str, pd.DataFrame]) -> dict[str, str]:
"""From a dictionary of data frames for VariantSummaries,
MolecularProfileSummaries, AssertionSummaries, and ClinicalEvidenceSummaries,
create a unifed bigBed file for each reference genome.
Returns a dictionary from reference genome slug (hg19/hg38) to the
file path of the bidBed files.
"""
adf = transform_assertion_summaries(
dfs["AssertionSummaries"], dfs["MolecularProfileSummaries"]
)
edf = transform_clinical_evidence(
dfs["ClinicalEvidenceSummaries"], dfs["MolecularProfileSummaries"]
)
mps = dfs["MolecularProfileSummaries"]
mid2vids = split_one_to_n_map(mps, "molecular_profile_id", "variant_ids")
filtered_variant = filter_augment_variant_summaries(dfs["VariantSummaries"])
filtered_variant = add_variant_diseases_therapies(
adf, edf, filtered_variant, mid2vids
)
gene_variant_df = transform_gene_variant_summaries(filtered_variant)
gene_beds = liftover_gene_variant_beds(gene_variant_df)
fusion_variant_dfs = {
ref: transform_fusion_variants(filtered_variant, fn)
for ref, fn in GENCODE_UCSC_FN.items()
}
fusion_beds = write_fusion_beds(fusion_variant_dfs)
finalBed_tmpl = "civic.{ref}.bed"
finalBigBed_tmpl = "civic.{ref}.bb"
outbb = {}
for ref in gene_beds:
gbed = gene_beds[ref]
fbed = fusion_beds[ref]
outbed = finalBed_tmpl.format(ref=ref)
outbb[ref] = finalBigBed_tmpl.format(ref=ref)
shell(f"cat {gbed} {fbed} | bedSort /dev/stdin {outbed}")
shell(
f"{BED_TO_BIG_BED_CMD} -tab -type=bed12+5 -as=civic.as {outbed} {CHROM_SIZES[ref]} {outbb[ref]}"
)
return outbb
def main(
day: datetime.date, dldir: str, overwrite: bool, skip_check_prior: bool
) -> None:
logging.info(f"Starting civicToBed for {get_date_slug(day)}")
dfs = download_and_load_dataframes(
dldir, DOWNLOAD_BASE_URL, day, overwrite, skip_check_prior
)
transform_dfs(dfs)
if __name__ == "__main__":
import argparse
ap = argparse.ArgumentParser(description=__doc__)
ap.add_argument(
"--test",
help="do not run normally, instead execute unit tests",
action="store_true",
)
ap.add_argument(
"--day",
help="which archive to download, in ISO format like 2024-11-28. Default: today()",
default=datetime.date.today(),
type=datetime.date.fromisoformat,
)
ap.add_argument(
"--skip-check-prior",
action="store_true",
help="Don't look for the prior month to compare data sizes",
)
ap.add_argument(
"--download-dir",
default=DOWNLOAD_DIR,
help=f"base of downloads; each date gets a separate subdirectory. default: {DOWNLOAD_DIR}",
)
ap.add_argument(
"--overwrite",
action="store_true",
help=f"overwrite download of latest archive, if present",
)
argv = ap.parse_args()
import sys
print(sys.path)
if argv.test:
import doctest
doctest.testmod()
else:
main(argv.day, argv.download_dir, argv.overwrite, argv.skip_check_prior)