Skip to content

Preprocess Script

The preprocess script is an optional script used to provide inputs to a workflow which cannot be solely formatted by the analysis submission form. This can help with common tasks like:

  • Preparing the essential inputs.json and options.json for WDL/Cromwell workflows
  • Formatting a samplesheet.csv for Nextflow workflows
  • Performing any validation of files or parameters before the workflow is run
  • Executing arbitrary logic based on user inputs

The flow of an analysis in Cirro with a preprocess script is as follows:

  1. User uploads a dataset with input files and sample metadata (samplesheet.csv).
  2. Cirro reads the samplesheet.csv file and stores the metadata in the database.
  3. User selects a pipeline, enters parameters, and submits the analysis.
  4. Cirro writes the parameters and sample metadata and makes them available to the preprocess script as ds.params, ds.samplesheet, and ds.files.
  5. Cirro runs the user-provided preprocess script, and writes out any additional files (if specified).
  6. Cirro runs the workflow.

Preprocess Flow Diagram

For workflows that require samplesheet-style inputs, the preprocess script is necessary because:

  • At the time of upload, the file locations are not known. The ds.files table will contain the S3 paths to the files needed by the workflow.
  • The sample metadata information may have been changed by the Samples table after upload.
  • The format of the samplesheet that the user uploads is decoupled the samplesheet that the workflow expects.
    • The user uploading the dataset might not be aware of the specific format required by the workflow.
    • The user might want to run multiple workflows on the same dataset, each with a different samplesheet format.
    • The user might want to run the same workflow on multiple input datasets, combining them into a single samplesheet.

Accessing Dataset Information

Each preprocess script will typically start with:

from cirro.helpers.preprocess_dataset import PreprocessDataset
ds = PreprocessDataset.from_running()

The ds object contains all of the information needed for the dataset which has been created for the analysis results.

Explanation: When a Pipeline is run by a user, a new Dataset is created to hold the results of that analysis. The new dataset is populated with

  • the location of the input data
  • the parameters selected by the user
  • the sample metadata (a snapshot from the time of submission)

The attributes of the ds object which can be used in the preprocess script are:

ds.files

A pandas DataFrame containing information on the files contained in the input datasets, and the sample that each file is assigned to.

Any other metadata which is assigned at the file level (e.g., the read variable which is parsed for paired-end FASTQ data) will also be present in ds.files.

print(ds.files)
sample file sampleIndex read
sampleA s3://path/to/sampleA_R1.fastq.gz 1 1
sampleA s3://path/to/sampleA_R2.fastq.gz 1 2
sampleB s3://path/to/sampleB_R1.fastq.gz 2 1
sampleB s3://path/to/sampleB_R2.fastq.gz 2 2

Note: The sampleIndex is used to differentiate between files which are uploaded as a pair, but which have the same sample name (e.g., combining FASTQ files from multiple lanes of a genome sequencer).

ds.samplesheet

A pandas DataFrame containing all of the metadata assigned to the samples present in the input datasets (at the time of analysis).

print(ds.samplesheet)
sample group age
sampleA Group_1 55
sampleB Group_2 72

ds.wide_samplesheet()

Pivot the long-form ds.files to a wide-form DataFrame. The arguments required are:

  • index (default ["sampleIndex", "sample", "lane"]): The columns used for the index in the wide samplesheet. If any of these columns do not exist in ds.files.columns, they will be silently omitted from the results of ds.wide_samplesheet()
  • columns (default read): The column containing values which will be used for column headers in the wide samplesheet
  • values (default file): The column which will be used for the values in the wide samplesheet
  • column_prefix (default fastq_): The string prefix will be added to the values from the columns

For example, using the ds.files table above:

ds.wide_samplesheet()
sampleIndex sample fastq_1 fastq_2
1 sampleA s3://path/to/sampleA_R1.fastq.gz s3://path/to/sampleA_R2.fastq.gz
2 sampleB s3://path/to/sampleB_R1.fastq.gz s3://path/to/sampleB_R2.fastq.gz

Note that the results of ds.wide_samplesheet() may often reproduce the contents of the samplesheet.csv used to upload a paired-end FASTQ dataset. Importantly, the path to the individual files will reflect their current location in Cirro, and not the relative path they were uploaded from.

ds.metadata

Detailed information on the dataset which has been created for the analysis results.

