Aggregate

The fisseq_data_pipeline.aggregate module provides aggregators that summarize a normalized feature matrix into per-(batch, label) statistics. It also exposes a two-subcommand CLI for computing aggregations and normalizing aggregate DataFrames to synonymous variants.


Overview

Class hierarchy

BaseAggregator (abstract)
├── NativeAggregator (abstract) — Polars group-by expressions, no reference
│   ├── MeanAggregator
│   ├── MedianAggregator
│   ├── MADAggregator
│   └── StdAggregator
└── ReferenceBaseAggregator (abstract) — requires control reference distribution
    ├── EMDAggregator
    ├── KSAggregator
    ├── QQCorrelationAggregator
    └── AUROCAggregator

Native aggregators use Polars group_by().agg() and require no reference distribution. They accept only agg_df as an argument to aggregate().

Reference-based aggregators compare each (batch, label) group against a control reference distribution. They are constructed with a reference_df and use joblib for parallel dispatch.

Aggregator registry

Key Class Type
"mean" MeanAggregator Native
"median" MedianAggregator Native
"MAD" MADAggregator Native
"std" StdAggregator Native
"EMD" EMDAggregator Reference
"KS" KSAggregator Reference
"QQ" QQCorrelationAggregator Reference
"AUROC" AUROCAggregator Reference

Example usage

Native aggregator

import polars as pl
from fisseq_data_pipeline.aggregate import MeanAggregator

data_df = pl.DataFrame({
    "_meta_batch": ["A", "A", "B", "B"],
    "_meta_label": ["X", "X", "Y", "Y"],
    "f1": [1.0, 2.0, 3.0, 4.0],
    "f2": [5.0, 6.0, 7.0, 8.0],
})

agg = MeanAggregator()
result = agg.aggregate(data_df)

Reference-based aggregator

from fisseq_data_pipeline.aggregate import EMDAggregator

reference_df = data_df.filter(pl.col("_meta_label") == "WT")
agg = EMDAggregator(reference_df)
result = agg.aggregate(data_df)

CLI

The module exposes two subcommands via Python Fire:

compute

Compute per-(batch, label) aggregation statistics from a normalized Parquet file. Outputs aggregated.parquet containing only the aggregate feature columns.

python -m fisseq_data_pipeline.aggregate compute \
  --norm_df normalized.parquet \
  --out_dir out/ \
  --aggregator mean

Valid --aggregator values: mean, median, MAD, std, EMD, KS, QQ, AUROC.

normalize

Normalize an aggregate DataFrame to synonymous (synonymous-variant) rows, fitting and applying a normalizer. Outputs normalized.parquet and normalizer.pkl.

python -m fisseq_data_pipeline.aggregate normalize \
  --agg_df aggregated.parquet \
  --out_dir out/

API reference


fisseq_data_pipeline.aggregate.BaseAggregator

Bases: ABC

Abstract base class for all aggregators.

All subclasses expose a single aggregate(agg_df) method that accepts an input DataFrame and returns a per-(batch, label) summary DataFrame.

Source code in src/fisseq_data_pipeline/aggregate.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
class BaseAggregator(abc.ABC):
    """
    Abstract base class for all aggregators.

    All subclasses expose a single ``aggregate(agg_df)`` method that accepts
    an input DataFrame and returns a per-(batch, label) summary DataFrame.
    """

    @abc.abstractmethod
    def aggregate(self, agg_df: pl.DataFrame) -> pl.DataFrame:
        """
        Compute per-(batch, label) statistics.

        Parameters
        ----------
        agg_df : pl.DataFrame
            Input DataFrame containing ``_meta_batch``, ``_meta_label``, and
            feature columns.

        Returns
        -------
        pl.DataFrame
            One row per (batch, label) group with computed statistics.
        """
        raise NotImplementedError

aggregate(agg_df) abstractmethod

Compute per-(batch, label) statistics.

