Pipeline

The FISSEQ data pipeline exposes a small CLI via the entry point fisseq-data-pipeline. Subcommands are provided by Python Fire:

  • validate — Train/validate on a stratified split and write outputs.
  • run — Production, single-pass run (not yet implemented).
  • configure — Write a default configuration file.

Quick start

# validate with explicit config and output directory
fisseq-data-pipeline validate \
  --input_data_path data.parquet \
  --config config.yaml \
  --output_dir out \
  --test_size 0.2 \
  --write_train_results true

Write a default config to the current directory

fisseq-data-pipeline configure

Logging

FISSEQ_PIPELINE_LOG_LEVEL=debug fisseq-data-pipeline validate \
  --input_data_path data.parquet

Command Interface


Validate

fisseq_data_pipeline.pipeline.validate(input_data_path, config=None, output_dir=None, test_size=0.2, write_train_results=True)

Train pipeline parameters and run on a stratified train/test split.

Validation Pipeline steps
  1. Load dataset, derive feature/metadata frames, and clean invalid rows/columns.
  2. Build a stratification vector from _batch and _label and perform a single stratified train/test split.
  3. Fit a normalizer on the training split; transform train and test.
  4. Fit ComBat harmonizer on normalized training data; apply to the normalized test (and optionally train).
  5. Write unmodified, normalized, and harmonized Parquet outputs, and save fitted models.
Parameters:
  • input_data_path (PathLike) –

    Path to a Parquet file to scan and process.

  • config (Config or PathLike, default: None ) –

    Configuration object or path. Must define feature columns and the names of _batch, _label, and _is_control fields.

  • output_dir (PathLike, default: None ) –

    Output directory. Defaults to the current working directory.

  • test_size (float, default: 0.2 ) –

    Fraction of samples assigned to the test split.

  • write_train_results (bool, default: True ) –

    If True, also write the train split's unmodified/normalized/ harmonized outputs.

Outputs

Written to output_dir:

  • meta_data.test.parquet
  • features.test.parquet
  • normalized.test.parquet
  • harmonized.test.parquet
  • normalizer.pkl
  • harmonizer.pkl

If write_train_results=True:

  • meta_data.train.parquet
  • features.train.parquet
  • normalized.train.parquet
  • harmonized.train.parquet
CLI

Exposed via Fire at the fisseq-data-pipeline entry point, e.g.::

fisseq-data-pipeline validate
    --input_data_path data.parquet
    --config config.yaml
    --output_dir out
    --test_size 0.2
    --write_train_results true
