import pandas as pd
import numpy as np
from nptyping import Float, NDArray, Int32
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Union
from pyspark.sql import Column, DataFrame
from pyspark.sql.types import StringType, StructField, IntegerType
from scipy import stats
from typeguard import typechecked
from . import functions as gwas_fx
from .functions import _VALUES_COLUMN_NAME, _get_indices_to_drop
from ..wgr.wgr_functions import _get_contigs_from_loco_df

__all__ = ['linear_regression']

[docs]@typechecked def linear_regression(genotype_df: DataFrame, phenotype_df: pd.DataFrame, covariate_df: pd.DataFrame = pd.DataFrame({}), offset_df: pd.DataFrame = pd.DataFrame({}), contigs: Optional[List[str]] = None, add_intercept: bool = True, values_column: Union[str, Column] = 'values', dt: type = np.float64, verbose_output: bool = False, intersect_samples: bool = False, genotype_sample_ids: Optional[List[str]] = None) -> DataFrame: ''' Uses linear regression to test for association between genotypes and one or more phenotypes. The implementation is a distributed version of the method used in regenie: Implementation details: On the driver node, we decompose the covariate matrix into an orthonormal basis and use it to project the covariates out of the phenotype matrix. The orthonormal basis and the phenotype residuals are broadcast as part of a Pandas UDF. In each Spark task, we project the covariates out of a block of genotypes and then compute the regression statistics for each phenotype, taking into account the distinct missingness patterns of each phenotype. Examples: >>> np.random.seed(42) >>> n_samples, n_phenotypes, n_covariates = (710, 3, 3) >>> phenotype_df = pd.DataFrame(np.random.random((n_samples, n_phenotypes)), columns=['p1', 'p2', 'p3']) >>> covariate_df = pd.DataFrame(np.random.random((n_samples, n_phenotypes))) >>> genotype_df = ('vcf').load('test-data/1kg_sample.vcf') ... .select('contigName', 'start', 'genotypes')) >>> results = glow.gwas.linear_regression(genotype_df, phenotype_df, covariate_df, ... values_column=glow.genotype_states('genotypes')) >>> results.head() # doctest: +ELLIPSIS Row(contigName='1', start=904164, effect=0.0453..., stderror=0.0214..., tvalue=2.114..., pvalue=0.0348..., phenotype='p1') >>> phenotype_df = pd.DataFrame(np.random.random((n_samples, n_phenotypes)), columns=['p1', 'p2', 'p3']) >>> covariate_df = pd.DataFrame(np.random.random((n_samples, n_phenotypes))) >>> genotype_df = ('vcf').load('test-data/1kg_sample.vcf') ... .select('contigName', 'start', 'genotypes')) >>> contigs = ['1', '2', '3'] >>> offset_index = pd.MultiIndex.from_product([phenotype_df.index, contigs]) >>> offset_df = pd.DataFrame(np.random.random((n_samples * len(contigs), n_phenotypes)), ... index=offset_index, columns=phenotype_df.columns) >>> results = glow.gwas.linear_regression(genotype_df, phenotype_df, covariate_df, ... offset_df=offset_df, values_column=glow.genotype_states('genotypes')) Args: genotype_df : Spark DataFrame containing genomic data phenotype_df : Pandas DataFrame containing phenotypic data covariate_df : An optional Pandas DataFrame containing covariates offset_df : An optional Pandas DataFrame containing the phenotype offset, as output by GloWGR's RidgeRegression or Regenie step 1. The actual phenotype used for linear regression is the mean-centered, residualized and scaled ``phenotype_df`` minus the appropriate offset. The ``offset_df`` may have one or two levels of indexing. If one level, the index should be the same as the ``phenotype_df``. If two levels, the level 0 index should be the same as the ``phenotype_df``, and the level 1 index should be the contig name. The two level index scheme allows for per-contig offsets like LOCO predictions from GloWGR. contigs : When using LOCO offsets, this parameter indicates the contigs to analyze. You can use this parameter to limit the size of the broadcasted data, which may be necessary with large sample sizes. If this parameter is omitted, the contigs are inferred from the ``offset_df``. add_intercept : Whether or not to add an intercept column to the covariate DataFrame values_column : A column name or column expression to test with linear regression. If a column name is provided, ``genotype_df`` should have a column with this name and a numeric array type. If a column expression is provided, the expression should return a numeric array type. dt : The numpy datatype to use in the linear regression test. Must be ``np.float32`` or ``np.float64``. verbose_output: Whether or not to generate additional test statistics (n, sum_x, y_transpose_x) to the output DataFrame. These values are derived directly from phenotype_df and genotype_df, and does not reflect any standardization performed as part of the implementation of linear_regression. intersect_samples: The current implementation of linear regression is optimized for speed, but is not robust to high levels missing phenotype values. Without handling missingness appropriately, pvalues may become inflated due to imputation. When intersect_samples is enabled, samples that do no exist in the phenotype dataframe will be dropped from genotypes, offsets, and covariates prior to regression analysis. Note that if phenotypes in phenotypes_df contain missing values, these samples will not be automatically dropped. The user is responsible for determining their desired levels of missingness and imputation. Drop any rows with missing values from phenotype_df prior to linear_regression to prevent any imputation. If covariates are provided, covariate and phenotype samples will automatically be intersected. genotype_sample_ids: Sample ids from genotype_df. i.e. from applying glow.wgr.functions.get_sample_ids(genotype_df) or if include_sample_ids=False was used during the generation genotype_df, then using an externally managed list of sample_ids that correspond to the array of genotype calls. Returns: A Spark DataFrame that contains - All columns from ``genotype_df`` except the ``values_column`` and the ``genotypes`` column if one exists - ``effect``: The effect size estimate for the genotype - ``stderror``: The estimated standard error of the effect - ``tvalue``: The T statistic - ``pvalue``: P value estimated from a two sided T-test - ``phenotype``: The phenotype name as determined by the column names of ``phenotype_df`` - ``n``(int): (verbose_output only) number of samples with non-null phenotype - ``sum_x``(float): (verbose_output only) sum of genotype inputs - ``y_transpose_x``(float): (verbose_output only) dot product of phenotype response (missing values encoded as zeros) and genotype input, i.e. phenotype value * number of alternate alleles ''' gwas_fx._check_spark_version(genotype_df.sql_ctx.sparkSession) gwas_fx._validate_covariates_and_phenotypes(covariate_df, phenotype_df, is_binary=False, intersect_samples=intersect_samples) sql_type = gwas_fx._regression_sql_type(dt) genotype_df = gwas_fx._prepare_genotype_df(genotype_df, values_column, sql_type) gt_indices_to_drop = None _covs = covariate_df if intersect_samples: if not genotype_sample_ids: raise ValueError("genotype_sample_ids required when intersect_samples enabled") _covs = covariate_df.loc[phenotype_df.index, :] gt_indices_to_drop = _get_indices_to_drop(phenotype_df, genotype_sample_ids) if not offset_df.empty: if offset_df.index.nlevels == 1: # Indexed by sample id offset_df = offset_df.reindex(phenotype_df.index) elif offset_df.index.nlevels == 2: # Indexed by sample id and contig offset_df = offset_df[offset_df.index.get_level_values(0).isin(phenotype_df.index)] C = _covs.to_numpy(dt, copy=True) if add_intercept: C = gwas_fx._add_intercept(C, phenotype_df.shape[0]) # Prepare covariate basis and phenotype residuals Q = np.linalg.qr(C)[0] Y = phenotype_df.to_numpy(dt, copy=True) Y_mask = (~np.isnan(Y)).astype(dt) Y = np.nan_to_num(Y, copy=False) Y_for_verbose_output = np.copy(Y) if verbose_output else None Y -= Y.mean(axis=0) # Mean-center Y = gwas_fx._residualize_in_place(Y, Q) * Y_mask # Residualize Y_scale = np.sqrt(np.sum(Y**2, axis=0) / (Y_mask.sum(axis=0) - Q.shape[1])) Y /= Y_scale[None, :] # Scale Y_state = _create_YState(Y, phenotype_df, offset_df, Y_mask, dt, contigs) dof = C.shape[0] - C.shape[1] - 1 return _generate_linreg_output(genotype_df, sql_type, Y_state, Y_mask, Y_scale, Q, dof, phenotype_df, Y_for_verbose_output, verbose_output, gt_indices_to_drop)
@dataclass class YState: ''' Keeps track of state that varies per contig ''' Y: NDArray[(Any, Any), Float] YdotY: NDArray[(Any), Float] def _create_YState(Y: NDArray[(Any, Any), Float], phenotype_df: pd.DataFrame, offset_df: pd.DataFrame, Y_mask: NDArray[(Any, Any), Float], dt, contigs: Optional[List[str]]) -> Union[YState, Dict[str, YState]]: offset_type = gwas_fx._validate_offset(phenotype_df, offset_df) if offset_type != gwas_fx._OffsetType.LOCO_OFFSET: return _create_one_YState(Y, phenotype_df, offset_df, Y_mask, dt) if contigs is None: contigs = _get_contigs_from_loco_df(offset_df) return { contig: _create_one_YState(Y, phenotype_df, offset_df.xs(contig, level=1), Y_mask, dt) for contig in contigs } def _create_one_YState(Y: NDArray[(Any, Any), Float], phenotype_df: pd.DataFrame, offset_df: pd.DataFrame, Y_mask: NDArray[(Any, Any), Float], dt) -> YState: if not offset_df.empty: base_Y = pd.DataFrame(Y, phenotype_df.index, phenotype_df.columns) # Reindex so the numpy array maintains ordering after subtracting offset Y = (base_Y - offset_df).reindex(phenotype_df.index).to_numpy(dt) Y *= Y_mask return YState(Y, np.sum(Y * Y, axis=0)) # @typechecked -- typeguard does not support numpy array def _linear_regression_inner(genotype_pdf: pd.DataFrame, Y_state: YState, Y_mask: NDArray[(Any, Any), Float], Y_scale: NDArray[(Any, ), Float], Q: NDArray[(Any, Any), Float], dof: int, phenotype_names: pd.Series, Y_for_verbose_output: Optional[NDArray[(Any, Any), Float]], verbose_output: Optional[bool], gt_indices_to_drop: Optional[NDArray[(Any, ), Int32]]) -> pd.DataFrame: ''' Applies a linear regression model to a block of genotypes. We first project the covariates out of the genotype block and then perform single variate linear regression for each site. To account for samples with missing traits, we additionally accept a mask indicating which samples are missing for each phenotype. This mask is used to ensure that missing samples are not included when summing across individuals. Rather than use traditional matrix indices in the einsum expressions, we use semantic indices. s: sample g: genotype p: phenotype So, if a matrix's indices are `sg` (like the X matrix), it has one row per sample and one column per genotype. ''' genotype_values = genotype_pdf[_VALUES_COLUMN_NAME].array X = np.column_stack(genotype_values) if gt_indices_to_drop is not None and gt_indices_to_drop.size: X = np.delete(X, gt_indices_to_drop, axis=0) del genotype_pdf[_VALUES_COLUMN_NAME] num_genotypes = genotype_pdf.shape[0] out_df = pd.concat([genotype_pdf] * Y_state.Y.shape[1]) if verbose_output: out_df["n"] = list(np.ravel(Y_mask.T @ np.ones(X.shape))) out_df["sum_x"] = list(np.ravel(Y_mask.T @ X)) out_df["y_transpose_x"] = list(np.ravel(Y_for_verbose_output.T @ X)) X = gwas_fx._residualize_in_place(X, Q) XdotY = Y_state.Y.T @ X XdotX_reciprocal = 1 / gwas_fx._einsum('sp,sg,sg->pg', Y_mask, X, X) betas = XdotY * XdotX_reciprocal standard_error = np.sqrt((Y_state.YdotY[:, None] * XdotX_reciprocal - betas * betas) / dof) T = betas / standard_error pvalues = 2 * stats.distributions.t.sf(np.abs(T), dof) Y_scale_mat = Y_scale[:, None] out_df['effect'] = list(np.ravel(betas * Y_scale_mat)) out_df['stderror'] = list(np.ravel(standard_error * Y_scale_mat)) out_df['tvalue'] = list(np.ravel(T)) out_df['pvalue'] = list(np.ravel(pvalues)) out_df['phenotype'] = phenotype_names.repeat(num_genotypes).tolist() return out_df def _generate_linreg_output(genotype_df, sql_type, Y_state, Y_mask, Y_scale, Q, dof, phenotype_df, Y_for_verbose_output, verbose_output, gt_indices_to_drop) -> DataFrame: # Construct output schema result_fields = [ StructField('effect', sql_type), StructField('stderror', sql_type), StructField('tvalue', sql_type), StructField('pvalue', sql_type), StructField('phenotype', StringType()) ] if verbose_output: result_fields += ([ StructField('n', IntegerType()), StructField('sum_x', sql_type), StructField('y_transpose_x', sql_type) ]) result_struct = gwas_fx._output_schema(genotype_df.schema.fields, result_fields) def map_func(pdf_iterator): for pdf in pdf_iterator: yield gwas_fx._loco_dispatch(pdf, Y_state, _linear_regression_inner, Y_mask, Y_scale, Q, dof, phenotype_df.columns.to_series().astype('str'), Y_for_verbose_output, verbose_output, gt_indices_to_drop) return genotype_df.mapInPandas(map_func, result_struct)