{
    "dataset": {
        "id": "111",
        "createdAt": "2024-09-23T19:55:28.020052Z",
        "updatedAt": "<SNIP>",
        "createdBy": "hello@cirro.bio",
        "params": {
            "level": 3,
            "experiment_format": {
                "plates": "A B C",
                "wells": "1 2 3"
            },
            "custom_options": {
                "with_custom": false
            }
        },
        "info": {},
        "projectId": "000",
        "tags": [],
        "processId": "process-hutch-interop-qc-1_0",
        "name": "New Dataset",
        "status": "PENDING",
        "s3": "s3://project-000/datasets/111",
        "dataPath": "s3://project-000/datasets/111/data"
    },
    "project": {
        "id": "000",
        "name": "Test Project",
        "description": "Test Project Description",
        "status": "COMPLETED"
    },
    "process": {
        "id": "process-hutch-interop-qc-1_0",
        "createdAt": "<SNIP>",
        "updatedAt": "<SNIP>",
        "pipelineType": "Community",
        "name": "InterOp QC",
        "description": "InterOp QC Reporting - Generate QC metrics from Illumina Sequencing run",
        "executor": "NEXTFLOW",
        "documentationUrl": "https://github.com/FredHutch/interop-nf",
        "code": {
            "repositoryType": "GITHUB_PUBLIC",
            "uri": "FredHutch/interop-nf",
            "version": "1.0",
            "script": "main.nf"
        },
        "archived": false,
        "category": "Quality Control"
    },
    "inputs": [
        {
            "id": "aaa",
            "createdAt": "<SNIP>",
            "updatedAt": "<SNIP>",
            "createdBy": "hello@cirro.bio",
            "params": {},
            "info": {},
            "projectId": "000",
            "sourceDatasetIds": [],
            "tags": [],
            "processId": "sequencing-run",
            "name": "MiSeq Demo 1",
            "status": "COMPLETED",
            "description": "test",
            "s3": "s3://project-000/aaa",
            "dataPath": "s3://project-000/aaa/data"
        },
        {
            "id": "bbb",
            "createdAt": "<SNIP>",
            "updatedAt": "<SNIP>",
            "createdBy": "hello@cirro.bio",
            "params": {},
            "info": {},
            "projectId": "000",
            "sourceDatasetIds": [],
            "tags": [],
            "processId": "sequencing-run",
            "name": "MiSeq Demo 2",
            "status": "COMPLETED",
            "description": "test",
            "s3": "s3://project-000/bbb",
            "dataPath": "s3://project-000/bbb/data"
        }
    ]
}

ds.params

A dictionary with all of the parameter values populated by user input using the process-form.json and process-input.json configurations.

This object is only used to access parameter values (e.g. ds.params['foo'] or ds.params.get('foo')). Adding or removing parameters from this object will not be carried through to the workflow. For that purpose, use ds.add_param or ds.remove_param.

ds.add_param()

Method to add a parameter using its keyword and value. If the parameter keyword is already in use, this method will raise an error unless overwrite=True is set.

# To add the parameter "new_param", with value 1
ds.add_param("new_param", 1)
ds.params["new_param"] == 1

Note: While ds.params["new_param"] = 1 would assign the value within the context of the preprocess script, that value would not be passed along to the workflow without using the .add_param method.

ds.remove_param()

Method to remove a parameter from the workflow parameters. If the parameter does not currently exist, this method will raise an error unless force=True is set.

# To remove the parameter "new_param"
ds.remove_param("new_param")
ds.params.get("new_param") is None

ds.logger.info()

Print a helpful message to the logs, which is very helpful for debugging purposes.

# For example, to get an idea of what the samplesheet
# looks like before running the workflow
samplesheet = ds.wide_samplesheet()
ds.logger.info(samplesheet.to_csv(index=None))

WDL / Cromwell

For a WDL pipeline to run in Cirro (which uses the Cromwell executor in 'run mode'), at least two files must be created by the preprocess script: inputs.*.json and options.json.

Differentiating 'inputs' and 'options'

While Cirro sets up a single set of 'parameters' for a workflow, the configuration of the Cromwell executor will expect some of those values to be provided as options while others are inputs. For example, the location where final output files will be written is typically provided as final_workflow_outputs_dir (which is an option), while the reference genome used for alignment may be provided as WholeGenomeGermlineSingleSample.references (which is an input).

For this reason, the preprocess script is used to differentiate the parameters which should be submitted as inputs versus options.

After the preprocess script has completed, the headnode will expect to find:

  • A file named options.json
  • One or more files named inputs.*.json

Execution of Multiple inputs.json

The philosophy of WDL workflows established by GATK is that a workflow processes a single sample (or panel of samples) at a time. In contrast, the approach of nf-core is often that a batch of samples can be processed in parallel.

To support both approaches for WDL (analyzing either a single or multiple samples), the Cirro headnode can run the specified workflow multiple times, once for each file encountered with the file name pattern inputs.*.json (e.g. inputs.1.json, inputs.2.json) (note that * can be any string, not just numerals).

