Example Pipeline Configuration: WDL
To provide a minimal example of how a pipeline written in WDL can be set up to run in Cirro, we will consider the gatk4-germline-snps-indels, which runs the GATK4 HaplotypeCaller tool in GVCF mode on a single sample according to GATK Best Practices.
Workflow documentation: here
Workflow Scope
To run, this workflow needs:
- A reference genome (consisting of a handful of related files) (in
inputs.json
) - Sequencing reads in BAM format (with a paired index file) (in
inputs.json
) - Configuration options (in
options.json
)
Input Files
The datasets which can be used as inputs to this workflow contain unaligned BAM files,
which can be uploaded directly by the user.
Those upstream datasets will contain both the BAM reads and the index files.
The complete list of input files will be provided to the preprocess script (preprocess.py
)
as a single table (with two columns: sample
and file
).
The sample name matching each BAM may be inferred directly by the filename,
or the user may provide a sample sheet.
Cirro Configuration Files
The files needed by Cirro to configure the workflow are:
process-form.json
: Used to render a form that can collect user inputsprocess-input.json
: Used to map user inputs (and other variables) to the preprocess script (accessed asds.params
in the script below)preprocess.py
: Run immediately before the workflow to write outinputs.json
andoptions.json
process-output.json
: Optionally transform outputs (e.g. images) for web visualization (not used in this example)process-compute.config
: Optionally customize the configuration of the workflow executor (not used in this example)
User Form (process-form.json
)
Because this workflow does not need to collect any user input, this file will be blank:
{
"form": {},
"ui": {}
}
Process Inputs (process-input.json
)
The information passed to the preprocess.py
script contains attributes which will be passed
both to inputs.json
and options.json
, differentiated by the workflow prefix HaplotypeCallerGvcf_GATK4
.
{
"HaplotypeCallerGvcf_GATK4.ref_dict": "s3://pubweb-references/GATK/hg38/v0/Homo_sapiens_assembly38.dict",
"HaplotypeCallerGvcf_GATK4.ref_fasta": "s3://pubweb-references/GATK/hg38/v0/Homo_sapiens_assembly38.fasta",
"HaplotypeCallerGvcf_GATK4.ref_fasta_index": "s3://pubweb-references/GATK/hg38/v0/Homo_sapiens_assembly38.fasta.fai",
"HaplotypeCallerGvcf_GATK4.scattered_calling_intervals_list": "s3://pubweb-references/GATK/hg38/v0/hg38_wgs_scattered_calling_intervals.txt",
"final_workflow_outputs_dir": "$.dataset.dataPath",
"final_call_logs_dir": "$.params.dataset.s3|/logs/",
"memory_retry_multiplier": 2.0,
"use_relative_output_paths": true
}
The reference genome files are located in a storage bucket which is available to all running processes in Cirro, mirroring the iGenomes reference genome project.
The string
"$.dataset.dataPath"
will be replaced by Cirro with the full path in S3 where all output files should be saved
Preprocess Script (preprocess.py
)
The preprocess script will perform the following steps:
- Separate out the information needed for
inputs.json
andoptions.json
- Loop over each of the input files and write out an
inputs.json
file - Write out the
options.json
file
#!/usr/bin/env python3
# Import the packages needed to run the code below
import json
from cirro.helpers.preprocess_dataset import PreprocessDataset
from cirro.api.models.s3_path import S3Path
def main():
"""Primary entrypoint for the script"""
# Get information on the analysis launched by the user
ds = PreprocessDataset.from_running()
# Set up the options.json file
setup_options(ds)
# Set up the inputs files
setup_inputs(ds)
def setup_options(ds: PreprocessDataset):
# Set up the scriptBucketName, which is needed by the workflow
# to stage analysis scripts
ds.add_param(
"scriptBucketName",
S3Path(ds.params['final_workflow_outputs_dir']).bucket
)
# Isolate the options arguments for the workflow
# Define a new dictionary which contains all of the items
# from `ds.params` which do not start with the workflow
# prefix "HaplotypeCallerGvcf_GATK4"
options = {
kw: val
for kw, val in ds.params.items()
if not kw.startswith("HaplotypeCallerGvcf_GATK4")
}
# Write out to the options.json file
write_json("options.json", options)
def yield_single_inputs(ds: PreprocessDataset) -> dict:
"""
This function is used to identify each of the BAM/BAI pairs
which have been provided as part of the input dataset by the user.
The output of this function will be the a yielded series of
struct objects with the `input_bam` and `input_bam_index` attributes.
"""
# The ds.files object is a DataFrame with columns for `sample` and `file`
# For example:
# | sample | file |
# |---------|-----------------|
# | sampleA | sampleA.bam |
# | sampleA | sampleA.bam.bai |
# | sampleB | sampleB.bam |
# | sampleB | sampleB.bam.bai |
# Iterate over each batch of files with a distinct sample
for sample, files in ds.files.groupby("sample"):
# Set up an empty struct
dat = dict(input_bam=None, input_bam_index=None)
# Iterate over each file
for file in files['file'].values:
for suffix, kw in [(".bam", "input_bam"), (".bai", "input_bam_index")]
if file.endswith(suffix):
if dat[kw] is not None:
raise ValueError(f"Multiple '{suffix}' files found for sample {sample}")
dat[kw] = file
ds.logger.info(f"Sample: {sample}")
if dat["input_bam"] is None:
ds.logger.info("No BAM file found, skipping")
continue
if dat["input_bam_index"] is None:
ds.logger.info("No BAM Index file found, skipping")
continue
ds.logger.info(f"BAM: {dat['input_bam']}")
ds.logger.info(f"BAM Index: {dat['input_bam_index']}")
# Add the workflow prefix
yield {
f"HaplotypeCallerGvcf_GATK4.{kw}": val
for kw, val in dat.items()
}
def setup_inputs(ds: PreprocessDataset):
# Make a combined set of inputs with each of the BAM files
all_inputs = [
{
**single_input,
**{
kw: val
for kw, val in ds.params.items()
if kw.startswith("HaplotypeCallerGvcf_GATK4")
}
}
for single_input in yield_single_inputs(ds)
]
# Raise an error if no inputs are found
assert len(all_inputs) > 0, "No inputs found -- stopping execution"
# Write out the complete set of inputs
write_json("inputs.json", all_inputs)
# Write out each individual file pair
for i, input in enumerate(all_inputs):
write_json(f"inputs.{i}.json", input)
def write_json(fp, obj, indent=4) -> None:
with open(fp, "wt") as handle:
json.dump(obj, handle, indent=indent)
if __name__ == "__main__":
main()
Workflow Execution
After the preprocess script completes, the WDL workflow will be run on
each of the inputs.*.json
files which have been written out by the
preprocess script.
All of those analysis results will be saved to the folder for
the newly created dataset, and will be available to the user by selecting
the analysis results.