Data Cleaning Utilities

The fisseq_data_pipeline.filter module provides functions to clean and filter feature tables prior to normalization. These utilities are invoked automatically in the pipeline, but can also be used independently.

Overview

  • clean_data: Applies a configurable sequence of filtering stages to a LazyFrame. The default pipeline removes all-non-finite columns then rows with any non-finite value.
  • drop_cols_all_nonfinite: Drops columns where every value is NaN, +inf, or -inf.
  • drop_rows_any_nonfinite: Drops rows containing any non-finite feature value.

Example Usage

import polars as pl
from fisseq_data_pipeline.filter import clean_data

# Example combined data LazyFrame (features + metadata)
data_lf = pl.DataFrame({
    "_meta_batch": ["A", "A", "B"],
    "_meta_label": ["X", "X", "Y"],
    "f1": [1.0, float("nan"), 3.0],
    "f2": [5.0, 6.0, float("inf")],
    "f3": [float("nan"), float("nan"), float("nan")],
}).lazy()

# Run default pipeline: drop all-nonfinite columns, then rows with any nonfinite
cleaned_lf = clean_data(data_lf)

# Run only one stage
cleaned_lf = clean_data(data_lf, stages=["drop_cols_all_nonfinite"])

# Insert a custom filtering stage
def my_filter(lf: pl.LazyFrame) -> pl.LazyFrame:
    return lf.filter(pl.col("f1") > 0)

cleaned_lf = clean_data(data_lf, stages=["drop_cols_all_nonfinite", my_filter])

Notes

  • Only columns not prefixed with _meta are treated as feature columns and considered during non-finite checks. Metadata columns are carried through unchanged.
  • Unknown string stage names are skipped with a WARNING log message.
  • Custom stages must accept and return a pl.LazyFrame.

API reference


fisseq_data_pipeline.filter.clean_data(feature_lf, stages=['drop_cols_all_nonfinite', 'drop_rows_any_nonfinite'])

Apply a sequence of filtering stages to a LazyFrame.

Each stage may be specified by: - A string matching a known stage name; or - A Callable of type FilterFun receiving a LazyFrame and returning one.

Stages are executed in order, and each stage returns a transformed LazyFrame that becomes the input for the next stage.

Parameters:
  • feature_lf (LazyFrame) –

    LazyFrame containing feature columns to clean.

  • stages (Iterable[str | FilterFun], default: ['drop_cols_all_nonfinite', 'drop_rows_any_nonfinite'] ) –

    Ordered list of filter stages. The default pipeline is: - "drop_cols_all_nonfinite": remove columns that are entirely NaN/inf/-inf. - "drop_rows_any_nonfinite": remove rows containing any non-finite values.

Returns:
  • LazyFrame

    A LazyFrame representing the cleaned feature set.

Notes
  • Invalid stage names are skipped with a warning.
  • Custom filtering functions can be inserted as callables.
Source code in src/fisseq_data_pipeline/filter.py
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
def clean_data(
    feature_lf: pl.LazyFrame,
    stages: Iterable[str | FilterFun] = [
        "drop_cols_all_nonfinite",
        "drop_rows_any_nonfinite",
    ],
) -> pl.LazyFrame:
    """
    Apply a sequence of filtering stages to a LazyFrame.

    Each stage may be specified by:
      - A string matching a known stage name; or
      - A Callable of type ``FilterFun`` receiving a LazyFrame and returning one.

    Stages are executed in order, and each stage returns a transformed
    LazyFrame that becomes the input for the next stage.

    Parameters
    ----------
    feature_lf : pl.LazyFrame
        LazyFrame containing feature columns to clean.
    stages : Iterable[str | FilterFun], optional
        Ordered list of filter stages. The default pipeline is:
          - ``"drop_cols_all_nonfinite"``: remove columns that are entirely
            NaN/inf/-inf.
          - ``"drop_rows_any_nonfinite"``: remove rows containing any
            non-finite values.

    Returns
    -------
    pl.LazyFrame
        A LazyFrame representing the cleaned feature set.

    Notes
    -----
    - Invalid stage names are skipped with a warning.
    - Custom filtering functions can be inserted as callables.
    """
    stage_lookup: dict[str, FilterFun] = {
        "drop_cols_all_nonfinite": drop_cols_all_nonfinite,
        "drop_rows_any_nonfinite": drop_rows_any_nonfinite,
    }

    for stage in stages:
        if isinstance(stage, str):
            if stage not in stage_lookup:
                logging.warning("Skipping invalid filtering stage: %s", stage)
                continue
            stage = stage_lookup[stage]
        feature_lf = stage(feature_lf)

    return feature_lf

