GloWGR

WGR functions

class glow.wgr.LogisticRegression(alphas=array([], dtype=float64))[source]

The LogisticRegression class is used to fit logistic regression models against one or more labels optimized over a provided list of ridge alpha parameters. The optimal ridge alpha value is chosen for each label by minimizing the average out of fold log_loss scores.

fit(blockdf, labeldf, sample_blocks, covdf=Empty DataFrame Columns: [] Index: [])[source]

Fits a logistic regression model, represented by a Spark DataFrame containing coefficients for each of the ridge alpha parameters, for each block in the starting matrix, for each label in the target labels, as well as a Spark DataFrame containing the optimal ridge alpha value for each label.

Parameters
  • blockdf (DataFrame) – Spark DataFrame representing the beginning block matrix X

  • labeldf (DataFrame) – Pandas DataFrame containing the target labels used in fitting the ridge models

  • sample_blocks (Dict[str, List[str]]) – Dict containing a mapping of sample_block ID to a list of corresponding sample IDs

  • covdf (DataFrame) – Pandas DataFrame containing covariates to be included in every model in the stacking ensemble (optional). The covariates should not include an explicit intercept term, as one will be added automatically. If empty, the intercept will be used as the only covariate.

Return type

(<class ‘pyspark.sql.dataframe.DataFrame’>, <class ‘pyspark.sql.dataframe.DataFrame’>)

Returns

Two Spark DataFrames, one containing the model resulting from the fitting routine and one containing the results of the cross validation procedure.

fit_transform(blockdf, labeldf, sample_blocks, covdf=Empty DataFrame Columns: [] Index: [], response='linear')[source]

Fits a logistic regression model with a block matrix, then transforms the matrix using the model.

Parameters
  • blockdf (DataFrame) – Spark DataFrame representing the beginning block matrix X

  • labeldf (DataFrame) – Pandas DataFrame containing the target labels used in fitting the ridge models

  • sample_blocks (Dict[str, List[str]]) – Dict containing a mapping of sample_block ID to a list of corresponding sample IDs

  • covdf (DataFrame) – Pandas DataFrame containing covariates to be included in every model in the stacking ensemble (optional). The covariates should not include an explicit intercept term, as one will be added automatically. Covariates will be ignored during the transformation step for a linear response.

  • response (str) – String specifying the desired output. Can be ‘linear’ to specify the direct output of the linear WGR model (default) or ‘sigmoid’ to specify predicted label probabilities.

Return type

DataFrame

Returns

Pandas DataFrame containing prediction y_hat values. The shape and order match labeldf such that the rows are indexed by sample ID and the columns by label. The column types are float64.

reduce_block_matrix(blockdf, labeldf, sample_blocks, modeldf, cvdf, covdf, response)[source]

Transforms a starting block matrix by applying a linear model. The form of the output can either be a direct linear transformation (response = “linear”) or a linear transformation followed by a sigmoid transformation (response = “sigmoid”).

Parameters
  • blockdf (DataFrame) – Spark DataFrame representing the beginning block matrix X

  • labeldf (DataFrame) – Pandas DataFrame containing the target labels used in fitting the ridge models

  • sample_blocks (Dict[str, List[str]]) – Dict containing a mapping of sample_block ID to a list of corresponding sample IDs

  • modeldf (DataFrame) – Spark DataFrame produced by the LogisticRegression fit method, representing the reducer model

  • cvdf (DataFrame) – Spark DataFrame produced by the LogisticRegression fit method, containing the results of the cross validation routine.

  • covdf (DataFrame) – Pandas DataFrame containing covariates to be included in every model in the stacking ensemble. The covariates should not include an explicit intercept term, as one will be added automatically. Covariates will be ignored for a linear response.

  • response (str) – String specifying what transformation to apply (“linear” or “sigmoid”)

Return type

DataFrame

Returns

Spark DataFrame containing the result of the transformation.

transform(blockdf, labeldf, sample_blocks, modeldf, cvdf, covdf=Empty DataFrame Columns: [] Index: [], response='linear')[source]

Generates GWAS covariates for the target labels in the provided label DataFrame by applying the model resulting from the LogisticRegression fit method to the starting block matrix.

