Pour avoir été client des articles ("Snakemake pour les nuls", "Formaliser ses protocoles avec Snakemake" et "Snakemake, aller plus loin avec la parallélisation") de mon prédécesseur lelouar, j'ai décidé d'apporter ma pierre à l'édifice et de continuer cette série sur Snakemake. Je vais ici vous parler de généralisation de pipeline pour l'utilisation intensive au sein d'une plateforme par exemple.
Pourquoi rendre mon pipeline générique ?
C'est une très bonne question Jamy ! Les articles précédents se basent sur le fait que les fichiers attendus en fin de pipeline (règle target) sont tous écrits "en dur". Pas très pratique si vous devez utiliser ce pipeline pour des projets différents. Cela oblige à retoucher le code ce qui est pénible et source d'erreur. De plus le fait de devoir fixer le nombre de threads pour une règle donnée peut être assez limitant si le nombre de cœurs disponible au lancement est variable (par exemple si on est plusieurs à utiliser un serveur de calcul). Je vais donc aborder certaines techniques applicables pour rendre un pipeline plus adapté à différentes conditions d'utilisation.
Un Snakefile reste avant tout un script Python3
Il ne faut pas perdre de vue que tout code python écrit dans le Snakefile est exécuté normalement (a quelques petites exceptions près). On peut donc très bien englober les règles dans des conditions. Tout l’intérêt est de jouer avec le fichier de configuration afin de lancer une règle précise en fonction de ce qui est requis par l'utilisateur. Pour illustrer mon propos tout au long de cet article, je vais me baser sur le pipeline des articles précédents en l'adaptant à ma problématique.
Reprenons avec le fichier de configuration
Snakemake propose l'utilisation de fichiers de configuration au format JSON mais également au format YAML. C'est ce format que je vais utiliser.
Voici un petit pipeline simple réalisant l'alignement de fastq paired-end avec bwa-aln ou bien bwa-mem avec un fichier de configuration très simple :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 |
import subprocess configfile : "config.yaml" rule target : input : config["genome"]+".bwt", "results/A.sorted.bam", "results/B.sorted.bam", # Index du génome de référence avant l'alignement des lectures (séquences) rule bwa_index : input : config["genome"] output : config["genome"]+".bwt" message : "Index du génome de référence : "+config["genome"] version : subprocess.getoutput("bwa 2>&1 | grep Version : | sed ‑r 's/Version : +//'") shell : #3) lance la commande "bwa index {input}" #"{input}"="config["genome"]" # Alignement des lectures sur le génome de référence if config["bwa"] == "mem" : rule bwa_mem : input : bwt = config["genome"]+".bwt", fasta = config["genome"], R1 = config["sampleDirectory"]+"/{samples}_R1.fastq.gz", R2 = config["sampleDirectory"]+"/{samples}_R2.fastq.gz" output : temp("results/{samples}.sam") threads : 20 #max cpu autorisé version : subprocess.getoutput("bwa 2>&1 | grep Version : | sed ‑r 's/Version : +//'") log : "results/{samples}_"+config["bwa"]+".log" message : "Alignment de {wildcards.samples} sur "+config["genome"] shell : "bwa mem " # avec Snakemake "-t {threads} " # vous pouvez "{input.fasta} " # commenter "{input.R1} " # tous les "{input.R2} " # arguments "-o {output} 2> {log}" # des commandes rule mem_sort : input : rules.bwa_mem.output output : "results/{samples}.sorted.bam" threads : 20 shell : "samtools sort " "-@ {threads} {input} ‑o {output}" " & & samtools index " "-@ {threads} {output}" else : rule bwa_aln : input : bwt = config["genome"]+".bwt", fasta = config["genome"], R1 = config["sampleDirectory"]+"/{samples}_R1.fastq.gz", R2 = config["sampleDirectory"]+"/{samples}_R2.fastq.gz" output : sai_R1 = temp("results/{samples}_R1.sai"), sai_R2 = temp("results/{samples}_R2.sai") version : subprocess.getoutput("bwa 2>&1 | grep Version : | sed ‑r 's/Version : +//'") log : "results/{samples}_"+config["bwa"]+".log" threads : 20 params : genome = config["genome"] shell : "bwa aln ‑t {threads} {params.genome} {input.R1} ‑f {output.sai_R1} 2> {log}" "&& " "bwa aln ‑t {threads} {params.genome} {input.R2} ‑f {output.sai_R2} 2>> {log}" rule aln_sampe : input : R1 = config["sampleDirectory"]+"/{samples}_R1.fastq.gz", R2 = config["sampleDirectory"]+"/{samples}_R2.fastq.gz", sai_R1 = rules.bwa_aln.output.sai_R1, sai_R2 = rules.bwa_aln.output.sai_R2 output : temp("results/{samples}.sam") version : subprocess.getoutput("bwa 2>&1 | grep Version : | sed ‑r 's/Version : +//'") log : "results/{samples}_sampe.log" params : genome = config["genome"] shell : "bwa sampe {params.genome} {input.sai_R1} {input.sai_R2} {input.R1} {input.R2} ‑f {output} 2> {log}" rule aln_sort : input : rules.aln_sampe.output output : "results/{samples}.sorted.bam" threads : 20 shell : "samtools sort " "-@ {threads} {input} ‑o {output}" " && samtools index " "-@ {threads} {output}" |
Et le fichier de configuration :
1 2 3 |
sampleDirectory : "samples" bwa : "mem" genome : "hg19.fa" |
A ce stade vous me direz qu'il n'y a pas grande différence avec un pipeline classique vu précédemment et vous aurez raison. Nous allons maintenant nous occuper de rendre tout ça un peu plus dynamique. Le but étant de reproduire plus ou moins le comportement des options de lancement d'un bon vieux script Python (argparse ce module qui vous veut du bien). Les valeurs des variables du fichier de configuration sont modifiables avec la commande de lancement Snakemake via l'option –config comme dans l'exemple ci-dessous :
snakemake –cores 30 –config genome=hg38.fa bwa=aln sampleDirectory=DNAseq
Il peut être intéressant de laisser des valeurs en dur dans le fichier de configuration afin d'avoir un comportement par défaut du pipeline. Par contre, attribuer une valeur à une variable non présente au sein du fichier de configuration (par exemple –config gemome=hg38.fa) la rajoute pour cette session de lancement. Vous voyez venir le problème ? Si vous faites une faute dans la variable que vous voulez modifier, non seulement une nouvelle variable sera créée mais votre pipeline se lancera sans soucis en conservant la valeur par défaut dans le fichier de configuration. Mais pas de panique ! Snakemake propose de valider un fichier de configuration selon un schéma YAML via le code suivant :
1 2 |
from snakemake.utils import validate validate(config, "path/to/schemal.yaml") |
Pour le petit fichier de configuration utilisé ici, le schéma pourrait ressembler à celui ci-dessous :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
definitions : {} $schema : http ://json-schema.org/draft-07/schema# $id : http ://example.com/root.json type : object title : The Root Schema required : - sampleDirectory - bwa - genome properties : sampleDirectory : $id : '#/properties/sampleDirectory' type : string title : The SampleDirectory Schema default : '' examples : - samples pattern : ^(.*)$ bwa : $id : '#/properties/bwa' type : string title : The Bwa Schema default : '' examples : - mem pattern : ^(.*)$ genome : $id : '#/properties/genome' type : string title : The Genome Schema default : '' examples : - hg19.fa pattern : ^(.*)$ additionalProperties : false |
La ligne finale spécifie que seules les variables présentes dans ce schéma doivent être présentes dans le fichier de configuration (ou via la ligne de commande snakemake). Pour aller plus loin dans l'écriture des schémas je vous invite à lire la documentation en ligne (bon courage !)
Rendons tout ça un peu plus général !
Maintenant que nous avons un pipeline bien défini avec fichier de configuration et validation YAML, occupons-nous de rendre ce pipeline applicable au plus grand nombre de cas possible. Vous avez sans doute remarqué que les fichiers attendus en fin de pipeline sont écrit en dur dans la règle target ce qui oblige d'aller modifier des chemins et des noms de fichiers dans le code du Snakefile ce qui n'est pas très agréable, vous en conviendrez. La solution est d'utiliser des fonctions d'input afin d'établir automatiquement la liste des fichiers attendus en analysant par exemple le contenu d'un dossier (donné en option) contenant nos échantillons à aligner. Cela permettrait de ne changer uniquement le dossier d'entrée via la commande –config et ne plus toucher au code du pipeline.
Prenons l'exemple suivant : Joe travaille au sein d'une plateforme de séquençage qui enchaîne les projets. Comment améliorer le pipeline du dessus afin de pouvoir l'utiliser pour tous les projets paired-end ? Dans un premier temps on peut détecter automatiquement les fichiers d'entrée et de sortie comme expliqué au paragraphe précédent mais on peut également adapter le nombre de threads alloués à chaque règle en fonction de la taille du projet. Dans cet exemple la plateforme exige que le pipeline fasse tourner le maximum d'échantillons en parallèle sur un serveur de 32 cœurs et que les fichiers issus d'un séquençage Illumina soient formatés sous la forme nom_S[numéro_individu]_L00[numéro_lane_de_séquençage]_R[1,2]_001.fastq.gz
Exemple pour un projet X de séquençage du rat en paired-end :
- Rattus_projetX_S1_L001_R1_001.fastq.gz
- Rattus_projetX_S1_L001_R2_001.fastq.gz
Pour commencer, grâce au fichier de config nous pouvons récupérer la liste des fichiers présents dans le dossier contenant ces deux fastq :
1 2 3 4 5 6 7 8 9 10 11 12 13 |
import glob import re LIST_FILE = glob.glob(config["sampleDirectory"]+"/*.fastq.gz") #récupération de tout les fastq.gz présent dans le dossier fourni dans le config file LIST_NAME =[] # bloc récupérant via regex le prefix correpondant au nom des échantillions (un nom retenu pour deux échantillons en Paired-End par exemple) for name in LIST_FILE : removeDir = re.sub(config["sampleDirectory"]+"/",'',name) removeDir = re.sub(".fastq.gz",'',removeDir) if re.sub("_Sw+_R1_001",'', removeDir) != removeDir : name = re.sub("_Sw+_R1_001",'', removeDir) LIST_NAME.append(name) |
On peut créer une fonction retournant la liste des fichiers attendus en fonction du contenu du dossier d'input en utilisant les informations récupérées dans LIST_NAME :
1 2 3 4 5 |
def input_target(names) : liste = [] for name in names : liste.append("results/"+name+".sorted.bam") return liste |
Et donc la règle target s'écrit désormais comme cela :
1 2 3 4 |
rule target : input : config["genome"]+".bwt", input_target(LIST_NAME) |
Et hop ! Vous avez une fonction d'input automatique ! Bien évidemment ce code ne fonctionne que pour notre exemple précis, si les fastq ne sont pas formatés comme demandé, le pipeline ne trouvera pas vos échantillons. A vous d'adapter ce code aux spécificités de votre labo.
La fonction lambda
Si utiliser des fonctions Python classiques fonctionne parfaitement dans Snakemake, utiliser les wildcards de ce dernier est encore plus efficace. Pour continuer sur la lancée de la partie précédente, on peut écrire deux petites fonctions qui prendront en entrée la valeur de la wildcards définie par Snakemake afin d'avoir une continuité dans l'automatisation :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
def R1_func(wildcards): return(glob.glob(config["sampleDirectory"]+"/"+wildcards + "*_R1_001.fastq.gz")) def R2_func(wildcards): return(glob.glob(config["sampleDirectory"]+"/"+wildcards + "*_R2_001.fastq.gz")) [...] rule bwa_aln : input : bwt = config["genome"]+".bwt", fasta = config["genome"], R1 = lambda wildcards : R1_func("{samples}".format(samples=wildcards.samples)), R2 = lambda wildcards : R2_func("{samples}".format(samples=wildcards.samples)) output : sai_R1 = temp("results/{samples}_R1.sai"), sai_R2 = temp("results/{samples}_R2.sai") version : subprocess.getoutput("bwa 2>&1 | grep Version : | sed ‑r 's/Version : +//'") log : "results/{samples}_"+config["bwa"]+".log" threads : 20 params : genome = config["genome"] shell : "bwa aln ‑t {threads} {params.genome} {input.R1} ‑f {output.sai_R1} 2> {log}" "&& " "bwa aln ‑t {threads} {params.genome} {input.R2} ‑f {output.sai_R2} 2>> {log}" |
Ces deux fonctions sélectionnent les fichiers fastq R1 et R2 en fonction d'une wildcards donnée. Cette wildcards va prendre automatiquement comme valeur les noms présents dans LIST_NAME car c'est à partir de la règle target que Snakemake sait quels fichiers doivent être créés et donc quelle valeur attribuer aux wildcards.
Gérer l'attribution du nombre de threads
Étrangement, il n'existe pas de façon simple pour récupérer la valeur du nombre de cœurs disponible spécifiée au lancement (–cores ou ‑j). Mais l'idée est de récupérer cette valeur afin d'adapter le nombre de threads pour chaque règle en fonction du nombre d'échantillons à analyser.
J'ai donc utilisé sys.argv (la liste contenant les arguments de la commande de lancement d'un programme python) afin de récupérer cette valeur mais le comportement de snakemake étant assez particulier, la récupération doit se faire en deux temps. En effet, Snakemake exécute le code une première fois (votre commande de lancement) et sys.argv contient toute vos options de lancement. Ensuite le code s'exécute une seconde fois et Snakemake modifie les valeurs de sys.argv par ses propres valeurs. Oui c'est tordu ! Essayez de print sys.argv a l'exécution d'un pipeline vous allez voir.
Je vous donne donc la fonction toute faite ce qui vous évitera chercher deux heures comme moi :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
def threads_max() : th = 0 it=0 for op in sys.argv : #dans la première éxécution, l'option est sous la forme […"-j","32"…] if op == "-j" or op == "–cores": th = int(sys.argv[it+1]) break else : #Dans la seconde exécution la structure de l'option change ([…"-j32",…] par exemple) if re.search("-j(w+)",op) : th = int(re.search("-j(w+)",op).group(1)) it += 1 return th |
Cette fonction vous permettra de récupérer le nombre de cœurs max disponible. Vous pourrez ensuite par exemple décider de lancer le plus d'analyse possible en parallèle quel que soit le nombre d'échantillons. Snakemake permet de mettre une variable au nombre de threads par règle au lieu d'un nombre fixe. Il est donc intéressant de créer une petite conditions qui adaptera ce nombre au nombre d'échantillons.
1 2 3 |
bwa_threads = 1 if threads_max() > len(LIST_NAME): bwa_threads = threads_max() // len(LIST_NAME) |
Au final
Avec toutes ces modifications, le pipeline est désormais capable de réaliser l'alignement sur bwa-mem ou aln au choix, en fonction d'un dossier d'input et d'optimiser tout seul le nombre de threads en fonction du nombre d'échantillons à traiter. Le pipeline a été testé sur une machine locale sous Ubuntu 19.04 avec 16Gb de Ram et un i5-8250U.
Je vous redonne le code total du pipeline optimisé :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 |
import re import subprocess import glob import sys from snakemake.utils import validate configfile : "config.yaml" # Fonctions # validate(config, "schema.yaml") LIST_FILE = glob.glob(config["sampleDirectory"]+"/*.fastq.gz") LIST_NAME = [] for name in LIST_FILE : # bloc récupérant via regex le prefix correpondant au nom des échantillions (un nom retenu pour deux échantillons en Pair-End par exemple) removeDir = re.sub(config["sampleDirectory"]+"/", '', name) removeDir = re.sub(".fastq.gz", '', removeDir) if re.sub("_S\w+_R1_001",'', removeDir) != removeDir : name = re.sub("_S\w+_R1_001",'', removeDir) LIST_NAME.append(name) def R1_func(wildcards): return(glob.glob(config["sampleDirectory"]+"/"+wildcards + "*_R1_001.fastq.gz")) def R2_func(wildcards): return(glob.glob(config["sampleDirectory"]+"/"+wildcards + "*_R2_001.fastq.gz")) def input_target(names): liste = [] for name in names : liste.append("results/"+name+".sorted.bam") return liste def threads_max(): th = 0 it = 0 for op in sys.argv : #dans la première éxécution, l'option est sous la forme […"-j","32"…] if op == "-j" or op == "–cores": th = int(sys.argv[it+1]) break else : #Dans la seconde exécution la structure de l'option change ([…"-j32",…] par exemple) if re.search("-j(w+)",op): th = int(re.search("-j(w+)",op).group(1)) it += 1 return th bwa_threads = 1 if threads_max() > len(LIST_NAME): bwa_threads = threads_max() // len(LIST_NAME) # au dessus de 16 threads, le gain de vitesse est négligeable if bwa_threads > 16 : bwa_threads = 16 ###### Pipeline ###### rule target : input : config["genome"]+".bwt", input_target(LIST_NAME) # Index du génome de référence avant l'alignement des lectures (séquences) rule bwa_index : input : config["genome"] output : config["genome"] + ".bwt" message : "Index du génome de référence : " + config["genome"] version : subprocess.getoutput("bwa 2>&1 | grep Version : | sed ‑r 's/Version : +//'") shell : #3) lance la commande "bwa index {input}" #"{input}"="config["genome"]" # Alignement des lectures sur le génome de référence if config["bwa"] == "mem": rule bwa_mem : input : bwt = config["genome"]+".bwt", fasta = config["genome"], R1 = lambda wildcards : R1_func("{samples}".format(samples=wildcards.samples)), R2 = lambda wildcards : R2_func("{samples}".format(samples=wildcards.samples)) output : temp("results/{samples}.sam") threads : bwa_threads #max cpu autorisé version : subprocess.getoutput("bwa 2>&1 | grep Version : | sed ‑r 's/Version : +//'") log : "results/{samples}_"+config["bwa"]+".log" message : "Alignement de {wildcards.samples} sur "+config["genome"]+" avec "+str(bwa_threads)+" threads" shell : "bwa mem " # avec Snakemake "-t {threads} " # vous pouvez "{input.fasta} " # commenter "{input.R1} " # tous les "{input.R2} " # arguments "-o {output} 2> {log}" # des commandes rule mem_sort : input : rules.bwa_mem.output output : "results/{samples}.sorted.bam" threads : bwa_threads shell : "samtools sort " "-@ {threads} {input} ‑o {output}" " && samtools index " "-@ {threads} {output}" else : rule bwa_aln : input : bwt = config["genome"]+".bwt", fasta = config["genome"], R1 = lambda wildcards : R1_func("{samples}".format(samples=wildcards.samples)), R2 = lambda wildcards : R2_func("{samples}".format(samples=wildcards.samples)) output : sai_R1 = temp("results/{samples}_R1.sai"), sai_R2 = temp("results/{samples}_R2.sai") version : subprocess.getoutput("bwa 2>&1 | grep Version : | sed ‑r 's/Version : +//'") log : "results/{samples}_"+config["bwa"]+".log" threads : bwa_threads message : "Alignement de {wildcards.samples} sur "+config["genome"]+" avec "+str(bwa_threads)+" threads" params : genome = config["genome"] shell : "bwa aln ‑t {threads} {params.genome} {input.R1} ‑f {output.sai_R1} 2> {log}" "&& " "bwa aln ‑t {threads} {params.genome} {input.R2} ‑f {output.sai_R2} 2>> {log}" rule aln_sampe : input : R1 = lambda wildcards : R1_func("{samples}".format(samples=wildcards.samples)), R2 = lambda wildcards : R2_func("{samples}".format(samples=wildcards.samples)), sai_R1 = rules.bwa_aln.output.sai_R1, sai_R2 = rules.bwa_aln.output.sai_R2 output : temp("results/{samples}.sam") version : subprocess.getoutput("bwa 2>&1 | grep Version : | sed ‑r 's/Version : +//'") log : "results/{samples}_sampe.log" params : genome = config["genome"] shell : "bwa sampe {params.genome} {input.sai_R1} {input.sai_R2} {input.R1} {input.R2} ‑f {output} 2> {log}" rule aln_sort : input : rules.aln_sampe.output output : "results/{samples}.sorted.bam" threads : bwa_threads shell : "samtools sort " "-@ {threads} {input} ‑o {output}" " && samtools index " "-@ {threads} {output}" |
Ce pipeline peut évidemment être encore optimisé notamment via l'ajout de messages personnalisés explicatif à l'exécution des règles par exemple. Il est également possible d'imaginer une adaptation à l'exécution sur cluster.
Je remercie les relecteurs de cet article Akira, Lins et lelouar pour leur aide et leur avis pendant la rédaction de cet article.
Source photo : "Trans-Alaska Pipeline"
Laisser un commentaire