Skip to content

Example Pipeline Configuration: WDL

To provide a minimal example of how a pipeline written in WDL can be set up to run in Cirro, we will consider the gatk4-germline-snps-indels, which runs the GATK4 HaplotypeCaller tool in GVCF mode on a single sample according to GATK Best Practices.

Workflow documentation: here

Workflow Scope

To run, this workflow needs:

  • A reference genome (consisting of a handful of related files) (in inputs.json)
  • Sequencing reads in BAM format (with a paired index file) (in inputs.json)
  • Configuration options (in options.json)

Input Files

The datasets which can be used as inputs to this workflow contain unaligned BAM files, which can be uploaded directly by the user. Those upstream datasets will contain both the BAM reads and the index files. The complete list of input files will be provided to the preprocess script (preprocess.py) as a single table (with two columns: sample and file). The sample name matching each BAM may be inferred directly by the filename, or the user may provide a sample sheet.

Cirro Configuration Files

The files needed by Cirro to configure the workflow are:

  • process-form.json: Used to render a form that can collect user inputs
  • process-input.json: Used to map user inputs (and other variables) to the preprocess script (accessed as ds.params in the script below)
  • preprocess.py: Run immediately before the workflow to write out inputs.json and options.json
  • process-output.json: Optionally transform outputs (e.g. images) for web visualization (not used in this example)
  • process-compute.config: Optionally customize the configuration of the workflow executor (not used in this example)

User Form (process-form.json)

Because this workflow does not need to collect any user input, this file will be blank:

{
  "form": {},
  "ui": {}
}

Process Inputs (process-input.json)

The information passed to the preprocess.py script contains attributes which will be passed both to inputs.json and options.json, differentiated by the workflow prefix HaplotypeCallerGvcf_GATK4.

{
  "HaplotypeCallerGvcf_GATK4.ref_dict": "s3://pubweb-references/GATK/hg38/v0/Homo_sapiens_assembly38.dict",
  "HaplotypeCallerGvcf_GATK4.ref_fasta": "s3://pubweb-references/GATK/hg38/v0/Homo_sapiens_assembly38.fasta",
  "HaplotypeCallerGvcf_GATK4.ref_fasta_index": "s3://pubweb-references/GATK/hg38/v0/Homo_sapiens_assembly38.fasta.fai",

  "HaplotypeCallerGvcf_GATK4.scattered_calling_intervals_list": "s3://pubweb-references/GATK/hg38/v0/hg38_wgs_scattered_calling_intervals.txt",

  "final_workflow_outputs_dir": "$.dataset.dataPath",
  "final_call_logs_dir": "$.params.dataset.s3|/logs/",
  "memory_retry_multiplier": 2.0,
  "use_relative_output_paths": true
}

The reference genome files are located in a storage bucket which is available to all running processes in Cirro, mirroring the iGenomes reference genome project.

The string "$.dataset.dataPath" will be replaced by Cirro with the full path in S3 where all output files should be saved

Preprocess Script (preprocess.py)

The preprocess script will perform the following steps:

  1. Separate out the information needed for inputs.json and options.json
  2. Loop over each of the input files and write out an inputs.json file
  3. Write out the options.json file
#!/usr/bin/env python3

# Import the packages needed to run the code below
import json
from cirro.helpers.preprocess_dataset import PreprocessDataset
from cirro.api.models.s3_path import S3Path


def main():
    """Primary entrypoint for the script"""

    # Get information on the analysis launched by the user
    ds = PreprocessDataset.from_running()
    # Set up the options.json file
    setup_options(ds)
    # Set up the inputs files
    setup_inputs(ds)


def setup_options(ds: PreprocessDataset):

    # Set up the scriptBucketName, which is needed by the workflow
    # to stage analysis scripts
    ds.add_param(
        "scriptBucketName",
        S3Path(ds.params['final_workflow_outputs_dir']).bucket
    )

    # Isolate the options arguments for the workflow
    # Define a new dictionary which contains all of the items
    # from `ds.params` which do not start with the workflow
    # prefix "HaplotypeCallerGvcf_GATK4"
    options = {
        kw: val
        for kw, val in ds.params.items()
        if not kw.startswith("HaplotypeCallerGvcf_GATK4")
    }

    # Write out to the options.json file
    write_json("options.json", options)


def yield_single_inputs(ds: PreprocessDataset) -> dict:
    """
    This function is used to identify each of the BAM/BAI pairs
    which have been provided as part of the input dataset by the user.
    The output of this function will be the a yielded series of
    struct objects with the `input_bam` and `input_bam_index` attributes.
    """

    # The ds.files object is a DataFrame with columns for `sample` and `file`
    # For example:
    # | sample  | file            |
    # |---------|-----------------|
    # | sampleA | sampleA.bam     |
    # | sampleA | sampleA.bam.bai |
    # | sampleB | sampleB.bam     |
    # | sampleB | sampleB.bam.bai |

    # Iterate over each batch of files with a distinct sample
    for sample, files in ds.files.groupby("sample"):

        # Set up an empty struct
        dat = dict(input_bam=None, input_bam_index=None)

        # Iterate over each file
        for file in files['file'].values:
            for suffix, kw in [(".bam", "input_bam"), (".bai", "input_bam_index")]
                if file.endswith(suffix):
                    if dat[kw] is not None:
                        raise ValueError(f"Multiple '{suffix}' files found for sample {sample}")
                    dat[kw] = file

        ds.logger.info(f"Sample: {sample}")
        if dat["input_bam"] is None:
            ds.logger.info("No BAM file found, skipping")
            continue
        if dat["input_bam_index"] is None:
            ds.logger.info("No BAM Index file found, skipping")
            continue

        ds.logger.info(f"BAM: {dat['input_bam']}")
        ds.logger.info(f"BAM Index: {dat['input_bam_index']}")

        # Add the workflow prefix
        yield {
            f"HaplotypeCallerGvcf_GATK4.{kw}": val
            for kw, val in dat.items()
        }


def setup_inputs(ds: PreprocessDataset):

    # Make a combined set of inputs with each of the BAM files
    all_inputs = [
        {
            **single_input,
            **{
                kw: val
                for kw, val in ds.params.items()
                if kw.startswith("HaplotypeCallerGvcf_GATK4")
            }
        }
        for single_input in yield_single_inputs(ds)
    ]

    # Raise an error if no inputs are found
    assert len(all_inputs) > 0, "No inputs found -- stopping execution"

    # Write out the complete set of inputs
    write_json("inputs.json", all_inputs)

    # Write out each individual file pair
    for i, input in enumerate(all_inputs):
        write_json(f"inputs.{i}.json", input)


def write_json(fp, obj, indent=4) -> None:

    with open(fp, "wt") as handle:
        json.dump(obj, handle, indent=indent)


if __name__ == "__main__":
    main()

Workflow Execution

After the preprocess script completes, the WDL workflow will be run on each of the inputs.*.json files which have been written out by the preprocess script. All of those analysis results will be saved to the folder for the newly created dataset, and will be available to the user by selecting the analysis results.