Accessibility Tools

- Le blog participatif de bioinformatique francophone depuis 2012 -

"Trans-Alas­ka Pipe­line" by Ted LaBar

Pour avoir été client des articles ("Sna­ke­make pour les nuls""For­ma­li­ser ses pro­to­coles avec Sna­ke­make" et "Sna­ke­make, aller plus loin avec la paral­lé­li­sa­tion") de mon pré­dé­ces­seur lelouar, j'ai déci­dé d'apporter ma pierre à l'édifice et de conti­nuer cette série sur Sna­ke­make. Je vais ici vous par­ler de géné­ra­li­sa­tion de pipe­line pour l'utilisation inten­sive au sein d'une pla­te­forme par exemple.

Pourquoi rendre mon pipeline générique ?

C'est une très bonne ques­tion Jamy ! Les articles pré­cé­dents se basent sur le fait que les fichiers atten­dus en fin de pipe­line (règle tar­get) sont tous écrits "en dur". Pas très pra­tique si vous devez uti­li­ser ce pipe­line pour des pro­jets dif­fé­rents. Cela oblige à retou­cher le code ce qui est pénible et source d'erreur. De plus le fait de devoir fixer le nombre de threads pour une règle don­née peut être assez limi­tant si le nombre de cœurs dis­po­nible au lan­ce­ment est variable (par exemple si on est plu­sieurs à uti­li­ser un ser­veur de cal­cul). Je vais donc abor­der cer­taines tech­niques appli­cables pour rendre un pipe­line plus adap­té à dif­fé­rentes condi­tions d'utilisation.

Un Snakefile reste avant tout un script Python3

Il ne faut pas perdre de vue que tout code python écrit dans le Sna­ke­file est exé­cu­té nor­ma­le­ment (a quelques petites excep­tions près). On peut donc très bien englo­ber les règles dans des condi­tions. Tout l’intérêt est de jouer avec le fichier de confi­gu­ra­tion afin de lan­cer une règle pré­cise en fonc­tion de ce qui est requis par l'utilisateur. Pour illus­trer mon pro­pos tout au long de cet article, je vais me baser sur le pipe­line des articles pré­cé­dents en l'adaptant à ma pro­blé­ma­tique.

Reprenons avec le fichier de configuration

Sna­ke­make pro­pose l'utilisation de fichiers de confi­gu­ra­tion au for­mat JSON mais éga­le­ment au for­mat YAML. C'est ce for­mat que je vais uti­li­ser.

Voi­ci un petit pipe­line simple réa­li­sant l'alignement de fastq pai­red-end avec bwa-aln ou bien bwa-mem avec un fichier de confi­gu­ra­tion très simple :

import subprocess

configfile : "config.yaml"
 
rule target:
    input:
        config["genome"]+".bwt",
        "results/A.sorted.bam",
        "results/B.sorted.bam",

 
# Index du génome de référence avant l'alignement des lectures (séquences)
rule bwa_index:
    input:
        config["genome"] 
    output:
        config["genome"]+".bwt"
    message:
        "Index du génome de référence : "+config["genome"]
    version: subprocess.getoutput("bwa 2>&1 | grep Version: | sed -r 's/Version: +//'")  
    shell: #3) lance la commande
        "bwa index {input}" #"{input}"="config["genome"]"
 
# Alignement des lectures sur le génome de référence

if config["bwa"] == "mem" :

    rule bwa_mem:
        input:
            bwt = config["genome"]+".bwt",
            fasta = config["genome"],
            R1 = config["sampleDirectory"]+"/{samples}_R1.fastq.gz",
            R2 = config["sampleDirectory"]+"/{samples}_R2.fastq.gz"
        output:
            temp("results/{samples}.sam") 
        threads: 20 #max cpu autorisé
        version: subprocess.getoutput("bwa 2>&1 | grep Version: | sed -r 's/Version: +//'")
        log : "results/{samples}_"+config["bwa"]+".log"
        message:
            "Alignment de {wildcards.samples} sur "+config["genome"]
        shell:
            "bwa mem "       # avec Snakemake
            "-t {threads} "  # vous pouvez 
            "{input.fasta} " # commenter
            "{input.R1} "    # tous les 
            "{input.R2} "     # arguments 
            "-o {output} 2> {log}"        # des commandes

    rule mem_sort :
        input :
            rules.bwa_mem.output
        output :
            "results/{samples}.sorted.bam"
        threads : 20
        shell :
            "samtools sort "
            "-@ {threads} {input} -o {output}"
            " & & samtools index "
            "-@ {threads} {output}"

