Utility Functions
Glow includes a variety of utility functions for performing basic data manipulation.
Struct transformations
Glow’s struct transformation functions change the schema structure of the DataFrame. These transformations integrate with functions whose parameter structs require a certain schema.
subset_struct
: subset fields from a struct
from pyspark.sql import Row
row_one = Row(Row(str_col='foo', int_col=1, bool_col=True))
row_two = Row(Row(str_col='bar', int_col=2, bool_col=False))
base_df = spark.createDataFrame([row_one, row_two], schema=['base_col'])
subsetted_df = base_df.select(glow.subset_struct('base_col', 'str_col', 'bool_col').alias('subsetted_col'))
add_struct_fields
: append fields to a struct
from pyspark.sql.functions import lit, reverse
added_df = base_df.select(glow.add_struct_fields('base_col', lit('float_col'), lit(3.14), lit('rev_str_col'), reverse(base_df.base_col.str_col)).alias('added_col'))
expand_struct
: explode a struct into columns
expanded_df = base_df.select(glow.expand_struct('base_col'))
Spark ML transformations
Glow supports transformations between double arrays and Spark ML vectors for integration with machine learning libraries such as Spark’s machine learning library (MLlib).
array_to_dense_vector
: transform from an array to a dense vector
array_df = spark.createDataFrame([Row([1.0, 2.0, 3.0]), Row([4.1, 5.1, 6.1])], schema=['array_col'])
dense_df = array_df.select(glow.array_to_dense_vector('array_col').alias('dense_vector_col'))
array_to_sparse_vector
: transform from an array to a sparse vector
sparse_df = array_df.select(glow.array_to_sparse_vector('array_col').alias('sparse_vector_col'))
vector_to_array
: transform from a vector to a double array
from pyspark.ml.linalg import SparseVector
row_one = Row(vector_col=SparseVector(3, [0, 2], [1.0, 3.0]))
row_two = Row(vector_col=SparseVector(3, [1], [1.0]))
vector_df = spark.createDataFrame([row_one, row_two])
array_df = vector_df.select(glow.vector_to_array('vector_col').alias('array_col'))
explode_matrix
: explode a Spark ML matrix such that each row becomes an array of doubles
from pyspark.ml.linalg import DenseMatrix
matrix_df = spark.createDataFrame(Row([DenseMatrix(2, 3, range(6))]), schema=['matrix_col'])
array_df = matrix_df.select(glow.explode_matrix('matrix_col').alias('array_col'))
Variant data transformations
Glow supports numeric transformations on variant data for downstream calculations, such as GWAS.
genotype_states
: create a numeric representation for each sample’s genotype data. This calculates the sum of the calls (or-1
if any calls are missing); the sum is equivalent to the number of alternate alleles for biallelic variants.
from pyspark.sql.types import *
missing_and_hom_ref = Row([Row(calls=[-1,0]), Row(calls=[0,0])])
het_and_hom_alt = Row([Row(calls=[0,1]), Row(calls=[1,1])])
calls_schema = StructField('calls', ArrayType(IntegerType()))
genotypes_schema = StructField('genotypes_col', ArrayType(StructType([calls_schema])))
genotypes_df = spark.createDataFrame([missing_and_hom_ref, het_and_hom_alt], StructType([genotypes_schema]))
num_alt_alleles_df = genotypes_df.select(glow.genotype_states('genotypes_col').alias('num_alt_alleles_col'))
hard_calls
: get hard calls from genotype probabilities. These are determined based on the number of alternate alleles for the variant, whether the probabilities are phased (true for haplotypes and false for genotypes), and a call threshold (if not provided, this defaults to0.9
). If no calls have a probability above the threshold, the call is set to-1
.
unphased_above_threshold = Row(probabilities=[0.0, 0.0, 0.0, 1.0, 0.0, 0.0], num_alts=2, phased=False)
phased_below_threshold = Row(probabilities=[0.1, 0.9, 0.8, 0.2], num_alts=1, phased=True)
uncalled_df = spark.createDataFrame([unphased_above_threshold, phased_below_threshold])
hard_calls_df = uncalled_df.select(glow.hard_calls('probabilities', 'num_alts', 'phased', 0.95).alias('calls'))
mean_substitute
: substitutes the missing values of a numeric array using the mean of the non-missing values. Any values that are NaN, null or equal to the missing value parameter are considered missing. If all values are missing, they are substituted with the missing value. If the missing value is not provided, this defaults to-1
.
unsubstituted_row = Row(unsubstituted_values=[float('nan'), None, -1.0, 0.0, 1.0, 2.0, 3.0])
unsubstituted_df = spark.createDataFrame([unsubstituted_row])
substituted_df = unsubstituted_df.select(glow.mean_substitute('unsubstituted_values', lit(-1.0)).alias('substituted_values'))