# Copyright 2019 The Glow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from .logistic_udfs import *
from .ridge_reduction import RidgeReduction
from .model_functions import _prepare_labels_and_warn, _prepare_covariates, _check_model, _check_cv, _is_binary
from nptyping import Float, NDArray
import pandas as pd
from pyspark.sql import DataFrame
from pyspark.sql.functions import pandas_udf, PandasUDFType
import pyspark.sql.functions as f
from typeguard import typechecked
from typing import Any, Dict, List
import warnings
from glow.logging import record_hls_event
__all__ = ['LogisticRidgeRegression']
[docs]@typechecked
class LogisticRidgeRegression:
"""
The LogisticRidgeRegression class is used to fit logistic ridge regression models against one or more labels optimized over a
provided list of ridge alpha parameters. The optimal ridge alpha value is chosen for each label by minimizing the
average out of fold log_loss scores.
"""
def __init__(self,
reduced_block_df: DataFrame,
label_df: pd.DataFrame,
sample_blocks: Dict[str, List[str]],
cov_df: pd.DataFrame = pd.DataFrame({}),
add_intercept: bool = True,
alphas: List[float] = []) -> None:
"""
Args:
reduced_block_df : Spark DataFrame representing the reduced block matrix generated by
RidgeReduction
label_df : Pandas DataFrame containing the target labels used in fitting the ridge models
sample_blocks : Dict containing a mapping of sample_block ID to a list of corresponding sample IDs
cov_df : Pandas DataFrame containing covariates to be included in every model in the stacking
ensemble (optional).
add_intercept: If True, an intercept column (all ones) will be added to the covariates
(as the first column)
ridge_reduced: RidgeReduction object containing level 0 reduction data
alphas : array_like of alpha values used in the ridge regression (optional).
"""
self.reduced_block_df = reduced_block_df
self.sample_blocks = sample_blocks
self.set_label_df(label_df)
self.set_cov_df(cov_df, add_intercept)
self.set_alphas(alphas)
self.model_df = None
self.cv_df = None
self.y_hat_df = None
[docs] @classmethod
def from_ridge_reduction(cls, ridge_reduced: RidgeReduction, alphas: List[float] = []):
"""
Initializes an instance of LogsiticRidgeRegression using a RidgeReduction object
Args:
ridge_reduced : A RidgeReduction instance based on which the LogisticRidgeRegression instance must be made
alphas : array_like of alpha values used in the logistic ridge regression (optional).
"""
obj = cls.__new__(cls)
obj.reduced_block_df = ridge_reduced.reduced_block_df
obj.sample_blocks = ridge_reduced.sample_blocks
obj._label_df = ridge_reduced.get_label_df()
obj._cov_df = ridge_reduced.get_cov_df()
obj._std_cov_df = ridge_reduced._std_cov_df
obj.set_alphas(alphas)
obj.model_df = None
obj.cv_df = None
obj.y_hat_df = None
return obj
def __getstate__(self):
# Copy the object's state from self.__dict__ which contains
state = self.__dict__.copy()
# Remove the unpicklable entries.
del state['reduced_block_df'], state['model_df'], state['cv_df']
return state
def set_label_df(self, label_df: pd.DataFrame) -> None:
_prepare_labels_and_warn(label_df, _is_binary(label_df), 'binary')
self._label_df = label_df
def get_label_df(self) -> pd.DataFrame:
return self._label_df
def set_cov_df(self, cov_df: pd.DataFrame, add_intercept: bool) -> None:
self._cov_df = cov_df
self._std_cov_df = _prepare_covariates(cov_df, self._label_df, add_intercept)
def get_cov_df(self) -> pd.DataFrame:
return self._cov_df
def set_alphas(self, alphas: List[float]) -> None:
self._alphas = generate_alphas(
self.reduced_block_df) if len(alphas) == 0 else create_alpha_dict(alphas)
def get_alphas(self) -> Dict[str, Float]:
return self._alphas
def _cache_model_cv_df(self) -> None:
_check_model(self.model_df)
_check_cv(self.cv_df)
self.model_df.cache()
self.cv_df.cache()
def _unpersist_model_cv_df(self) -> None:
_check_model(self.model_df)
_check_cv(self.cv_df)
self.model_df.unpersist()
self.cv_df.unpersist()
[docs] def fit(self) -> (DataFrame, DataFrame):
"""
Fits a logistic regression model, represented by a Spark DataFrame containing coefficients for each of the ridge
alpha parameters, for each block in the reduced block matrix, for each label in the target labels, as well as a
Spark DataFrame containing the optimal ridge alpha value for each label.
Returns:
Two Spark DataFrames, one containing the model resulting from the fitting routine and one containing the
results of the cross validation procedure.
"""
map_key_pattern = ['sample_block', 'label', 'alpha_name']
reduce_key_pattern = ['header_block', 'header', 'label', 'alpha_name']
model_key_pattern = ['sample_block', 'label', 'alpha_name']
score_key_pattern = ['sample_block', 'label']
metric = 'log_loss'
maskdf = pd.DataFrame(data=np.where(np.isnan(self._label_df), False, True),
columns=self._label_df.columns,
index=self._label_df.index)
beta_cov_dict = {}
for label in self._label_df:
if self._std_cov_df.empty:
beta_cov_dict[label] = np.array([])
else:
row_mask = slice_label_rows(maskdf, label, list(self._label_df.index),
np.array([])).ravel()
cov_mat = slice_label_rows(self._std_cov_df, 'all', list(self._label_df.index),
row_mask)
y = slice_label_rows(self._label_df, label, list(self._label_df.index),
row_mask).ravel()
fit_result = constrained_logistic_fit(cov_mat,
y,
np.zeros(cov_mat.shape[1]),
guess=np.array([]),
n_cov=0)
beta_cov_dict[label] = fit_result.x
map_udf = pandas_udf(
lambda key, pdf: map_irls_eqn(key, map_key_pattern, pdf, self._label_df, self.
sample_blocks, self._std_cov_df, beta_cov_dict, maskdf,
self._alphas), irls_eqn_struct, PandasUDFType.GROUPED_MAP)
reduce_udf = pandas_udf(lambda key, pdf: reduce_irls_eqn(key, reduce_key_pattern, pdf),
irls_eqn_struct, PandasUDFType.GROUPED_MAP)
model_udf = pandas_udf(
lambda key, pdf: solve_irls_eqn(key, model_key_pattern, pdf, self._label_df, self.
_alphas, self._std_cov_df), model_struct,
PandasUDFType.GROUPED_MAP)
score_udf = pandas_udf(
lambda key, pdf: score_models(key, score_key_pattern, pdf, self._label_df, self.
sample_blocks, self._alphas, self._std_cov_df, maskdf,
metric), cv_struct, PandasUDFType.GROUPED_MAP)
self.model_df = self.reduced_block_df.drop('alpha') \
.withColumn('alpha_name', f.explode(f.array([f.lit(n) for n in self._alphas.keys()]))) \
.groupBy(map_key_pattern) \
.apply(map_udf) \
.groupBy(reduce_key_pattern) \
.apply(reduce_udf) \
.groupBy(model_key_pattern) \
.apply(model_udf) \
.withColumn('alpha_label_coef', f.expr('struct(alphas[0] AS alpha, labels[0] AS label, coefficients[0] AS coefficient)')) \
.withColumn('alpha_label_coef_label', f.expr('labels[0]')) \
.groupBy('header_block', 'sample_block', 'header', 'sort_key', 'alpha_label_coef_label') \
.agg(f.sort_array(f.collect_list('alpha_label_coef')).alias('alphas_labels_coefs')) \
.selectExpr('*', 'alphas_labels_coefs.alpha AS alphas', 'alphas_labels_coefs.label AS labels', 'alphas_labels_coefs.coefficient AS coefficients') \
.drop('alphas_labels_coefs', 'label')
self.cv_df = cross_validation(self.reduced_block_df, self.model_df, score_udf,
score_key_pattern, self._alphas, metric)
record_hls_event('wgrLogisticRegressionFit')
return self.model_df, self.cv_df
[docs] def reduce_block_matrix(self, response: str) -> DataFrame:
"""
Transforms a starting reduced block matrix by applying a linear model. The form of the output
can either be a direct linear transformation (response = "linear") or a linear transformation followed by a
sigmoid transformation (response = "sigmoid").
Args:
response : String specifying what transformation to apply ("linear" or "sigmoid")
Returns:
Spark DataFrame containing the result of the transformation.
"""
transform_key_pattern = ['sample_block', 'label']
if response == 'linear':
warnings.warn('Ignoring any covariates for linear response')
transform_udf = pandas_udf(
lambda key, pdf: apply_model(key, transform_key_pattern, pdf, self._label_df, self.
sample_blocks, self._alphas, pd.DataFrame({})),
reduced_matrix_struct, PandasUDFType.GROUPED_MAP)
join_type = 'inner'
elif response == 'sigmoid':
transform_udf = pandas_udf(
lambda key, pdf: apply_logistic_model(
key, transform_key_pattern, pdf, self._label_df, self.sample_blocks, self.
_alphas, self._std_cov_df), logistic_reduced_matrix_struct,
PandasUDFType.GROUPED_MAP)
join_type = 'right'
else:
raise ValueError(f'response must be either "linear" or "sigmoid", received "{response}"')
return apply_model_df(self.reduced_block_df, self.model_df, self.cv_df, transform_udf,
transform_key_pattern, join_type)