else :

    rule bwa_aln :
        input :
            bwt = config["genome"]+".bwt",
            fasta = config["genome"],
            R1 = config["sampleDirectory"]+"/{samples}_R1.fastq.gz",
            R2 = config["sampleDirectory"]+"/{samples}_R2.fastq.gz"
        output :
            sai_R1 = temp("results/{samples}_R1.sai"),
            sai_R2 = temp("results/{samples}_R2.sai")
        version: subprocess.getoutput("bwa 2>&1 | grep Version: | sed -r 's/Version: +//'")
        log : "results/{samples}_"+config["bwa"]+".log"
        threads : 20
        params :
            genome = config["genome"]
        shell :
            "bwa aln -t {threads} {params.genome} {input.R1} -f {output.sai_R1} 2> {log}"
            "&& " 
            "bwa aln -t {threads} {params.genome} {input.R2} -f {output.sai_R2} 2>> {log}"

    rule aln_sampe :
        input :
            R1 = config["sampleDirectory"]+"/{samples}_R1.fastq.gz",
            R2 = config["sampleDirectory"]+"/{samples}_R2.fastq.gz",
            sai_R1 = rules.bwa_aln.output.sai_R1,
            sai_R2 = rules.bwa_aln.output.sai_R2
        output :
            temp("results/{samples}.sam")
        version: subprocess.getoutput("bwa 2>&1 | grep Version: | sed -r 's/Version: +//'")
        log : "results/{samples}_sampe.log"
        params :
            genome = config["genome"]
        shell :
            "bwa sampe {params.genome} {input.sai_R1} {input.sai_R2} {input.R1} {input.R2} -f {output} 2> {log}"

    rule aln_sort :
        input :
            rules.aln_sampe.output
        output :
            "results/{samples}.sorted.bam"
        threads : 20
        shell :
            "samtools sort "
            "-@ {threads} {input} -o {output}"
            " && samtools index "
            "-@ {threads} {output}"

Et le fichier de confi­gu­ra­tion :

sampleDirectory : "samples"
bwa : "mem"
genome : "hg19.fa"

A ce stade vous me direz qu'il n'y a pas grande dif­fé­rence avec un pipe­line clas­sique vu pré­cé­dem­ment et vous aurez rai­son. Nous allons main­te­nant nous occu­per de rendre tout ça un peu plus dyna­mique. Le but étant de repro­duire plus ou moins le com­por­te­ment des options de lan­ce­ment d'un bon vieux script Python (arg­parse ce module qui vous veut du bien). Les valeurs des variables du fichier de confi­gu­ra­tion sont modi­fiables avec la com­mande de lan­ce­ment Sna­ke­make via l'option --config comme dans l'exemple ci-des­sous :

sna­ke­make --cores 30 --config genome=hg38.fa bwa=aln sampleDirectory=DNAseq

Il peut être inté­res­sant de lais­ser des valeurs en dur dans le fichier de confi­gu­ra­tion afin d'avoir un com­por­te­ment par défaut du pipe­line. Par contre, attri­buer une valeur à une variable non pré­sente au sein du fichier de confi­gu­ra­tion (par exemple --config gemome=hg38.fa) la rajoute pour cette ses­sion de lan­ce­ment. Vous voyez venir le pro­blème ? Si vous faites une faute dans la variable que vous vou­lez modi­fier, non seule­ment une nou­velle variable sera créée mais votre pipe­line se lan­ce­ra sans sou­cis en conser­vant la valeur par défaut dans le fichier de confi­gu­ra­tion. Mais pas de panique ! Sna­ke­make pro­pose de vali­der un fichier de confi­gu­ra­tion selon un sché­ma YAML via le code sui­vant :

from snakemake.utils import validate
validate(config, "path/to/schemal.yaml")

Pour le petit fichier de confi­gu­ra­tion uti­li­sé ici, le sché­ma pour­rait res­sem­bler à celui ci-des­sous :

