.. _variantnormalization: ===================== Variant Normalization ===================== .. invisible-code-block: python import glow test_dir = 'test-data/variantsplitternormalizer-test/' df_original = spark.read.format('vcf').load(test_dir + 'test_left_align_hg38_altered.vcf') ref_genome_path = test_dir + 'Homo_sapiens_assembly38.20.21_altered.fasta' Different genomic analysis tools often represent the same genomic variant in different ways, making it non-trivial to integrate and compare variants across call sets. Therefore, **variant normalization** is an essential step to be applied on variants before further downstream analysis to make sure the same variant is represented identically in different call sets. Normalization is achieved by making sure the variant is parsimonious and left-aligned (see `Variant Normalization `_ for more details). Glow provides variant normalization capabilities as a DataFrame transformer as well as a SQL expression function with a Python API, bringing unprecedented scalability to this operation. .. note:: Glow's variant normalization algorithm follows the same logic as those used in normalization tools such as `bcftools norm `_ and `vt normalize `_. This normalization logic is different from the one used by GATK's `LeftAlignAndTrimVariants `_, which sometimes yields incorrect normalization (see `Variant Normalization `_ for more details). ``normalize_variants`` Transformer ================================== The ``normalize_variants`` transformer can be applied to normalize a variant DataFrame, such as one generated by loading VCF or BGEN files. The output of the transformer is described under the ``replace_columns`` option below. Usage ===== Assuming ``df_original`` is a variable of type DataFrame which contains the genomic variant records, and ``ref_genome_path`` is a variable of type String containing the path to the reference genome file, a minimal example of using this transformer for normalization is as follows: .. tabs:: .. tab:: Python .. code-block:: python df_normalized = glow.transform("normalize_variants", df_original, reference_genome_path=ref_genome_path) .. invisible-code-block: python from pyspark.sql import Row expected_normalized_variant = Row(contigName='chr20', start=268, end=269, names=None, referenceAllele='A', alternateAlleles=['ATTTGAGATCTTCCCTCTTTTCTAATATAAACACATAAAGCTCTGTTTCCTTCTAGGTAACTGG'], normalizationStatus=Row(changed=True, errorMessage=None), qual=30.0, filters=None, splitFromMultiAllelic=False, INFO_AN=4, INFO_AF=[1.0], INFO_AC=[1], genotypes=[Row(sampleId='CHMI_CHMI3_WGS2', alleleDepths=None, phased=False, calls=[1, 1]), Row(sampleId='CHMI_CHMI3_WGS3', alleleDepths=None, phased=False, calls=[1, 1])]) assert_rows_equal(df_normalized.head(), expected_normalized_variant) .. tab:: Scala .. code-block:: scala df_normalized = Glow.transform("normalize_variants", df_original, Map("reference_genome_path" -> ref_genome_path)) Options ======= The ``normalize_variants`` transformer has the following options: .. list-table:: :header-rows: 1 * - Option - Type - Possible values and description * - ``reference_genome_path`` - string - Path to the reference genome ``.fasta`` or ``.fa`` file. This file must be accompanied with a ``.fai`` index file in the same folder. * - ``replace_columns`` - boolean - | | ``False``: The transformer does not modify the original ``start``, ``end``, ``referenceAllele`` and ``alternateAlleles`` columns. Instead, a StructType column called ``normalizationResult`` is added to the DataFrame. This column contains the normalized ``start``, ``end``, ``referenceAllele``, and ``alternateAlleles`` columns as well as the ``normalizationStatus`` StructType as the fifth field, which contains the following subfields: | ``changed``: Indicates whether the variant data was changed as a result of normalization | ``errorMessage``: An error message in case the attempt at normalizing the row hit an error. In this case, the ``changed`` field will be set to ``False``. If no errors occur this field will be ``null``. In case of error, the first four fields in ``normalizationResult`` will be ``null``. | | ``True`` (default): The original ``start``, ``end``, ``referenceAllele``, and ``alternateAlleles`` columns are replaced with the normalized values in case they have changed. Otherwise (in case of no change or an error), the original ``start``, ``end``, ``referenceAllele``, and ``alternateAlleles`` are not modified. A StructType ``normalizationStatus`` column is added to the DataFrame with the same subfields explained above. * - ``mode`` (deprecated) - string - | | ``normalize``: Only normalizes the variants (if user does not pass the option, ``normalize`` is assumed as default) | ``split_and_normalize``: Split multiallelic variants to biallelic variants and then normalize the variants. This usage is deprecated. Instead, use :ref:`split_multiallelics transformer` followed by normalize_variants transformer. | ``split``: Only split the multiallelic variants to biallelic without normalizing. This usage is deprecated. Instead, use :ref:`split_multiallelics transformer`. ``normalize_variant`` Function ============================== The normalizer can also be used as a SQL expression function. See :ref:`Glow PySpark Functions` for details on how to use it in the Python API. An example of an expression using the ``normalize_variant`` function is as follows: .. code-block:: python from pyspark.sql.functions import lit normalization_expr = glow.normalize_variant('contigName', 'start', 'end', 'referenceAllele', 'alternateAlleles', ref_genome_path) df_normalized = df_original.withColumn('normalizationResult', normalization_expr) .. invisible-code-block: python expected_normalized_variant = Row(contigName='chr20', start=400, end=401, names=None, referenceAllele='G', alternateAlleles=['GATCTTCCCTCTTTTCTAATATAAACACATAAAGCTCTGTTTCCTTCTAGGTAACTGGTTTGAG'], qual=30.0, filters=None, splitFromMultiAllelic=False, INFO_AN=4, INFO_AF=[1.0], INFO_AC=[1], genotypes=[Row(sampleId='CHMI_CHMI3_WGS2', alleleDepths=None, phased=False, calls=[1, 1]), Row(sampleId='CHMI_CHMI3_WGS3', alleleDepths=None, phased=False, calls=[1, 1])], normalizationResult=Row(start=268, end=269, referenceAllele='A', alternateAlleles=['ATTTGAGATCTTCCCTCTTTTCTAATATAAACACATAAAGCTCTGTTTCCTTCTAGGTAACTGG'], normalizationStatus=Row(changed=True, errorMessage=None))) assert_rows_equal(df_normalized.head(), expected_normalized_variant) .. notebook:: .. etl/normalizevariants.html :title: Variant normalization notebook