Source code in src/fisseq_data_pipeline/pipeline.py
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
def validate(
    input_data_path: PathLike,
    config: Optional[Config | PathLike] = None,
    output_dir: Optional[PathLike] = None,
    test_size: float = 0.2,
    write_train_results: bool = True,
) -> None:
    """
    Train pipeline parameters and run on a stratified train/test split.

    Validation Pipeline steps
    --------------
    1. Load dataset, derive feature/metadata frames, and clean invalid
       rows/columns.
    2. Build a stratification vector from ``_batch`` and ``_label`` and
       perform a single stratified train/test split.
    3. Fit a normalizer on the training split; transform train and test.
    4. Fit ComBat harmonizer on normalized training data; apply to the
       normalized test (and optionally train).
    5. Write unmodified, normalized, and harmonized Parquet outputs, and
       save fitted models.

    Parameters
    ----------
    input_data_path : PathLike
        Path to a Parquet file to scan and process.
    config : Config or PathLike, optional
        Configuration object or path. Must define feature columns and the
        names of ``_batch``, ``_label``, and ``_is_control`` fields.
    output_dir : PathLike, optional
        Output directory. Defaults to the current working directory.
    test_size : float, default=0.2
        Fraction of samples assigned to the test split.
    write_train_results : bool, default=True
        If True, also write the train split's unmodified/normalized/
        harmonized outputs.

    Outputs
    -------
    Written to ``output_dir``:

    - ``meta_data.test.parquet``
    - ``features.test.parquet``
    - ``normalized.test.parquet``
    - ``harmonized.test.parquet``
    - ``normalizer.pkl``
    - ``harmonizer.pkl``

    If ``write_train_results=True``:

    - ``meta_data.train.parquet``
    - ``features.train.parquet``
    - ``normalized.train.parquet``
    - ``harmonized.train.parquet``

    CLI
    ---
    Exposed via Fire at the ``fisseq-data-pipeline`` entry point, e.g.::

    ```bash
    fisseq-data-pipeline validate
        --input_data_path data.parquet
        --config config.yaml
        --output_dir out
        --test_size 0.2
        --write_train_results true
    ```
    """
    setup_logging(output_dir)
    logging.info("Starting validation with input path: %s", input_data_path)

    data_df = pl.scan_parquet(input_data_path)
    output_dir = pathlib.Path.cwd() if output_dir is None else pathlib.Path(output_dir)
    logging.info("Output directory set to: %s", output_dir)

    logging.info("Collecting data matrices")
    config = Config(config)
    feature_df, meta_data_df = get_data_dfs(data_df, config)
    feature_df, meta_data_df = clean_data(feature_df, meta_data_df)
    feature_df, meta_data_df = drop_infrequent_pairs(feature_df, meta_data_df)
    train_feature_df, train_meta_df, test_feature_df, test_meta_df = train_test_split(
        feature_df, meta_data_df, test_size=test_size
    )

    logging.info("Fitting normalizer on train data")
    normalizer = fit_normalizer(
        train_feature_df,
        meta_data_df=train_meta_df,
        fit_only_on_control=True,
    )

    logging.info("Running normalizer on train/test data")
    train_normalized_df = normalize(train_feature_df, normalizer)
    test_normalized_df = normalize(test_feature_df, normalizer)

    logging.info("Fitting harmonizer on train data")
    harmonizer = fit_harmonizer(
        train_normalized_df, train_meta_df, fit_only_on_control=True
    )

    logging.info("Harmonizing test data")
    test_harmonized_df = harmonize(test_normalized_df, test_meta_df, harmonizer)

    # write outputs
    logging.info("Writing test outputs to %s", output_dir)
    test_meta_df.write_parquet(output_dir / "meta_data.test.parquet")
    test_feature_df.write_parquet(output_dir / "features.test.parquet")
    test_normalized_df.write_parquet(output_dir / "normalized.test.parquet")
    test_harmonized_df.write_parquet(output_dir / "harmonized.test.parquet")

    logging.info("Writing fitted parameters to %s", output_dir)
    with open(output_dir / f"normalizer.pkl", "wb") as f:
        pickle.dump(normalizer, f)
    with open(output_dir / f"harmonizer.pkl", "wb") as f:
        pickle.dump(harmonizer, f)

    if write_train_results:
        logging.info("Harmonizing train data")
        train_harmonized_df = harmonize(train_normalized_df, train_meta_df, harmonizer)
        train_meta_df.write_parquet(output_dir / "meta_data.train.parquet")
        train_feature_df.write_parquet(output_dir / "features.train.parquet")
        train_normalized_df.write_parquet(output_dir / "normalized.train.parquet")
        train_harmonized_df.write_parquet(output_dir / "harmonized.train.parquet")

Run

fisseq_data_pipeline.pipeline.run(*args, **kwargs)

Run the production pipeline on a full dataset.

This function is a placeholder for a single-pass production run (no train/test split). It is not implemented yet.

Raises:
  • NotImplementedError

    Always raised. The function body is not implemented.

CLI

Registered subcommand (placeholder)::

fisseq-data-pipeline run
Source code in src/fisseq_data_pipeline/pipeline.py
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
def run(*args, **kwargs) -> None:
    """
    Run the production pipeline on a full dataset.

    This function is a placeholder for a single-pass production run
    (no train/test split). It is not implemented yet.

    Raises
    ------
    NotImplementedError
        Always raised. The function body is not implemented.

    CLI
    ---
    Registered subcommand (placeholder)::

    ```bash
    fisseq-data-pipeline run
    ```
    """
    # TODO: implement run
    raise NotImplementedError()

options: show_signature: true show_signature_annotations: true show_source: true


Configure

fisseq_data_pipeline.pipeline.configure(output_path=None)

Write a copy of the default configuration to output_path.

Parameters:
  • output_path (PathLike, default: None ) –

    Target path for the configuration file. If None, writes config.yaml to the current working directory.

Returns:
  • None
CLI

Exposed via Fire at the fisseq-data-pipeline entry point