Because the Cromwell executor is used in "run mode", each file of inputs will be processed in serial (within the analysis of a single Cirro Dataset). While this approach may be slower in some cases than parallel execution, one minor benefit is that any computational steps which are repeated across runs will be cached, and will not be unnecessarily repeated during the workflow execution.

Useful Patterns

Splitting up inputs and options using a keyword prefix

A convenient approach for differentiating between workflow parameters which should be supplied via inputs vs. options is the presence of a keyword prefix which is named for the workflow being executed.

For example, when running the workflow named WholeGenomeGermlineSingleSample, all of the input parameters will be named something starting with WholeGenomeGermlineSingleSample.*.

To split up parameters based on the presence of that prefix and write out the appropriate files:

from cirro.helpers.preprocess_dataset import PreprocessDataset
import json

# Get the workflow execution information
ds = PreprocessDataset.from_running()

# Name of the WDL workflow being run
WORKFLOW_PREFIX = "WholeGenomeGermlineSingleSample"

# Split up the inputs and options using dict comprehension
inputs = {
    kw: val
    for kw, val in ds.params.items()
    if kw.startswith(WORKFLOW_PREFIX)
}
options = {
    kw: val
    for kw, val in ds.params.items()
    if not kw.startswith(WORKFLOW_PREFIX)
}

