PySpark Functions
Glow includes a number of functions that operate on PySpark columns. These functions are interoperable with functions provided by PySpark or other libraries.
- glow.add_struct_fields(struct, *fields)[source]
Adds fields to a struct.
Added in version 0.3.0.
Examples
>>> df = spark.createDataFrame([Row(struct=Row(a=1))]) >>> df.select(glow.add_struct_fields('struct', lit('b'), lit(2)).alias('struct')).collect() [Row(struct=Row(a=1, b=2))]
- Parameters
- Return type
Column- Returns
A struct consisting of the input struct and the added fields
- glow.array_summary_stats(arr)[source]
Computes the minimum, maximum, mean, standard deviation for an array of numerics.
Added in version 0.3.0.
Examples
>>> df = spark.createDataFrame([Row(arr=[1, 2, 3])]) >>> df.select(glow.expand_struct(glow.array_summary_stats('arr'))).collect() [Row(mean=2.0, stdDev=1.0, min=1.0, max=3.0)]
- glow.array_to_dense_vector(arr)[source]
Converts an array of numerics into a
spark.mlDenseVector.Added in version 0.3.0.
Examples
>>> from pyspark.ml.linalg import DenseVector >>> df = spark.createDataFrame([Row(arr=[1, 2, 3])]) >>> df.select(glow.array_to_dense_vector('arr').alias('v')).collect() [Row(v=DenseVector([1.0, 2.0, 3.0]))]
- glow.array_to_sparse_vector(arr)[source]
Converts an array of numerics into a
spark.mlSparseVector.Added in version 0.3.0.
Examples
>>> from pyspark.ml.linalg import SparseVector >>> df = spark.createDataFrame([Row(arr=[1, 0, 2, 0, 3, 0])]) >>> df.select(glow.array_to_sparse_vector('arr').alias('v')).collect() [Row(v=SparseVector(6, {0: 1.0, 2: 2.0, 4: 3.0}))]
- glow.call_summary_stats(genotypes)[source]
Computes call summary statistics for an array of genotype structs. See Variant Quality Control for more details.
Added in version 0.3.0.
Examples
>>> schema = 'genotypes: array<struct<calls: array<int>>>' >>> df = spark.createDataFrame([Row(genotypes=[Row(calls=[0, 0]), Row(calls=[1, 0]), Row(calls=[1, 1])])], schema) >>> df.select(glow.expand_struct(glow.call_summary_stats('genotypes'))).collect() [Row(callRate=1.0, nCalled=3, nUncalled=0, nHet=1, nHomozygous=[1, 1], nNonRef=2, nAllelesCalled=6, alleleCounts=[3, 3], alleleFrequencies=[0.5, 0.5])]
- Parameters
genotypes (
Union[Column,str]) – The array of genotype structs withcallsfield- Return type
Column- Returns
A struct containing
callRate,nCalled,nUncalled,nHet,nHomozygous,nNonRef,nAllelesCalled,alleleCounts,alleleFrequenciesfields. See Variant Quality Control.
- glow.dp_summary_stats(genotypes)[source]
Computes summary statistics for the depth field from an array of genotype structs. See Variant Quality Control.
Added in version 0.3.0.
Examples
>>> df = spark.createDataFrame([Row(genotypes=[Row(depth=1), Row(depth=2), Row(depth=3)])], 'genotypes: array<struct<depth: int>>') >>> df.select(glow.expand_struct(glow.dp_summary_stats('genotypes'))).collect() [Row(mean=2.0, stdDev=1.0, min=1.0, max=3.0)]
- glow.expand_struct(struct)[source]
Promotes fields of a nested struct to top-level columns similar to using
struct.*from SQL, but can be used in more contexts.Added in version 0.3.0.
Examples
>>> df = spark.createDataFrame([Row(struct=Row(a=1, b=2))]) >>> df.select(glow.expand_struct(col('struct'))).collect() [Row(a=1, b=2)]
- glow.explode_matrix(matrix)[source]
Explodes a
spark.mlMatrix(sparse or dense) into multiple arrays, one per row of the matrix.Added in version 0.3.0.
Examples
>>> from pyspark.ml.linalg import DenseMatrix >>> m = DenseMatrix(numRows=3, numCols=2, values=[1, 2, 3, 4, 5, 6]) >>> df = spark.createDataFrame([Row(matrix=m)]) >>> df.select(glow.explode_matrix('matrix').alias('row')).collect() [Row(row=[1.0, 4.0]), Row(row=[2.0, 5.0]), Row(row=[3.0, 6.0])]
- glow.genotype_states(genotypes)[source]
Gets the number of alternate alleles for an array of genotype structs. Returns
-1if there are any-1s (no-calls) in the calls array.Added in version 0.3.0.
Examples
>>> genotypes = [ ... Row(calls=[1, 1]), ... Row(calls=[1, 0]), ... Row(calls=[0, 0]), ... Row(calls=[-1, -1])] >>> df = spark.createDataFrame([Row(genotypes=genotypes)], 'genotypes: array<struct<calls: array<int>>>') >>> df.select(glow.genotype_states('genotypes').alias('states')).collect() [Row(states=[2, 1, 0, -1])]
- glow.gq_summary_stats(genotypes)[source]
Computes summary statistics about the genotype quality field for an array of genotype structs. See Variant Quality Control.
Added in version 0.3.0.
Examples
>>> genotypes = [ ... Row(conditionalQuality=1), ... Row(conditionalQuality=2), ... Row(conditionalQuality=3)] >>> df = spark.createDataFrame([Row(genotypes=genotypes)], 'genotypes: array<struct<conditionalQuality: int>>') >>> df.select(glow.expand_struct(glow.gq_summary_stats('genotypes'))).collect() [Row(mean=2.0, stdDev=1.0, min=1.0, max=3.0)]
- glow.hard_calls(probabilities, numAlts, phased, threshold=None)[source]
Converts an array of probabilities to hard calls. The probabilities are assumed to be diploid. See Variant data transformations for more details.
Added in version 0.3.0.
Examples
>>> df = spark.createDataFrame([Row(probs=[0.95, 0.05, 0.0])]) >>> df.select(glow.hard_calls('probs', numAlts=lit(1), phased=lit(False)).alias('calls')).collect() [Row(calls=[0, 0])] >>> df = spark.createDataFrame([Row(probs=[0.05, 0.95, 0.0])]) >>> df.select(glow.hard_calls('probs', numAlts=lit(1), phased=lit(False)).alias('calls')).collect() [Row(calls=[0, 1])] >>> # Use the threshold parameter to change the minimum probability required for a call >>> df = spark.createDataFrame([Row(probs=[0.05, 0.95, 0.0])]) >>> df.select(glow.hard_calls('probs', numAlts=lit(1), phased=lit(False), threshold=0.99).alias('calls')).collect() [Row(calls=[-1, -1])]
- Parameters
probabilities (
Union[Column,str]) – The array of probabilities to convertnumAlts (
Union[Column,str]) – The number of alternate allelesphased (
Union[Column,str]) – Whether the probabilities are phased. If phased, we expect one2 * numAltsvalues in the probabilities array. If unphased, we expect one probability per possible genotype.threshold (
Optional[float]) – The minimum probability to make a call. If no probability falls into the range of[0, 1 - threshold]or[threshold, 1], a no-call (represented by-1s) will be emitted. If not provided, this parameter defaults to0.9.
- Return type
Column- Returns
An array of hard calls
- glow.hardy_weinberg(genotypes)[source]
Computes statistics relating to the Hardy Weinberg equilibrium. See Variant Quality Control for more details.
Added in version 0.3.0.
Examples
>>> genotypes = [ ... Row(calls=[1, 1]), ... Row(calls=[1, 0]), ... Row(calls=[0, 0])] >>> df = spark.createDataFrame([Row(genotypes=genotypes)], 'genotypes: array<struct<calls: array<int>>>') >>> df.select(glow.expand_struct(glow.hardy_weinberg('genotypes'))).collect() [Row(hetFreqHwe=0.6, pValueHwe=0.7)]
- glow.lift_over_coordinates(contigName, start, end, chainFile, minMatchRatio=None)[source]
Performs liftover for the coordinates of a variant. To perform liftover of alleles and add additional metadata, see Liftover.
Added in version 0.3.0.
Examples
>>> df = spark.read.format('vcf').load('test-data/liftover/unlifted.test.vcf').where('start = 18210071') >>> chain_file = 'test-data/liftover/hg38ToHg19.over.chain.gz' >>> reference_file = 'test-data/liftover/hg19.chr20.fa.gz' >>> df.select('contigName', 'start', 'end').head() Row(contigName='chr20', start=18210071, end=18210072) >>> lifted_df = df.select(glow.expand_struct(glow.lift_over_coordinates('contigName', 'start', 'end', chain_file))) >>> lifted_df.head() Row(contigName='chr20', start=18190715, end=18190716)
- Parameters
- Return type
Column- Returns
A struct containing
contigName,start, andendfields after liftover
- glow.linear_regression_gwas(genotypes, phenotypes, covariates)[source]
Performs a linear regression association test optimized for performance in a GWAS setting. See Linear regression for details.
Added in version 0.3.0.
Examples
>>> from pyspark.ml.linalg import DenseMatrix >>> phenotypes = [2, 3, 4] >>> genotypes = [0, 1, 2] >>> covariates = DenseMatrix(numRows=3, numCols=1, values=[1, 1, 1]) >>> df = spark.createDataFrame([Row(genotypes=genotypes, phenotypes=phenotypes, covariates=covariates)]) >>> df.select(glow.expand_struct(glow.linear_regression_gwas('genotypes', 'phenotypes', 'covariates'))).collect() [Row(beta=0.9999999999999998, standardError=1.4901161193847656e-08, pValue=9.486373847239922e-09)]
- Parameters
- Return type
Column- Returns
A struct containing
beta,standardError, andpValuefields. See Linear regression.
- glow.logistic_regression_gwas(genotypes, phenotypes, covariates, test, offset=None)[source]
Performs a logistic regression association test optimized for performance in a GWAS setting. See Logistic regression for more details.
Added in version 0.3.0.
Examples
>>> from pyspark.ml.linalg import DenseMatrix >>> phenotypes = [1, 0, 0, 1, 1] >>> genotypes = [0, 0, 1, 2, 2] >>> covariates = DenseMatrix(numRows=5, numCols=1, values=[1, 1, 1, 1, 1]) >>> offset = [1, 0, 1, 0, 1] >>> df = spark.createDataFrame([Row(genotypes=genotypes, phenotypes=phenotypes, covariates=covariates, offset=offset)]) >>> df.select(glow.expand_struct(glow.logistic_regression_gwas('genotypes', 'phenotypes', 'covariates', 'Firth'))).collect() [Row(beta=0.7418937644793101, oddsRatio=2.09990848346903, waldConfidenceInterval=[0.2509874689201784, 17.569066925598555], pValue=0.3952193664793294)] >>> df.select(glow.expand_struct(glow.logistic_regression_gwas('genotypes', 'phenotypes', 'covariates', 'LRT'))).collect() [Row(beta=1.1658962684583645, oddsRatio=3.208797538802116, waldConfidenceInterval=[0.29709600522888285, 34.65674887513274], pValue=0.2943946848756769)] >>> df.select(glow.expand_struct(glow.logistic_regression_gwas('genotypes', 'phenotypes', 'covariates', 'Firth', 'offset'))).collect() [Row(beta=0.8024832156793392, oddsRatio=2.231074294047771, waldConfidenceInterval=[0.2540891981649045, 19.590334974925725], pValue=0.3754070658316332)] >>> df.select(glow.expand_struct(glow.logistic_regression_gwas('genotypes', 'phenotypes', 'covariates', 'LRT', 'offset'))).collect() [Row(beta=1.1996041727573317, oddsRatio=3.3188029900720117, waldConfidenceInterval=[0.3071189078535928, 35.863807161497334], pValue=0.2857137988674153)]
- Parameters
genotypes (
Union[Column,str]) – An numeric array of genotypesphenotypes (
Union[Column,str]) – A double array of phenotype valuescovariates (
Union[Column,str]) – Aspark.mlMatrixof covariatestest (
str) – Which logistic regression test to use. Can beLRTorFirthoffset (
Union[Column,str,None]) – An optional double array of offset values. The offset vector is added with coefficient 1 to the linear predictor term X*b.
- Return type
Column- Returns
A struct containing
beta,oddsRatio,waldConfidenceInterval, andpValuefields. See Logistic regression.
- glow.mean_substitute(array, missingValue=None)[source]
Substitutes the missing values of a numeric array using the mean of the non-missing values. Any values that are NaN, null or equal to the missing value parameter are considered missing. See Variant data transformations for more details.
Added in version 0.4.0.
Examples
>>> df = spark.createDataFrame([Row(unsubstituted_values=[float('nan'), None, 0.0, 1.0, 2.0, 3.0, 4.0])]) >>> df.select(glow.mean_substitute('unsubstituted_values', lit(0.0)).alias('substituted_values')).collect() [Row(substituted_values=[2.5, 2.5, 2.5, 1.0, 2.0, 3.0, 4.0])] >>> df = spark.createDataFrame([Row(unsubstituted_values=[0, 1, 2, 3, -1, None])]) >>> df.select(glow.mean_substitute('unsubstituted_values').alias('substituted_values')).collect() [Row(substituted_values=[0.0, 1.0, 2.0, 3.0, 1.5, 1.5])]
- glow.normalize_variant(contigName, start, end, refAllele, altAlleles, refGenomePathString)[source]
Normalizes the variant with a behavior similar to vt normalize or bcftools norm. Creates a StructType column including the normalized
start,end,referenceAlleleandalternateAllelesfields (whether they are changed or unchanged as the result of normalization) as well as a StructType field callednormalizationStatusthat contains the following fields:changed: A boolean field indicating whether the variant data was changed as a result of normalizationerrorMessage: An error message in case the attempt at normalizing the row hit an error. In this case, thechangedfield will be set tofalse. If no errors occur, this field will benull.In case of an error, the
start,end,referenceAlleleandalternateAllelesfields in the generated struct will benull.Added in version 0.3.0.
Examples
>>> df = spark.read.format('vcf').load('test-data/variantsplitternormalizer-test/test_left_align_hg38_altered.vcf') >>> ref_genome = 'test-data/variantsplitternormalizer-test/Homo_sapiens_assembly38.20.21_altered.fasta' >>> df.select('contigName', 'start', 'end', 'referenceAllele', 'alternateAlleles').head() Row(contigName='chr20', start=400, end=401, referenceAllele='G', alternateAlleles=['GATCTTCCCTCTTTTCTAATATAAACACATAAAGCTCTGTTTCCTTCTAGGTAACTGGTTTGAG']) >>> normalized_df = df.select('contigName', glow.expand_struct(glow.normalize_variant('contigName', 'start', 'end', 'referenceAllele', 'alternateAlleles', ref_genome))) >>> normalized_df.head() Row(contigName='chr20', start=268, end=269, referenceAllele='A', alternateAlleles=['ATTTGAGATCTTCCCTCTTTTCTAATATAAACACATAAAGCTCTGTTTCCTTCTAGGTAACTGG'], normalizationStatus=Row(changed=True, errorMessage=None))
- Parameters
- Return type
Column- Returns
A struct as explained above
- glow.sample_call_summary_stats(genotypes, refAllele, alternateAlleles)[source]
Computes per-sample call summary statistics. See Sample Quality Control for more details.
Added in version 0.3.0.
Examples
>>> sites = [ ... {'refAllele': 'C', 'alternateAlleles': ['G'], 'genotypes': [{'sampleId': 'NA12878', 'calls': [0, 0]}]}, ... {'refAllele': 'A', 'alternateAlleles': ['G'], 'genotypes': [{'sampleId': 'NA12878', 'calls': [1, 1]}]}, ... {'refAllele': 'AG', 'alternateAlleles': ['A'], 'genotypes': [{'sampleId': 'NA12878', 'calls': [1, 0]}]}] >>> df = spark.createDataFrame(sites, 'refAllele: string, alternateAlleles: array<string>, genotypes: array<struct<sampleId: string, calls: array<int>>>') >>> df.select(glow.sample_call_summary_stats('genotypes', 'refAllele', 'alternateAlleles').alias('stats')).collect() [Row(stats=[Row(sampleId='NA12878', callRate=1.0, nCalled=3, nUncalled=0, nHomRef=1, nHet=1, nHomVar=1, nSnp=2, nInsertion=0, nDeletion=1, nTransition=2, nTransversion=0, nSpanningDeletion=0, rTiTv=inf, rInsertionDeletion=0.0, rHetHomVar=1.0)])]
- Parameters
- Return type
Column- Returns
A struct containing
sampleId,callRate,nCalled,nUncalled,nHomRef,nHet,nHomVar,nSnp,nInsertion,nDeletion,nTransition,nTransversion,nSpanningDeletion,rTiTv,rInsertionDeletion,rHetHomVarfields. See Sample Quality Control.
- glow.sample_dp_summary_stats(genotypes)[source]
Computes per-sample summary statistics about the depth field in an array of genotype structs.
Added in version 0.3.0.
Examples
>>> sites = [ ... {'genotypes': [{'sampleId': 'NA12878', 'depth': 1}]}, ... {'genotypes': [{'sampleId': 'NA12878', 'depth': 2}]}, ... {'genotypes': [{'sampleId': 'NA12878', 'depth': 3}]}] >>> df = spark.createDataFrame(sites, 'genotypes: array<struct<depth: int, sampleId: string>>') >>> df.select(glow.sample_dp_summary_stats('genotypes').alias('stats')).collect() [Row(stats=[Row(sampleId='NA12878', mean=2.0, stdDev=1.0, min=1.0, max=3.0)])]
- Parameters
genotypes (
Union[Column,str]) – An array of genotype structs withdepthfield- Return type
Column- Returns
An array of structs where each struct contains
mean,stDev,min, andmaxof the genotype depths for a sample. IfsampleIdis present in a genotype, it will be propagated to the resulting struct as an extra field.
- glow.sample_gq_summary_stats(genotypes)[source]
Computes per-sample summary statistics about the genotype quality field in an array of genotype structs.
Added in version 0.3.0.
Examples
>>> sites = [ ... Row(genotypes=[Row(sampleId='NA12878', conditionalQuality=1)]), ... Row(genotypes=[Row(sampleId='NA12878', conditionalQuality=2)]), ... Row(genotypes=[Row(sampleId='NA12878', conditionalQuality=3)])] >>> df = spark.createDataFrame(sites, 'genotypes: array<struct<sampleId: string, conditionalQuality: int>>') >>> df.select(glow.sample_gq_summary_stats('genotypes').alias('stats')).collect() [Row(stats=[Row(sampleId='NA12878', mean=2.0, stdDev=1.0, min=1.0, max=3.0)])]
- Parameters
genotypes (
Union[Column,str]) – An array of genotype structs withconditionalQualityfield- Return type
Column- Returns
An array of structs where each struct contains
mean,stDev,min, andmaxof the genotype qualities for a sample. IfsampleIdis present in a genotype, it will be propagated to the resulting struct as an extra field.
- glow.subset_struct(struct, *fields)[source]
Selects fields from a struct.
Added in version 0.3.0.
Examples
>>> df = spark.createDataFrame([Row(struct=Row(a=1, b=2, c=3))]) >>> df.select(glow.subset_struct('struct', 'a', 'c').alias('struct')).collect() [Row(struct=Row(a=1, c=3))]
- glow.vector_to_array(vector)[source]
Converts a
spark.mlVector(sparse or dense) to an array of doubles.Added in version 0.3.0.
Examples
>>> from pyspark.ml.linalg import DenseVector, SparseVector >>> df = spark.createDataFrame([Row(v=SparseVector(3, {0: 1.0, 2: 2.0})), Row(v=DenseVector([3.0, 4.0]))]) >>> df.select(glow.vector_to_array('v').alias('arr')).collect() [Row(arr=[1.0, 0.0, 2.0]), Row(arr=[3.0, 4.0])]