Parameters:
  • agg_df (DataFrame) –

    Input DataFrame containing _meta_batch, _meta_label, and feature columns.

Returns:
  • DataFrame

    One row per (batch, label) group with computed statistics.

Source code in src/fisseq_data_pipeline/aggregate.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
@abc.abstractmethod
def aggregate(self, agg_df: pl.DataFrame) -> pl.DataFrame:
    """
    Compute per-(batch, label) statistics.

    Parameters
    ----------
    agg_df : pl.DataFrame
        Input DataFrame containing ``_meta_batch``, ``_meta_label``, and
        feature columns.

    Returns
    -------
    pl.DataFrame
        One row per (batch, label) group with computed statistics.
    """
    raise NotImplementedError

fisseq_data_pipeline.aggregate.NativeAggregator

Bases: BaseAggregator

Base class for aggregators implemented as native Polars group-by expressions.

Subclasses override :meth:polars_exprs to return a list of Polars expressions that are passed directly to group_by().agg().

Source code in src/fisseq_data_pipeline/aggregate.py
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
class NativeAggregator(BaseAggregator):
    """
    Base class for aggregators implemented as native Polars group-by expressions.

    Subclasses override :meth:`polars_exprs` to return a list of Polars
    expressions that are passed directly to ``group_by().agg()``.
    """

    @abc.abstractmethod
    def polars_exprs(self, feature_cols: list[str]) -> list[pl.Expr]:
        """
        Return Polars aggregation expressions for the given feature columns.

        Parameters
        ----------
        feature_cols : list[str]
            Names of the feature columns to aggregate.

        Returns
        -------
        list[pl.Expr]
            Polars expressions to pass to ``group_by().agg()``.
        """
        raise NotImplementedError

    def aggregate(self, agg_df: pl.DataFrame) -> pl.DataFrame:
        feature_cols = get_feature_cols(agg_df, as_string=True)
        return (
            agg_df.filter(pl.col("_meta_label") != "WT")
            .group_by("_meta_label", "_meta_batch")
            .agg(self.polars_exprs(feature_cols))
        )

polars_exprs(feature_cols) abstractmethod

Return Polars aggregation expressions for the given feature columns.

Parameters:
  • feature_cols (list[str]) –

    Names of the feature columns to aggregate.

Returns:
  • list[Expr]

    Polars expressions to pass to group_by().agg().

Source code in src/fisseq_data_pipeline/aggregate.py
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
@abc.abstractmethod
def polars_exprs(self, feature_cols: list[str]) -> list[pl.Expr]:
    """
    Return Polars aggregation expressions for the given feature columns.

    Parameters
    ----------
    feature_cols : list[str]
        Names of the feature columns to aggregate.

    Returns
    -------
    list[pl.Expr]
        Polars expressions to pass to ``group_by().agg()``.
    """
    raise NotImplementedError

fisseq_data_pipeline.aggregate.MeanAggregator

Bases: NativeAggregator

Computes per-group mean for each feature column.

Source code in src/fisseq_data_pipeline/aggregate.py
176
177
178
179
180
class MeanAggregator(NativeAggregator):
    """Computes per-group mean for each feature column."""

    def polars_exprs(self, feature_cols: list[str]) -> list[pl.Expr]:
        return [pl.col(f).mean().alias(f"{f}_mean") for f in feature_cols]

fisseq_data_pipeline.aggregate.MedianAggregator

Bases: NativeAggregator

Computes per-group median for each feature column.

Source code in src/fisseq_data_pipeline/aggregate.py
183
184
185
186
187
class MedianAggregator(NativeAggregator):
    """Computes per-group median for each feature column."""

    def polars_exprs(self, feature_cols: list[str]) -> list[pl.Expr]:
        return [pl.col(f).median().alias(f"{f}_median") for f in feature_cols]

fisseq_data_pipeline.aggregate.MADAggregator

Bases: NativeAggregator

Computes per-group median absolute deviation (MAD) for each feature column.