# Write out the inputs and options
json.dump(inputs, open("inputs.1.json", "w))
json.dump(options, open("options.json", "w))

Apply default params:

# Boilerplate which can be used with all workflows
ds.add_param("use_relative_output_paths", True)
ds.add_param("read_from_cache", True)
ds.add_param("write_to_cache", True)

Nextflow

The preprocess script is technically optional for Nextflow workflows, but it is often required for nf-core style workflows in which inputs are formatted as a samplesheet.

By default, all of the parameters set up in the pipeline configuration will be passed to a Nextflow workflow as parameters (in the params scope). As such, any modifications made to the parameters using ds.add_param or ds.remove_param will be reflected in the inputs to Nextflow without having to write any other files to the working directory.

Useful Patterns

Remove any index reads (for paired-end FASTQ datasets with I1/I2 in addition to R1/R2):

# Filter out any index files that may have been uploaded
ds.files = ds.files.loc[
    ds.files.apply(
        lambda r: r.get('readType', 'R') == 'R',
        axis=1
    )
]

Set up a wide-form samplesheet for paired-end FASTQ inputs, including metadata:

# Make a wide-form samplesheet with columns: sample, fastq_1, and fastq_2
samplesheet = ds.wide_samplesheet()

# Merge in the metadata for these samples (if any exists)
samplesheet = pd.merge(samplesheet, ds.samplesheet, left_on="sample", right_on="sample")

# Save to a file
samplesheet.to_csv("samplesheet.csv", index=None)

# Set up a workflow param pointing to that file (e.g., for nf-core/rnaseq)
ds.add_param("input", "samplesheet.csv")

Unit Testing

When developing the preprocess.py script, it can be helpful to test the script locally using a unit testing framework, rather than running the entire Cirro analysis workflow (which can be time-consuming).

To do this, you will need to make sure that your preprocess.py script is written in a way that allows you to test it in isolation. For example, if you need to create a sample sheet, you should extract the logic for creating the sample sheet into a separate function that can take the samples and files as arguments.

Here is an example of how you might structure your preprocess.py script to allow for unit testing:

import pandas as pd

from cirro.helpers.preprocess_dataset import PreprocessDataset

def make_manifest(samples: pd.DataFrame, files: pd.DataFrame) -> pd.DataFrame:
    """
    Create a samplesheet for my workflow
    """

    # Your logic for creating the samplesheet goes here
    pass

if __name__ == '__main__':
    ds = PreprocessDataset.from_running()
    manifest = make_manifest(ds.samplesheet, ds.files)
    manifest.to_csv('samplesheet.csv', index=False)

You can then create a separate test script, preprocess_test.py, that imports the make_manifest function and tests it using a unit testing framework like unittest.

Here is an example of how you might structure your preprocess_test.py script:

import unittest
from io import StringIO

import pandas

from preprocess import make_manifest

class PreprocessTest(unittest.TestCase):
    def test_formats_samplesheet(self):
        manifest = make_manifest(
            samples=pandas.read_csv("<TEST_FILE_PATH>"),
            files=pandas.read_csv("<TEST_FILE_PATH>")
        )

        manifest_csv = manifest.to_csv(index=False).strip()
        expected_manifest_csv = pandas.read_csv("<EXPECTED_MANIFEST_FILE_PATH>").to_csv(index=False).strip()
        self.assertEqual(manifest_csv, expected_manifest_csv)

In this test script, you would replace the file path placeholders to your test files. You can also use the StringIO class to create in-memory CSV data for testing.

Full Example

Let's say we have a Nextflow workflow that requires a sample sheet to be in this format:

sample forwardReads reverseReads group
sampleA s3://path/to/sampleA_R1.fastq.gz s3://path/to/sampleA_R2.fastq.gz Group_1
sampleB s3://path/to/sampleB_R1.fastq.gz s3://path/to/sampleB_R2.fastq.gz Group_2

The script will need make the following changes to the Cirro-provided sample sheet:

  • filter out any index files that may be present in the dataset upload,
  • pivot the files table to have one row per sample, with columns for each read,
  • merge the samples and files tables to bring in additional metadata,
  • rename the sample column to sampleID and read number columns to forwardReads and reverseReads,
  • drop all unnecessary columns,
  • make sure the sampleID only has allowed characters,
  • check to make sure there are samples in the dataset.

Expand the section below to see the full preprocess.py script:

preprocess.py
import re

import pandas
import pandas as pd

from cirro.helpers.preprocess_dataset import PreprocessDataset

def make_manifest(samples: pandas.DataFrame, files: pandas.DataFrame) -> pandas.DataFrame:
    """
    Create a samplesheet for my workflow
    """

    # Filter out any index files that may be present, we only want the reads
    files = files[files['readType'] == 'R']

    # Pivot the files table to have one row per sample, with columns for each read
    files = pd.concat([
        pd.DataFrame(dict(
            sample=sample,
            **{
                f"R{read}": read_files['file'].sort_values().tolist()
                for read, read_files in sample_files.groupby("read")
            }
        ))
        for sample, sample_files in files.assign(
            read=files["read"].apply(int)
        ).query(
            "read <= 2"
        ).groupby("sample")
    ])

    # Merge samples and files tables
    samplesheet = pd.merge(files,
                           samples,
                           left_on="sample",
                           right_on="sample",
                           validate="1:1",
                           how="inner")

    # Rename sample column to sampleID and read number columns
    samplesheet = samplesheet.rename(columns={
        "sample": "sampleID",
        "R1": "forwardReads",
        "R2": "reverseReads"
    })

    # Drop all columns except sampleID, forwardReads, reverseReads, and group
    samplesheet = samplesheet[["sampleID", "forwardReads", "reverseReads", "group"]]

    # Make sure the sampleID only has allowed characters (remove non-alphanumeric characters)
    samplesheet["sampleID"] = samplesheet["sampleID"].apply(
        lambda s: re.sub('\W+', '_', str(s))
    )

    if samplesheet.shape[0] == 0:
        raise ValueError("No samples found in the dataset")

    return samplesheet


if __name__ == '__main__':
    ds = PreprocessDataset.from_running()
    manifest = make_manifest(ds.samplesheet, ds.files)
    manifest.to_csv('samplesheet.csv', index=False)

To test this script, we create the file preprocess_test.py in the same directory as preprocess.py. See the full test script below:

preprocess_test.py
import unittest
from io import StringIO

import pandas

from preprocess import make_manifest

TEST_SAMPLESHEET = """
sample,group,randomData
sample1,control,test
sample2,treatment,test
""".strip()

TEST_FILES = """
sample,file,readType,read
sample1,sample1_R1.fastq.gz,R,1
sample1,sample1_R2.fastq.gz,R,2
sample1,sample1_I1.fastq.gz,I,1
sample2,sample2_R1.fastq.gz,R,1
""".strip()

EXPECTED_MANIFEST = """
sampleID,forwardReads,reverseReads,group
sample1,sample1_R1.fastq.gz,sample1_R2.fastq.gz,control
sample2,sample2_R1.fastq.gz,,treatment
""".strip()


class PreprocessTest(unittest.TestCase):
    def test_formats_samplesheet(self):
        manifest = make_manifest(
            samples=pandas.read_csv(StringIO(TEST_SAMPLESHEET)),
            files=pandas.read_csv(StringIO(TEST_FILES))
        )

        manifest_csv = manifest.to_csv(index=False).strip()
        self.assertEqual(manifest_csv, EXPECTED_MANIFEST)

    def test_throws_error_no_samples(self):
        files = pandas.read_csv(StringIO(TEST_FILES))
        only_index_files = files[files['readType'] == 'I']

        with self.assertRaises(ValueError):
            make_manifest(
                samples=pandas.read_csv(StringIO(TEST_SAMPLESHEET)),
                files=only_index_files
            )