Parameters
  • blockdf (DataFrame) – Spark DataFrame representing the beginning block matrix X

  • labeldf (DataFrame) – Pandas DataFrame containing the target labels used in fitting the ridge models

  • sample_blocks (Dict[str, List[str]]) – Dict containing a mapping of sample_block ID to a list of corresponding sample IDs

  • modeldf (DataFrame) – Spark DataFrame produced by the LogisticRegression fit method, representing the reducer model

  • cvdf (DataFrame) – Spark DataFrame produced by the LogisticRegression fit method, containing the results of the cross validation routine.

  • covdf (DataFrame) – Pandas DataFrame containing covariates to be included in every model in the stacking ensemble (optional). The covariates should not include an explicit intercept term, as one will be added automatically. Covariates will be ignored for a linear response.

  • response (str) – String specifying the desired output. Can be ‘linear’ to specify the direct output of the linear WGR model (default) or ‘sigmoid’ to specify predicted label probabilities.

Return type

DataFrame

Returns

Pandas DataFrame containing covariate values. The shape and order match labeldf such that the rows are indexed by sample ID and the columns by label. The column types are float64.

transform_loco(blockdf, labeldf, sample_blocks, modeldf, cvdf, covdf=Empty DataFrame Columns: [] Index: [], response='linear', chromosomes=[])[source]

Generates predictions for the target labels in the provided label DataFrame by applying the model resulting from the RidgeRegression fit method to the starting block matrix using a leave-one-chromosome-out (LOCO) approach.

Parameters
  • blockdf (DataFrame) – Spark DataFrame representing the beginning block matrix X

  • labeldf (DataFrame) – Pandas DataFrame containing the target labels used in fitting the ridge models

  • sample_blocks (Dict[str, List[str]]) – Dict containing a mapping of sample_block ID to a list of corresponding sample IDs

  • modeldf (DataFrame) – Spark DataFrame produced by the RidgeRegression fit method, representing the reducer model

  • cvdf (DataFrame) – Spark DataFrame produced by the RidgeRegression fit method, containing the results of the cross validation routine.

  • covdf (DataFrame) – covdf : Pandas DataFrame containing covariates to be included in every model in the stacking ensemble (optional). The covariates should not include an explicit intercept term, as one will be added automatically. Covariates will be ignored for a linear response.

  • response (str) – String specifying the desired output. Can be ‘linear’ to specify the direct output of the linear WGR model (default) or ‘sigmoid’ to specify predicted label probabilities.

  • chromosomes (List[str]) – List of chromosomes for which to generate a prediction (optional). If not provided, the chromosomes will be inferred from the block matrix.

Return type

DataFrame

Returns

Pandas DataFrame containing prediction y_hat values per chromosome. The rows are indexed by sample ID and chromosome; the columns are indexed by label. The column types are float64. The DataFrame is sorted using chromosome as the primary sort key, and sample ID as the secondary sort key.

class glow.wgr.RidgeReducer(alphas=array([], dtype=float64))[source]

The RidgeReducer class is intended to reduce the feature space of an N by M block matrix X to an N by P<<M block matrix. This is done by fitting K ridge models within each block of X on one or more target labels, such that a block with L columns to begin with will be reduced to a block with K columns, where each column is the prediction of one ridge model for one target label.

fit(blockdf, labeldf, sample_blocks, covdf=Empty DataFrame Columns: [] Index: [])[source]

Fits a ridge reducer model, represented by a Spark DataFrame containing coefficients for each of the ridge alpha parameters, for each block in the starting matrix, for each label in the target labels.

Parameters
  • blockdf (DataFrame) – Spark DataFrame representing the beginning block matrix X

  • labeldf (DataFrame) – Pandas DataFrame containing the target labels used in fitting the ridge models

  • sample_blocks (Dict[str, List[str]]) – Dict containing a mapping of sample_block ID to a list of corresponding sample IDs

  • covdf (DataFrame) – Pandas DataFrame containing covariates to be included in every model in the stacking ensemble (optional).

Return type

DataFrame

Returns

Spark DataFrame containing the model resulting from the fitting routine.

fit_transform(blockdf, labeldf, sample_blocks, covdf=Empty DataFrame Columns: [] Index: [])[source]

Fits a ridge reducer model with a block matrix, then transforms the matrix using the model.