definitions: {}
$schema: http://json-schema.org/draft-07/schema#
$id: http://example.com/root.json
type: object
title: The Root Schema
required:
- sampleDirectory
- bwa
- genome
properties:
  sampleDirectory:
    $id: '#/properties/sampleDirectory'
    type: string
    title: The SampleDirectory Schema
    default: ''
    examples:
    - samples
    pattern: ^(.*)$
  bwa:
    $id: '#/properties/bwa'
    type: string
    title: The Bwa Schema
    default: ''
    examples:
    - mem
    pattern: ^(.*)$
  genome:
    $id: '#/properties/genome'
    type: string
    title: The Genome Schema
    default: ''
    examples:
    - hg19.fa
    pattern: ^(.*)$
additionalProperties: false

La ligne finale spé­ci­fie que seules les variables pré­sentes dans ce sché­ma doivent être pré­sentes dans le fichier de confi­gu­ra­tion (ou via la ligne de com­mande sna­ke­make). Pour aller plus loin dans l'écriture des sché­mas je vous invite à lire la docu­men­ta­tion en ligne (bon cou­rage !)

Rendons tout ça un peu plus général !

Main­te­nant que nous avons un pipe­line bien défi­ni avec fichier de confi­gu­ra­tion et vali­da­tion YAML, occu­pons-nous de rendre ce pipe­line appli­cable au plus grand nombre de cas pos­sible. Vous avez sans doute remar­qué que les fichiers atten­dus en fin de pipe­line sont écrit en dur dans la règle tar­get ce qui oblige d'aller modi­fier des che­mins et des noms de fichiers dans le code du Sna­ke­file ce qui n'est pas très agréable, vous en convien­drez. La solu­tion est d'utiliser des fonc­tions d'input afin d'établir auto­ma­ti­que­ment la liste des fichiers atten­dus en ana­ly­sant par exemple le conte­nu d'un dos­sier (don­né en option) conte­nant nos échan­tillons à ali­gner. Cela per­met­trait de ne chan­ger uni­que­ment le dos­sier d'entrée via la com­mande --config et ne plus tou­cher au code du pipe­line.

Pre­nons l'exemple sui­vant : Joe tra­vaille au sein d'une pla­te­forme de séquen­çage qui enchaîne les pro­jets. Com­ment amé­lio­rer le pipe­line du des­sus afin de pou­voir l'utiliser pour tous les pro­jets pai­red-end ? Dans un pre­mier temps on peut détec­ter auto­ma­ti­que­ment les fichiers d'entrée et de sor­tie comme expli­qué au para­graphe pré­cé­dent mais on peut éga­le­ment adap­ter le nombre de threads alloués à chaque règle en fonc­tion de la taille du pro­jet. Dans cet exemple la pla­te­forme exige que le pipe­line fasse tour­ner le maxi­mum d'échantillons en paral­lèle sur un ser­veur de 32 cœurs et que les fichiers issus d'un séquen­çage Illu­mi­na soient for­ma­tés sous la forme nom_S[numéro_individu]_L00[numéro_lane_de_séquençage]_R[1,2]_001.fastq.gz

Exemple pour un pro­jet X de séquen­çage du rat en pai­red-end :

  • Rattus_projetX_S1_L001_R1_001.fastq.gz
  • Rattus_projetX_S1_L001_R2_001.fastq.gz

Pour com­men­cer, grâce au fichier de config nous pou­vons récu­pé­rer la liste des fichiers pré­sents dans le dos­sier conte­nant ces deux fastq :

import glob
import re

LIST_FILE = glob.glob(config["sampleDirectory"]+"/*.fastq.gz") #récupération de tout les fastq.gz présent dans le dossier fourni dans le config file

LIST_NAME =[] 
# bloc récupérant via regex le prefix correpondant au nom des échantillions (un nom retenu pour deux échantillons en Paired-End par exemple)
for name in LIST_FILE : 
	removeDir = re.sub(config["sampleDirectory"]+"/",'',name)
	removeDir = re.sub(".fastq.gz",'',removeDir)
	if re.sub("_Sw+_R1_001",'', removeDir) != removeDir : 
		name = re.sub("_Sw+_R1_001",'', removeDir)
		LIST_NAME.append(name) 

