Source code for esda.moran_local_mv

import numpy as np
from .moran import Moran_Local
from .significance import calculate_significance
from libpysal.weights import lag_spatial
from libpysal.graph import Graph

try:
    from tqdm.auto import tqdm
except ImportError:

    def tqdm(x, **kwargs):
        return x


def _calc_quad(x,y):
    """
    This is a simpler solution to calculate a cartesian quadrant. 

    To explain graphically, let the tuple below be (off_sign[i], neg_y[i]*2). 

    If sign(x[i]) != sign(y[i]), we are on the negative diagonal. 
    If y is negative, we are on the bottom of the plot. 

    Therefore, the sum (off_sign + neg_y*2 + 1) gives you the cartesian quadrant. 

     II  |   I
     1,0 | 0,0
    -----+-----
     0,2 | 1,2
     III |  IV

    """
    off_sign = np.sign(x) != np.sign(y)
    neg_y = (y<0)
    return off_sign + neg_y*2 + 1

[docs] class MoranLocalPartial(object):
[docs] def __init__( self, permutations=999, unit_scale=True, partial_labels=True, alternative='two-sided' ): """ Compute the Multivariable Local Moran statistics under partial dependence :cite:`wolf2024confounded` Parameters ---------- permutations : int the number of permutations to run for the inference, driven by conditional randomization. unit_scale : bool whether to enforce unit variance in the local statistics. This normalizes the variance of the data at inupt, ensuring that the covariance statistics are not overwhelmed by any single covariate's large variance. partial_labels : bool, default=True whether to calculate the classification based on the part-regressive quadrant classification or the univariate quadrant classification, like a classical Moran's I. When mvquads is True, the variables are labelled as: - label 1: observations with large y - rho * x that also have large Wy values. - label 2: observations with small y - rho * x values that also have large Wy values. - label 3: observations with small y - rho * x values that also have small Wy values. - label 4: observations with large y - rho * x values that have small Wy values. alternative : str (default: 'two-sided') the alternative hypothesis for the inference. One of 'two-sided', 'greater', 'lesser', 'directed', or 'folded'. See the esda.significance.calculate_significance() documentation for more information. Attributes ---------- connectivity : W The weights matrix inputted, but row standardized D : array The "design" matrix used in computation. If X is not None, this will be [1 y X] R : array The "response" matrix used in computation. Will always be the same shape as D and contain [1, Wy, Wy, ....] DtDi : array empirical parameter covariance matrix the P x P matrix describing the variance and covariance of y and X. P : int the number of parameters. 1 if X is not provided. ``association_`` : array the N,P matrix of multivariable LISA statistics. the first column, lmos[:,1] is the LISAs corresponding to the relationship between Wy and y conditioning on X. ``reference_distribution_`` : array the (N, permutations, P+1) realizations from the conditional randomization to generate reference distributions for each Local Moran statistic. rlmos_[:,:,1] pertain to the reference distribution of y and Wy. ``significance_`` : array the (N, P) matrix of quadrant classifications for the part-regressive relationships. quads[:,0] pertains to the relationship between y and Wy. The mean is not classified, since it's just binary above/below mean usually. ``partials_`` : array the (N,2,P+1) matrix of part-regressive contributions. The ith slice of ``partials_[:,:,i]`` contains the partial regressive contribution of that covariate, with the first column indicating the part-regressive outcome and the second indicating the part-regressive design. The partial regression matrix starts at zero, so ``partials_``[:,:,0] corresponds to the partial regression describing the relationship between y and Wy. ``labels_`` : array the (N,) array of quadrant classifications for the part-regressive relationships. See the partial_labels argument for more information. """ self.permutations = permutations self.unit_scale = unit_scale self.partial_labels = partial_labels self.alternative = alternative
[docs] def fit(self, X, y, W): """ Fit the partial local Moran statistic on input data Parameters ---------- X : (N,p) array array of data that is used as confounding factors to account for their covariance with Y. y : (N,1) array array of data that is the targeted outcome covariate to compute the multivariable Moran's I W : (N,N) weights object spatial weights instance as W or Graph aligned with y. Immediately row-standardized. Returns ------- self : object this MoranLocalPartial() statistic after fitting to data """ y = np.asarray(y).reshape(-1, 1) if isinstance(W, Graph): W = W.transform("R") else: W.transform = "r" y = y - y.mean() if self.unit_scale: y /= y.std() X = X - X.mean(axis=0) if self.unit_scale: X = X / X.std(axis=0) self.y = y self.X = X D, R = self._make_data(y, X, W) self.D, self.R = D, R self.P = D.shape[1] - 1 self.N = W.n self.DtDi = np.linalg.inv( self.D.T @ self.D ) # this is only PxP, so not too bad... self._left_component_ = (self.D @ self.DtDi) * (self.N - 1) self._lmos_ = self._left_component_ * self.R self.connectivity = W self.permutations = self.permutations if self.permutations is not None: # NOQA necessary to avoid None > 0 if self.permutations > 0: self._crand(y, X, W) self._rlmos_ *= self.N - 1 self._p_sim_ = np.zeros((self.N, self.P + 1)) for i in range(self.P + 1): self._p_sim_[:,i] = calculate_significance( self._lmos_[:,i], self._rlmos_[:,:,i], alternative=self.alternative ) component_quads = [] for i, left in enumerate(self._left_component_.T): right = self.R[:, i] quads = _calc_quad(left - left.mean(), right) component_quads.append(quads) self._partials_ = np.asarray( [ np.vstack((left, right)).T for left, right in zip(self._left_component_.T, self.R.T) ] ) uvquads = [] negative_lag = R[:,1] < 0 for i, x_ in enumerate(self.D.T): if i == 0: continue off_sign = np.sign(x_) != np.sign(R[:,1]) quads = negative_lag.astype(int).flatten() * 2 + off_sign.astype(int) + 1 uvquads.append(quads.flatten()) self._uvquads_ = np.row_stack(uvquads).T self._mvquads_ = np.row_stack(component_quads).T return self
def _make_data(self, z, X, W): if isinstance(W, Graph): # NOQA because ternary is confusing Wz = W.lag(z) else: Wz = lag_spatial(W, z) if X is not None: D = np.hstack((np.ones(z.shape), z, X)) P = X.shape[1] + 1 else: D = np.hstack((np.ones(z.shape), z)) P = 1 R = np.tile(Wz, P + 1) return D, R # self.D, self.R = D, R def _crand(self, y, X, W): N = W.n N_permutations = self.permutations prange = range(N_permutations) if isinstance(W, Graph): max_neighbs = W.cardinalities.max() + 1 else: max_neighbs = W.max_neighbors + 1 pre_permutations = np.array( [np.random.permutation(N - 1)[0:max_neighbs] for i in prange] ) straight_ids = np.arange(N) if isinstance(W, Graph): # NOQA id_order = W.unique_ids else: id_order = W.id_order DtDi = self.DtDi ordered_weights = [W.weights[id_order[i]] for i in straight_ids] ordered_cardinalities = [W.cardinalities[id_order[i]] for i in straight_ids] lmos = np.empty((N, N_permutations, self.P + 1)) for i in tqdm(range(N), desc="Simulating by site"): ids_noti = straight_ids[straight_ids != i] np.random.shuffle(ids_noti) these_permutations = pre_permutations[:, 0 : ordered_cardinalities[i]] randomized_permutations = ids_noti[these_permutations] shuffled_ys = y[randomized_permutations] these_weights = np.asarray(ordered_weights[i]).reshape(-1, 1) shuffled_Wyi = (shuffled_ys * these_weights).sum( axis=1 ) # these are N-permutations by 1 now # shuffled_X = X[randomized_permutations, :] # #these are still N-permutations, N-neighbs, N-covariates if X is None: local_data = np.array((1, y[i].item())).reshape(1, -1) shuffled_D = np.tile( local_data, N_permutations ).T # now N permutations by P else: local_data = np.array((1, y[i].item(), *X[i])).reshape(-1, 1) shuffled_D = np.tile( local_data, N_permutations ).T # now N permutations by P shuffled_R = np.tile(shuffled_Wyi, self.P + 1) lmos[i] = (shuffled_R * shuffled_D) @ DtDi self._rlmos_ = lmos # nobs, nperm, nvars @property def association_(self): """ The association between y and the local average of y, removing the correlation due to x and the local average of y """ return self._lmos_[:, 1] @property def significance_(self): """ The pseudo-p-value built using map randomization for the structural relationship between y and its local average, removing the correlation due to the relationship between x and the local average of y. """ return self._p_sim_[:, 1] @property def partials_(self): """ The components of the local statistic. The first column is the structural exogenous component of the data, and the second is the local average of y. """ return self._partials_[1] @property def reference_distribution_(self): """ Simulated distribution of ``association_``, assuming that there is - no structural relationship between y and its local average; - the same observed structural relationship between y and x. """ return self._rlmos_[:, :, 1] @property def labels_(self): """ The classifications (in terms of cluster-type and outlier-type) for the ``association_`` statistics. If the quads requested are *mvquads*, then the classification is done with respect to the left and right components (first and second columns of ``partials_``). If the quads requested are *uvquads*, then this will only be computed with respect to the outcome and the local average. The cluster typology is: - 1: above-average left and right components - 2: below-average left, above-average right component - 3: below-average left and right components - 4: above-average left, below-average right component """ if self.partial_labels: return self._mvquads_[:, 1] else: return self._uvquads_[:, 1]
[docs] class MoranLocalConditional(Moran_Local): """ Fit a local moran statistic for y after regressing out the effects of confounding X on y. A "stronger" version of the MoranLocalPartial statistic, as defined by :cite:`wolf2024confounded`. """
[docs] def __init__( self, permutations=999, unit_scale=True, transformer=None, alternative='two-sided' ): """ Initialize a local Moran statistic on the regression residuals Parameters --------- permutations : int (default: 999) the number of permutations to run for the inference, driven by conditional randomization. unit_scale : bool (default: True) whether or not to convert the input data to a unit normal scale. transformer : callable (default: scikit regression) should transform X into a predicted y. If not provided, will use the standard scikit OLS regression of y on X. alternative : str (default: 'two-sided') the alternative hypothesis for the inference. One of 'two-sided', 'greater', 'lesser', 'directed', or 'folded'. See the esda.significance.calculate_significance() documentation for more information. Attributes ---------- connectivity : W The weights matrix inputted, but row standardized y_filtered_ : array the (N,1) array of y after removing the effect of X ``association_`` : array the N,P matrix of multivariable LISA statistics. ``reference_distribution_`` : array the (N, permutations, P+1) realizations from the conditional randomization to generate reference distributions. ``significance_`` : array the (N, P) matrix of quadrant classifications. ``partials_`` : array the (N,2,P+1) matrix of part-regressive contributions. The ith slice of ``partials_[:,:,i]`` contains the partial regressive contribution of that covariate. ``labels_`` : array the (N,) array of quadrant classifications. """ self.permutations = permutations self.unit_scale = unit_scale self.transformer = transformer self.alternative = alternative
[docs] def fit(self, X, y, W): """ Parameters ---------- y : (N,1) array array of data that is the targeted "outcome" covariate to compute the multivariable Moran's I X : (N,3) array array of data that is used as "confounding factors" to account for their covariance with Y. W : (N,N) weights object spatial weights instance as W or Graph aligned with y. Immediately row-standardized. Returns ------- A fitted MoranLocalConditional() estimator """ y = y - y.mean() X = X - X.mean(axis=0) if self.unit_scale: y /= y.std() X /= X.std(axis=0) self.y = y self.X = X y_filtered_ = self.y_filtered_ = self._part_regress_transform(y, X) if isinstance(W, Graph): W = W.transform("R") Wyf = W.lag(y_filtered_) else: W.transform = "r" Wyf = lag_spatial(W, y_filtered_) # TODO: graph self.connectivity = W self.partials_ = np.column_stack((y_filtered_, Wyf)) y_out = self.y_filtered_ self.association_ = ((y_out * Wyf) / (y_out.T @ y_out) * (W.n - 1)).flatten() if self.permutations > 0: self._crand() self.significance_ = calculate_significance(self.association_, self.reference_distribution_, alternative=self.alternative) quads = np.array([[3,2,4,1]]).reshape(2,2) left_component_cluster = (y_filtered_ > 0).astype(int) right_component_cluster = (Wyf > 0).astype(int) quads = quads[left_component_cluster, right_component_cluster] self.labels_ = quads.squeeze() return self
def _part_regress_transform(self, y, X): """If the object has a _transformer, use it; otherwise, fit it.""" if hasattr(self, "_transformer"): ypart = y - self._transformer(X) else: from sklearn.linear_model import LinearRegression self._transformer = LinearRegression().fit(X, y).predict ypart = self._part_regress_transform(y, X) return ypart def _crand(self): """ Cribbed from esda.Moran_Local conditional randomization for observation i with ni neighbors, the candidate set cannot include i (we don't want i being a neighbor of i). we have to sample without replacement from a set of ids that doesn't include i. numpy doesn't directly support sampling wo replacement and it is expensive to implement this. instead we omit i from the original ids, permute the ids and take the first ni elements of the permuted ids as the neighbors to i in each randomization. """ _, z = self.partials_.T lisas = np.zeros((self.connectivity.n, self.permutations)) n_1 = self.connectivity.n - 1 prange = list(range(self.permutations)) if isinstance(self.connectivity, Graph): k = self.connectivity.cardinalities.max() + 1 else: k = self.connectivity.max_neighbors + 1 nn = self.connectivity.n - 1 rids = np.array([np.random.permutation(nn)[0:k] for i in prange]) ids = np.arange(self.connectivity.n) if hasattr(self.connectivity, "id_order"): ido = self.connectivity.id_order else: ido = self.connectivity.unique_ids.values w = [self.connectivity.weights[ido[i]] for i in ids] wc = [self.connectivity.cardinalities[ido[i]] for i in ids] for i in tqdm(range(self.connectivity.n), desc="Simulating by site"): idsi = ids[ids != i] np.random.shuffle(idsi) tmp = z[idsi[rids[:, 0 : wc[i]]]] lisas[i] = z[i] * (w[i] * tmp).sum(1) self.reference_distribution_ = (n_1 / (z * z).sum()) * lisas
MoranLocalConditional.__init__.__doc__ = MoranLocalPartial.__init__.__doc__.replace( "Partial", "Conditional" )