Parameters
  • blockdf (DataFrame) – Spark DataFrame representing the beginning block matrix X

  • labeldf (DataFrame) – Pandas DataFrame containing the target labels used in fitting the ridge models

  • sample_blocks (Dict[str, List[str]]) – Dict containing a mapping of sample_block ID to a list of corresponding sample IDs

  • covdf (DataFrame) – Pandas DataFrame containing covariates to be included in every model in the stacking ensemble (optional).

Return type

DataFrame

Returns

Spark DataFrame representing the reduced block matrix

transform(blockdf, labeldf, sample_blocks, modeldf, covdf=Empty DataFrame Columns: [] Index: [])[source]

Transforms a starting block matrix to the reduced block matrix, using a reducer model produced by the RidgeReducer fit method.

Parameters
  • blockdf (DataFrame) – Spark DataFrame representing the beginning block matrix

  • labeldf (DataFrame) – Pandas DataFrame containing the target labels used in fitting the ridge models

  • sample_blocks (Dict[str, List[str]]) – Dict containing a mapping of sample_block ID to a list of corresponding sample IDs

  • modeldf (DataFrame) – Spark DataFrame produced by the RidgeReducer fit method, representing the reducer model

  • covdf (DataFrame) – Pandas DataFrame containing covariates to be included in every model in the stacking ensemble (optional).

Return type

DataFrame

Returns

Spark DataFrame representing the reduced block matrix

class glow.wgr.RidgeRegression(alphas=array([], dtype=float64))[source]

The RidgeRegression class is used to fit ridge models against one or more labels optimized over a provided list of ridge alpha parameters. It is similar in function to RidgeReducer except that whereas RidgeReducer attempts to reduce a starting matrix X to a block matrix of smaller dimension, RidgeRegression is intended to find an optimal model of the form Y_hat ~ XB, where Y_hat is a matrix of one or more predicted labels and B is a matrix of coefficients. The optimal ridge alpha value is chosen for each label by maximizing the average out of fold r2 score.

fit(blockdf, labeldf, sample_blocks, covdf=Empty DataFrame Columns: [] Index: [])[source]

Fits a ridge regression model, represented by a Spark DataFrame containing coefficients for each of the ridge alpha parameters, for each block in the starting matrix, for each label in the target labels, as well as a Spark DataFrame containing the optimal ridge alpha value for each label.

Parameters
  • blockdf (DataFrame) – Spark DataFrame representing the beginning block matrix X

  • labeldf (DataFrame) – Pandas DataFrame containing the target labels used in fitting the ridge models

  • sample_blocks (Dict[str, List[str]]) – Dict containing a mapping of sample_block ID to a list of corresponding sample IDs

  • covdf (DataFrame) – Pandas DataFrame containing covariates to be included in every model in the stacking ensemble (optional).

Return type

(<class ‘pyspark.sql.dataframe.DataFrame’>, <class ‘pyspark.sql.dataframe.DataFrame’>)

Returns

Two Spark DataFrames, one containing the model resulting from the fitting routine and one containing the results of the cross validation procedure.

fit_transform(blockdf, labeldf, sample_blocks, covdf=Empty DataFrame Columns: [] Index: [])[source]

Fits a ridge regression model with a block matrix, then transforms the matrix using the model.

Parameters
  • blockdf (DataFrame) – Spark DataFrame representing the beginning block matrix X

  • labeldf (DataFrame) – Pandas DataFrame containing the target labels used in fitting the ridge models

  • sample_blocks (Dict[str, List[str]]) – Dict containing a mapping of sample_block ID to a list of corresponding sample IDs

  • covdf (DataFrame) – Pandas DataFrame containing covariates to be included in every model in the stacking ensemble (optional).

Return type

DataFrame

Returns

Pandas DataFrame containing prediction y_hat values. The shape and order match labeldf such that the rows are indexed by sample ID and the columns by label. The column types are float64.

transform(blockdf, labeldf, sample_blocks, modeldf, cvdf, covdf=Empty DataFrame Columns: [] Index: [])[source]

Generates predictions for the target labels in the provided label DataFrame by applying the model resulting from the RidgeRegression fit method to the starting block matrix.

Parameters
  • blockdf (DataFrame) – Spark DataFrame representing the beginning block matrix X

  • labeldf (DataFrame) – Pandas DataFrame containing the target labels used in fitting the ridge models

  • sample_blocks (Dict[str, List[str]]) – Dict containing a mapping of sample_block ID to a list of corresponding sample IDs

  • modeldf (DataFrame) – Spark DataFrame produced by the RidgeRegression fit method, representing the reducer model

  • cvdf (DataFrame) – Spark DataFrame produced by the RidgeRegression fit method, containing the results of the cross validation routine.

  • covdf (DataFrame) – Pandas DataFrame containing covariates to be included in every model in the stacking ensemble (optional).