On peut créer une fonc­tion retour­nant la liste des fichiers atten­dus en fonc­tion du conte­nu du dos­sier d'input en uti­li­sant les infor­ma­tions récu­pé­rées dans LIST_​NAME :

def input_target(names) :
	liste = []
	for name in names :
		liste.append("results/"+name+".sorted.bam")
	return liste

Et donc la règle tar­get s'écrit désor­mais comme cela :

rule target:
	input:
		config["genome"]+".bwt",
		input_target(LIST_NAME)

Et hop ! Vous avez une fonc­tion d'input auto­ma­tique ! Bien évi­dem­ment ce code ne fonc­tionne que pour notre exemple pré­cis, si les fastq ne sont pas for­ma­tés comme deman­dé, le pipe­line ne trou­ve­ra pas vos échan­tillons. A vous d'adapter ce code aux spé­ci­fi­ci­tés de votre labo.

La fonction lambda

Si uti­li­ser des fonc­tions Python clas­siques fonc­tionne par­fai­te­ment dans Sna­ke­make, uti­li­ser les wild­cards de ce der­nier est encore plus effi­cace. Pour conti­nuer sur la lan­cée de la par­tie pré­cé­dente, on peut écrire deux petites fonc­tions qui pren­dront en entrée la valeur de la wild­cards défi­nie par Sna­ke­make afin d'avoir une conti­nui­té dans l'automatisation :

def R1_func(wildcards):
	return(glob.glob(config["sampleDirectory"]+"/"+wildcards + "*_R1_001.fastq.gz"))
def R2_func(wildcards):
	return(glob.glob(config["sampleDirectory"]+"/"+wildcards + "*_R2_001.fastq.gz"))

[...]

rule bwa_aln :
		input :
			bwt = config["genome"]+".bwt",
			fasta = config["genome"],
			R1 = lambda wildcards: R1_func("{samples}".format(samples=wildcards.samples)),
			R2 = lambda wildcards: R2_func("{samples}".format(samples=wildcards.samples))
		output :
			sai_R1 = temp("results/{samples}_R1.sai"),
			sai_R2 = temp("results/{samples}_R2.sai")
		version: subprocess.getoutput("bwa 2>&1 | grep Version: | sed -r 's/Version: +//'")
		log : "results/{samples}_"+config["bwa"]+".log"
		threads : 20
		params :
			genome = config["genome"]
		shell :
			"bwa aln -t {threads} {params.genome} {input.R1} -f {output.sai_R1} 2> {log}"
			"&& " 
			"bwa aln -t {threads} {params.genome} {input.R2} -f {output.sai_R2} 2>> {log}"

Ces deux fonc­tions sélec­tionnent les fichiers fastq R1 et R2 en fonc­tion d'une wild­cards don­née. Cette wild­cards va prendre auto­ma­ti­que­ment comme valeur les noms pré­sents dans LIST_​NAME car c'est à par­tir de la règle tar­get que Sna­ke­make sait quels fichiers doivent être créés et donc quelle valeur attri­buer aux wild­cards.

Gérer l'attribution du nombre de threads

Étran­ge­ment, il n'existe pas de façon simple pour récu­pé­rer la valeur du nombre de cœurs dis­po­nible spé­ci­fiée au lan­ce­ment (--cores ou -j). Mais l'idée est de récu­pé­rer cette valeur afin d'adapter le nombre de threads pour chaque règle en fonc­tion du nombre d'échantillons à ana­ly­ser.

J'ai donc uti­li­sé sys.argv (la liste conte­nant les argu­ments de la com­mande de lan­ce­ment d'un pro­gramme python) afin de récu­pé­rer cette valeur mais le com­por­te­ment de sna­ke­make étant assez par­ti­cu­lier, la récu­pé­ra­tion doit se faire en deux temps. En effet, Sna­ke­make exé­cute le code une pre­mière fois (votre com­mande de lan­ce­ment) et sys.argv contient toute vos options de lan­ce­ment. Ensuite le code s'exécute une seconde fois et Sna­ke­make modi­fie les valeurs de sys.argv par ses propres valeurs. Oui c'est tor­du ! Essayez de print sys.argv a l'exécution d'un pipe­line vous allez voir.

Je vous donne donc la fonc­tion toute faite ce qui vous évi­te­ra cher­cher deux heures comme moi :

