1 ## Copyright Broad Institute, 2018
2 ## 
3 ## This WDL pipeline implements data pre-processing according to the GATK Best Practices 
4 ## (June 2016).  
5 ##
6 ## Requirements/expectations :
7 ## - Pair-end sequencing data in unmapped BAM (uBAM) format
8 ## - One or more read groups, one per uBAM file, all belonging to a single sample (SM)
9 ## - Input uBAM files must additionally comply with the following requirements:
10 ## - - filenames all have the same suffix (we use ".unmapped.bam")
11 ## - - files must pass validation by ValidateSamFile 
12 ## - - reads are provided in query-sorted order
13 ## - - all reads must have an RG tag
15 ## Output :
16 ## - A clean BAM file and its index, suitable for variant discovery analyses.
18 ## Software version requirements (see recommended dockers in inputs JSON)
19 ## - GATK 4 or later
20 ## - Picard (see gotc docker)
21 ## - Samtools (see gotc docker)
22 ## - Python 2.7
24 ## Cromwell version support 
25 ## - Successfully tested on v32
26 ## - Does not work on versions < v23 due to output syntax
28 ## Runtime parameters are optimized for Broad's Google Cloud Platform implementation.
30 ## LICENSING : 
31 ## This script is released under the WDL source code license (BSD-3) (see LICENSE in 
32 ## Note however that the programs it calls may 
33 ## be subject to different licenses. Users are responsible for checking that they are
34 ## authorized to run all programs before running this script. Please see the dockers
35 ## for detailed licensing information pertaining to the included programs.
38 workflow PreProcessingForVariantDiscovery_GATK4 {
40   String sample_name
41   String ref_name
43   File flowcell_unmapped_bams_list
44   String unmapped_bam_suffix
46   File ref_fasta
47   File ref_fasta_index
48   File ref_dict
50   String bwa_commandline
51   Int compression_level
53   File dbSNP_vcf
54   File dbSNP_vcf_index
55   Array[File] known_indels_sites_VCFs
56   Array[File] known_indels_sites_indices
58   String gotc_docker
59   String gatk_docker
60   String python_docker
62   String gotc_path
63   String bwa_path
64   String gatk_path
66   Int flowcell_small_disk
67   Int flowcell_medium_disk
68   Int agg_small_disk
69   Int agg_medium_disk
70   Int agg_large_disk
72   Int preemptible_tries
73   Int agg_preemptible_tries
75   String base_file_name = sample_name + "." + ref_name
77   Array[File] flowcell_unmapped_bams = read_lines(flowcell_unmapped_bams_list)
79   # Get the version of BWA to include in the PG record in the header of the BAM produced 
80   # by MergeBamAlignment. 
81   call GetBwaVersion {
82     input: 
83       docker_image = gotc_docker,
84       bwa_path = bwa_path,
85       preemptible_tries = preemptible_tries
86   }
88   # Align flowcell-level unmapped input bams in parallel
89   scatter (unmapped_bam in flowcell_unmapped_bams) {
91     # Get the basename, i.e. strip the filepath and the extension
92     String bam_basename = basename(unmapped_bam, unmapped_bam_suffix)
94     # Map reads to reference
95     call SamToFastqAndBwaMem {
96       input:
97         input_bam = unmapped_bam,
98         bwa_commandline = bwa_commandline,
99         output_bam_basename = bam_basename + ".unmerged",
100         ref_fasta = ref_fasta,
101         ref_fasta_index = ref_fasta_index,
102         ref_dict = ref_dict,
103         docker_image = gotc_docker,
104         bwa_path = bwa_path,
105         gotc_path = gotc_path,
106         disk_size = flowcell_medium_disk,
107         preemptible_tries = preemptible_tries,
108         compression_level = compression_level
109      }
111     # Merge original uBAM and BWA-aligned BAM 
112     call MergeBamAlignment {
113       input:
114         unmapped_bam = unmapped_bam,
115         bwa_commandline = bwa_commandline,
116         bwa_version = GetBwaVersion.version,
117         aligned_bam = SamToFastqAndBwaMem.output_bam,
118         output_bam_basename = bam_basename + ".aligned.unsorted",
119         ref_fasta = ref_fasta,
120         ref_fasta_index = ref_fasta_index,
121         ref_dict = ref_dict,
122         docker_image = gatk_docker,
123         gatk_path = gatk_path,
124         disk_size = flowcell_medium_disk,
125         preemptible_tries = preemptible_tries,
126         compression_level = compression_level
127     }
128   }
130   # Aggregate aligned+merged flowcell BAM files and mark duplicates
131   # We take advantage of the tool's ability to take multiple BAM inputs and write out a single output
132   # to avoid having to spend time just merging BAM files.
133   call MarkDuplicates {
134     input:
135       input_bams = MergeBamAlignment.output_bam,
136       output_bam_basename = base_file_name + ".aligned.unsorted.duplicates_marked",
137       metrics_filename = base_file_name + ".duplicate_metrics",
138       docker_image = gatk_docker,
139       gatk_path = gatk_path,
140       disk_size = agg_large_disk,
141       compression_level = compression_level,
142       preemptible_tries = agg_preemptible_tries
143   }
145   # Sort aggregated+deduped BAM file and fix tags
146   call SortAndFixTags {
147     input:
148       input_bam = MarkDuplicates.output_bam,
149       output_bam_basename = base_file_name + ".aligned.duplicate_marked.sorted",
150       ref_dict = ref_dict,
151       ref_fasta = ref_fasta,
152       ref_fasta_index = ref_fasta_index,
153       docker_image = gatk_docker,
154       gatk_path = gatk_path,
155       disk_size = agg_large_disk,
156       preemptible_tries = 0,
157       compression_level = compression_level
158   }
160   # Create list of sequences for scatter-gather parallelization 
161   call CreateSequenceGroupingTSV {
162     input:
163       ref_dict = ref_dict,
164       docker_image = python_docker,
165       preemptible_tries = preemptible_tries
166   }
168   # Perform Base Quality Score Recalibration (BQSR) on the sorted BAM in parallel
169   scatter (subgroup in CreateSequenceGroupingTSV.sequence_grouping) {
170     # Generate the recalibration model by interval
171     call BaseRecalibrator {
172       input:
173         input_bam = SortAndFixTags.output_bam,
174         input_bam_index = SortAndFixTags.output_bam_index,
175         recalibration_report_filename = base_file_name + ".recal_data.csv",
176         sequence_group_interval = subgroup,
177         dbSNP_vcf = dbSNP_vcf,
178         dbSNP_vcf_index = dbSNP_vcf_index,
179         known_indels_sites_VCFs = known_indels_sites_VCFs,
180         known_indels_sites_indices = known_indels_sites_indices,
181         ref_dict = ref_dict,
182         ref_fasta = ref_fasta,
183         ref_fasta_index = ref_fasta_index,
184         docker_image = gatk_docker,
185         gatk_path = gatk_path,
186         disk_size = agg_small_disk,
187         preemptible_tries = agg_preemptible_tries
188     }  
189   }  
191   # Merge the recalibration reports resulting from by-interval recalibration
192   call GatherBqsrReports {
193     input:
194       input_bqsr_reports = BaseRecalibrator.recalibration_report,
195       output_report_filename = base_file_name + ".recal_data.csv",
196       docker_image = gatk_docker,
197       gatk_path = gatk_path,
198       disk_size = flowcell_small_disk,
199       preemptible_tries = preemptible_tries
200   }
202   scatter (subgroup in CreateSequenceGroupingTSV.sequence_grouping_with_unmapped) {
204     # Apply the recalibration model by interval
205     call ApplyBQSR {
206       input:
207         input_bam = SortAndFixTags.output_bam,
208         input_bam_index = SortAndFixTags.output_bam_index,
209         output_bam_basename = base_file_name + ".aligned.duplicates_marked.recalibrated",
210         recalibration_report = GatherBqsrReports.output_bqsr_report,
211         sequence_group_interval = subgroup,
212         ref_dict = ref_dict,
213         ref_fasta = ref_fasta,
214         ref_fasta_index = ref_fasta_index,
215         docker_image = gatk_docker,
216         gatk_path = gatk_path,
217         disk_size = agg_small_disk,
218         preemptible_tries = agg_preemptible_tries
219     }
220   } 
222   # Merge the recalibrated BAM files resulting from by-interval recalibration
223   call GatherBamFiles {
224     input:
225       input_bams = ApplyBQSR.recalibrated_bam,
226       output_bam_basename = base_file_name,
227       docker_image = gatk_docker,
228       gatk_path = gatk_path,
229       disk_size = agg_large_disk,
230       preemptible_tries = agg_preemptible_tries,
231       compression_level = compression_level
232   }
234   # Outputs that will be retained when execution is complete  
235   output {
236     File duplication_metrics = MarkDuplicates.duplicate_metrics
237     File bqsr_report = GatherBqsrReports.output_bqsr_report
238     File analysis_ready_bam = GatherBamFiles.output_bam
239     File analysis_ready_bam_index = GatherBamFiles.output_bam_index
240     File analysis_ready_bam_md5 = GatherBamFiles.output_bam_md5
241   } 
246 # Get version of BWA
247 task GetBwaVersion {
249   Int preemptible_tries
250   String mem_size
252   String docker_image
253   String bwa_path
255   command {
256     # Not setting "set -o pipefail" here because /bwa has a rc=1 and we don't want to allow rc=1 to succeed 
257     # because the sed may also fail with that error and that is something we actually want to fail on.
258     ${bwa_path}bwa 2>&1 | \
259     grep -e '^Version' | \
260     sed 's/Version: //'
261   }
262   runtime {
263     preemptible: preemptible_tries
264     #docker: docker_image
265     memory: mem_size
266   }
267   output {
268     String version = read_string(stdout())
269   }
272 # Read unmapped BAM, convert on-the-fly to FASTQ and stream to BWA MEM for alignment
273 task SamToFastqAndBwaMem {
274   File input_bam
275   String bwa_commandline
276   String output_bam_basename
277   File ref_fasta
278   File ref_fasta_index
279   File ref_dict
281   # This is the .alt file from bwa-kit (, 
282   # listing the reference contigs that are "alternative". Leave blank in JSON for legacy 
283   # references such as b37 and hg19.
284   File? ref_alt
285   File ref_amb
286   File ref_ann
287   File ref_bwt
288   File ref_pac
289   File ref_sa
291   Int compression_level
292   Int preemptible_tries
293   Int disk_size
294   String mem_size
295   String num_cpu
297   String docker_image
298   String bwa_path
299   String gotc_path
300   String java_opt
302   command <<<
303     set -o pipefail
304     set -e
306     # set the bash variable needed for the command-line
307     bash_ref_fasta=${ref_fasta}
309                 java -Dsamjdk.compression_level=${compression_level} ${java_opt} -jar ${gotc_path}picard.jar \
310       SamToFastq \
311                         INPUT=${input_bam} \
312                         FASTQ=/dev/stdout \
313                         INTERLEAVE=true \
314                         NON_PF=true \
315     | \
316                 ${bwa_path}${bwa_commandline} /dev/stdin -  2> >(tee ${output_bam_basename}.bwa.stderr.log >&2) \
317     | \
318                 samtools view -1 - > ${output_bam_basename}.bam
320   >>>
321   runtime {
322     preemptible: preemptible_tries
323     #docker: docker_image
324     memory: mem_size
325     cpu: num_cpu
326     disks: "local-disk " + disk_size + " HDD"
327   }
328   output {
329     File output_bam = "${output_bam_basename}.bam"
330     File bwa_stderr_log = "${output_bam_basename}.bwa.stderr.log"
331   }
334 # Merge original input uBAM file with BWA-aligned BAM file
335 task MergeBamAlignment {
336   File unmapped_bam
337   String bwa_commandline
338   String bwa_version
339   File aligned_bam
340   String output_bam_basename
341   File ref_fasta
342   File ref_fasta_index
343   File ref_dict
345   Int compression_level
346   Int preemptible_tries
347   Int disk_size
348   String mem_size
350   String docker_image
351   String gatk_path
352   String java_opt
354   command {
355     # set the bash variable needed for the command-line
356     bash_ref_fasta=${ref_fasta}
357     ${gatk_path} --java-options "-Dsamjdk.compression_level=${compression_level} ${java_opt}" \
358       MergeBamAlignment \
362       --ALIGNED_BAM ${aligned_bam} \
363       --UNMAPPED_BAM ${unmapped_bam} \
364       --OUTPUT ${output_bam_basename}.bam \
365       --REFERENCE_SEQUENCE ${ref_fasta} \
366       --PAIRED_RUN true \
367       --SORT_ORDER "unsorted" \
368       --IS_BISULFITE_SEQUENCE false \
369       --ALIGNED_READS_ONLY false \
370       --CLIP_ADAPTERS false \
371       --MAX_RECORDS_IN_RAM 2000000 \
372       --ADD_MATE_CIGAR true \
374       --PRIMARY_ALIGNMENT_STRATEGY MostDistant \
375       --PROGRAM_RECORD_ID "bwamem" \
376       --PROGRAM_GROUP_VERSION "${bwa_version}" \
377       --PROGRAM_GROUP_COMMAND_LINE "${bwa_commandline}" \
378       --PROGRAM_GROUP_NAME "bwamem" \
380       --ALIGNER_PROPER_PAIR_FLAGS true \
382   }
383   runtime {
384     preemptible: preemptible_tries
385     #docker: docker_image
386     memory: mem_size
387     disks: "local-disk " + disk_size + " HDD"
388   }
389   output {
390     File output_bam = "${output_bam_basename}.bam"
391   }
394 # Sort BAM file by coordinate order and fix tag values for NM and UQ
395 task SortAndFixTags {
396   File input_bam
397   String output_bam_basename
398   File ref_dict
399   File ref_fasta
400   File ref_fasta_index
402   Int compression_level
403   Int preemptible_tries
404   Int disk_size
405   String mem_size
407   String docker_image
408   String gatk_path
409   String java_opt_sort
410   String java_opt_fix
412   command {
413     set -o pipefail
415     ${gatk_path} --java-options "-Dsamjdk.compression_level=${compression_level} ${java_opt_sort}" \
416       SortSam \
417       --INPUT ${input_bam} \
418       --OUTPUT /dev/stdout \
419       --SORT_ORDER "coordinate" \
420       --CREATE_INDEX false \
421       --CREATE_MD5_FILE false \
422     | \
423     ${gatk_path} --java-options "-Dsamjdk.compression_level=${compression_level} ${java_opt_fix}" \
424       SetNmAndUqTags \
425       --INPUT /dev/stdin \
426       --OUTPUT ${output_bam_basename}.bam \
427       --CREATE_INDEX true \
428       --CREATE_MD5_FILE true \
429       --REFERENCE_SEQUENCE ${ref_fasta}
430   }
431   runtime {
432     preemptible: preemptible_tries
433     #docker: docker_image
434     memory: mem_size
435     disks: "local-disk " + disk_size + " HDD"
436   }
437   output {
438     File output_bam = "${output_bam_basename}.bam"
439     File output_bam_index = "${output_bam_basename}.bai"
440     File output_bam_md5 = "${output_bam_basename}.bam.md5"
441   }
444 # Mark duplicate reads to avoid counting non-independent observations
445 task MarkDuplicates {
446   Array[File] input_bams
447   String output_bam_basename
448   String metrics_filename
450   Int compression_level
451   Int preemptible_tries
452   Int disk_size
453   String mem_size
455   String docker_image
456   String gatk_path
457   String java_opt
459  # Task is assuming query-sorted input so that the Secondary and Supplementary reads get marked correctly.
460  # This works because the output of BWA is query-grouped and therefore, so is the output of MergeBamAlignment.
461  # While query-grouped isn't actually query-sorted, it's good enough for MarkDuplicates with ASSUME_SORT_ORDER="queryname"
462   command {
463     ${gatk_path} --java-options "-Dsamjdk.compression_level=${compression_level} ${java_opt}" \
464       MarkDuplicates \
465       --INPUT ${sep=' --INPUT ' input_bams} \
466       --OUTPUT ${output_bam_basename}.bam \
467       --METRICS_FILE ${metrics_filename} \
470       --ASSUME_SORT_ORDER "queryname" \
471       --CREATE_MD5_FILE true
472   }
473   runtime {
474     preemptible: preemptible_tries
475     #docker: docker_image
476     memory: mem_size
477     disks: "local-disk " + disk_size + " HDD"
478   }
479   output {
480     File output_bam = "${output_bam_basename}.bam"
481     File duplicate_metrics = "${metrics_filename}"
482   }
485 # Generate sets of intervals for scatter-gathering over chromosomes
486 task CreateSequenceGroupingTSV {
487   File ref_dict  
489   Int preemptible_tries
490   String mem_size
492   String docker_image
494   # Use python to create the Sequencing Groupings used for BQSR and PrintReads Scatter. 
495   # It outputs to stdout where it is parsed into a wdl Array[Array[String]]
496   # e.g. [["1"], ["2"], ["3", "4"], ["5"], ["6", "7", "8"]]
497   command <<<
498     python <<CODE
499     with open("${ref_dict}", "r") as ref_dict_file:
500         sequence_tuple_list = []
501         longest_sequence = 0
502         for line in ref_dict_file:
503             if line.startswith("@SQ"):
504                 line_split = line.split("\t")
505                 # (Sequence_Name, Sequence_Length)
506                 sequence_tuple_list.append((line_split[1].split("SN:")[1], int(line_split[2].split("LN:")[1])))
507         longest_sequence = sorted(sequence_tuple_list, key=lambda x: x[1], reverse=True)[0][1]
508     # We are adding this to the intervals because hg38 has contigs named with embedded colons (:) and a bug in 
509     # some versions of GATK strips off the last element after a colon, so we add this as a sacrificial element.
510     hg38_protection_tag = ":1+"
511     # initialize the tsv string with the first sequence
512     tsv_string = sequence_tuple_list[0][0] + hg38_protection_tag
513     temp_size = sequence_tuple_list[0][1]
514     for sequence_tuple in sequence_tuple_list[1:]:
515         if temp_size + sequence_tuple[1] <= longest_sequence:
516             temp_size += sequence_tuple[1]
517             tsv_string += "\t" + sequence_tuple[0] + hg38_protection_tag
518         else:
519             tsv_string += "\n" + sequence_tuple[0] + hg38_protection_tag
520             temp_size = sequence_tuple[1]
521     # add the unmapped sequences as a separate line to ensure that they are recalibrated as well
522     with open("sequence_grouping.txt","w") as tsv_file:
523       tsv_file.write(tsv_string)
524       tsv_file.close()
526     tsv_string += '\n' + "unmapped"
528     with open("sequence_grouping_with_unmapped.txt","w") as tsv_file_with_unmapped:
529       tsv_file_with_unmapped.write(tsv_string)
530       tsv_file_with_unmapped.close()
531     CODE
532   >>>
533   runtime {
534     preemptible: preemptible_tries
535     #docker: docker_image
536     memory: mem_size
537   }
538   output {
539     Array[Array[String]] sequence_grouping = read_tsv("sequence_grouping.txt")
540     Array[Array[String]] sequence_grouping_with_unmapped = read_tsv("sequence_grouping_with_unmapped.txt")
541   }
544 # Generate Base Quality Score Recalibration (BQSR) model
545 task BaseRecalibrator {
546   File input_bam
547   File input_bam_index
548   String recalibration_report_filename
549   Array[String] sequence_group_interval
550   File dbSNP_vcf
551   File dbSNP_vcf_index
552   Array[File] known_indels_sites_VCFs
553   Array[File] known_indels_sites_indices
554   File ref_dict
555   File ref_fasta
556   File ref_fasta_index
558   Int preemptible_tries
559   Int disk_size
560   String mem_size
562   String docker_image
563   String gatk_path
564   String java_opt
566   command { 
567     ${gatk_path} --java-options "${java_opt}" \
568       BaseRecalibrator \
569       -R ${ref_fasta} \
570       -I ${input_bam} \
571       --use-original-qualities \
572       -O ${recalibration_report_filename} \
573       --known-sites ${dbSNP_vcf} \
574       --known-sites ${sep=" --known-sites " known_indels_sites_VCFs} \
575       -L ${sep=" -L " sequence_group_interval}
576   }
577   runtime {
578     preemptible: preemptible_tries
579     #docker: docker_image
580     memory: mem_size
581     disks: "local-disk " + disk_size + " HDD"
582   }
583   output {
584     File recalibration_report = "${recalibration_report_filename}"
585   }
588 # Combine multiple recalibration tables from scattered BaseRecalibrator runs
589 # Note that when run from GATK 3.x the tool is not a walker and is invoked differently.
590 task GatherBqsrReports {
591   Array[File] input_bqsr_reports
592   String output_report_filename
594   Int preemptible_tries
595   Int disk_size
596   String mem_size
598   String docker_image
599   String gatk_path
600   String java_opt
602   command {
603     ${gatk_path} --java-options "${java_opt}" \
604       GatherBQSRReports \
605       -I ${sep=' -I ' input_bqsr_reports} \
606       -O ${output_report_filename}
607     }
608   runtime {
609     preemptible: preemptible_tries
610     #docker: docker_image
611     memory: mem_size
612     disks: "local-disk " + disk_size + " HDD"
613   }
614   output {
615     File output_bqsr_report = "${output_report_filename}"
616   }
619 # Apply Base Quality Score Recalibration (BQSR) model
620 task ApplyBQSR {
621   File input_bam
622   File input_bam_index
623   String output_bam_basename
624   File recalibration_report
625   Array[String] sequence_group_interval
626   File ref_dict
627   File ref_fasta
628   File ref_fasta_index
630   Int preemptible_tries
631   Int disk_size 
632   String mem_size
634   String docker_image
635   String gatk_path
636   String java_opt
638   command {  
639     ${gatk_path} --java-options "${java_opt}" \
640       ApplyBQSR \
641       -R ${ref_fasta} \
642       -I ${input_bam} \
643       -O ${output_bam_basename}.bam \
644       -L ${sep=" -L " sequence_group_interval} \
645       -bqsr ${recalibration_report} \
646       --static-quantized-quals 10 --static-quantized-quals 20 --static-quantized-quals 30 \
647       --add-output-sam-program-record \
648       --create-output-bam-md5 \
649       --use-original-qualities
650   }
651   runtime {
652     preemptible: preemptible_tries
653     #docker: docker_image
654     memory: mem_size
655     disks: "local-disk " + disk_size + " HDD"
656   }
657   output {
658     File recalibrated_bam = "${output_bam_basename}.bam"
659   }
662 # Combine multiple recalibrated BAM files from scattered ApplyRecalibration runs
663 task GatherBamFiles {
664   Array[File] input_bams
665   String output_bam_basename
667   Int compression_level
668   Int preemptible_tries
669   Int disk_size
670   String mem_size
672   String docker_image
673   String gatk_path
674   String java_opt
676   command {
677     ${gatk_path} --java-options "-Dsamjdk.compression_level=${compression_level} ${java_opt}" \
678       GatherBamFiles \
679       --INPUT ${sep=' --INPUT ' input_bams} \
680       --OUTPUT ${output_bam_basename}.bam \
681       --CREATE_INDEX true \
682       --CREATE_MD5_FILE true
683     }
684   runtime {
685     preemptible: preemptible_tries
686     #docker: docker_image
687     memory: mem_size
688     disks: "local-disk " + disk_size + " HDD"
689   }
690   output {
691     File output_bam = "${output_bam_basename}.bam"
692     File output_bam_index = "${output_bam_basename}.bai"
693     File output_bam_md5 = "${output_bam_basename}.bam.md5"
694   }