Skip to content

Preprocess Script

The preprocess script is used to provide inputs to a workflow which cannot be solely formatted by the analysis submission form. This can help with common tasks like:

  • Preparing the essential inputs.json and options.json for WDL/Cromwell workflows
  • Formatting a samplesheet.csv for Nextflow workflows
  • Executing arbitrary logic based on user inputs

Accessing Dataset Information

Each preprocess script will typically start with:

from cirro.helpers.preprocess_dataset import PreprocessDataset
ds = PreprocessDataset.from_running()

The ds object contains all of the information needed for the dataset which has been created for the analysis results.

Explanation: When a Pipeline is run by a user, a new Dataset is created to hold the results of that analysis. The new dataset is populated with

  • the location of the input data
  • the parameters selected by the user
  • the sample metadata (a snapshot from the time of submission)

The attributes of the ds object which can be used in the preprocess script are:

ds.files

A pandas DataFrame containing information on the files contained in the input dataset, and the sample that each file is assigned to.

Any other metadata which is assigned at the file level (e.g., the read variable which is parsed for paired-end FASTQ data) will also be present in ds.files.

print(ds.files)
sample file sampleIndex read
sampleA s3://path/to/sampleA_R1.fastq.gz 1 1
sampleA s3://path/to/sampleA_R2.fastq.gz 1 2
sampleB s3://path/to/sampleB_R1.fastq.gz 2 1
sampleB s3://path/to/sampleB_R2.fastq.gz 2 2

Note: The sampleIndex is used to differentiate between files which are uploaded as a pair, but which have the same sample name (e.g., combining FASTQ files from multiple lanes of a genome sequencer).

ds.samplesheet

A pandas DataFrame containing all of the metadata assigned to the samples present in the input dataset (at the time of analysis).

print(ds.samplesheet)
sample group age
sampleA Group_1 55
sampleB Group_2 72

ds.wide_samplesheet()

Pivot the long-form ds.files to a wide-form DataFrame. The arguments required are:

  • index (default ["sampleIndex", "sample", "lane"]): The columns used for the index in the wide samplesheet. If any of these columns do not exist in ds.files.columns, they will be silently omitted from the results of ds.wide_samplesheet()
  • columns (default read): The column containing values which will be used for column headers in the wide samplesheet
  • values (default file): The column which will be used for the values in the wide samplesheet
  • column_prefix (default fastq_): The string prefix will be added to the values from the columns

For example, using the ds.files table above:

ds.wide_samplesheet()
sampleIndex sample fastq_1 fastq_2
1 sampleA s3://path/to/sampleA_R1.fastq.gz s3://path/to/sampleA_R2.fastq.gz
2 sampleB s3://path/to/sampleB_R1.fastq.gz s3://path/to/sampleB_R2.fastq.gz

Note that the results of ds.wide_samplesheet() may often reproduce the contents of the samplesheet.csv used to upload a paired-end FASTQ dataset. Importantly, the path to the individual files will reflect their current location in Cirro, and not the relative path they were uploaded from.

ds.params

A dictionary with all of the parameter values populated by user input using the process-form.json and process-input.json configurations.

This object is only used to access parameter values (e.g. ds.params['foo'] or ds.params.get('foo')). Adding or removing parameters from this object will not be carried through to the workflow. For that purpose, use ds.add_param or ds.remove_param.

ds.add_param()

Method to add a parameter using its keyword and value. If the parameter keyword is already in use, this method will raise an error unless overwrite=True is set.

# To add the parameter "new_param", with value 1
ds.add_param("new_param", 1)
ds.params["new_param"] == 1

Note: While ds.params["new_param"] = 1 would assign the value within the context of the preprocess script, that value would not be passed along to the workflow without using the .add_param method.

ds.remove_param()

Method to remove a parameter from the workflow parameters. If the parameter does not currently exist, this method will raise an error unless force=True is set.

# To remove the parameter "new_param"
ds.remove_param("new_param")
ds.params.get("new_param") is None

ds.logger.info()

Print a helpful message to the logs, which is very helpful for debugging purposes.

# For example, to get an idea of what the samplesheet
# looks like before running the workflow
samplesheet = ds.wide_samplesheet()
ds.logger.info(samplesheet.to_csv(index=None))