Return type

DataFrame

Returns

Pandas DataFrame containing prediction y_hat values. The shape and order match labeldf such that the rows are indexed by sample ID and the columns by label. The column types are float64.

transform_loco(blockdf, labeldf, sample_blocks, modeldf, cvdf, covdf=Empty DataFrame Columns: [] Index: [], chromosomes=[])[source]

Generates predictions for the target labels in the provided label DataFrame by applying the model resulting from the RidgeRegression fit method to the starting block matrix using a leave-one-chromosome-out (LOCO) approach.

Parameters
  • blockdf (DataFrame) – Spark DataFrame representing the beginning block matrix X

  • labeldf (DataFrame) – Pandas DataFrame containing the target labels used in fitting the ridge models

  • sample_blocks (Dict[str, List[str]]) – Dict containing a mapping of sample_block ID to a list of corresponding sample IDs

  • modeldf (DataFrame) – Spark DataFrame produced by the RidgeRegression fit method, representing the reducer model

  • cvdf (DataFrame) – Spark DataFrame produced by the RidgeRegression fit method, containing the results of the cross validation routine.

  • covdf (DataFrame) – Pandas DataFrame containing covariates to be included in every model in the stacking ensemble (optional).

  • chromosomes (List[str]) – List of chromosomes for which to generate a prediction (optional). If not provided, the chromosomes will be inferred from the block matrix.

Return type

DataFrame

Returns

Pandas DataFrame containing prediction y_hat values per chromosome. The rows are indexed by sample ID and chromosome; the columns are indexed by label. The column types are float64. The DataFrame is sorted using chromosome as the primary sort key, and sample ID as the secondary sort key.

glow.wgr.block_variants_and_samples(variant_df, sample_ids, variants_per_block, sample_block_count)[source]

Creates a blocked GT matrix and index mapping from sample blocks to a list of corresponding sample IDs. Uses the same sample-blocking logic as the blocked GT matrix transformer.

Requires that:

  • Each variant row has the same number of values

  • The number of values per row matches the number of sample IDs

Parameters
  • variant_df (DataFrame) – The variant DataFrame

  • sample_ids (List[str]) – The list of sample ID strings

  • variants_per_block (int) – The number of variants per block

  • sample_block_count (int) – The number of sample blocks

Return type

(<class ‘pyspark.sql.dataframe.DataFrame’>, typing.Dict[str, typing.List[str]])

Returns

tuple of (blocked GT matrix, index mapping)

glow.wgr.get_sample_ids(data)[source]

Extracts sample IDs from a variant DataFrame, such as one read from PLINK files.

Requires that the sample IDs:

  • Are in genotype.sampleId

  • Are the same across all the variant rows

  • Are a list of strings

  • Are non-empty

  • Are unique

Parameters

data (DataFrame) – The variant DataFrame containing sample IDs

Return type

List[str]

Returns

list of sample ID strings

glow.wgr.reshape_for_gwas(spark, label_df)[source]

Reshapes a Pandas DataFrame into a Spark DataFrame with a convenient format for Glow’s GWAS functions. This function can handle labels that are either per-sample or per-sample and per-contig, like those generated by GloWGR’s transform_loco function.

Examples

>>> label_df = pd.DataFrame({'label1': [1, 2], 'label2': [3, 4]}, index=['sample1', 'sample2'])
>>> reshaped = reshape_for_gwas(spark, label_df)
>>> reshaped.head()
Row(label='label1', values=[1, 2])
>>> loco_label_df = pd.DataFrame({'label1': [1, 2], 'label2': [3, 4]},
...     index=pd.MultiIndex.from_tuples([('sample1', 'chr1'), ('sample1', 'chr2')]))
>>> reshaped = reshape_for_gwas(spark, loco_label_df)
>>> reshaped.head()
Row(contigName='chr1', label='label1', values=[1])

Requires that:

  • The input label DataFrame is indexed by sample id or by (sample id, contig name)

Parameters
  • spark (SparkSession) – A Spark session

  • label_df (DataFrame) – A pandas DataFrame containing labels. The Data Frame should either be indexed by sample id or multi indexed by (sample id, contig name). Each column is interpreted as a label.