Source code in src/fisseq_data_pipeline/aggregate.py
190
191
192
193
194
195
196
197
class MADAggregator(NativeAggregator):
    """Computes per-group median absolute deviation (MAD) for each feature column."""

    def polars_exprs(self, feature_cols: list[str]) -> list[pl.Expr]:
        return [
            (pl.col(f) - pl.col(f).median()).abs().median().alias(f"{f}_MAD")
            for f in feature_cols
        ]

fisseq_data_pipeline.aggregate.StdAggregator

Bases: NativeAggregator

Computes per-group standard deviation for each feature column.

Source code in src/fisseq_data_pipeline/aggregate.py
200
201
202
203
204
class StdAggregator(NativeAggregator):
    """Computes per-group standard deviation for each feature column."""

    def polars_exprs(self, feature_cols: list[str]) -> list[pl.Expr]:
        return [pl.col(f).std().alias(f"{f}_std") for f in feature_cols]

fisseq_data_pipeline.aggregate.ReferenceBaseAggregator

Bases: BaseAggregator

Base class for aggregators that compare each group against a reference distribution, typically built from control rows.

Subclasses override :meth:compute_statistic to implement a specific scalar summary (e.g. EMD, KS statistic) for each feature.

Parameters:
  • reference_df (DataFrame) –

    Reference DataFrame (typically control rows) used as the comparison distribution in :meth:compute_statistic.

Source code in src/fisseq_data_pipeline/aggregate.py
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
class ReferenceBaseAggregator(BaseAggregator):
    """
    Base class for aggregators that compare each group against a reference
    distribution, typically built from control rows.

    Subclasses override :meth:`compute_statistic` to implement a specific
    scalar summary (e.g. EMD, KS statistic) for each feature.

    Parameters
    ----------
    reference_df : pl.DataFrame
        Reference DataFrame (typically control rows) used as the comparison
        distribution in :meth:`compute_statistic`.
    """

    def __init__(self, reference_df: pl.DataFrame) -> None:
        self.feature_cols = get_feature_cols(reference_df, as_string=True)
        self.reference_df = reference_df

    @abc.abstractmethod
    def compute_statistic(
        self, label: str, batch: str, group_df: pl.DataFrame, ref_df: pl.DataFrame
    ) -> dict[str, Any]:
        """
        Compute per-feature statistics for a single (batch, label) group.

        Parameters
        ----------
        label : str
            The variant label for this group.
        batch : str
            The batch identifier for this group.
        group_df : pl.DataFrame
            The subset of the input DataFrame corresponding to this
            (batch, label) group.
        ref_df : pl.DataFrame
            The subset of the reference DataFrame corresponding to this batch.

        Returns
        -------
        dict[str, Any]
            A dict with at minimum ``_meta_label`` and ``_meta_batch`` keys,
            plus one entry per feature column containing the computed statistic.
        """
        raise NotImplementedError

    def aggregate(
        self,
        agg_df: pl.DataFrame,
        n_jobs: int = -1,
        backend: str = "threading",
        verbose: int = 0,
    ) -> pl.DataFrame:
        """
        Compute per-(batch, label) statistics for all features using joblib.

        Filters out wildtype rows, groups by ``_meta_label`` and
        ``_meta_batch``, and dispatches one :meth:`compute_statistic` call
        per group in parallel.

        Parameters
        ----------
        agg_df : pl.DataFrame
            Input DataFrame containing ``_meta_batch``, ``_meta_label``, and
            feature columns.
        n_jobs : int, optional
            Number of parallel jobs passed to joblib.
        backend : str, optional
            Joblib backend.
        verbose : int, optional
            Joblib verbosity level. Defaults to ``0``.

        Returns
        -------
        pl.DataFrame
            A DataFrame with one row per (batch, label) group and one column
            per feature containing the computed statistic.
        """
        groups = agg_df.filter(pl.col("_meta_label") != "WT").group_by(
            "_meta_label", "_meta_batch"
        )
        tasks = [((label, batch), group_df) for (label, batch), group_df in groups]

        dicts = joblib.Parallel(n_jobs=n_jobs, backend=backend, verbose=verbose)(
            joblib.delayed(self.compute_statistic)(
                label=label,
                batch=batch,
                group_df=group_df,
                ref_df=self.reference_df.filter(pl.col("_meta_batch") == batch),
            )
            for (label, batch), group_df in tasks
        )

        return pl.DataFrame(dicts)