WDL / Cromwell

For a WDL pipeline to run in Cirro (which uses the Cromwell executor in 'run mode'), at least two files must be created by the preprocess script: inputs.*.json and options.json.

Differentiating 'inputs' and 'options'

While Cirro sets up a single set of 'parameters' for a workflow, the configuration of the Cromwell executor will expect some of those values to be provided as options while others are inputs. For example, the location where final output files will be written is typically provided as final_workflow_outputs_dir (which is an option), while the reference genome used for alignment may be provided as WholeGenomeGermlineSingleSample.references (which is an input).

For this reason, the preprocess script is used to differentiate the parameters which should be submitted as inputs versus options.

After the preprocess script has completed, the headnode will expect to find:

  • A file named options.json
  • One or more files named inputs.*.json

Execution of Multiple inputs.json

The philosophy of WDL workflows established by GATK is that a workflow processes a single sample (or panel of samples) at a time. In contrast, the approach of nf-core is often that a batch of samples can be processed in parallel.

To support both approaches for WDL (analyzing either a single or multiple samples), the Cirro headnode can run the specified workflow multiple times, once for each file encountered with the file name pattern inputs.*.json (e.g. inputs.1.json, inputs.2.json) (note that * can be any string, not just numerals).

Because the Cromwell executor is used in "run mode", each file of inputs will be processed in serial (within the analysis of a single Cirro Dataset). While this approach may be slower in some cases than parallel execution, one minor benefit is that any computational steps which are repeated across runs will be cached, and will not be unnecessarily repeated during the workflow execution.

Useful Patterns

Splitting up inputs and options using a keyword prefix

A convenient approach for differentiating between workflow parameters which should be supplied via inputs vs. options is the presence of a keyword prefix which is named for the workflow being executed.

For example, when running the workflow named WholeGenomeGermlineSingleSample, all of the input parameters will be named something starting with WholeGenomeGermlineSingleSample.*.

To split up parameters based on the presence of that prefix and write out the appropriate files:

from cirro.helpers.preprocess_dataset import PreprocessDataset
import json

# Get the workflow execution information
ds = PreprocessDataset.from_running()

# Name of the WDL workflow being run
WORKFLOW_PREFIX = "WholeGenomeGermlineSingleSample"

# Split up the inputs and options using dict comprehension
inputs = {
    kw: val
    for kw, val in ds.params.items()
    if kw.startswith(WORKFLOW_PREFIX)
}
options = {
    kw: val
    for kw, val in ds.params.items()
    if not kw.startswith(WORKFLOW_PREFIX)
}

# Write out the inputs and options
json.dump(inputs, open("inputs.1.json", "w))
json.dump(options, open("options.json", "w))

Apply default params:

# Boilerplate which can be used with all workflows
ds.add_param("use_relative_output_paths", True)
ds.add_param("read_from_cache", True)
ds.add_param("write_to_cache", True)

Nextflow

The preprocess script is technically optional for Nextflow workflows, but it is often required for nf-core style workflows in which inputs are formatted as a samplesheet.

By default, all of the parameters set up in the workflow configuration will be passed to a Nextflow workflow as parameters (in the params scope). As such, any modifications made to the parameters using ds.add_param or ds.remove_param will be reflected in the inputs to Nextflow without having to write any other files to the working directory.

Useful Patterns

Remove any index reads (for paired-end FASTQ datasets with I1/I2 in addition to R1/R2):

# Filter out any index files that may have been uploaded
ds.files = ds.files.loc[
    ds.files.apply(
        lambda r: r.get('readType', 'R') == 'R',
        axis=1
    )
]

Set up a wide-form samplesheet for paired-end FASTQ inputs, including metadata:

# Make a wide-form samplesheet with columns: sample, fastq_1, and fastq_2
samplesheet = ds.wide_samplesheet()

# Merge in the metadata for these samples (if any exists)
samplesheet = pd.merge(samplesheet, ds.samplesheet, left_on="sample", right_on="sample")

# Save to a file
samplesheet.to_csv("samplesheet.csv", index=None)

# Set up a workflow param pointing to that file (e.g., for nf-core/rnaseq)
ds.add_param("input", "samplesheet.csv")