3

I need to process over 1k samples with the nextflow (dsl2) pipeline in aws batch. current version of the workflow process single input per run. I'm looking workflow syntax (map tuple to iterate) to process multiple inputs to run in parralel. The inputs should be in json or yaml format, path to the input files are unique to each sample.

To preserve the input file path "s3://..." I used .fromPath in channel.

Following is my single sample input config input.yaml (-parms-file)

id: HLA1001
bam: s3://HLA/data/HLA1001.bam
vcf: s3://HLA/data/HLA1001.vcf.gz

Workflow to run single sample input

process samtools_stats {
    tag "${id}"
    publishDir "${params.publishdir}/${id}/samtools", mode: "copy"

    input:
    path bam
    
    output:
    path "${id}.stats"

    script:
    """
      samtools stats ${bam} > ${id}.stats
    """
}

process mosdepth_bam {
    tag "${id}"
    publishDir "${params.publishdir}/${id}/mosdepth", mode: "copy"

    input:
    path bam
    path bam_idx

    output:
    path "${id}.regions.bed.gz"

    script:
    """
    mosdepth --no-per-base --by 1000 --mapq 20 --threads 4 ${id} ${bam}
    """
}

process mosdepth_cram {
    tag "${id}"
    publishDir "${params.publishdir}/${id}/mosdepth", mode: "copy"

    input:
    path bam
    path bam_idx
    path reference
    path reference_idx

    output:
    path "${id}.regions.bed.gz"

    script:
    """
    mosdepth --no-per-base --by 1000 --mapq 20 --threads 4 --fasta ${reference} ${id} ${bam}

    """
}

process bcftools_stats {
    tag "${id}"
    publishDir "${params.publishdir}/${id}/bcftools", mode: "copy"

    input:
    path vcf
    path vcf_idx

    output:
    path "*"

    script:
    """
    bcftools stats -f PASS ${vcf} > ${id}.pass.stats
    """
}

process multiqc {

    tag "${id}"
    publishDir "${params.publishdir}/${id}/multiqc", mode: "copy"
    
    input:
    path "*"

    output:
    path  "multiqc_data/*", emit: multiqc_ch

    script:
    """
    multiqc . --data-format json --enable-npm-plugin
    """
}

process compile_metrics {

    tag "${id}"
    publishDir "${params.publishdir}/${id}", mode: "copy"

    input:
    path multiqc

    output:
    path "${params.id}.metrics.json", emit: compile_metrics_out

    script:
    """
    # parse and calculate all the metrics in the multiqc output to compile

    compile_metrics.py \
        --multiqc_json multiqc_data.json \
        --output_json ${params.id}.metrics.json \
        --biosample_id ${params.id}
    """
}


/*
----------------------------------------------------------------------
WORKFLOW
---------------------------------------------------------------------
*/

id = params.id

aln_file = file ( params.bam )
aln_file_type = aln_file.getExtension()
vcf_file = ( params.vcf )
vcf_idx = channel.fromPath(params.vcf + ".tbi", checkIfExists: true)

if (aln_file_type == "bam") {
    cbam = channel.fromPath(params.bam, checkIfExists: true)
    cbam_idx = channel.fromPath(params.bam + ".bai", checkIfExists: true)
}
else if (aln_file_type == "cram") {
    cbam = channel.fromPath(params.bam, checkIfExists: true)
    cbam_idx = channel.fromPath(params.bam + ".crai", checkIfExists: true)
}

reference = channel.fromPath(params.reference, checkIfExists: true)
reference_idx = channel.fromPath(params.reference + ".fai", checkIfExists: true)

// main
workflow {
    if (aln_file_type == "bam") {
        samtools_stats( bam )
        mosdepth_bam( bam, bam_idx )
        bcftools_stats ( vcf, vcf_idx )
        multiqc( samtools_stats.out.mix( mosdepth_bam.out ).collect() )
        compile_metrics(multiqc.out)
    } else if (aln_file_type == "cram") {
        samtools_stats( bam )
        mosdepth_cram( bam, bam_idx, reference, reference_idx )
        bcftools_stats ( vcf, vcf_idx )
        multiqc( samtools_stats.out.mix( mosdepth_cram.out ).collect() )
        compile_metrics(multiqc.out)
    }
}

I want to modify the workflow to run for the following multi sample input in parellel

