Pipeline

The FISSEQ data pipeline exposes a small CLI via the entry point fisseq-data-pipeline. Subcommands are provided by Python Fire:

  • run — Production, single-pass run: clean, normalize, and write outputs.
  • configure — Write a default configuration file.

Quick start

# Run the full pipeline
fisseq-data-pipeline run \
  --input_data_path data.parquet \
  --config config.yaml \
  --output_dir out

Write a default config to the current directory

fisseq-data-pipeline configure

# Write to a custom location
fisseq-data-pipeline configure --output_path path/to/config.yaml

Logging

Log level is controlled via the FISSEQ_PIPELINE_LOG_LEVEL environment variable (default: info).

FISSEQ_PIPELINE_LOG_LEVEL=debug fisseq-data-pipeline run \
  --input_data_path data.parquet

Command Interface


Run

fisseq_data_pipeline.pipeline.run(input_data_path, config=None, output_dir=None, eager_db_loading=False)

Run the full batch-correction pipeline.

This function performs a one-pass processing workflow that reads a full dataset, cleans invalid rows and columns, fits normalization statistics (optionally batch-wise and on control samples), applies normalization, and writes the resulting cleaned and normalized outputs to disk.

Production Pipeline steps
  1. Load and scan the input Parquet dataset into a Polars LazyFrame.
  2. Derive feature and metadata frames using configuration-specified column selections.
  3. Clean the dataset by removing:
    • Columns that contain only non-finite (NaN/inf) values.
    • Rows containing any non-finite feature values.
  4. Fit a batch-wise normalizer (computed from control samples only).
  5. Apply normalization to the full cleaned dataset.
  6. Write the cleaned, normalized, and fitted model artifacts to disk.
Parameters:
  • input_data_path (PathLike) –

    Path to the input Parquet file containing the full dataset to process.

  • config (Config or PathLike, default: None ) –

    Path to configuration

  • output_dir (PathLike, default: None ) –

    Directory to which cleaned and normalized outputs will be written. Defaults to the current working directory if not specified.

  • eager_db_loading (bool, default: False ) –

    If True, fully load the input Parquet file into memory eagerly using :func:polars.read_parquet. This avoids repeated on-disk scans and can significantly speed up processing on systems with sufficient RAM. If False (default), the dataset is accessed lazily using :func:polars.scan_parquet, which minimizes memory usage but may incur slower disk I/O during computation.

Outputs

Written to output_dir:

  • meta_data.parquet — cleaned metadata table.
  • features.parquet — cleaned feature matrix.
  • normalized.parquet — z-score normalized feature matrix.
  • normalizer.pkl — serialized :class:Normalizer object containing per-feature mean and standard deviation statistics.
CLI

Exposed via Fire at the fisseq-data-pipeline entry point, e.g.::

fisseq-data-pipeline run
    --input_data_path data.parquet
    --config config.yaml
    --output_dir out
Source code in src/fisseq_data_pipeline/pipeline.py
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
def run(
    input_data_path: PathLike,
    config: Optional[Config | PathLike] = None,
    output_dir: Optional[PathLike] = None,
    eager_db_loading: bool = False,
) -> None:
    """
    Run the full batch-correction pipeline.

    This function performs a one-pass processing workflow that reads a full
    dataset, cleans invalid rows and columns, fits normalization statistics
    (optionally batch-wise and on control samples), applies normalization, and
    writes the resulting cleaned and normalized outputs to disk.

    Production Pipeline steps
    -------------------------
    1. Load and scan the input Parquet dataset into a Polars LazyFrame.
    2. Derive feature and metadata frames using configuration-specified
       column selections.
    3. Clean the dataset by removing:
         - Columns that contain only non-finite (NaN/inf) values.
         - Rows containing any non-finite feature values.
    4. Fit a batch-wise normalizer (computed from control samples only).
    5. Apply normalization to the full cleaned dataset.
    6. Write the cleaned, normalized, and fitted model artifacts to disk.

    Parameters
    ----------
    input_data_path : PathLike
        Path to the input Parquet file containing the full dataset to process.
    config : Config or PathLike, optional
        Path to configuration
    output_dir : PathLike, optional
        Directory to which cleaned and normalized outputs will be written.
        Defaults to the current working directory if not specified.
    eager_db_loading : bool, default=False
        If True, fully load the input Parquet file into memory eagerly
        using :func:`polars.read_parquet`. This avoids repeated on-disk
        scans and can significantly speed up processing on systems with
        sufficient RAM. If False (default), the dataset is accessed lazily
        using :func:`polars.scan_parquet`, which minimizes memory usage but
        may incur slower disk I/O during computation.

    Outputs
    -------
    Written to ``output_dir``:

    - ``meta_data.parquet`` — cleaned metadata table.
    - ``features.parquet`` — cleaned feature matrix.
    - ``normalized.parquet`` — z-score normalized feature matrix.
    - ``normalizer.pkl`` — serialized :class:`Normalizer` object containing
      per-feature mean and standard deviation statistics.

    CLI
    ---
    Exposed via Fire at the ``fisseq-data-pipeline`` entry point, e.g.::

    ```bash
    fisseq-data-pipeline run
        --input_data_path data.parquet
        --config config.yaml
        --output_dir out
    ```
    """
    setup_logging(output_dir)
    logging.info("Starting validation with input path: %s", input_data_path)

    db_lf = get_db(input_data_path, eager_db_loading)
    output_dir = pathlib.Path.cwd() if output_dir is None else pathlib.Path(output_dir)
    logging.info("Output directory set to: %s", output_dir)

    logging.info("Collecting data matrices")
    config = Config(config)
    data_lf = get_data_lf(db_lf, config)

    logging.info("Cleaning data")
    data_lf = clean_data(data_lf)

    logging.info("Saving cleaned data")
    data_lf.sink_parquet(output_dir / "data-cleaned.parquet")

    logging.info("Fitting normalizer")
    normalizer = fit_normalizer(
        data_lf=data_lf,
        fit_batch_wise=True,
        fit_only_on_control=True,
    )

    logging.info("Saving normalizer")
    normalizer.save(output_dir / "normalizer.pkl")

    logging.info("Normalizing data")
    normalized_lf = normalize(data_lf, normalizer)

    logging.info("Saving normalized data")
    normalized_lf.sink_parquet(output_dir / "normalized.parquet")

Configure

fisseq_data_pipeline.pipeline.configure(output_path=None)

Write a copy of the default configuration to output_path.

Parameters:
  • output_path (PathLike, default: None ) –

    Target path for the configuration file. If None, writes config.yaml to the current working directory.

Returns:
  • None
CLI

Exposed via Fire at the fisseq-data-pipeline entry point

# Write config.yaml to CWD
fisseq-data-pipeline configure

# Write to a custom location
fisseq-data-pipeline configure --output_path path/to/config.yaml
Source code in src/fisseq_data_pipeline/pipeline.py
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
def configure(output_path: Optional[PathLike] = None) -> None:
    """
    Write a copy of the default configuration to ``output_path``.

    Parameters
    ----------
    output_path : PathLike, optional
        Target path for the configuration file. If ``None``, writes
        ``config.yaml`` to the current working directory.

    Returns
    -------
    None

    CLI
    ---
    Exposed via Fire at the ``fisseq-data-pipeline`` entry point

    ```bash
    # Write config.yaml to CWD
    fisseq-data-pipeline configure

    # Write to a custom location
    fisseq-data-pipeline configure --output_path path/to/config.yaml
    ```
    """
    if output_path is None:
        output_path = pathlib.Path.cwd() / "config.yaml"

    shutil.copy(DEFAULT_CFG_PATH, output_path)