aggregate(agg_df, n_jobs=-1, backend='threading', verbose=0)

Compute per-(batch, label) statistics for all features using joblib.

Filters out wildtype rows, groups by _meta_label and _meta_batch, and dispatches one :meth:compute_statistic call per group in parallel.

Parameters:
  • agg_df (DataFrame) –

    Input DataFrame containing _meta_batch, _meta_label, and feature columns.

  • n_jobs (int, default: -1 ) –

    Number of parallel jobs passed to joblib.

  • backend (str, default: 'threading' ) –

    Joblib backend.

  • verbose (int, default: 0 ) –

    Joblib verbosity level. Defaults to 0.

Returns:
  • DataFrame

    A DataFrame with one row per (batch, label) group and one column per feature containing the computed statistic.

Source code in src/fisseq_data_pipeline/aggregate.py
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
def aggregate(
    self,
    agg_df: pl.DataFrame,
    n_jobs: int = -1,
    backend: str = "threading",
    verbose: int = 0,
) -> pl.DataFrame:
    """
    Compute per-(batch, label) statistics for all features using joblib.

    Filters out wildtype rows, groups by ``_meta_label`` and
    ``_meta_batch``, and dispatches one :meth:`compute_statistic` call
    per group in parallel.

    Parameters
    ----------
    agg_df : pl.DataFrame
        Input DataFrame containing ``_meta_batch``, ``_meta_label``, and
        feature columns.
    n_jobs : int, optional
        Number of parallel jobs passed to joblib.
    backend : str, optional
        Joblib backend.
    verbose : int, optional
        Joblib verbosity level. Defaults to ``0``.

    Returns
    -------
    pl.DataFrame
        A DataFrame with one row per (batch, label) group and one column
        per feature containing the computed statistic.
    """
    groups = agg_df.filter(pl.col("_meta_label") != "WT").group_by(
        "_meta_label", "_meta_batch"
    )
    tasks = [((label, batch), group_df) for (label, batch), group_df in groups]

    dicts = joblib.Parallel(n_jobs=n_jobs, backend=backend, verbose=verbose)(
        joblib.delayed(self.compute_statistic)(
            label=label,
            batch=batch,
            group_df=group_df,
            ref_df=self.reference_df.filter(pl.col("_meta_batch") == batch),
        )
        for (label, batch), group_df in tasks
    )

    return pl.DataFrame(dicts)

compute_statistic(label, batch, group_df, ref_df) abstractmethod

Compute per-feature statistics for a single (batch, label) group.

Parameters:
  • label (str) –

    The variant label for this group.

  • batch (str) –

    The batch identifier for this group.

  • group_df (DataFrame) –

    The subset of the input DataFrame corresponding to this (batch, label) group.

  • ref_df (DataFrame) –

    The subset of the reference DataFrame corresponding to this batch.

Returns:
  • dict[str, Any]

    A dict with at minimum _meta_label and _meta_batch keys, plus one entry per feature column containing the computed statistic.

Source code in src/fisseq_data_pipeline/aggregate.py
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
@abc.abstractmethod
def compute_statistic(
    self, label: str, batch: str, group_df: pl.DataFrame, ref_df: pl.DataFrame
) -> dict[str, Any]:
    """
    Compute per-feature statistics for a single (batch, label) group.

    Parameters
    ----------
    label : str
        The variant label for this group.
    batch : str
        The batch identifier for this group.
    group_df : pl.DataFrame
        The subset of the input DataFrame corresponding to this
        (batch, label) group.
    ref_df : pl.DataFrame
        The subset of the reference DataFrame corresponding to this batch.

    Returns
    -------
    dict[str, Any]
        A dict with at minimum ``_meta_label`` and ``_meta_batch`` keys,
        plus one entry per feature column containing the computed statistic.
    """
    raise NotImplementedError