samples:
-
 id: HLA1001
 bam: s3://HLA/data/udf/HLA1001.bam
 vcf: s3://HLA/data/udf/HLA1001.vcf.gz
-
 id: NHLA1002
 bam: s3://HLA/data/sdd/HLA1002.bam
 vcf: s3://HLA/data/sdd/HLA1002.vcf.gz
-
 id: NHLA1003
 bam: s3://HLA/data/klm/HLA1003.bam
 vcf: s3://HLA/data/klm/HLA1003.vcf.gz
-
 id: NHLA2000
 bam: s3://HLA/data/rcb/HLA2000.bam
 vcf: s3://HLA/data/rcb/HLA2000.vcf.gz

The expected final output folder structure for the multiple samples..

s3://mybucket/results/HLA1001/
samtools/
mosdepth/
bcftools/
multiqc/
metrics/HLA1001.metrics.json

s3://mybucket/results/HLA1002/
samtools/
mosdepth/
bcftools/
multiqc/
metrics/HLA1002.metrics.json

The input of bam/cram, vcf and input of multiqc and compile_metrics all must fetch the same sample in every single process.

Appreciate your help! Thanks

Follwing the method answered by @steve..

Contents of main.nf: update

include { compile_metrics } from './modules/compile_metrics'

    Channel
        .fromList( params.samples )
        .map { it.biosample_id }
        .set { sample_ids }

    compile_metrics ( sample_ids, multiqc.out.json_data )
}

Contents of modules/compile_metrics/main.nf:

process compile_metrics {

    tag { sample_ids }

    input:
    val(sample_ids)
    path "multiqc_data.json"

    output:
    tuple val(sample_ids), path("${sample_ids}.metrics.json"), emit: compile_metrics_out

    script:
    """
    compile_metrics.py \
        --multiqc_json multiqc_data.json \
        --output_json "${sample_ids}.metrics.json" \\
        --biosample_id "${sample_ids}" \\
    """
}

Update main.nf:

include { mosdepth_datamash } from './modules/mosdepth_datamash'

autosomes_non_gap_regions = file( params.autosomes_non_gap_regions )

mosdepth_datamash( autosomes_non_gap_regions, mosdepth_bam.out.regions.mix( mosdepth_cram.out.regions ).collect() )

Update mosdepth_datamash:

