Preprocess Script
The preprocess script is used to provide inputs to a workflow which cannot be solely formatted by the analysis submission form. This can help with common tasks like:
- Preparing the essential
inputs.json
andoptions.json
for WDL/Cromwell workflows - Formatting a
samplesheet.csv
for Nextflow workflows - Executing arbitrary logic based on user inputs
Accessing Dataset Information
Each preprocess script will typically start with:
from cirro.helpers.preprocess_dataset import PreprocessDataset
ds = PreprocessDataset.from_running()
The ds
object contains all of the information needed for the dataset
which has been created for the analysis results.
Explanation: When a Pipeline is run by a user, a new Dataset is created to hold the results of that analysis. The new dataset is populated with
- the location of the input data
- the parameters selected by the user
- the sample metadata (a snapshot from the time of submission)
The attributes of the ds
object which can be used in the preprocess
script are:
ds.files
A pandas DataFrame containing information on the files contained in the input dataset, and the sample that each file is assigned to.
Any other metadata which is assigned at the file level (e.g.,
the read
variable which is parsed for paired-end FASTQ data)
will also be present in ds.files
.
print(ds.files)
sample | file | sampleIndex | read |
---|---|---|---|
sampleA | s3://path/to/sampleA_R1.fastq.gz | 1 | 1 |
sampleA | s3://path/to/sampleA_R2.fastq.gz | 1 | 2 |
sampleB | s3://path/to/sampleB_R1.fastq.gz | 2 | 1 |
sampleB | s3://path/to/sampleB_R2.fastq.gz | 2 | 2 |
Note: The
sampleIndex
is used to differentiate between files which are uploaded as a pair, but which have the same sample name (e.g., combining FASTQ files from multiple lanes of a genome sequencer).
ds.samplesheet
A pandas DataFrame containing all of the metadata assigned to the samples present in the input dataset (at the time of analysis).
print(ds.samplesheet)
sample | group | age |
---|---|---|
sampleA | Group_1 | 55 |
sampleB | Group_2 | 72 |
ds.wide_samplesheet()
Pivot the long-form ds.files
to a wide-form DataFrame.
The arguments required are:
index
(default["sampleIndex", "sample", "lane"]
): The columns used for the index in the wide samplesheet. If any of these columns do not exist inds.files.columns
, they will be silently omitted from the results ofds.wide_samplesheet()
columns
(defaultread
): The column containing values which will be used for column headers in the wide samplesheetvalues
(defaultfile
): The column which will be used for the values in the wide samplesheetcolumn_prefix
(defaultfastq_
): The string prefix will be added to the values from thecolumns
For example, using the ds.files
table above:
ds.wide_samplesheet()
sampleIndex | sample | fastq_1 | fastq_2 |
---|---|---|---|
1 | sampleA | s3://path/to/sampleA_R1.fastq.gz | s3://path/to/sampleA_R2.fastq.gz |
2 | sampleB | s3://path/to/sampleB_R1.fastq.gz | s3://path/to/sampleB_R2.fastq.gz |
Note that the results of
ds.wide_samplesheet()
may often reproduce the contents of thesamplesheet.csv
used to upload a paired-end FASTQ dataset. Importantly, the path to the individual files will reflect their current location in Cirro, and not the relative path they were uploaded from.
ds.params
A dictionary
with all of the parameter values populated by user input using the process-form.json
and process-input.json
configurations.
This object is only used to access parameter values (e.g. ds.params['foo']
or
ds.params.get('foo')
).
Adding or removing parameters from this object will not be carried through to
the workflow.
For that purpose, use ds.add_param
or ds.remove_param
.
ds.add_param()
Method to add a parameter using its keyword and value.
If the parameter keyword is already in use, this method
will raise an error unless overwrite=True
is set.
# To add the parameter "new_param", with value 1
ds.add_param("new_param", 1)
ds.params["new_param"] == 1
Note: While
ds.params["new_param"] = 1
would assign the value within the context of the preprocess script, that value would not be passed along to the workflow without using the.add_param
method.
ds.remove_param()
Method to remove a parameter from the workflow parameters.
If the parameter does not currently exist, this method
will raise an error unless force=True
is set.
# To remove the parameter "new_param"
ds.remove_param("new_param")
ds.params.get("new_param") is None
ds.logger.info()
Print a helpful message to the logs, which is very helpful for debugging purposes.
# For example, to get an idea of what the samplesheet
# looks like before running the workflow
samplesheet = ds.wide_samplesheet()
ds.logger.info(samplesheet.to_csv(index=None))
WDL / Cromwell
For a WDL pipeline to run in Cirro (which uses the Cromwell executor in 'run mode'),
at least two files must be created by the preprocess script:
inputs.*.json
and options.json
.
Differentiating 'inputs' and 'options'
While Cirro sets up a single set of 'parameters' for a workflow,
the configuration of the Cromwell executor will expect some of
those values to be provided as options
while others are inputs
.
For example, the location where final output files will be written
is typically provided as final_workflow_outputs_dir
(which is an option
),
while the reference genome used for alignment may be provided as
WholeGenomeGermlineSingleSample.references
(which is an input
).
For this reason, the preprocess script is used to differentiate the
parameters which should be submitted as inputs
versus options
.
After the preprocess script has completed, the headnode will expect to find:
- A file named
options.json
- One or more files named
inputs.*.json
Execution of Multiple inputs.json
The philosophy of WDL workflows established by GATK is that a workflow processes a single sample (or panel of samples) at a time. In contrast, the approach of nf-core is often that a batch of samples can be processed in parallel.
To support both approaches for WDL (analyzing either a single or multiple
samples), the Cirro headnode can run the specified workflow multiple times,
once for each file encountered with the file name pattern inputs.*.json
(e.g. inputs.1.json
, inputs.2.json
) (note that *
can be any string,
not just numerals).
Because the Cromwell executor is used in "run mode", each file of inputs will be processed in serial (within the analysis of a single Cirro Dataset). While this approach may be slower in some cases than parallel execution, one minor benefit is that any computational steps which are repeated across runs will be cached, and will not be unnecessarily repeated during the workflow execution.
Useful Patterns
Splitting up inputs
and options
using a keyword prefix
A convenient approach for differentiating between workflow parameters
which should be supplied via inputs
vs. options
is the presence
of a keyword prefix which is named for the workflow being executed.
For example, when running the workflow named WholeGenomeGermlineSingleSample
,
all of the input
parameters will be named something starting with
WholeGenomeGermlineSingleSample.*
.
To split up parameters based on the presence of that prefix and write out the appropriate files:
from cirro.helpers.preprocess_dataset import PreprocessDataset
import json
# Get the workflow execution information
ds = PreprocessDataset.from_running()
# Name of the WDL workflow being run
WORKFLOW_PREFIX = "WholeGenomeGermlineSingleSample"
# Split up the inputs and options using dict comprehension
inputs = {
kw: val
for kw, val in ds.params.items()
if kw.startswith(WORKFLOW_PREFIX)
}
options = {
kw: val
for kw, val in ds.params.items()
if not kw.startswith(WORKFLOW_PREFIX)
}
# Write out the inputs and options
json.dump(inputs, open("inputs.1.json", "w))
json.dump(options, open("options.json", "w))
Apply default params:
# Boilerplate which can be used with all workflows
ds.add_param("use_relative_output_paths", True)
ds.add_param("read_from_cache", True)
ds.add_param("write_to_cache", True)
Nextflow
The preprocess script is technically optional for Nextflow workflows, but it is often required for nf-core style workflows in which inputs are formatted as a samplesheet.
By default, all of the parameters set up in the workflow configuration
will be passed to a Nextflow workflow as parameters (in the params
scope).
As such, any modifications made to the parameters using ds.add_param
or
ds.remove_param
will be reflected in the inputs to Nextflow without having
to write any other files to the working directory.
Useful Patterns
Remove any index reads (for paired-end FASTQ datasets with I1/I2 in addition to R1/R2):
# Filter out any index files that may have been uploaded
ds.files = ds.files.loc[
ds.files.apply(
lambda r: r.get('readType', 'R') == 'R',
axis=1
)
]
Set up a wide-form samplesheet for paired-end FASTQ inputs, including metadata:
# Make a wide-form samplesheet with columns: sample, fastq_1, and fastq_2
samplesheet = ds.wide_samplesheet()
# Merge in the metadata for these samples (if any exists)
samplesheet = pd.merge(samplesheet, ds.samplesheet, left_on="sample", right_on="sample")
# Save to a file
samplesheet.to_csv("samplesheet.csv", index=None)
# Set up a workflow param pointing to that file (e.g., for nf-core/rnaseq)
ds.add_param("input", "samplesheet.csv")