Aggregate
The fisseq_data_pipeline.aggregate module provides aggregators that
summarize a normalized feature matrix into per-(batch, label) statistics.
It also exposes a two-subcommand CLI for computing aggregations and
normalizing aggregate DataFrames to synonymous variants.
Overview
Class hierarchy
BaseAggregator (abstract)
├── NativeAggregator (abstract) — Polars group-by expressions, no reference
│ ├── MeanAggregator
│ ├── MedianAggregator
│ ├── MADAggregator
│ └── StdAggregator
└── ReferenceBaseAggregator (abstract) — requires control reference distribution
├── EMDAggregator
├── KSAggregator
├── QQCorrelationAggregator
└── AUROCAggregator
Native aggregators use Polars group_by().agg() and require no reference
distribution. They accept only agg_df as an argument to aggregate().
Reference-based aggregators compare each (batch, label) group against a
control reference distribution. They are constructed with a reference_df and
use joblib for parallel dispatch.
Aggregator registry
| Key |
Class |
Type |
"mean" |
MeanAggregator |
Native |
"median" |
MedianAggregator |
Native |
"MAD" |
MADAggregator |
Native |
"std" |
StdAggregator |
Native |
"EMD" |
EMDAggregator |
Reference |
"KS" |
KSAggregator |
Reference |
"QQ" |
QQCorrelationAggregator |
Reference |
"AUROC" |
AUROCAggregator |
Reference |
Example usage
Native aggregator
import polars as pl
from fisseq_data_pipeline.aggregate import MeanAggregator
data_df = pl.DataFrame({
"_meta_batch": ["A", "A", "B", "B"],
"_meta_label": ["X", "X", "Y", "Y"],
"f1": [1.0, 2.0, 3.0, 4.0],
"f2": [5.0, 6.0, 7.0, 8.0],
})
agg = MeanAggregator()
result = agg.aggregate(data_df)
Reference-based aggregator
from fisseq_data_pipeline.aggregate import EMDAggregator
reference_df = data_df.filter(pl.col("_meta_label") == "WT")
agg = EMDAggregator(reference_df)
result = agg.aggregate(data_df)
CLI
The module exposes two subcommands via Python Fire:
compute
Compute per-(batch, label) aggregation statistics from a normalized Parquet
file. Outputs aggregated.parquet containing only the aggregate feature
columns.
python -m fisseq_data_pipeline.aggregate compute \
--norm_df normalized.parquet \
--out_dir out/ \
--aggregator mean
Valid --aggregator values: mean, median, MAD, std, EMD, KS,
QQ, AUROC.
normalize
Normalize an aggregate DataFrame to synonymous (synonymous-variant) rows,
fitting and applying a normalizer. Outputs normalized.parquet and
normalizer.pkl.
python -m fisseq_data_pipeline.aggregate normalize \
--agg_df aggregated.parquet \
--out_dir out/
API reference
fisseq_data_pipeline.aggregate.BaseAggregator
Bases: ABC
Abstract base class for all aggregators.
All subclasses expose a single aggregate(agg_df) method that accepts
an input DataFrame and returns a per-(batch, label) summary DataFrame.
Source code in src/fisseq_data_pipeline/aggregate.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43 | class BaseAggregator(abc.ABC):
"""
Abstract base class for all aggregators.
All subclasses expose a single ``aggregate(agg_df)`` method that accepts
an input DataFrame and returns a per-(batch, label) summary DataFrame.
"""
@abc.abstractmethod
def aggregate(self, agg_df: pl.DataFrame) -> pl.DataFrame:
"""
Compute per-(batch, label) statistics.
Parameters
----------
agg_df : pl.DataFrame
Input DataFrame containing ``_meta_batch``, ``_meta_label``, and
feature columns.
Returns
-------
pl.DataFrame
One row per (batch, label) group with computed statistics.
"""
raise NotImplementedError
|
aggregate(agg_df)
abstractmethod
Compute per-(batch, label) statistics.
| Parameters: |
-
agg_df
(DataFrame)
–
Input DataFrame containing _meta_batch, _meta_label, and
feature columns.
|
| Returns: |
-
DataFrame
–
One row per (batch, label) group with computed statistics.
|
Source code in src/fisseq_data_pipeline/aggregate.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43 | @abc.abstractmethod
def aggregate(self, agg_df: pl.DataFrame) -> pl.DataFrame:
"""
Compute per-(batch, label) statistics.
Parameters
----------
agg_df : pl.DataFrame
Input DataFrame containing ``_meta_batch``, ``_meta_label``, and
feature columns.
Returns
-------
pl.DataFrame
One row per (batch, label) group with computed statistics.
"""
raise NotImplementedError
|
fisseq_data_pipeline.aggregate.NativeAggregator
Bases: BaseAggregator
Base class for aggregators implemented as native Polars group-by expressions.
Subclasses override :meth:polars_exprs to return a list of Polars
expressions that are passed directly to group_by().agg().
Source code in src/fisseq_data_pipeline/aggregate.py
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173 | class NativeAggregator(BaseAggregator):
"""
Base class for aggregators implemented as native Polars group-by expressions.
Subclasses override :meth:`polars_exprs` to return a list of Polars
expressions that are passed directly to ``group_by().agg()``.
"""
@abc.abstractmethod
def polars_exprs(self, feature_cols: list[str]) -> list[pl.Expr]:
"""
Return Polars aggregation expressions for the given feature columns.
Parameters
----------
feature_cols : list[str]
Names of the feature columns to aggregate.
Returns
-------
list[pl.Expr]
Polars expressions to pass to ``group_by().agg()``.
"""
raise NotImplementedError
def aggregate(self, agg_df: pl.DataFrame) -> pl.DataFrame:
feature_cols = get_feature_cols(agg_df, as_string=True)
return (
agg_df.filter(pl.col("_meta_label") != "WT")
.group_by("_meta_label", "_meta_batch")
.agg(self.polars_exprs(feature_cols))
)
|
polars_exprs(feature_cols)
abstractmethod
Return Polars aggregation expressions for the given feature columns.
| Parameters: |
-
feature_cols
(list[str])
–
Names of the feature columns to aggregate.
|
| Returns: |
-
list[Expr]
–
Polars expressions to pass to group_by().agg().
|
Source code in src/fisseq_data_pipeline/aggregate.py
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165 | @abc.abstractmethod
def polars_exprs(self, feature_cols: list[str]) -> list[pl.Expr]:
"""
Return Polars aggregation expressions for the given feature columns.
Parameters
----------
feature_cols : list[str]
Names of the feature columns to aggregate.
Returns
-------
list[pl.Expr]
Polars expressions to pass to ``group_by().agg()``.
"""
raise NotImplementedError
|
fisseq_data_pipeline.aggregate.MeanAggregator
Bases: NativeAggregator
Computes per-group mean for each feature column.
Source code in src/fisseq_data_pipeline/aggregate.py
| class MeanAggregator(NativeAggregator):
"""Computes per-group mean for each feature column."""
def polars_exprs(self, feature_cols: list[str]) -> list[pl.Expr]:
return [pl.col(f).mean().alias(f"{f}_mean") for f in feature_cols]
|
Bases: NativeAggregator
Computes per-group median for each feature column.
Source code in src/fisseq_data_pipeline/aggregate.py
| class MedianAggregator(NativeAggregator):
"""Computes per-group median for each feature column."""
def polars_exprs(self, feature_cols: list[str]) -> list[pl.Expr]:
return [pl.col(f).median().alias(f"{f}_median") for f in feature_cols]
|
fisseq_data_pipeline.aggregate.MADAggregator
Bases: NativeAggregator
Computes per-group median absolute deviation (MAD) for each feature column.
Source code in src/fisseq_data_pipeline/aggregate.py
190
191
192
193
194
195
196
197 | class MADAggregator(NativeAggregator):
"""Computes per-group median absolute deviation (MAD) for each feature column."""
def polars_exprs(self, feature_cols: list[str]) -> list[pl.Expr]:
return [
(pl.col(f) - pl.col(f).median()).abs().median().alias(f"{f}_MAD")
for f in feature_cols
]
|
fisseq_data_pipeline.aggregate.StdAggregator
Bases: NativeAggregator
Computes per-group standard deviation for each feature column.
Source code in src/fisseq_data_pipeline/aggregate.py
| class StdAggregator(NativeAggregator):
"""Computes per-group standard deviation for each feature column."""
def polars_exprs(self, feature_cols: list[str]) -> list[pl.Expr]:
return [pl.col(f).std().alias(f"{f}_std") for f in feature_cols]
|
fisseq_data_pipeline.aggregate.ReferenceBaseAggregator
Bases: BaseAggregator
Base class for aggregators that compare each group against a reference
distribution, typically built from control rows.
Subclasses override :meth:compute_statistic to implement a specific
scalar summary (e.g. EMD, KS statistic) for each feature.
| Parameters: |
-
reference_df
(DataFrame)
–
Reference DataFrame (typically control rows) used as the comparison
distribution in :meth:compute_statistic.
|
Source code in src/fisseq_data_pipeline/aggregate.py
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139 | class ReferenceBaseAggregator(BaseAggregator):
"""
Base class for aggregators that compare each group against a reference
distribution, typically built from control rows.
Subclasses override :meth:`compute_statistic` to implement a specific
scalar summary (e.g. EMD, KS statistic) for each feature.
Parameters
----------
reference_df : pl.DataFrame
Reference DataFrame (typically control rows) used as the comparison
distribution in :meth:`compute_statistic`.
"""
def __init__(self, reference_df: pl.DataFrame) -> None:
self.feature_cols = get_feature_cols(reference_df, as_string=True)
self.reference_df = reference_df
@abc.abstractmethod
def compute_statistic(
self, label: str, batch: str, group_df: pl.DataFrame, ref_df: pl.DataFrame
) -> dict[str, Any]:
"""
Compute per-feature statistics for a single (batch, label) group.
Parameters
----------
label : str
The variant label for this group.
batch : str
The batch identifier for this group.
group_df : pl.DataFrame
The subset of the input DataFrame corresponding to this
(batch, label) group.
ref_df : pl.DataFrame
The subset of the reference DataFrame corresponding to this batch.
Returns
-------
dict[str, Any]
A dict with at minimum ``_meta_label`` and ``_meta_batch`` keys,
plus one entry per feature column containing the computed statistic.
"""
raise NotImplementedError
def aggregate(
self,
agg_df: pl.DataFrame,
n_jobs: int = -1,
backend: str = "threading",
verbose: int = 0,
) -> pl.DataFrame:
"""
Compute per-(batch, label) statistics for all features using joblib.
Filters out wildtype rows, groups by ``_meta_label`` and
``_meta_batch``, and dispatches one :meth:`compute_statistic` call
per group in parallel.
Parameters
----------
agg_df : pl.DataFrame
Input DataFrame containing ``_meta_batch``, ``_meta_label``, and
feature columns.
n_jobs : int, optional
Number of parallel jobs passed to joblib.
backend : str, optional
Joblib backend.
verbose : int, optional
Joblib verbosity level. Defaults to ``0``.
Returns
-------
pl.DataFrame
A DataFrame with one row per (batch, label) group and one column
per feature containing the computed statistic.
"""
groups = agg_df.filter(pl.col("_meta_label") != "WT").group_by(
"_meta_label", "_meta_batch"
)
tasks = [((label, batch), group_df) for (label, batch), group_df in groups]
dicts = joblib.Parallel(n_jobs=n_jobs, backend=backend, verbose=verbose)(
joblib.delayed(self.compute_statistic)(
label=label,
batch=batch,
group_df=group_df,
ref_df=self.reference_df.filter(pl.col("_meta_batch") == batch),
)
for (label, batch), group_df in tasks
)
return pl.DataFrame(dicts)
|
aggregate(agg_df, n_jobs=-1, backend='threading', verbose=0)
Compute per-(batch, label) statistics for all features using joblib.
Filters out wildtype rows, groups by _meta_label and
_meta_batch, and dispatches one :meth:compute_statistic call
per group in parallel.
| Parameters: |
-
agg_df
(DataFrame)
–
Input DataFrame containing _meta_batch, _meta_label, and
feature columns.
-
n_jobs
(int, default:
-1
)
–
Number of parallel jobs passed to joblib.
-
backend
(str, default:
'threading'
)
–
-
verbose
(int, default:
0
)
–
Joblib verbosity level. Defaults to 0.
|
| Returns: |
-
DataFrame
–
A DataFrame with one row per (batch, label) group and one column
per feature containing the computed statistic.
|
Source code in src/fisseq_data_pipeline/aggregate.py
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139 | def aggregate(
self,
agg_df: pl.DataFrame,
n_jobs: int = -1,
backend: str = "threading",
verbose: int = 0,
) -> pl.DataFrame:
"""
Compute per-(batch, label) statistics for all features using joblib.
Filters out wildtype rows, groups by ``_meta_label`` and
``_meta_batch``, and dispatches one :meth:`compute_statistic` call
per group in parallel.
Parameters
----------
agg_df : pl.DataFrame
Input DataFrame containing ``_meta_batch``, ``_meta_label``, and
feature columns.
n_jobs : int, optional
Number of parallel jobs passed to joblib.
backend : str, optional
Joblib backend.
verbose : int, optional
Joblib verbosity level. Defaults to ``0``.
Returns
-------
pl.DataFrame
A DataFrame with one row per (batch, label) group and one column
per feature containing the computed statistic.
"""
groups = agg_df.filter(pl.col("_meta_label") != "WT").group_by(
"_meta_label", "_meta_batch"
)
tasks = [((label, batch), group_df) for (label, batch), group_df in groups]
dicts = joblib.Parallel(n_jobs=n_jobs, backend=backend, verbose=verbose)(
joblib.delayed(self.compute_statistic)(
label=label,
batch=batch,
group_df=group_df,
ref_df=self.reference_df.filter(pl.col("_meta_batch") == batch),
)
for (label, batch), group_df in tasks
)
return pl.DataFrame(dicts)
|
compute_statistic(label, batch, group_df, ref_df)
abstractmethod
Compute per-feature statistics for a single (batch, label) group.
| Parameters: |
-
label
(str)
–
The variant label for this group.
-
batch
(str)
–
The batch identifier for this group.
-
group_df
(DataFrame)
–
The subset of the input DataFrame corresponding to this
(batch, label) group.
-
ref_df
(DataFrame)
–
The subset of the reference DataFrame corresponding to this batch.
|
| Returns: |
-
dict[str, Any]
–
A dict with at minimum _meta_label and _meta_batch keys,
plus one entry per feature column containing the computed statistic.
|
Source code in src/fisseq_data_pipeline/aggregate.py
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90 | @abc.abstractmethod
def compute_statistic(
self, label: str, batch: str, group_df: pl.DataFrame, ref_df: pl.DataFrame
) -> dict[str, Any]:
"""
Compute per-feature statistics for a single (batch, label) group.
Parameters
----------
label : str
The variant label for this group.
batch : str
The batch identifier for this group.
group_df : pl.DataFrame
The subset of the input DataFrame corresponding to this
(batch, label) group.
ref_df : pl.DataFrame
The subset of the reference DataFrame corresponding to this batch.
Returns
-------
dict[str, Any]
A dict with at minimum ``_meta_label`` and ``_meta_batch`` keys,
plus one entry per feature column containing the computed statistic.
"""
raise NotImplementedError
|
fisseq_data_pipeline.aggregate.EMDAggregator
Bases: ReferenceBaseAggregator
Computes per-group 1D Wasserstein distances (Earth Mover's Distance)
against a reference distribution for each feature column.
Source code in src/fisseq_data_pipeline/aggregate.py
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221 | class EMDAggregator(ReferenceBaseAggregator):
"""
Computes per-group 1D Wasserstein distances (Earth Mover's Distance)
against a reference distribution for each feature column.
"""
def compute_statistic(
self, label: str, batch: str, group_df: pl.DataFrame, ref_df: pl.DataFrame
) -> dict[str, Any]:
row: dict[str, Any] = {"_meta_label": label, "_meta_batch": batch}
for feat in self.feature_cols:
variant = group_df.get_column(feat).to_numpy()
reference = ref_df.get_column(feat).to_numpy()
row[f"{feat}_EMD"] = scipy.stats.wasserstein_distance(variant, reference)
return row
|
fisseq_data_pipeline.aggregate.KSAggregator
Bases: ReferenceBaseAggregator
Computes per-group two-sample Kolmogorov-Smirnov statistics against
a reference distribution for each feature column.
Source code in src/fisseq_data_pipeline/aggregate.py
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238 | class KSAggregator(ReferenceBaseAggregator):
"""
Computes per-group two-sample Kolmogorov-Smirnov statistics against
a reference distribution for each feature column.
"""
def compute_statistic(
self, label: str, batch: str, group_df: pl.DataFrame, ref_df: pl.DataFrame
) -> dict[str, Any]:
row: dict[str, Any] = {"_meta_label": label, "_meta_batch": batch}
for feat in self.feature_cols:
variant = group_df.get_column(feat).to_numpy()
reference = ref_df.get_column(feat).to_numpy()
row[f"{feat}_KS"] = scipy.stats.ks_2samp(variant, reference).statistic
return row
|
fisseq_data_pipeline.aggregate.QQCorrelationAggregator
Bases: ReferenceBaseAggregator
Computes per-group Q-Q correlation against a reference distribution for
each feature column.
| Parameters: |
-
reference_df
(DataFrame)
–
Reference DataFrame (typically control rows).
-
n_quantiles
(int, default:
100
)
–
Number of quantile points to evaluate. Defaults to 100.
|
Source code in src/fisseq_data_pipeline/aggregate.py
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270 | class QQCorrelationAggregator(ReferenceBaseAggregator):
"""
Computes per-group Q-Q correlation against a reference distribution for
each feature column.
Parameters
----------
reference_df : pl.DataFrame
Reference DataFrame (typically control rows).
n_quantiles : int, optional
Number of quantile points to evaluate. Defaults to ``100``.
"""
def __init__(self, reference_df: pl.DataFrame, n_quantiles: int = 100) -> None:
super().__init__(reference_df)
self.quantile_points = np.linspace(0, 1, n_quantiles)
def compute_statistic(
self, label: str, batch: str, group_df: pl.DataFrame, ref_df: pl.DataFrame
) -> dict[str, Any]:
row: dict[str, Any] = {"_meta_label": label, "_meta_batch": batch}
for feat in self.feature_cols:
variant = group_df.get_column(feat).to_numpy()
reference = ref_df.get_column(feat).to_numpy()
variant_quantiles = np.quantile(variant, self.quantile_points)
reference_quantiles = np.quantile(reference, self.quantile_points)
row[f"{feat}_QQ"] = scipy.stats.pearsonr(
variant_quantiles, reference_quantiles
).statistic
return row
|
fisseq_data_pipeline.aggregate.AUROCAggregator
Bases: ReferenceBaseAggregator
Computes per-group AUROC against a reference distribution for each
feature column.
Variant samples are labelled 1 and reference samples are labelled
0. 0.5 indicates identical distributions and 1.0 indicates
perfect separability.
Source code in src/fisseq_data_pipeline/aggregate.py
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304 | class AUROCAggregator(ReferenceBaseAggregator):
"""
Computes per-group AUROC against a reference distribution for each
feature column.
Variant samples are labelled ``1`` and reference samples are labelled
``0``. ``0.5`` indicates identical distributions and ``1.0`` indicates
perfect separability.
"""
def compute_statistic(
self, label: str, batch: str, group_df: pl.DataFrame, ref_df: pl.DataFrame
) -> dict[str, Any]:
row: dict[str, Any] = {"_meta_label": label, "_meta_batch": batch}
for feat in self.feature_cols:
variant = group_df.get_column(feat).to_numpy()
reference = ref_df.get_column(feat).to_numpy()
values = np.concatenate([reference, variant])
labels = np.concatenate(
[
np.zeros(len(reference)),
np.ones(len(variant)),
]
)
auroc = sklearn.metrics.roc_auc_score(labels, values)
if auroc < 0.5:
auroc = 1 - auroc
row[f"{feat}_AUROC"] = auroc
return row
|
fisseq_data_pipeline.aggregate.compute_cli(norm_df, out_dir, aggregator)
Compute per-(batch, label) aggregate statistics and write the result.
Reads a normalized feature parquet, runs the specified aggregator over all
(batch, label) groups (excluding WT), and writes the result — containing
only the aggregate feature columns — to <out_dir>/aggregated.parquet.
For reference-based aggregators (EMD, KS, QQ, AUROC) the per-batch
reference distributions are built from rows where
_meta_is_control == True.
| Parameters: |
-
norm_df
(PathLike)
–
Path to the input normalized feature parquet. Must contain
_meta_batch, _meta_label, and feature columns. Reference-based
aggregators also require _meta_is_control.
-
out_dir
(PathLike)
–
Output directory. Created if it does not exist.
-
aggregator
(str)
–
Aggregation method. One of: mean, median, MAD, std,
EMD, KS, QQ, AUROC.
|
Source code in src/fisseq_data_pipeline/aggregate.py
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571 | def compute_cli(
norm_df: PathLike,
out_dir: PathLike,
aggregator: str,
) -> None:
"""
Compute per-(batch, label) aggregate statistics and write the result.
Reads a normalized feature parquet, runs the specified aggregator over all
(batch, label) groups (excluding WT), and writes the result — containing
only the aggregate feature columns — to ``<out_dir>/aggregated.parquet``.
For reference-based aggregators (EMD, KS, QQ, AUROC) the per-batch
reference distributions are built from rows where
``_meta_is_control == True``.
Parameters
----------
norm_df : PathLike
Path to the input normalized feature parquet. Must contain
``_meta_batch``, ``_meta_label``, and feature columns. Reference-based
aggregators also require ``_meta_is_control``.
out_dir : PathLike
Output directory. Created if it does not exist.
aggregator : str
Aggregation method. One of: ``mean``, ``median``, ``MAD``, ``std``,
``EMD``, ``KS``, ``QQ``, ``AUROC``.
"""
out_dir = pathlib.Path(out_dir)
out_dir.mkdir(parents=True, exist_ok=True)
setup_logging(out_dir)
if aggregator not in _AGGREGATORS:
raise ValueError(
f"Unknown aggregator {aggregator!r}. "
f"Choose from: {sorted(_AGGREGATORS)}"
)
logging.info("Loading normalized dataframe from %s", str(norm_df))
df = pl.read_parquet(norm_df)
logging.info("Loaded: %d rows × %d cols", df.height, df.width)
agg_cls = _AGGREGATORS[aggregator]
if issubclass(agg_cls, ReferenceBaseAggregator):
if "_meta_is_control" not in df.columns:
raise ValueError(
f"Aggregator {aggregator!r} requires '_meta_is_control' column."
)
control_df = df.filter(pl.col("_meta_is_control"))
logging.info("Building reference from %d control rows", control_df.height)
agg = agg_cls(control_df)
else:
agg = agg_cls()
logging.info("Running %s aggregator", aggregator)
result = agg.aggregate(df)
logging.info("Result: %d rows × %d cols", result.height, result.width)
out_path = out_dir / "aggregated.parquet"
logging.info("Writing aggregated dataframe to %s", str(out_path))
result.write_parquet(out_path)
logging.info("Done")
|
fisseq_data_pipeline.aggregate.normalize_cli(agg_df, out_dir)
Normalize an aggregate dataframe to the synonymous-variant baseline.
Reads an aggregate parquet (produced by compute), classifies each row's
_meta_label with :func:variant_classification, marks "Synonymous"
rows as the reference population, fits a batch-wise z-score normalizer on
those rows, and writes the normalized result.
| Parameters: |
-
agg_df
(PathLike)
–
Path to the aggregate feature parquet. Must contain _meta_batch,
_meta_label, and aggregate feature columns.
-
out_dir
(PathLike)
–
Output directory. Created if it does not exist. Writes:
normalized.parquet — the normalized aggregate dataframe
normalizer.pkl — the fitted :class:~.normalize.Normalizer
|
Source code in src/fisseq_data_pipeline/aggregate.py
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630 | def normalize_cli(
agg_df: PathLike,
out_dir: PathLike,
) -> None:
"""
Normalize an aggregate dataframe to the synonymous-variant baseline.
Reads an aggregate parquet (produced by ``compute``), classifies each row's
``_meta_label`` with :func:`variant_classification`, marks "Synonymous"
rows as the reference population, fits a batch-wise z-score normalizer on
those rows, and writes the normalized result.
Parameters
----------
agg_df : PathLike
Path to the aggregate feature parquet. Must contain ``_meta_batch``,
``_meta_label``, and aggregate feature columns.
out_dir : PathLike
Output directory. Created if it does not exist. Writes:
- ``normalized.parquet`` — the normalized aggregate dataframe
- ``normalizer.pkl`` — the fitted :class:`~.normalize.Normalizer`
"""
out_dir = pathlib.Path(out_dir)
out_dir.mkdir(parents=True, exist_ok=True)
setup_logging(out_dir)
logging.info("Loading aggregate dataframe from %s", str(agg_df))
df = pl.read_parquet(agg_df)
logging.info("Loaded: %d rows × %d cols", df.height, df.width)
logging.info("Marking synonymous rows as normalization reference")
df = df.with_columns(
(
pl.col("_meta_label").map_elements(
variant_classification, return_dtype=pl.Utf8
)
== "Synonymous"
).alias("_meta_is_control")
)
lf = df.lazy()
logging.info("Fitting batch-wise normalizer on synonymous rows")
normalizer = fit_normalizer(lf, fit_batch_wise=True, fit_only_on_control=True)
logging.info("Normalizing")
normalized_df = normalize(lf, normalizer).collect()
normalized_df = normalized_df.drop("_meta_is_control")
out_path = out_dir / "normalized.parquet"
logging.info("Writing normalized dataframe to %s", str(out_path))
normalized_df.write_parquet(out_path)
norm_path = out_dir / "normalizer.pkl"
logging.info("Writing normalizer to %s", str(norm_path))
normalizer.save(norm_path)
logging.info("Done")
|