process mosdepth_datamash {

    tag { sample }

    input:
    path autosomes_non_gap_regions
    tuple val(sample), path(regions)

    output:
    tuple val(sample), path("${sample}.mosdepth.csv"), emit: coverage

    script:
    """
    zcat "${sample}.regions.bed.gz" | bedtools intersect -a stdin -b ${autosomes_non_gap_regions} | gzip -9c > "${sample}.regions.autosomes_non_gap_n_bases.bed.gz"
    .....
}

Update main.nf: fix - use queue channel instead of collect

mosdepth_datamash( autosomes_non_gap_regions, mosdepth_bam.out.regions.mix( mosdepth_cram.out.regions ) )

Process verifybamid works with file instead of channel.fromPath

vbi2_ud = file( params.vbi2_ud )
vbi2_bed = file( params.vbi2_bed )
vbi2_mean = file( params.vbi2_mean )

How to modify the channel to handle backward (previous version of workflow single sample input format) compatible of single sample input format which lacks sample key

id: HLA1001
bam: s3://HLA/data/HLA1001.bam
vcf: s3://HLA/data/HLA1001.vcf.gz

Content of processing the input mani.nf:

Channel
        .fromList( params.samples )
        .branch { rec ->
            def aln_file = file( rec.bam )

            bam: aln_file.extension == 'bam'
                def bam_idx = file( "${rec.bam}.bai" )

                return tuple( rec.id, aln_file, bam_idx )

            cram: aln_file.extension == 'cram'
                def cram_idx = file( "${rec.bam}.crai" )

                return tuple( rec.id, aln_file, cram_idx )
        }
        .set { aln_inputs }
    Channel
        .fromList( params.samples )
        .map { rec ->
            def vcf_file = file( rec.vcf )
            def vcf_idx = file( "${rec.vcf}.tbi" )

            tuple( rec.id, vcf_file, vcf_idx )
        }
        .set { vcf_inputs }

    Channel
        .fromList( params.samples )
        .map { it.biosample_id }
        .set { sample_ids }

Updated main.nf works well

INPUT format A or B:
A)
biosample_id: NA12878-chr14
bam: s3://sample-qc/data/NA12878-chr14.bam

B)
samples:
-
 biosample_id: NA12878-chr14
 bam: s3://sample-qc/data/NA12878-chr14.bam
---------------------------------------------------------------
workflow {
....
....
params.samples = null
Channel
        .fromList( params.samples )
        .ifEmpty { ['biosample_id': params.biosample_id, 'bam': params.bam] }
        .branch { rec ->
            def aln_file = rec.bam ? file( rec.bam ) : null

            bam: rec.biosample_id && aln_file?.extension == 'bam'
                def bam_idx = file( "${rec.bam}.bai" )

                return tuple( rec.biosample_id, aln_file, bam_idx )

            cram: rec.biosample_id && aln_file?.extension == 'cram'
                def cram_idx = file( "${rec.bam}.crai" )

                return tuple( rec.biosample_id, aln_file, cram_idx )
        }
        .set { aln_inputs }

Channel
    .fromList( params.samples )
    .ifEmpty { ['biosample_id': params.biosample_id] }
    .map { it.biosample_id }
    .set { sample_ids }

compile_metrics ( sample_ids, multiqc.out.json_data )
...
...
}

Trying other way of not duplicate the code (the above code block .ifEmpty) in each process to parse the params.sample. eg two process here required to use params.sample

INPUT format A or B:
A)
biosample_id: NA12878-chr14
bam: s3://sample-qc/data/NA12878-chr14.bam

B)

samples:
-
 biosample_id: NA12878-chr14
 bam: s3://sample-qc/data/NA12878-chr14.bam
-------------------------------------------------------------
params.samples = ''
//    params.samples = null

def get_samples_list() {
    if (params.samples) {
        return  params.samples
    }
    else {
    return ['biosample_id': params.biosample_id, 'bam': params.bam]
    }
}

workflow {

//    params.samples = ''
    samples = get_samples_list()
    ...
    ...
    Channel
        .fromList( samples )
        .branch { rec ->
            def aln_file = rec.bam ? file( rec.bam ) : null

            bam: rec.biosample_id && aln_file?.extension == 'bam'
                def bam_idx = file( "${rec.bam}.bai" )

                return tuple( rec.biosample_id, aln_file, bam_idx )

            cram: rec.biosample_id && aln_file?.extension == 'cram'
                def cram_idx = file( "${rec.bam}.crai" )

                return tuple( rec.biosample_id, aln_file, cram_idx )
        }
        .set { aln_inputs }

    samtools_stats_bam( aln_inputs.bam, [] )
    samtools_stats_cram( aln_inputs.cram, ref_fasta )

    Channel
        .fromList( params.samples )
        .map { it.biosample_id }
        .set { sample_ids }

    compile_metrics ( sample_ids, multiqc.out.json_data )
}

ERROR:

Workflow execution stopped with the following message:
Exit status   : null
Error message : Cannot invoke method branch() on null object
Error report  : Cannot invoke method branch() on null object
ERROR ~ Cannot invoke method branch() on null object

Calling the samples from channel to reuse works well and much better approach.

Channel
        .fromList( params.samples )
        .ifEmpty { ['biosample_id': params.biosample_id, 'bam': params.bam] }
        .set { samples }

    Channel
        samples.branch { rec ->
        ....
        
    Channel
        samples.map { it.biosample_id }
  • 1
    FileNotFoundError: [Errno 2] No such file or directory: 'multiqc_data.json' when `path "multiqc_data"` or `path "${multiqc_data}/multiqc_data.json"` But multiqc folder prefix with json file way around to work `path "multiqc_data"` `--multiqc_json multiqc_data/multiqc_data.json` – user3214212 May 15 '23 at 09:44
  • 1
    Sorry, I missed that mosdepth outputs are in tuple with sample id. Just testing with two sample, it does successfully runs for one sample only with the warning **WARN:** `Input tuple does not match input set cardinality declared by process `mosdepth_datamash` -- offending value:` `mosdepth_datamash( autosomes_non_gap_regions, mosdepth_bam.out.regions.mix( mosdepth_cram.out.regions ).collect() )` update the details in the above code section.. – user3214212 May 16 '23 at 05:54
  • 2
    Changing from `channel.fromPath` of vbi2_* to `file` works. – user3214212 May 16 '23 at 12:25
  • 1
    @Steve The channel processing any number of samples including one that's great. can I check how to modify the channel backward (previous version of the workflow) compatible format of the single sample input, which do not contain `samples` key in the input. I put the previous version single input file format above in the code section for reference! Thanks – user3214212 May 18 '23 at 02:35
  • Hi @Steve All works well, learned a number of things. Thanks much for your detailed and answering every query. – user3214212 May 19 '23 at 03:53
  • @Steve I have one final query added in the last block of my code above. Trying to assign the `params.sample` outside the workflow for any no. of process to use it from the variable `samples` whether the samples given in the input (A) or not (B). I also tried declaring empty string and null to `params.samples = `` or null` to initialise the `params.sample` but I'm getting an error! can you help me to check what mistake in the code and provide solution. much thanks – user3214212 May 25 '23 at 11:17
  • Note that DSL2 lets you re-use channels. So with `Channel.fromList( params.samples ).ifEmpty { ['biosample_id': params.biosample_id, 'bam': params.bam] }.set { samples }` (note the call to `set { samples }` on the end) you can re-use the _samples_ channel multiple times as needed: `samples.branch ...`, `samples.map ...`, etc. If you wanted to encapsulate this in a function, a better way would be to have the function return the _samples_ channel itself (i.e. `return samples`). But for such little code, a separate function probably doesn't add much. – Steve May 25 '23 at 13:43

1 Answers1

2

With channels it is possible to process any number of samples, including just one. Here's one way that use modules to handle both BAM and CRAM inputs. Note that each process below expects an input tuple where the first element is a sample name or key. To greatly assist with being able to merge channels downstream, we should also ensure we output tuples with the same sample name or key. The following is untested on AWS Batch, but it should at least get you started:

Contents of main.nf:

include { bcftools_stats } from './modules/bcftools'
include { mosdepth as mosdepth_bam } from './modules/mosdepth'
include { mosdepth as mosdepth_cram } from './modules/mosdepth'
include { multiqc } from './modules/multiqc'
include { samtools_stats as samtools_stats_bam } from './modules/samtools'
include { samtools_stats as samtools_stats_cram } from './modules/samtools'
include { mosdepth_datamash } from './modules/mosdepth_datamash'
workflow {

    ref_fasta = file( params.ref_fasta )
    autosomes_non_gap_regions = file( params.autosomes_non_gap_regions )

    Channel
        .fromList( params.samples )
        .ifEmpty { ['id': params.id, 'bam': params.bam] }
        .branch { rec ->
            def aln_file = rec.bam ? file( rec.bam ) : null

            bam: rec.id && aln_file?.extension == 'bam'
                def bam_idx = file( "${rec.bam}.bai" )

                return tuple( rec.id, aln_file, bam_idx )

            cram: rec.id && aln_file?.extension == 'cram'
                def cram_idx = file( "${rec.bam}.crai" )

                return tuple( rec.id, aln_file, cram_idx )
        }
        .set { aln_inputs }

    Channel
        .fromList( params.samples )
        .ifEmpty { ['id': params.id, 'vcf': params.vcf] }
        .branch { rec ->
            def vcf_file = rec.vcf ? file( rec.vcf ) : null

            output: rec.id && vcf_file
                def vcf_idx = file( "${rec.vcf}.tbi" )

                return tuple( rec.id, vcf_file, vcf_idx )
        }
        .set { vcf_inputs }

    mosdepth_bam( aln_inputs.bam, [] )
    mosdepth_cram( aln_inputs.cram, ref_fasta )

    samtools_stats_bam( aln_inputs.bam, [] )
    samtools_stats_cram( aln_inputs.cram, ref_fasta )

    bcftools_stats( vcf_inputs )

    Channel
        .empty()
        .mix( mosdepth_bam.out.regions )
        .mix( mosdepth_cram.out.regions )
        .set { mosdepth_regions }

    mosdepth_datamash( mosdepth_regions, autosomes_non_gap_regions )

    Channel
        .empty()
        .mix( mosdepth_bam.out.dists )
        .mix( mosdepth_bam.out.summary )
        .mix( mosdepth_cram.out.dists )
        .mix( mosdepth_cram.out.summary )
        .mix( samtools_stats_bam.out )
        .mix( samtools_stats_cram.out )
        .mix( bcftools_stats.out )
        .mix( mosdepth_datamash.out )
        .map { sample, files -> files }
        .collect()
        .set { log_files }

    multiqc( log_files )
}

Contents of modules/samtools/main.nf:

process samtools_stats {

    tag { sample }

    input:
    tuple val(sample), path(bam), path(bai)
    path ref_fasta

    output:
    tuple val(sample), path("${sample}.stats")

    script:
    def reference = ref_fasta ? /--reference "${ref_fasta}"/ : ''

    """
    samtools stats \\
        ${reference} \\
        "${bam}" \\
        > "${sample}.stats"
    """
}

Contents of modules/mosdepth/main.nf:

process mosdepth {

    tag { sample }

    input:
    tuple val(sample), path(bam), path(bai)
    path ref_fasta

    output:
    tuple val(sample), path("*.regions.bed.gz"), emit: regions
    tuple val(sample), path("*.dist.txt"), emit: dists
    tuple val(sample), path("*.summary.txt"), emit: summary

    script:
    def fasta = ref_fasta ? /--fasta "${ref_fasta}"/ : ''

    """
    mosdepth \\
        --no-per-base \\
        --by 1000 \\
        --mapq 20 \\
        --threads ${task.cpus} \\
        ${fasta} \\
        "${sample}" \\
        "${bam}"
    """
}

Contents of modules/bcftools/main.nf:

process bcftools_stats {

    tag { sample }

    input:
    tuple val(sample), path(vcf), path(tbi)

    output:
    tuple val(sample), path("${sample}.pass.stats")

    """
    bcftools stats \\
        -f PASS \\
        "${vcf}" \\
        > "${sample}.pass.stats"
    """
}

Contents of modules/multiqc/main.nf:

process multiqc {

    input:
    path 'data/*'

    output:
    path "multiqc_report.html", emit: report
    path "multiqc_data", emit: data

    """
    multiqc \\
        --data-format json \\
        .
    """
}

Contents of modules/compile_metrics/main.nf:

process compile_metrics {

    tag { sample_id }

    input:
    val sample_id
    path multiqc_json

    output:
    tuple val(sample_id), path("${sample_id}.metrics.json")

    """
    compile_metrics.py \\
        --multiqc_json "${multiqc_json}" \\
        --output_json "${sample_id}.metrics.json" \\
        --biosample_id "${sample_id}"
    """
}

Contents of ./modules/mosdepth_datamash/main.nf:

process mosdepth_datamash {

    tag { sample_id }

    input:
    tuple val(sample_id), path(regions_bed)
    path autosomes_non_gap_regions

    output:
    tuple val(sample_id), path("${sample_id}.mosdepth.csv")

    """
    zcat -f "${regions_bed}" |
        bedtools intersect -a stdin -b "${autosomes_non_gap_regions}" |
        gzip -9 > "${sample_id}.regions.autosomes_non_gap_n_bases.bed.gz"

    # do something

    touch "${sample_id}.mosdepth.csv"
    """
}

Contents of nextflow.config:

plugins {

    id 'nf-amazon'
}

params {

    ref_fasta = null
    autosomes_non_gap_regions = null

    samples = null

    id = null
    bam = null
    vcf = null

    publish_dir = './results'
    publish_mode = 'copy'
}

process {

    executor = 'awsbatch'
    queue = 'test-queue'

    errorStrategy = 'retry'
    maxRetries = 3

    withName: 'samtools_stats' {

        publishDir = [
            path: "${params.publish_dir}/samtools",
            mode: params.publish_mode,
        ]
    }

    withName: 'bcftools_stats' {

        publishDir = [
            path: "${params.publish_dir}/bcftools",
            mode: params.publish_mode,
        ]
    }

    withName: 'mosdepth' {

        cpus = 4

        publishDir = [
            path: "${params.publish_dir}/mosdepth",
            mode: params.publish_mode,
        ]
    }

    withName: 'multiqc' {

        publishDir = [
            path: "${params.publish_dir}/multiqc",
            mode: params.publish_mode,
        ]
    }
}

aws {

    region = 'us-east-1'

    batch {
        cliPath = '/home/ec2-user/miniconda/bin/aws'
    }
    
}

And run using something like:

$ nextflow run main.nf \
    -ansi-log false \
    -params-file input.yaml \
    -work 's3://mybucket/work' \
    --publish_dir 's3://mybucket/results' \
    --ref_fasta 's3://mybucket/ref.fa'
Steve
  • 51,466
  • 13
  • 89
  • 103
  • 1
    Thank you so much for the detailed code @Steve Perfect!! make me to move on to module based method as well, very much appreciated. – user3214212 May 12 '23 at 02:19