fisseq_data_pipeline.filter.drop_cols_all_nonfinite(data_lf)

Remove columns where every value is non-finite.

A column is dropped if all of its elements are one of: - NaN - +inf - -inf

This scan is performed eagerly for the minimal number of rows needed to determine which columns contain at least one finite value. The final returned LazyFrame excludes all-non-finite columns.

Parameters:
  • data_lf (LazyFrame) –

    LazyFrame containing feature columns to be checked.

Returns:
  • LazyFrame

    A LazyFrame with all-non-finite columns removed.

Source code in src/fisseq_data_pipeline/filter.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
def drop_cols_all_nonfinite(data_lf: pl.LazyFrame) -> pl.LazyFrame:
    """
    Remove columns where *every* value is non-finite.

    A column is dropped if all of its elements are one of:
       - NaN
       - +inf
       - -inf

    This scan is performed eagerly for the minimal number of rows needed
    to determine which columns contain at least one finite value. The
    final returned LazyFrame excludes all-non-finite columns.

    Parameters
    ----------
    data_lf : pl.LazyFrame
        LazyFrame containing feature columns to be checked.

    Returns
    -------
    pl.LazyFrame
        A LazyFrame with all-non-finite columns removed.
    """
    # TODO: Can this also be a lazy expression?
    logging.info("Scanning for all non-finite rows")
    finite_summary = data_lf.select(
        [
            pl.col(c).is_finite().any().alias(c)
            for c in get_feature_cols(data_lf, as_string=True)
        ]
    ).collect()

    logging.info("Adding query to remove columns containing all non-finite values")
    non_finite_cols = [
        c for c in get_feature_cols(data_lf, as_string=True) if not finite_summary[c][0]
    ]
    data_lf = data_lf.select(pl.exclude(non_finite_cols))
    logging.info(
        "Removing %d columns containing only non-finite values",
        len(non_finite_cols),
    )

    return data_lf

fisseq_data_pipeline.filter.drop_rows_any_nonfinite(data_lf)

Remove rows containing any non-finite value in the feature columns.

A row is dropped if any feature column contains: - NaN - +inf - -inf

Parameters:
  • data_lf (LazyFrame) –

    LazyFrame containing numerical feature columns.

Returns:
  • LazyFrame

    A LazyFrame where all rows with any non-finite feature value have been filtered out.

Source code in src/fisseq_data_pipeline/filter.py
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
def drop_rows_any_nonfinite(data_lf: pl.LazyFrame) -> pl.LazyFrame:
    """
    Remove rows containing any non-finite value in the feature columns.

    A row is dropped if *any* feature column contains:
       - NaN
       - +inf
       - -inf

    Parameters
    ----------
    data_lf : pl.LazyFrame
        LazyFrame containing numerical feature columns.

    Returns
    -------
    pl.LazyFrame
        A LazyFrame where all rows with any non-finite feature value have
        been filtered out.
    """
    logging.info(
        "Adding query to remove any remaining rows that contain non-finite"
        " feature values"
    )

    return data_lf.filter(
        pl.all_horizontal([c.is_finite() for c in get_feature_cols(data_lf)])
    )