Source code for bullkpy.tl.neighbors
from __future__ import annotations
import numpy as np
import scipy.sparse as sp
from scipy.spatial import cKDTree
import anndata as ad
from ..logging import info, warn
[docs]
def neighbors(
adata: ad.AnnData,
*,
n_neighbors: int = 15,
n_pcs: int = 20,
use_rep: str = "X_pca",
metric: str = "euclidean", # "euclidean" or "cosine"
key_added: str = "neighbors",
) -> None:
"""
Compute kNN graph on samples (obs) using PCA representation.
Stores:
- adata.obsp["distances"] (CSR sparse)
- adata.obsp["connectivities"] (CSR sparse; Gaussian kernel)
- adata.uns[key_added] with parameters
"""
if use_rep not in adata.obsm:
raise KeyError(f"adata.obsm['{use_rep}'] not found. Run bk.tl.pca(adata) first.")
X = np.asarray(adata.obsm[use_rep], dtype=float)
if X.ndim != 2:
raise ValueError(f"{use_rep} must be 2D; got shape {X.shape}")
if n_pcs is not None:
n_pcs = int(min(n_pcs, X.shape[1]))
X = X[:, :n_pcs]
n_obs = X.shape[0]
k = int(min(n_neighbors, n_obs - 1))
info(f"Computing neighbors: k={k}, rep={use_rep}, n_pcs={n_pcs}, metric={metric}")
X_use = X
if metric == "cosine":
# cosine distance = 1 - cosine similarity
# Use L2 normalization then euclidean in that space (monotonic with cosine).
denom = np.linalg.norm(X_use, axis=1, keepdims=True)
denom[denom == 0] = 1.0
X_use = X_use / denom
# For normalized vectors, euclidean distance relates to cosine similarity.
elif metric != "euclidean":
raise ValueError("metric must be 'euclidean' or 'cosine'")
tree = cKDTree(X_use)
# query k+1 because the closest neighbor is the point itself (dist=0)
dists, idxs = tree.query(X_use, k=k + 1)
# remove self
dists = dists[:, 1:]
idxs = idxs[:, 1:]
# Build sparse distance matrix
rows = np.repeat(np.arange(n_obs), k)
cols = idxs.reshape(-1)
data = dists.reshape(-1)
distances = sp.csr_matrix((data, (rows, cols)), shape=(n_obs, n_obs))
# Symmetrize by taking min distance (or keep directed; Scanpy effectively uses symmetric connectivities)
distances = _symmetrize_min(distances)
# Convert distances -> connectivities with a local scaling kernel
# Simple, robust: sigma per node = distance to kth neighbor (after sym)
# sigma per node = distance to kth neighbor (after sym)
mx = distances.max(axis=1) # may be sparse matrix on some SciPy versions
if sp.issparse(mx):
kth = mx.toarray().ravel()
else:
kth = np.asarray(mx).ravel()
kth = kth.astype(float, copy=False)
pos = kth > 0
kth[~pos] = np.median(kth[pos]) if np.any(pos) else 1.0
# Gaussian kernel: exp(-d^2 / (2 sigma_i sigma_j))
connectivities = _gaussian_connectivities(distances, kth)
adata.obsp["distances"] = distances
adata.obsp["connectivities"] = connectivities
adata.uns[key_added] = {
"params": {
"n_neighbors": int(n_neighbors),
"n_pcs": None if n_pcs is None else int(n_pcs),
"use_rep": use_rep,
"metric": metric,
}
}
info("Stored neighbors graph in adata.obsp['distances'] and adata.obsp['connectivities'].")
def _symmetrize_min(A: sp.csr_matrix) -> sp.csr_matrix:
A = A.tocsr()
AT = A.transpose().tocsr()
# elementwise min where both present; if only one present keep that
# Use: min(A, AT) + abs(A-AT) to keep existing edges. Safer: take minimum of nonzeros via sparse ops
# Simple approach: keep the smaller where both exist by taking (A + AT)/2 on union but that changes values.
# We'll do: union with minimum by constructing both and taking elementwise minimum on dense mask is too heavy.
# Practical: take symmetric by: A_sym = A.minimum(AT) + (A - A.minimum(AT)) + (AT - A.minimum(AT))
M = A.minimum(AT)
A_sym = M + (A - M) + (AT - M)
A_sym.eliminate_zeros()
return A_sym
def _gaussian_connectivities(distances: sp.csr_matrix, sigma: np.ndarray) -> sp.csr_matrix:
D = distances.tocsr()
rows, cols = D.nonzero()
d = np.asarray(D[rows, cols]).ravel()
sig_i = sigma[rows]
sig_j = sigma[cols]
denom = 2.0 * sig_i * sig_j
denom[denom == 0] = np.median(denom[denom > 0]) if np.any(denom > 0) else 1.0
w = np.exp(-(d ** 2) / denom)
W = sp.csr_matrix((w, (rows, cols)), shape=D.shape)
# ensure symmetry
W = 0.5 * (W + W.T)
W.eliminate_zeros()
return W