def threads_max() :
	th = 0
	it=0
	for op in sys.argv :
#dans la première éxécution, l'option est sous la forme [..."-j","32"...]
		if op == "-j" or op == "--cores": 
			th = int(sys.argv[it+1])
			break
		else :
#Dans la seconde exécution la structure de l'option change ([..."-j32",...] par exemple)
			if re.search("-j(w+)",op) :
				th = int(re.search("-j(w+)",op).group(1))
		it += 1
	return th

Cette fonc­tion vous per­met­tra de récu­pé­rer le nombre de cœurs max dis­po­nible. Vous pour­rez ensuite par exemple déci­der de lan­cer le plus d'analyse pos­sible en paral­lèle quel que soit le nombre d'échantillons. Sna­ke­make per­met de mettre une variable au nombre de threads par règle au lieu d'un nombre fixe. Il est donc inté­res­sant de créer une petite condi­tions qui adap­te­ra ce nombre au nombre d'échantillons.

bwa_threads = 1
if threads_max() > len(LIST_NAME):
    bwa_threads =  threads_max() // len(LIST_NAME)

Au final

Avec toutes ces modi­fi­ca­tions, le pipe­line est désor­mais capable de réa­li­ser l'alignement sur bwa-mem ou aln au choix, en fonc­tion d'un dos­sier d'input et d'optimiser tout seul le nombre de threads en fonc­tion du nombre d'échantillons à trai­ter. Le pipe­line a été tes­té sur une machine locale sous Ubun­tu 19.04 avec 16Gb de Ram et un i5-8250U.

Je vous redonne le code total du pipe­line opti­mi­sé :

import re
import subprocess
import glob
import sys
from snakemake.utils import validate

configfile: "config.yaml"


# Fonctions #

validate(config, "schema.yaml")

LIST_FILE = glob.glob(config["sampleDirectory"]+"/*.fastq.gz")

LIST_NAME = [] 
for name in LIST_FILE: # bloc récupérant via regex le prefix correpondant au nom des échantillions (un nom retenu pour deux échantillons en Pair-End par exemple)
    removeDir = re.sub(config["sampleDirectory"]+"/", '', name)
    removeDir = re.sub(".fastq.gz", '', removeDir)
    if re.sub("_S\w+_R1_001",'', removeDir) != removeDir: 
        name = re.sub("_S\w+_R1_001",'', removeDir)
        LIST_NAME.append(name) 

def R1_func(wildcards):
    return(glob.glob(config["sampleDirectory"]+"/"+wildcards + "*_R1_001.fastq.gz"))

def R2_func(wildcards):
     return(glob.glob(config["sampleDirectory"]+"/"+wildcards + "*_R2_001.fastq.gz"))

def input_target(names):
    liste = []
    for name in names:
        liste.append("results/"+name+".sorted.bam")
    return liste

def threads_max():
    th = 0
    it = 0
    for op in sys.argv:
#dans la première éxécution, l'option est sous la forme [..."-j","32"...]
        if op == "-j" or op == "--cores": 
            th = int(sys.argv[it+1])
            break
        else:
#Dans la seconde exécution la structure de l'option change ([..."-j32",...] par exemple)
            if re.search("-j(w+)",op):
                th = int(re.search("-j(w+)",op).group(1))
        it += 1
    return th

bwa_threads = 1

if threads_max() > len(LIST_NAME):
    bwa_threads = threads_max() // len(LIST_NAME)

# au dessus de 16 threads, le gain de vitesse est négligeable
if bwa_threads > 16:
    bwa_threads = 16

###### Pipeline ######

rule target:
    input:
        config["genome"]+".bwt",
        input_target(LIST_NAME)

 
# Index du génome de référence avant l'alignement des lectures (séquences)
rule bwa_index:
    input:
        config["genome"] 
    output:
        config["genome"] + ".bwt"
    message:
        "Index du génome de référence: " + config["genome"]
    version: subprocess.getoutput("bwa 2>&1 | grep Version: | sed -r 's/Version: +//'")  
    shell: #3) lance la commande
        "bwa index {input}" #"{input}"="config["genome"]"
 
# Alignement des lectures sur le génome de référence