fisseq_data_pipeline.aggregate.EMDAggregator

Bases: ReferenceBaseAggregator

Computes per-group 1D Wasserstein distances (Earth Mover's Distance) against a reference distribution for each feature column.

Source code in src/fisseq_data_pipeline/aggregate.py
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
class EMDAggregator(ReferenceBaseAggregator):
    """
    Computes per-group 1D Wasserstein distances (Earth Mover's Distance)
    against a reference distribution for each feature column.
    """

    def compute_statistic(
        self, label: str, batch: str, group_df: pl.DataFrame, ref_df: pl.DataFrame
    ) -> dict[str, Any]:
        row: dict[str, Any] = {"_meta_label": label, "_meta_batch": batch}
        for feat in self.feature_cols:
            variant = group_df.get_column(feat).to_numpy()
            reference = ref_df.get_column(feat).to_numpy()
            row[f"{feat}_EMD"] = scipy.stats.wasserstein_distance(variant, reference)
        return row

fisseq_data_pipeline.aggregate.KSAggregator

Bases: ReferenceBaseAggregator

Computes per-group two-sample Kolmogorov-Smirnov statistics against a reference distribution for each feature column.

Source code in src/fisseq_data_pipeline/aggregate.py
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
class KSAggregator(ReferenceBaseAggregator):
    """
    Computes per-group two-sample Kolmogorov-Smirnov statistics against
    a reference distribution for each feature column.
    """

    def compute_statistic(
        self, label: str, batch: str, group_df: pl.DataFrame, ref_df: pl.DataFrame
    ) -> dict[str, Any]:
        row: dict[str, Any] = {"_meta_label": label, "_meta_batch": batch}
        for feat in self.feature_cols:
            variant = group_df.get_column(feat).to_numpy()
            reference = ref_df.get_column(feat).to_numpy()
            row[f"{feat}_KS"] = scipy.stats.ks_2samp(variant, reference).statistic
        return row

fisseq_data_pipeline.aggregate.QQCorrelationAggregator

Bases: ReferenceBaseAggregator

Computes per-group Q-Q correlation against a reference distribution for each feature column.

Parameters:
  • reference_df (DataFrame) –

    Reference DataFrame (typically control rows).

  • n_quantiles (int, default: 100 ) –

    Number of quantile points to evaluate. Defaults to 100.

Source code in src/fisseq_data_pipeline/aggregate.py
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
class QQCorrelationAggregator(ReferenceBaseAggregator):
    """
    Computes per-group Q-Q correlation against a reference distribution for
    each feature column.

    Parameters
    ----------
    reference_df : pl.DataFrame
        Reference DataFrame (typically control rows).
    n_quantiles : int, optional
        Number of quantile points to evaluate. Defaults to ``100``.
    """

    def __init__(self, reference_df: pl.DataFrame, n_quantiles: int = 100) -> None:
        super().__init__(reference_df)
        self.quantile_points = np.linspace(0, 1, n_quantiles)

    def compute_statistic(
        self, label: str, batch: str, group_df: pl.DataFrame, ref_df: pl.DataFrame
    ) -> dict[str, Any]:
        row: dict[str, Any] = {"_meta_label": label, "_meta_batch": batch}
        for feat in self.feature_cols:
            variant = group_df.get_column(feat).to_numpy()
            reference = ref_df.get_column(feat).to_numpy()
            variant_quantiles = np.quantile(variant, self.quantile_points)
            reference_quantiles = np.quantile(reference, self.quantile_points)
            row[f"{feat}_QQ"] = scipy.stats.pearsonr(
                variant_quantiles, reference_quantiles
            ).statistic
        return row

fisseq_data_pipeline.aggregate.AUROCAggregator

Bases: ReferenceBaseAggregator

Computes per-group AUROC against a reference distribution for each feature column.

Variant samples are labelled 1 and reference samples are labelled 0. 0.5 indicates identical distributions and 1.0 indicates perfect separability.

Source code in src/fisseq_data_pipeline/aggregate.py
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
class AUROCAggregator(ReferenceBaseAggregator):
    """
    Computes per-group AUROC against a reference distribution for each
    feature column.

    Variant samples are labelled ``1`` and reference samples are labelled
    ``0``. ``0.5`` indicates identical distributions and ``1.0`` indicates
    perfect separability.
    """

    def compute_statistic(
        self, label: str, batch: str, group_df: pl.DataFrame, ref_df: pl.DataFrame
    ) -> dict[str, Any]:
        row: dict[str, Any] = {"_meta_label": label, "_meta_batch": batch}
        for feat in self.feature_cols:
            variant = group_df.get_column(feat).to_numpy()
            reference = ref_df.get_column(feat).to_numpy()
            values = np.concatenate([reference, variant])
            labels = np.concatenate(
                [
                    np.zeros(len(reference)),
                    np.ones(len(variant)),
                ]
            )

            auroc = sklearn.metrics.roc_auc_score(labels, values)
            if auroc < 0.5:
                auroc = 1 - auroc

            row[f"{feat}_AUROC"] = auroc

        return row

fisseq_data_pipeline.aggregate.compute_cli(norm_df, out_dir, aggregator)

Compute per-(batch, label) aggregate statistics and write the result.

Reads a normalized feature parquet, runs the specified aggregator over all (batch, label) groups (excluding WT), and writes the result — containing only the aggregate feature columns — to <out_dir>/aggregated.parquet.

For reference-based aggregators (EMD, KS, QQ, AUROC) the per-batch reference distributions are built from rows where _meta_is_control == True.

Parameters:
  • norm_df (PathLike) –

    Path to the input normalized feature parquet. Must contain _meta_batch, _meta_label, and feature columns. Reference-based aggregators also require _meta_is_control.

  • out_dir (PathLike) –

    Output directory. Created if it does not exist.

  • aggregator (str) –

    Aggregation method. One of: mean, median, MAD, std, EMD, KS, QQ, AUROC.

Source code in src/fisseq_data_pipeline/aggregate.py
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
def compute_cli(
    norm_df: PathLike,
    out_dir: PathLike,
    aggregator: str,
) -> None:
    """
    Compute per-(batch, label) aggregate statistics and write the result.

    Reads a normalized feature parquet, runs the specified aggregator over all
    (batch, label) groups (excluding WT), and writes the result — containing
    only the aggregate feature columns — to ``<out_dir>/aggregated.parquet``.

    For reference-based aggregators (EMD, KS, QQ, AUROC) the per-batch
    reference distributions are built from rows where
    ``_meta_is_control == True``.

    Parameters
    ----------
    norm_df : PathLike
        Path to the input normalized feature parquet. Must contain
        ``_meta_batch``, ``_meta_label``, and feature columns.  Reference-based
        aggregators also require ``_meta_is_control``.
    out_dir : PathLike
        Output directory. Created if it does not exist.
    aggregator : str
        Aggregation method.  One of: ``mean``, ``median``, ``MAD``, ``std``,
        ``EMD``, ``KS``, ``QQ``, ``AUROC``.
    """
    out_dir = pathlib.Path(out_dir)
    out_dir.mkdir(parents=True, exist_ok=True)
    setup_logging(out_dir)

    if aggregator not in _AGGREGATORS:
        raise ValueError(
            f"Unknown aggregator {aggregator!r}. "
            f"Choose from: {sorted(_AGGREGATORS)}"
        )

    logging.info("Loading normalized dataframe from %s", str(norm_df))
    df = pl.read_parquet(norm_df)
    logging.info("Loaded: %d rows × %d cols", df.height, df.width)

    agg_cls = _AGGREGATORS[aggregator]
    if issubclass(agg_cls, ReferenceBaseAggregator):
        if "_meta_is_control" not in df.columns:
            raise ValueError(
                f"Aggregator {aggregator!r} requires '_meta_is_control' column."
            )
        control_df = df.filter(pl.col("_meta_is_control"))
        logging.info("Building reference from %d control rows", control_df.height)
        agg = agg_cls(control_df)
    else:
        agg = agg_cls()

    logging.info("Running %s aggregator", aggregator)
    result = agg.aggregate(df)
    logging.info("Result: %d rows × %d cols", result.height, result.width)

    out_path = out_dir / "aggregated.parquet"
    logging.info("Writing aggregated dataframe to %s", str(out_path))
    result.write_parquet(out_path)
    logging.info("Done")

fisseq_data_pipeline.aggregate.normalize_cli(agg_df, out_dir)

Normalize an aggregate dataframe to the synonymous-variant baseline.

Reads an aggregate parquet (produced by compute), classifies each row's _meta_label with :func:variant_classification, marks "Synonymous" rows as the reference population, fits a batch-wise z-score normalizer on those rows, and writes the normalized result.

Parameters:
  • agg_df (PathLike) –

    Path to the aggregate feature parquet. Must contain _meta_batch, _meta_label, and aggregate feature columns.

  • out_dir (PathLike) –

    Output directory. Created if it does not exist. Writes:

    • normalized.parquet — the normalized aggregate dataframe
    • normalizer.pkl — the fitted :class:~.normalize.Normalizer
Source code in src/fisseq_data_pipeline/aggregate.py
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
def normalize_cli(
    agg_df: PathLike,
    out_dir: PathLike,
) -> None:
    """
    Normalize an aggregate dataframe to the synonymous-variant baseline.

    Reads an aggregate parquet (produced by ``compute``), classifies each row's
    ``_meta_label`` with :func:`variant_classification`, marks "Synonymous"
    rows as the reference population, fits a batch-wise z-score normalizer on
    those rows, and writes the normalized result.

    Parameters
    ----------
    agg_df : PathLike
        Path to the aggregate feature parquet. Must contain ``_meta_batch``,
        ``_meta_label``, and aggregate feature columns.
    out_dir : PathLike
        Output directory. Created if it does not exist.  Writes:

        - ``normalized.parquet`` — the normalized aggregate dataframe
        - ``normalizer.pkl`` — the fitted :class:`~.normalize.Normalizer`
    """
    out_dir = pathlib.Path(out_dir)
    out_dir.mkdir(parents=True, exist_ok=True)
    setup_logging(out_dir)

    logging.info("Loading aggregate dataframe from %s", str(agg_df))
    df = pl.read_parquet(agg_df)
    logging.info("Loaded: %d rows × %d cols", df.height, df.width)

    logging.info("Marking synonymous rows as normalization reference")
    df = df.with_columns(
        (
            pl.col("_meta_label").map_elements(
                variant_classification, return_dtype=pl.Utf8
            )
            == "Synonymous"
        ).alias("_meta_is_control")
    )

    lf = df.lazy()
    logging.info("Fitting batch-wise normalizer on synonymous rows")
    normalizer = fit_normalizer(lf, fit_batch_wise=True, fit_only_on_control=True)

    logging.info("Normalizing")
    normalized_df = normalize(lf, normalizer).collect()
    normalized_df = normalized_df.drop("_meta_is_control")

    out_path = out_dir / "normalized.parquet"
    logging.info("Writing normalized dataframe to %s", str(out_path))
    normalized_df.write_parquet(out_path)

    norm_path = out_dir / "normalizer.pkl"
    logging.info("Writing normalizer to %s", str(norm_path))
    normalizer.save(norm_path)
    logging.info("Done")