Qu’est-qu’on veut ? Maintenant ! Et quand est-ce qu’on le veut ? De l’async !

Qu’est-ce que c’est encore que ce machin ?

Si tu fais un peu de Rust ces derniers temps, tu as du voir tout un tas de librairies se convertir à l’async sans forcément comprendre pourquoi. Async/await, c’est juste une manière de faire des choses de manières concurrentes. En gros, au lieu de chercher à exécuter le programme dans l’ordre, on va chercher à faire des actions un peu dans le désordre pour éviter de bloquer le thread principal.

Tu vas alors me dire : mais on a déjà de la programmation parallèle pour ça non ? Et ça, je répondrais oui. Sauf qu’en fait, non. Dans des threads classiques, toutes les actions sont effectuées en parallèle (pense : plusieurs processeurs exécutant des actions complètement différentes). En asynchrone, l’idée est d’exécuter tout ça dans le même thread mais en profitant des temps morts à droite à gauche pour essayer de gagner du temps. L’exemple typique est celui de l’écriture dans un fichier : c’est une opération qui bloque le thread d’exécution pendant longtemps pour rien : le processeur ne glande rien et attend juste le disque, donc autant lui faire exécuter d’autres choses en attendant que ça passe et récupérer l’information sur l’écriture du fichier un peu plus tard.

Bon, ça évidemment, c’est la jolie théorie qu’on décrit un peu partout. Mais concrètement comment ça se passe ?

Mettre des choses dans Async/Await, bah en fait, c’est pas si simple que ça…

Si je reprends l’exemple donné sur le site de tokio à part mettre des await partout, tu fais pas grand-chose d’asynchrone là-dedans. Et en fait, c’est vrai : basiquement, ça ne fait rien d’asynchrone. En fait, async/await en Rust peut effectivement commencer à exécuter des trucs en asynchrone sans qu’on lui demande mais à un moment, on est bien obligé de récupérer le résultat.

Donc si ton programme fait juste des await toutes les 3 lignes, bah en fait, tu gagnes à peu près rien…

Ça commence bien…

Allez, un exemple pour essayer

Comme tu peux éventuellement commencer à t’en douter, j’ai passé un bon moment à me gratter la tête avec ses histoires d’async, sans vraiment comprendre l’intérêt que ça avait (oui, on a déjà du parallèle si besoin donc bon…). Donc, voilà, je te propose un petit exemple qui permet de mettre en lumière certaines choses. Ça n’est pas une réponse universelles à tout, ça n’est peut-être même pas un bon exemple, mais disons que c’est là que je suis arrivé après plusieurs heures de recherche et de tatônnement (j’ai aussi pu constater que les ressources disponibles sur la toile sur le sujet était soit obsolètes, soit pas vraiment claires).

Bon admettons…

Donc, admettons qu’on ait 4 API en entrée (nous l’appellerons API entrante) qu’on cherche à interroger. Ces 4 API vont renvoyer un nombre arbitraire de nombres et on va les envoyer vers une autre API (que nous appellerons API sortante) qui va déterminer si les nombres sont pairs ou impairs (oui, je sais c’est complètement con, mais c’est pour les besoins de l’exercice). Pour bien se rendre compte du fonctionnement du truc, on va admettre que les API mettent énormément de temps à répondre (plusieurs secondes dans certains cas). L’idée est donc de voir si on peut optimiser et paralléliser.

Voici donc le code permettant de demandes des infos à l’API entrante :

async fn poll() -> Vec<u32> {
    let rand_waiter = rand::random::<u8>()/20; // on calcule un nombre aléatoire compris en 0 et 12

    println!("poll: Waiting {}s for poll response…", rand_waiter);

    sleep(Duration::from_secs(rand_waiter.into())).await; // on attend exactement ce nombre

    // on génère autant de nombre sur 32 bits qu’on a attendu de secondes
    let mut polling_res = vec![];

    for _ in 0..rand_waiter {
        let res = rand::random::<u32>();
        polling_res.push(res);
    }

    println!("poll: Finished polling!");

    polling_res
}

Évidemment, nous utilisons ici les primitives Duration et sleep provenant de tokio.

Notre API sortant va du coup ressembler à ça :

async fn treat(n: u32) -> bool {
    let rand_waiter = rand::random::<u8>()/10; // on calcule un nombre aléatoire entre 25

    println!("treat: Treating {}", n);
    println!("treat: Waiting {}s for treatment response…", rand_waiter);

    sleep(Duration::from_secs(rand_waiter.into())).await;

    println!("treat: Finished treating {}", n);
    n%2 == 0
}

Approche naïve

L’approche ultra de base consiste donc à mettre les 4 API entrantes dans un tableau, à les interroger une par une et ensuite pour chaque résultat reçu (tous les résultats arrivent en même temps en l’occurence, on attend simplement que chaque API entrante réponde l’ensemble de ses résultats), on les balance dans l’API sortante. On pourrait donc imaginer un code de ce type :

