from __future__ import annotations
from pathlib import Path
from typing import Any, Iterable, Mapping, Sequence
import numpy as np
import pandas as pd
import anndata as ad
from ..logging import info, warn
try:
import gseapy as gp
except Exception: # pragma: no cover
gp = None
def _ensure_gseapy() -> None:
if gp is None:
raise ImportError("gsea_preranked requires gseapy. Install with: pip install gseapy")
[docs]
def list_enrichr_libraries() -> list[str]:
"""
Return available Enrichr libraries as known by gseapy.
Useful to discover exact names like 'MSigDB_Hallmark_2020'.
Notes
-----
- Requires internet access (Enrichr).
"""
_ensure_gseapy()
try:
libs = gp.get_library_name()
return list(libs)
except Exception as e:
raise RuntimeError(f"Failed to fetch Enrichr library names: {e}") from e
def _normalize_gene_sets_arg(
gene_sets: str | Sequence[str] | Mapping[str, Sequence[str]],
) -> str | list[str] | dict[str, list[str]]:
"""
Accept:
- dict of gene sets: {"SetA": [...], ...}
- path to .gmt
- enrichr library name: "MSigDB_Hallmark_2020"
- convenience aliases: "hallmark", "hallmarks", "c2", "curated"
- list of the above (mix allowed)
"""
# dict passthrough
if isinstance(gene_sets, Mapping):
return {str(k): [str(x) for x in v] for k, v in gene_sets.items()}
# turn comma-separated string into list
if isinstance(gene_sets, str) and ("," in gene_sets):
gene_sets_list = [x.strip() for x in gene_sets.split(",") if x.strip()]
return _normalize_gene_sets_arg(gene_sets_list)
def _alias_one(x: str) -> str:
xl = x.strip().lower()
# Convenience aliases → real Enrichr library names (common)
# If your Enrichr instance has different years, use list_enrichr_libraries()
if xl in {"hallmark", "hallmarks"}:
return "MSigDB_Hallmark_2020"
if xl in {"c2", "curated", "c2_curated"}:
return "MSigDB_C2_Curated_2021"
return x
# list/tuple
if isinstance(gene_sets, (list, tuple)):
out: list[str] = []
for x in gene_sets:
if not isinstance(x, str):
raise TypeError("gene_sets list must contain strings (library names or .gmt paths).")
out.append(_alias_one(x))
return out
# plain string
if isinstance(gene_sets, str):
return _alias_one(gene_sets)
raise TypeError("gene_sets must be a str, list[str], or dict[str, list[str]].")
def _gseapy_result_to_table(pre_res: Any) -> pd.DataFrame:
"""
Convert gseapy Prerank result object to a tidy pandas DataFrame.
"""
# gseapy stores results in pre_res.res2d (DataFrame)
if not hasattr(pre_res, "res2d"):
raise ValueError("Unexpected gseapy result object: missing attribute 'res2d'.")
df = pre_res.res2d.copy()
# make column names consistent and flat
df.columns = [str(c) for c in df.columns]
df.index = df.index.astype(str)
df = df.reset_index().rename(columns={"index": "term"})
# common columns you may want:
# term, es, nes, pval, fdr, ... (depends on gseapy version)
return df
[docs]
def gsea_preranked(
adata: ad.AnnData | None = None,
*,
res: pd.DataFrame,
gene_sets: str | Sequence[str] | Mapping[str, Sequence[str]] = "MSigDB_Hallmark_2020",
comparison: str | None = None,
store_key: str = "gsea",
results_key: str = "results",
gene_col: str = "gene",
score_col: str = "t",
ascending: bool = False,
min_size: int = 5,
max_size: int = 1000,
permutation_num: int = 1000,
seed: int = 6,
processes: int = 4,
outdir: str | Path = "gsea",
save_rnk: bool = True,
return_pre_res: bool = True,
add_comparison_column: bool = True,
) -> pd.DataFrame | tuple[pd.DataFrame, Any]:
"""
Run GSEApy prerank and return a tidy results table.
Key improvements
----------------
1) Writes to outdir/<comparison>/ (folder created with parents=True)
2) Adds a 'comparison' column to the returned results
3) gene_sets can be:
- Enrichr library name, e.g. "MSigDB_Hallmark_2020"
- .gmt path
- dict of gene sets
- convenience aliases: "hallmark(s)", "c2"/"curated"
- list of libraries/paths (mix allowed)
4) Optionally returns the raw gseapy 'pre_res' object so you can plot later.
Notes
-----
- The returned 'pre_res' is NOT stored in adata.uns by default, because it is
not h5ad-serializable. Store tables only.
"""
_ensure_gseapy()
if res is None or not isinstance(res, pd.DataFrame):
raise TypeError("res must be a pandas DataFrame (your DE table).")
if gene_col not in res.columns:
raise KeyError(f"gene_col='{gene_col}' not found in res.columns.")
if score_col not in res.columns:
raise KeyError(f"score_col='{score_col}' not found in res.columns.")
# infer comparison if missing (best-effort)
if comparison is None:
comparison = "comparison"
# build output folder: outdir/comparison/
outdir = Path(outdir) / str(comparison)
outdir.mkdir(parents=True, exist_ok=True)
# build rank table
rnk = res[[gene_col, score_col]].copy()
rnk = rnk.dropna(subset=[gene_col, score_col])
rnk[gene_col] = rnk[gene_col].astype(str).str.upper()
rnk[score_col] = pd.to_numeric(rnk[score_col], errors="coerce")
rnk = rnk.dropna(subset=[score_col])
# handle duplicates: keep best score by absolute value (common)
if rnk[gene_col].duplicated().any():
frac_dup = float(rnk[gene_col].duplicated().mean()) * 100.0
warn(f"Duplicated genes found in preranked stats: {frac_dup:.2f}% of genes. Keeping max |score| per gene.")
rnk = (
rnk.assign(_abs=np.abs(rnk[score_col].to_numpy()))
.sort_values("_abs", ascending=False)
.drop_duplicates(subset=[gene_col], keep="first")
.drop(columns="_abs")
)
# sort for prerank
rnk = rnk.sort_values(score_col, ascending=bool(ascending))
# optionally save .rnk
if save_rnk:
rnk_path = outdir / f"{comparison}.rnk.tsv"
rnk.to_csv(rnk_path, sep="\t", index=False)
# resolve gene_sets aliases / list handling
gs = _normalize_gene_sets_arg(gene_sets)
info(f"Running GSEApy prerank: comparison={comparison} n_genes={len(rnk)} outdir={outdir}")
# Run prerank (no_plot=True keeps it light; you can plot later from pre_res)
pre_res = gp.prerank(
rnk=rnk,
gene_sets=gs,
min_size=int(min_size),
max_size=int(max_size),
permutation_num=int(permutation_num),
seed=int(seed),
processes=int(processes),
outdir=str(outdir),
format="png",
no_plot=True,
)
df = _gseapy_result_to_table(pre_res)
if add_comparison_column:
df.insert(0, "comparison", str(comparison))
# store table in adata.uns (serializable)
if adata is not None:
adata.uns.setdefault(store_key, {})
adata.uns[store_key].setdefault(str(comparison), {})
adata.uns[store_key][str(comparison)][results_key] = df
# helpful provenance
adata.uns[store_key][str(comparison)]["params"] = {
"gene_sets": gs,
"gene_col": gene_col,
"score_col": score_col,
"ascending": bool(ascending),
"min_size": int(min_size),
"max_size": int(max_size),
"permutation_num": int(permutation_num),
"seed": int(seed),
"processes": int(processes),
"outdir": str(outdir),
"save_rnk": bool(save_rnk),
}
return (df, pre_res) if return_pre_res else df