# Write config.yaml to CWD
fisseq-data-pipeline configure

# Write to a custom location
fisseq-data-pipeline configure --output_path path/to/config.yaml
Source code in src/fisseq_data_pipeline/pipeline.py
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
def configure(output_path: Optional[PathLike] = None) -> None:
    """
    Write a copy of the default configuration to ``output_path``.

    Parameters
    ----------
    output_path : PathLike, optional
        Target path for the configuration file. If ``None``, writes
        ``config.yaml`` to the current working directory.

    Returns
    -------
    None

    CLI
    ---
    Exposed via Fire at the ``fisseq-data-pipeline`` entry point

    ```bash
    # Write config.yaml to CWD
    fisseq-data-pipeline configure

    # Write to a custom location
    fisseq-data-pipeline configure --output_path path/to/config.yaml
    ```
    """
    if output_path is None:
        output_path = pathlib.Path.cwd() / "config.yaml"

    shutil.copy(DEFAULT_CFG_PATH, output_path)

options: show_signature: true show_signature_annotations: true show_source: true


Auxiliary functions

This functions are not exposed to the command line, and are for internal use only.


fisseq_data_pipeline.pipeline.setup_logging(log_dir=None)

Configure logging for the pipeline.

A log file and a console stream are set up simultaneously. The log file is created in the specified directory (or the current working directory by default) with a timestamped filename. The log level is controlled by the environment variable FISSEQ_PIPELINE_LOG_LEVEL (default: "info").

Parameters:
  • log_dir (PathLike, default: None ) –

    Directory where log files will be written. If None, the current working directory is used.

Source code in src/fisseq_data_pipeline/pipeline.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
def setup_logging(log_dir: Optional[PathLike] = None) -> None:
    """
    Configure logging for the pipeline.

    A log file and a console stream are set up simultaneously.
    The log file is created in the specified directory (or the current
    working directory by default) with a timestamped filename.
    The log level is controlled by the environment variable
    ``FISSEQ_PIPELINE_LOG_LEVEL`` (default: ``"info"``).

    Parameters
    ----------
    log_dir : PathLike, optional
        Directory where log files will be written.
        If ``None``, the current working directory is used.
    """
    log_levels = {
        "debug": logging.DEBUG,
        "info": logging.INFO,
        "warning": logging.WARNING,
        "error": logging.ERROR,
        "critical": logging.CRITICAL,
    }

    if log_dir is None:
        log_dir = pathlib.Path.cwd()
    else:
        log_dir = pathlib.Path(log_dir)

    dt_str = datetime.datetime.now().strftime("%Y%m%d:%H%M%S")
    filename = f"fisseq-data-pipeline-{dt_str}.log"
    log_path = log_dir / filename
    handlers = [logging.StreamHandler(), logging.FileHandler(log_path, mode="w")]

    log_level = os.getenv("FISSEQ_PIPELINE_LOG_LEVEL", "info")
    log_level = log_levels.get(log_level, logging.INFO)

    logging.basicConfig(
        level=log_level,
        format="%(asctime)s [%(levelname)s] [%(funcName)s] %(message)s",
        datefmt="%Y-%m-%d %H:%M:%S",
        handlers=handlers,
    )

fisseq_data_pipeline.pipeline.main()

CLI entry that registers Fire subcommands.

Subcommands
  • validate : Train/validate on a stratified split and write outputs.
  • run : Production, single-pass run (not yet implemented).
  • configure : Write a default configuration file.
CLI

Invoked as the fisseq-data-pipeline console script. For example::

fisseq-data-pipeline validate --input_data_path data.parquet
Source code in src/fisseq_data_pipeline/pipeline.py
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
def main() -> None:
    """
    CLI entry that registers Fire subcommands.

    Subcommands
    -----------
    - ``validate``  : Train/validate on a stratified split and write outputs.
    - ``run``       : Production, single-pass run (not yet implemented).
    - ``configure`` : Write a default configuration file.

    CLI
    ---
    Invoked as the ``fisseq-data-pipeline`` console script. For example::

        fisseq-data-pipeline validate --input_data_path data.parquet
    """
    try:
        fire.Fire({"validate": validate, "run": run, "configure": configure})
    except:
        logging.exception("Run failed due to the following exception:")
        raise