async fn main() {
    let apis = vec!["A","B","C","D"];

    for api in apis {
        let numbers = poll().await;

        for number in numbers {
            let res = treat(number).await;

            if res {
                println!("main: {} was a good one, {}!", res, api);
            } else {
                println!("main: nah {} no good, better luck next time, {}", res, api);
            }
        }
    }
}

Et quand on exécute, on constate effectivement que tout est parfaitement synchrone (et donc complètement con) :

poll: Waiting 3s for poll response…
poll: Finished polling!
treat: Treating 4097894384
treat: Waiting 7s for treatment response…
treat: Finished treating 4097894384
main: true was a good one, A!
treat: Treating 4056821172
treat: Waiting 14s for treatment response…
treat: Finished treating 4056821172
main: true was a good one, A!
[…]
poll: Waiting 2s for poll response…
poll: Finished polling!
treat: Treating 4156474264
treat: Waiting 20s for treatment response…
treat: Finished treating 4156474264
main: true was a good one, D!
treat: Treating 1467559148
treat: Waiting 13s for treatment response…
treat: Finished treating 1467559148
main: true was a good one, D!

Ça asynchrone donc pas des masses :/. En fait, le souci vient du fait qu’on attend le résultat de chaque opération. Pour la première opération, on ne peut pas vraiment faire autrement (toutes les réponses arrivent en même temps de la partie de l’API entrante). Par contre, pour le traitement, on pourrait imaginer de le faire en retirant le if derrière. Ça donnerait donc un truc de ce style :

for number in numbers {
	treat(number);
}

Et là, tu te rends compte de la principale particularité de l’implémentation d’async dans Rust : si tu ne lui demandes rien, un future (donc une fonction async ici) ne s’exécute pas. En gros, si tu l’attends pas, il fait rien ce con.

Bordel, mais comment on va résoudre ça ???

Il va falloir mettre notre traitement dans une autre fonction qui va se charger de le faire avancer pendant qu’on fait autre chose. On ne peut pas le coller juste dans un block async, il faut le coller dans une fonction qui va le faire avancer (en gros). Donc on peut utiliser tokio::spawn dans la librairie tokio pour ce faire :

tokio::spawn(async move {
	treat(number).await;
});

Là, ce n’est plus gênant de déclencher l’attente du résultat parce que finalement, le résultat est attendu dans une sorte de thread détaché du thread principal (mais pas vraiment un thread puisque c’est pas du parallèle…). Et donc, là, ça fonctionne nettement mieux :

poll: Waiting 7s for poll response…
poll: Finished polling!
poll: Waiting 1s for poll response…
treat: Treating 1725139708
treat: Waiting 14s for treatment response…
treat: Treating 2961643129
treat: Waiting 22s for treatment response…
treat: Treating 3490534924
treat: Waiting 7s for treatment response…
treat: Treating 481316326
treat: Waiting 23s for treatment response…
treat: Treating 3335438155
treat: Waiting 9s for treatment response…
treat: Treating 2314014323
treat: Waiting 14s for treatment response…
treat: Treating 4156517114
treat: Waiting 0s for treatment response…
treat: Finished treating 4156517114
poll: Finished polling!
poll: Waiting 10s for poll response…
[…]
poll: Finished polling!
treat: Treating 279666034
treat: Waiting 22s for treatment response…
treat: Treating 945860350
treat: Waiting 23s for treatment response…
treat: Treating 1248492005
treat: Waiting 24s for treatment response…

Oups… On traite donc bien en parallèle, mais le souci c’est qu’on attend pas vraiment que le traitement soit fini. En gros, non seulement il faut le lancer dans un morceau de code à part, mais à un moment, il faut se poser la question de comment on attend que le morceau de code en question ait fini d’exécuter. Bon, en plus, en l’occurence, on ne traite pas vraiment comme on le devrait. En fait, idéalement, il faudrait lancer la consultation de l’API entrante en parallèle aussi (parce que là, ce n’est pas fait), puis lancer le traitement de chaque résultat en parallèle, puis attendre que l’ensemble ait fini avant de sortir complètement.

craquement de doigts

Une première approche consiste donc à lancer en parallèle l’ensemble des traitement initiaux sur l’API entrante, de récupérer ses résultats et de les transmettre au thread principal pour qu’il puisse faire les traitements. De cette manière, ça ira un peu plus vite dans le sens où l’on peut déjà mettre en attente notre volontairement-super-lente API entrante. Pour cela, il va falloir insérer un composant de communication entre ses différents thread. Dans les faits, ça marche exactement de la même manière que les threads classiques de Rust avec des transmetteurs, un récepteur et des messages (dans le cas présent, il existe bien sûr d’autres types de messages et de mode de transmission). Si l’on veut pouvoir préciser à quelle API on parle, il va aussi falloir créer une nouvelle structure qui va permettre de transmettre le nom de l’API entrante en même temps que son résultat.

Ce dernier point est juste cosmétique, on pourrait très bien faire sans, mais je trouve ça plus zoli avec…

