Source code for glow.wgr.logistic_ridge_regression

# Copyright 2019 The Glow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from .logistic_udfs import *
from .ridge_reduction import RidgeReduction
from .model_functions import _prepare_labels_and_warn, _prepare_covariates, _check_model, _check_cv, _is_binary
from nptyping import Float, NDArray
import pandas as pd
from pyspark.sql import DataFrame
from pyspark.sql.functions import pandas_udf, PandasUDFType
import pyspark.sql.functions as f
from typeguard import typechecked
from typing import Any, Dict, List
import warnings
from glow.logging import record_hls_event

__all__ = ['LogisticRidgeRegression']


[docs]@typechecked class LogisticRidgeRegression: """ The LogisticRidgeRegression class is used to fit logistic ridge regression models against one or more labels optimized over a provided list of ridge alpha parameters. The optimal ridge alpha value is chosen for each label by minimizing the average out of fold log_loss scores. """ def __init__(self, reduced_block_df: DataFrame, label_df: pd.DataFrame, sample_blocks: Dict[str, List[str]], cov_df: pd.DataFrame = pd.DataFrame({}), add_intercept: bool = True, alphas: List[float] = []) -> None: """ Args: reduced_block_df : Spark DataFrame representing the reduced block matrix generated by RidgeReduction label_df : Pandas DataFrame containing the target labels used in fitting the ridge models sample_blocks : Dict containing a mapping of sample_block ID to a list of corresponding sample IDs cov_df : Pandas DataFrame containing covariates to be included in every model in the stacking ensemble (optional). add_intercept: If True, an intercept column (all ones) will be added to the covariates (as the first column) ridge_reduced: RidgeReduction object containing level 0 reduction data alphas : array_like of alpha values used in the ridge regression (optional). """ self.reduced_block_df = reduced_block_df self.sample_blocks = sample_blocks self.set_label_df(label_df) self.set_cov_df(cov_df, add_intercept) self.set_alphas(alphas) self.model_df = None self.cv_df = None self.y_hat_df = None
[docs] @classmethod def from_ridge_reduction(cls, ridge_reduced: RidgeReduction, alphas: List[float] = []): """ Initializes an instance of LogsiticRidgeRegression using a RidgeReduction object Args: ridge_reduced : A RidgeReduction instance based on which the LogisticRidgeRegression instance must be made alphas : array_like of alpha values used in the logistic ridge regression (optional). """ obj = cls.__new__(cls) obj.reduced_block_df = ridge_reduced.reduced_block_df obj.sample_blocks = ridge_reduced.sample_blocks obj._label_df = ridge_reduced.get_label_df() obj._cov_df = ridge_reduced.get_cov_df() obj._std_cov_df = ridge_reduced._std_cov_df obj.set_alphas(alphas) obj.model_df = None obj.cv_df = None obj.y_hat_df = None return obj
def __getstate__(self): # Copy the object's state from self.__dict__ which contains state = self.__dict__.copy() # Remove the unpicklable entries. del state['reduced_block_df'], state['model_df'], state['cv_df'] return state def set_label_df(self, label_df: pd.DataFrame) -> None: _prepare_labels_and_warn(label_df, _is_binary(label_df), 'binary') self._label_df = label_df def get_label_df(self) -> pd.DataFrame: return self._label_df def set_cov_df(self, cov_df: pd.DataFrame, add_intercept: bool) -> None: self._cov_df = cov_df self._std_cov_df = _prepare_covariates(cov_df, self._label_df, add_intercept) def get_cov_df(self) -> pd.DataFrame: return self._cov_df def set_alphas(self, alphas: List[float]) -> None: self._alphas = generate_alphas( self.reduced_block_df) if len(alphas) == 0 else create_alpha_dict(alphas) def get_alphas(self) -> Dict[str, Float]: return self._alphas def _cache_model_cv_df(self) -> None: _check_model(self.model_df) _check_cv(self.cv_df) self.model_df.cache() self.cv_df.cache() def _unpersist_model_cv_df(self) -> None: _check_model(self.model_df) _check_cv(self.cv_df) self.model_df.unpersist() self.cv_df.unpersist()
[docs] def fit(self) -> (DataFrame, DataFrame): """ Fits a logistic regression model, represented by a Spark DataFrame containing coefficients for each of the ridge alpha parameters, for each block in the reduced block matrix, for each label in the target labels, as well as a Spark DataFrame containing the optimal ridge alpha value for each label. Returns: Two Spark DataFrames, one containing the model resulting from the fitting routine and one containing the results of the cross validation procedure. """ map_key_pattern = ['sample_block', 'label', 'alpha_name'] reduce_key_pattern = ['header_block', 'header', 'label', 'alpha_name'] model_key_pattern = ['sample_block', 'label', 'alpha_name'] score_key_pattern = ['sample_block', 'label'] metric = 'log_loss' maskdf = pd.DataFrame(data=np.where(np.isnan(self._label_df), False, True), columns=self._label_df.columns, index=self._label_df.index) beta_cov_dict = {} for label in self._label_df: if self._std_cov_df.empty: beta_cov_dict[label] = np.array([]) else: row_mask = slice_label_rows(maskdf, label, list(self._label_df.index), np.array([])).ravel() cov_mat = slice_label_rows(self._std_cov_df, 'all', list(self._label_df.index), row_mask) y = slice_label_rows(self._label_df, label, list(self._label_df.index), row_mask).ravel() fit_result = constrained_logistic_fit(cov_mat, y, np.zeros(cov_mat.shape[1]), guess=np.array([]), n_cov=0) beta_cov_dict[label] = fit_result.x map_udf = pandas_udf( lambda key, pdf: map_irls_eqn(key, map_key_pattern, pdf, self._label_df, self. sample_blocks, self._std_cov_df, beta_cov_dict, maskdf, self._alphas), irls_eqn_struct, PandasUDFType.GROUPED_MAP) reduce_udf = pandas_udf(lambda key, pdf: reduce_irls_eqn(key, reduce_key_pattern, pdf), irls_eqn_struct, PandasUDFType.GROUPED_MAP) model_udf = pandas_udf( lambda key, pdf: solve_irls_eqn(key, model_key_pattern, pdf, self._label_df, self. _alphas, self._std_cov_df), model_struct, PandasUDFType.GROUPED_MAP) score_udf = pandas_udf( lambda key, pdf: score_models(key, score_key_pattern, pdf, self._label_df, self. sample_blocks, self._alphas, self._std_cov_df, maskdf, metric), cv_struct, PandasUDFType.GROUPED_MAP) self.model_df = self.reduced_block_df.drop('alpha') \ .withColumn('alpha_name', f.explode(f.array([f.lit(n) for n in self._alphas.keys()]))) \ .groupBy(map_key_pattern) \ .apply(map_udf) \ .groupBy(reduce_key_pattern) \ .apply(reduce_udf) \ .groupBy(model_key_pattern) \ .apply(model_udf) \ .withColumn('alpha_label_coef', f.expr('struct(alphas[0] AS alpha, labels[0] AS label, coefficients[0] AS coefficient)')) \ .withColumn('alpha_label_coef_label', f.expr('labels[0]')) \ .groupBy('header_block', 'sample_block', 'header', 'sort_key', 'alpha_label_coef_label') \ .agg(f.sort_array(f.collect_list('alpha_label_coef')).alias('alphas_labels_coefs')) \ .selectExpr('*', 'alphas_labels_coefs.alpha AS alphas', 'alphas_labels_coefs.label AS labels', 'alphas_labels_coefs.coefficient AS coefficients') \ .drop('alphas_labels_coefs', 'label') self.cv_df = cross_validation(self.reduced_block_df, self.model_df, score_udf, score_key_pattern, self._alphas, metric) record_hls_event('wgrLogisticRegressionFit') return self.model_df, self.cv_df
[docs] def reduce_block_matrix(self, response: str) -> DataFrame: """ Transforms a starting reduced block matrix by applying a linear model. The form of the output can either be a direct linear transformation (response = "linear") or a linear transformation followed by a sigmoid transformation (response = "sigmoid"). Args: response : String specifying what transformation to apply ("linear" or "sigmoid") Returns: Spark DataFrame containing the result of the transformation. """ transform_key_pattern = ['sample_block', 'label'] if response == 'linear': warnings.warn('Ignoring any covariates for linear response') transform_udf = pandas_udf( lambda key, pdf: apply_model(key, transform_key_pattern, pdf, self._label_df, self. sample_blocks, self._alphas, pd.DataFrame({})), reduced_matrix_struct, PandasUDFType.GROUPED_MAP) join_type = 'inner' elif response == 'sigmoid': transform_udf = pandas_udf( lambda key, pdf: apply_logistic_model( key, transform_key_pattern, pdf, self._label_df, self.sample_blocks, self. _alphas, self._std_cov_df), logistic_reduced_matrix_struct, PandasUDFType.GROUPED_MAP) join_type = 'right' else: raise ValueError(f'response must be either "linear" or "sigmoid", received "{response}"') return apply_model_df(self.reduced_block_df, self.model_df, self.cv_df, transform_udf, transform_key_pattern, join_type)
[docs] def transform(self, response: str = 'linear') -> pd.DataFrame: """ Generates GWAS covariates for the target labels in the provided label DataFrame by applying the model resulting from the LogisticRidgeRegression fit method to the starting reduced block matrix. Args: response : String specifying the desired output. Can be 'linear' to specify the direct output of the linear WGR model (default) or 'sigmoid' to specify predicted label probabilities. Returns: Pandas DataFrame containing covariate values. The shape and order match label_df such that the rows are indexed by sample ID and the columns by label. The column types are float64. """ _check_model(self.model_df) _check_cv(self.cv_df) block_prediction_df = self.reduce_block_matrix(response) self.y_hat_df = flatten_prediction_df(block_prediction_df, self.sample_blocks, self._label_df) record_hls_event('wgrLogisticRegressionTransform') return self.y_hat_df
[docs] def transform_loco(self, response: str = 'linear', chromosomes: List[str] = []) -> pd.DataFrame: """ Generates predictions for the target labels in the provided label DataFrame by applying the model resulting from the LogisticRidgeRegression fit method to the starting reduced block matrix using a leave-one-chromosome-out (LOCO) approach (this method caches the model and cross-validation DataFrames in the process for better performance). Args: response : String specifying the desired output. Can be 'linear' to specify the direct output of the linear WGR model (default) or 'sigmoid' to specify predicted label probabilities. chromosomes : List of chromosomes for which to generate a prediction (optional). If not provided, the chromosomes will be inferred from the block matrix. Returns: Pandas DataFrame containing prediction y_hat values per chromosome. The rows are indexed by sample ID and chromosome; the columns are indexed by label. The column types are float64. The DataFrame is sorted using chromosome as the primary sort key, and sample ID as the secondary sort key. """ loco_chromosomes = chromosomes if chromosomes else infer_chromosomes(self.reduced_block_df) loco_chromosomes.sort() # Cache model and CV DataFrames to avoid re-computing for each chromosome self._cache_model_cv_df() y_hat_df = pd.DataFrame({}) orig_model_df = self.model_df for chromosome in loco_chromosomes: print(f"Generating predictions for chromosome {chromosome}.") loco_model_df = self.model_df.filter( ~f.col('header').rlike(f'^chr_{chromosome}_(alpha|block)')) self.model_df = loco_model_df loco_y_hat_df = self.transform(response) loco_y_hat_df['contigName'] = chromosome y_hat_df = y_hat_df.append(loco_y_hat_df) self.model_df = orig_model_df self.y_hat_df = y_hat_df.set_index('contigName', append=True) self._unpersist_model_cv_df() return self.y_hat_df
[docs] def fit_transform(self, response: str = 'linear') -> pd.DataFrame: """ Fits a logistic ridge regression model, then transforms the matrix using the model. Args: response : String specifying the desired output. Can be 'linear' to specify the direct output of the linear WGR model (default) or 'sigmoid' to specify predicted label probabilities. Returns: Pandas DataFrame containing prediction y_hat values. The shape and order match labeldf such that the rows are indexed by sample ID and the columns by label. The column types are float64. """ self.fit() return self.transform(response)
[docs] def fit_transform_loco(self, response: str = 'linear', chromosomes: List[str] = []) -> pd.DataFrame: """ Fits a logistic ridge regression model with a block matrix, then generates predictions for the target labels in the provided label DataFrame by applying the model resulting from the LogisticRidgeRegression fit method to the starting reduced block matrix using a leave-one-chromosome-out (LOCO) approach (this method caches the model and cross-validation DataFrames in the process for better performance). Args: response : String specifying the desired output. Can be 'linear' to specify the direct output of the linear WGR model (default) or 'sigmoid' to specify predicted label probabilities. chromosomes : List of chromosomes for which to generate a prediction (optional). If not provided, the chromosomes will be inferred from the block matrix. Returns: Pandas DataFrame containing prediction y_hat values per chromosome. The rows are indexed by sample ID and chromosome; the columns are indexed by label. The column types are float64. The DataFrame is sorted using chromosome as the primary sort key, and sample ID as the secondary sort key. """ self.fit() return self.transform_loco(response, chromosomes)