Return type

DataFrame

Returns

A Spark DataFrame with a convenient format for Glow regression functions. Each row contains the label name, the contig name if provided in the input DataFrame, and an array containing the label value for each sample.

GWAS functions

glow.gwas.linear_regression(genotype_df, phenotype_df, covariate_df=Empty DataFrame Columns: [] Index: [], offset_df=Empty DataFrame Columns: [] Index: [], fit_intercept=True, values_column='values', dt=<class 'numpy.float64'>)[source]

Uses linear regression to test for association between genotypes and one or more phenotypes. The implementation is a distributed version of the method used in regenie: https://www.biorxiv.org/content/10.1101/2020.06.19.162354v2

Implementation details:

On the driver node, we decompose the covariate matrix into an orthonormal basis and use it to project the covariates out of the phenotype matrix. The orthonormal basis and the phenotype residuals are broadcast as part of a Pandas UDF. In each Spark task, we project the covariates out of a block of genotypes and then compute the regression statistics for each phenotype, taking into account the distinct missingness patterns of each phenotype.

Examples

>>> np.random.seed(42)
>>> n_samples, n_phenotypes, n_covariates = (710, 3, 3)
>>> phenotype_df = pd.DataFrame(np.random.random((n_samples, n_phenotypes)), columns=['p1', 'p2', 'p3'])
>>> covariate_df = pd.DataFrame(np.random.random((n_samples, n_phenotypes)))
>>> genotype_df = (spark.read.format('vcf').load('test-data/1kg_sample.vcf')
... .select('contigName', 'start', 'genotypes'))
>>> results = glow.gwas.linear_regression(genotype_df, phenotype_df, covariate_df,
... values_column=glow.genotype_states('genotypes'))
>>> results.head() 
Row(contigName='1', start=904164, effect=0.0453..., stderror=0.0214..., tvalue=2.114..., pvalue=0.0348..., phenotype='p1')
>>> phenotype_df = pd.DataFrame(np.random.random((n_samples, n_phenotypes)), columns=['p1', 'p2', 'p3'])
>>> covariate_df = pd.DataFrame(np.random.random((n_samples, n_phenotypes)))
>>> genotype_df = (spark.read.format('vcf').load('test-data/1kg_sample.vcf')
... .select('contigName', 'start', 'genotypes'))
>>> contigs = ['1', '2', '3']
>>> offset_index = pd.MultiIndex.from_product([phenotype_df.index, contigs])
>>> offset_df = pd.DataFrame(np.random.random((n_samples * len(contigs), n_phenotypes)),
... index=offset_index, columns=phenotype_df.columns)
>>> results = glow.gwas.linear_regression(genotype_df, phenotype_df, covariate_df,
... offset_df=offset_df, values_column=glow.genotype_states('genotypes'))
Parameters
  • genotype_df (DataFrame) – Spark DataFrame containing genomic data

  • phenotype_df (DataFrame) – Pandas DataFrame containing phenotypic data

  • covariate_df (DataFrame) – An optional Pandas DataFrame containing covariates

  • offset_df (DataFrame) – An optional Pandas DataFrame containing the phenotype offset. The actual phenotype used for linear regression is phenotype_df minus the appropriate offset. The offset_df may have one or two levels of indexing. If one level, the index should be the same as the phenotype_df. If two levels, the level 0 index should be the same as the phenotype_df, and the level 1 index should be the contig name. The two level index scheme allows for per-contig offsets like LOCO predictions from GloWGR.

  • fit_intercept (bool) – Whether or not to add an intercept column to the covariate DataFrame

  • values_column (Union[str, Column]) – A column name or column expression to test with linear regression. If a column name is provided, genotype_df should have a column with this name and a numeric array type. If a column expression is provided, the expression should return a numeric array type.

  • dt (type) – The numpy datatype to use in the linear regression test. Must be np.float32 or np.float64.

Return type

DataFrame

Returns

A Spark DataFrame that contains

  • All columns from genotype_df except the values_column and the genotypes column if one exists

  • effect: The effect size estimate for the genotype

  • stderror: The estimated standard error of the effect

  • tvalue: The T statistic

  • pvalue: P value estimated from a two sided T-test

  • phenotype: The phenotype name as determined by the column names of phenotype_df