Voilà ce que ça va donner :

#[derive(Debug, Clone)]
struct ResAPI {
    api: String,
    id: u32,
}

#[tokio::main]
async fn main() {
    let apis = vec!["A","B","C","D"];

    let api_len = apis.len();

    // on crée un canal de communication
    let (tx, mut rx) = mpsc::channel(api_len);

    for api in apis {
        // qu’on clone pour chaque API entrante
        let tx = tx.clone();
        tokio::spawn(async move {
            let vu32 = poll().await;

            for v in vu32 {
                let ra = ResAPI {
                    api: api.to_string(),
                    id: v,
                };

                // on envoie le résultat formé
                tx.send(ra).await.unwrap();
            }
        });
    }

    // on détruit explicitement le canal de communication restant (celui qui n’a jamais été cloné
    // en fait)
    drop(tx);

    while let Some(m) = rx.recv().await {
        println!("main: Receiving {} from {}", m.id, &m.api);

        let res = treat(m.id).await;

        if res {
            println!("main: {} was a good one, {}!", res, &m.api);
        } else {
            println!("main: nah {} no good, better luck next time, {}", res, &m.api);
        }
    }
}

Quand on exécute, ça fonctionne effectivement bien mieux. On constate bien que les 4 API entrantes sont consultées en même temps. On constate également que la première API qui a terminé va déclencher la suite des opérations. Mais… mais on a toujours un souci : on attend systématiquement le traitement de l’API sortante et on perd du coup beaucoup de temps. Idéalement, il faudrait également rendre asynchrone le traitement des résultats.

Où l’on se rend compte que les grands principes de la science sont constants

Les mêmes causes produisant les mêmes effets, si l’on ne fait que spawn des traitements de l’API sortante, ça va effectivement paralléliser les traitements, mais on va en perdre au passage (tout traitement non terminé lorsque le thread principal meurt, mourra avec lui). On pourrait reprendre la même stratégie que précédemment mais d’abord, ce ne serait pas drôle et ensuite, on est quand même obligé de déterminer la capacité d’un canal de communication avant de l’ouvrir. Dans notre cas, ça pourrait fonctionner puisqu’on connait à l’avance le nombre maximum de résultalts, mais on pourrait imaginer des cas où l’on en est pas capable (ou en tout cas, pas suffisamment précisément).

Donc, on va essayer une autre stratégie : lancer l’ensemble des threads de traitement dans un coin, stocker leur représentation dans un vecteur et simplement attendre jusqu’à ce que l’ensemble du vecteur ait terminé. Malheureusement tokio ne propose pas une telle fonction, il va donc falloir aller taper dans une autre librairie futures qui elle nous offre join_all permettant de faire exactement ce que l’on veut. Voyons les modifs de code à partir de la boucle de réception :

    // on crée un vecteur permettant de stocker tout ce petit monde
    let mut tasks: Vec<_> = vec![];

    while let Some(m) = rx.recv().await {
        println!("main: Receiving {} from {}", m.id, &m.api);

        // on lance des tâches comme précédemment
        let task = tokio::spawn(async move {
            println!("main: Beginning treatment for {} from {}", m.id, &m.api);

            let t = treat(m.id).await;

            if t {
                println!("main: {}, that was a good one, {}!", m.id, &m.api);
            } else {
                println!("main: nah {}, better luck next time, {}!", m.id, &m.api);
            }
        });

        // sauf qu’on met le résultat de cette tâche (le JoinHandle en fait) dans un tableau
        tasks.push(task);
        println!("main: Finished receiving");
    }

    println!("main: Back to main thread and we’re waiting for everyone now…");

    // mainteant qu’on a tout reçu, on attend simplement que chaque thread se termine
    join_all(tasks).await;

Et le tour est joué ! Les traitement entrants sont bien asynchrones, les traitements sortants également, et le programme principal attend sagement que tout le monde ait terminé avant de lui-même s’éteindre bien gentiment.

Conclusage

Ce n’est certaiment pas le plus optimal et ce n’est peut-être pas un modèle à suivre. Je suis certain par exemple qu’une fonction comme join_all doit poser des problèmes dans certains cas, ou ne peut pas accepter plus d’un certain nombre de tâches, etc… Néanmoins, ça démontre bien un truc : faire de l’asynchrone, c’est bien joli, mais à moins d’avoir un cas assez spécifique ou de savoir ce que l’on fait, ça ne servira clairement pas à tout le monde et clairement pas tout le temps.

Du coup, Messieurs les développeurs, essayer de proposer une alternative bloquante à vos librairies : ça m’intéresse beaucoup d’avoir de l’asynchrone (vraiment) mais les cas où je vais réellement en avoir besoin et réellement m’en servir, ce ne sera clairement pas tous les jours, surtout vu la complexité du truc.

Ah et dernière chose : quand je dis bloquant, c’est vraiment bloquant. Oui, je te regarde reqwest::blocking qui en fait planque du tokio dedans…