if config["bwa"] == "mem":

    rule bwa_mem:
        input:
            bwt = config["genome"]+".bwt",
            fasta = config["genome"],
            R1 = lambda wildcards: R1_func("{samples}".format(samples=wildcards.samples)),
            R2 = lambda wildcards: R2_func("{samples}".format(samples=wildcards.samples))
        output:
            temp("results/{samples}.sam") 
        threads: bwa_threads #max cpu autorisé
        version: subprocess.getoutput("bwa 2>&1 | grep Version: | sed -r 's/Version: +//'")
        log: "results/{samples}_"+config["bwa"]+".log"
        message:
            "Alignement de {wildcards.samples} sur "+config["genome"]+" avec "+str(bwa_threads)+" threads"
        shell:
            "bwa mem "       # avec Snakemake
            "-t {threads} "  # vous pouvez 
            "{input.fasta} " # commenter
            "{input.R1} "    # tous les 
            "{input.R2} "     # arguments 
            "-o {output} 2> {log}"        # des commandes

    rule mem_sort:
        input:
            rules.bwa_mem.output
        output:
            "results/{samples}.sorted.bam"
        threads: bwa_threads
        shell:
            "samtools sort "
            "-@ {threads} {input} -o {output}"
            " && samtools index "
            "-@ {threads} {output}"

else:

    rule bwa_aln:
        input:
            bwt = config["genome"]+".bwt",
            fasta = config["genome"],
            R1 = lambda wildcards: R1_func("{samples}".format(samples=wildcards.samples)),
            R2 = lambda wildcards: R2_func("{samples}".format(samples=wildcards.samples))
        output:
            sai_R1 = temp("results/{samples}_R1.sai"),
            sai_R2 = temp("results/{samples}_R2.sai")
        version: subprocess.getoutput("bwa 2>&1 | grep Version: | sed -r 's/Version: +//'")
        log: "results/{samples}_"+config["bwa"]+".log"
        threads: bwa_threads
        message: "Alignement de {wildcards.samples} sur "+config["genome"]+" avec "+str(bwa_threads)+" threads"
        params:
            genome = config["genome"]
        shell:
            "bwa aln -t {threads} {params.genome} {input.R1} -f {output.sai_R1} 2> {log}"
            "&& " 
            "bwa aln -t {threads} {params.genome} {input.R2} -f {output.sai_R2} 2>> {log}"

    rule aln_sampe:
        input:
            R1 = lambda wildcards: R1_func("{samples}".format(samples=wildcards.samples)),
            R2 = lambda wildcards: R2_func("{samples}".format(samples=wildcards.samples)),
            sai_R1 = rules.bwa_aln.output.sai_R1,
            sai_R2 = rules.bwa_aln.output.sai_R2
        output:
            temp("results/{samples}.sam")
        version: subprocess.getoutput("bwa 2>&1 | grep Version: | sed -r 's/Version: +//'")
        log: "results/{samples}_sampe.log"
        params:
            genome = config["genome"]
        shell:
            "bwa sampe {params.genome} {input.sai_R1} {input.sai_R2} {input.R1} {input.R2} -f {output} 2> {log}"

    rule aln_sort:
        input:
            rules.aln_sampe.output
        output:
            "results/{samples}.sorted.bam"
        threads: bwa_threads
        shell:
            "samtools sort "
            "-@ {threads} {input} -o {output}"
            " && samtools index "
            "-@ {threads} {output}"

Ce pipe­line peut évi­dem­ment être encore opti­mi­sé notam­ment via l'ajout de mes­sages per­son­na­li­sés expli­ca­tif à l'exécution des règles par exemple. Il est éga­le­ment pos­sible d'imaginer une adap­ta­tion à l'exécution sur clus­ter.

Je remer­cie les relec­teurs de cet article Aki­ra, Lins et lelouar pour leur aide et leur avis pen­dant la rédac­tion de cet article.

Source pho­to : "Trans-Alas­ka Pipe­line"

Vous avez aimé ? Dites-le nous !

Moyenne : 0 /​ 5. Nb de votes : 0

Pas encore de vote pour cet article.

We are sor­ry that this post was not use­ful for you !

Let us improve this post !

Tell us how we can improve this post ?




Commentaires

Laisser un commentaire

Pour insérer du code dans vos commentaires, utilisez les balises <code> et <\code>.