Résoudre les problèmes des “futures” en C++
Avec les processeurs modernes à grand nombre de cœurs, il est souhaitable d’accélérer les programmes grâce à l’utilisation d’algorithmes parallèles. Pour y parvenir, l’asynchronisation et les valeurs futures ne sont pas une solution fiable.
De nombreux langages de programmation modernes vous permettent de parvenir à cette accélération grâce au code asynchrone et aux valeurs futures. Le principe de base de l’asynchronisation et des valeurs futures est que les fonctions appelées sont exécutées sur un autre fil, et les valeurs de retour sont converties en ce que l’on appelle une valeur future. De telles valeurs futures ne possèdent pas de valeur réelle tant que la fonction asynchrone n’est pas terminée. La fonction s’exécute simultanément et, lorsqu’elle finit par retourner une valeur, la variable de la valeur future est mise à jour en arrière-plan pour conserver cette valeur de retour. Il n’est pas nécessaire d’utiliser des mutex ou un système de messages inter-fil (“interthread”) explicite : toute la synchronisation entre l’appel initial et le deuxième fil d’exécution est effectuée en arrière-plan. Lorsque le fil initial accède à la valeur future, il est automatiquement mis en pause jusqu’à ce que la valeur soit prête.
Le principal avantage de ce système est qu’il est très facile d’avoir un fonctionnement asynchrone. Bien sûr, le programmeur doit s’assurer que la fonction peut réellement être exécutée dans un autre fil en toute sécurité, qu’il n’y a pas de course aux données avec d’autres fils d’exécution. Les fonctions asynchrones et les valeurs futures ne font que créer des fils d’exécution pour générer un résultat.
Les problèmes écartés
Mon but ici n’est pas de discuter de la manière de concevoir des algorithmes sans course entre fils, ni de la manière d’arranger des données pour faciliter l’exécution d’algorithmes multi-fils. Je me contenterai de mentionner qu’un moyen possible d’y parvenir consiste à éviter toute valeur globale et à transmettre toutes les données à la fonction asynchrone par valeur. De cette façon, rien n’est partagé entre les fils et donc aucune concurrence ne peut se produire.
Les problèmes abordés
Si l’asynchronisme et les valeurs futures permettent de transformer facilement une fonction en un fil conducteur, c’est cette simplicité même qui pose problème. Elle signifie une absence totale de contrôle. Plus précisément, vous n’avez aucun contrôle sur :
- combien de fonctions asynchrones sont exécutées,
- combien de fils sont créés pour exécuter ces fonctions,
- combien de fils attendent des résultats.
Cela nécessite un équilibre délicat entre le désir de maximisation de l’utilisation du processeur et le maintien d’un certain contrôle. D’une part, vous voulez qu’un maximum de fonctions asynchrones soit exécuté, afin que le processeur soit toujours entièrement occupé ; d’autre part, vous ne voulez pas arriver à une surcharge du processeur avec trop de fils.
La solution
La meilleure solution à ce problème est d’introduire une certaine complexité. La complexité ajoutée vous permet de reprendre le contrôle de tous les éléments énumérés ci-dessus.
Première étape : le pool de fils
La première étape consiste à renoncer aux fonctions asynchrones et aux valeurs futures pour maximiser l’utilisation du processeur. Ceux-ci peuvent toujours être utilisés pour exécuter des algorithmes parallèles, mais pas pour créer des fils multiples. Il est préférable d’utiliser un pool de fils (“thread pool”).
Un pool de fils vous permet de contrôler le nombre de fils créés pour exécuter des algorithmes parallèles parallèles. Vous pouvez créer exactement autant de fils qu’il y a de cœurs dans le processeur, ce qui garantit un débit maximal exact sans surcharge du processeur.
Deuxième étape : la file d’attente des travaux
Si le pool de fils contrôle le nombre de fils utilisés, il ne contrôle pas la manière dont les fonctions sont gérées par ces fils. C’est le travail de la file d’attente des travaux. Les fonctions à exécuter de manière asynchrone sont ajoutées à la file, et le pool prend des fonctions de cette file pour les exécuter et produire des résultats.
Troisième étape : les résultats
Alors que la file d’attente des travaux s’occupe de l’entrée des algorithmes parallèles, nous avons besoin d’une autre fonction pour gérer l’attente des résultats. Alors qu’on aurait pu utiliser une file d’attente de résultats, nous avons une meilleure option : les valeurs futures ! La synchronisation entre le producteur d’un résultat et son consommateur est exactement l’utilité des valeurs futures. La principale différence par rapport au problème initial est que les résultats sont ici créés par le pool de fils.
Quatrième étape : le vol de fils
Un problème est alors de savoir ce qui se passe si l’algorithme parallèle soumet des sous-algorithmes à la file d’attente des travaux et attend leurs résultats. Nous pourrions nous retrouver à court de fils ! Chaque fil pourrait attendre que les résultats soient produits alors qu’aucun fil n’est disponible pour produire ces résultats.
La solution à ce problème est le concept de vol de fil en attente d’un résultat. Essentiellement, vous fournissez une fonction qui peut travailler sur la file d’attente des travaux tant qu’un résultat précis n’est pas disponible. Nous évitons donc d’accéder directement aux valeurs futures produites, car accéder à une valeur future non disponible bloquerait le fil. Au lieu de cela, nous passons la valeur future à la file d’attente, qui peut alors exécuter des travaux en attendant que la valeur soit prête.
Exemple de code concret
J’ai mis en place un tel système à plusieurs reprises dans le passé. Je l’ai récemment réimplémenté dans une application open-source, écrite en C++. Cette application s’appelle Tantrix Solver et comme son nome l’indique, elle solutionne les puzzles Tantrix. Le code de l’application est disponible sur GitHub et contient plusieurs branches Git :
- Une branche utilise des fonctions asynchrones et les valeurs futures.
- Une autre branche montre le même algorithme en utilisant le design suggéré.
Le repo sur GitHub est disponible ici.
Version asynchrone avec valeurs futures
La branche Git contenant la version asynchrone avec valeurs futures est appelée “thread-by-futures”.
Le design du code dans cette branche est simple. C’est son principal avantage. Il utilise la fonction C++ std::async
avec le mode std::launch::async
pour créer des fils. Cependant, les problèmes que nous avons mentionnés se manifestent comme prévu, avec un nombre incontrôlé de fils. Un puzzle Tantrix simple peut engendrer la création de quelques dizaines de fils, ce qui est probablement trop, mais reste encore gérable. En revanche, un puzzle Tantrix complexe peut créer plusieurs centaines de fils, ce qui peut saturer la plupart des ordinateurs.
Pool de fils et file d’attente des travaux
La branche Git contenant la version avec pool de fils et file d’attente est appelée “thread-pool”. Je vais décrire plus en détail la conception du code, car elle est plus complexe, bien que j’aie essayé de la garder aussi simple que possible.
Conception du code : les parties faciles
Dans cette section, je présenterai les éléments les plus simples du design.
Le premier élément du design est la classe du pool de fils. Il suffit de lui donner un “fournisseur de travail” et le nombre de fils à créer :
// A pool of threads of execution.
struct thread_pool_t
{
// Create a thread pool with a given number of threads
// that will take its work from the given work provider.
thread_pool_t(work_provider_t& a_work_provider, size_t a_thread_count = 0);
// Wait for all threads to end.
~thread_pool_t();
private:
// The internal function that execute queued functions in a loop.
static void execution_loop(thread_pool_t* self);
};
Le fournisseur de travail (“work provider”) indique aux fils ce qu’ils doivent faire. Il contrôle l’arrêt et l’exécution des algorithmes. Il dispose d’une fonction d’attente et d’exécution qui encapsule entièrement l’exécution d’un travail ou l’attente d’une fonction à exécuter. Les détails de la cette fonction seront présentés ultérieurement, dans son implémentation concrète. Pour l’instant, voici la conception du fournisseur :
// The provider of work for the pool.
struct work_provider_t
{
// Request that the threads stop.
virtual void stop() = 0;
// Check if stop was requested.
virtual bool is_stopped() const = 0;
// The wait-or-execute implementation, called in a loop
// by the threads in the thread =s pool.
virtual void wait_or_execute() = 0;
};
Ces deux classes précédentes sont cachées dans la file d’attente de travail. Pour cette raison, elles peuvent en fait être totalement ignorées par les utilisateurs de la conception. C’est pourquoi nous ne les aborderons pas plus avant.
Conception du code : les parties intermédiaires
La queue de travail est la pièce la plus complexe. Sa mise en œuvre est un template C++ pour faciliter l’utilisation d’un algorithme donné qui produit un type spécifique de résultats.
Comme il s’agit de la partie centrale du design, je la présenterai dans son intégralité. Je diviserai la présentation de la classe en plusieurs parties pour la rendre plus facile à comprendre.
La première partie de la conception est constituée des paramètres du template C++ :
template <class WORK_ITEM, class RESULT>
struct threaded_work_t : work_provider_t
{
using result_t = typename RESULT;
using work_item_t = typename WORK_ITEM;
using function_t = typename std::function<result_t(work_item_t, size_t)>;
Le work_item_t
(WORK_ITEM) est la donnée d’entrée de l’algorithme. Le result_t
(RESULT) est la sortie de l’algorithme. La function_t
est l’algorithme. En utilisant une fonction, nous pouvons prendre en charge une famille d’algorithmes ayant les mêmes types en entrée et en sortie. Lorsqu’un work_item_t
est ajouté, l’appelant fournit également la fonction à appliquer.
La deuxième partie de la conception de la file d’attente des travaux est l’ensemble des types de données de mise en œuvre et les variables membres. Les voici :
using task_t = std::packaged_task<result_t(work_item_t, size_t)>;
// How the function, work item and recursion depth is kept internally.
struct work_t
{
task_t task;
work_item_t item;
};
std::mutex my_mutex;
std::condition_variable my_cond;
std::atomic<bool> my_stop = false;
std::vector<work_t> my_work_items;
const size_t my_max_recursion;
// Note: the thread pool must be the last variable so that it gets
// destroyed first while the mutex, etc are still valid.
thread_pool_t my_thread_pool;
Le type task_t
contient la fonction dans un type C++ spécial qui peut l’appeler tout en produisant un std::future
en C++. C’est ainsi que les valeurs futures sont créées. Le work_t
est l’unité de travail qui peut être exécutée par un fil.
Les deux premières variables membres de la file d’attente sont le mutex et la variable de condition qui servent toutes deux à protéger les données partagées entre les fils d’exécution et le fil principal.
La variable atomique my_stop
est utilisée pour signaler que toute exécution doit s’arrêter (quelle surprise !). Le vecteur de work_t
contient les unités de travail à exécuter. Il s’agit donc de la file d’attente elle-même. La variable max_recursion
est un détail de l’implémentation utilisé pour éviter la récursion trop profonde due au vol de fils. Ce sera expliqué plus en détail plus tard. Le thread_pool
est évidemment l’endroit où les fils d’exécution sont gardés.
La troisième partie du design est la création de la file d’attente des travaux et l’implémentation de l’interface work_provider_t
. Tout cela est relativement simple. Nous créons le pool de fils internes avec le nombre exact de cœurs du processeur. Nous faisons également passer la file d’attente de travail elle-même en tant que “fournisseur de travail” (work_provider_t
) du pool de fils.
// Create a threaded work using the given thread pool.
threaded_work_t(size_t a_max_recursion = 3)
: my_max_recursion(a_max_recursion), my_thread_pool(*this, std::thread::hardware_concurrency()) {}
~threaded_work_t() { stop(); }
// Stop all waiters.
void stop() override
{
my_stop = true;
my_cond.notify_all();
}
// Check if it is stopped.
bool is_stopped() const override { return my_stop; }
// Wait for something to execute or execute something already in queue.
void wait_or_execute() override
{
std::unique_lock lock(my_mutex);
return internal_wait_or_execute(lock, 0);
}
La mise en œuvre des fonctions de destruction et d’arrêt (stop
) utilise simplement le drapeau d’arrêt (my_stop
) et la variable de condition pour signaler l’arrêt à tous les fils d’exécution. La fonction wait_or_execute
appelle simplement une autre fonction interne, plus complexe, qui sera décrite dans la section suivante.
Conception du code : les parties complexes
Dans cette section, nous arrivons enfin au cœur du design, les détails les plus complexes du code.
Tout d’abord, la fonction d’attente d’un résultat donné. Cette partie est encore facile à comprendre. Tant que la valeur future n’est pas prête, nous continuons à attendre l’arrivée de nouveaux résultats ou de nouveaux travaux à exécuter. C’est ici que nous travaillons pour d’autres algorithmes en file d’attente au lieu de dormir bêtement et de perdre un fil d’exécution. Si nous recevons le signal de tout arrêter, nous sortons rapidement avec un résultat vide.
// Wait for a particular result, execute work while waiting.
result_t wait_for(std::future<result_t>& a_token, size_t a_recusion_depth)
{
while (!is_stopped())
{
std::unique_lock lock(my_mutex);
if (a_token.wait_for(std::chrono::seconds(0)) == std::future_status::ready)
return a_token.get();
internal_wait_or_execute(lock, a_recusion_depth);
}
return {};
}
Deuxièmement, la fonction qui exécute réellement l’unité de travail. Elle attend quand il n’y a rien à exécuter. En revanche, lorsqu’il y a au moins une unité de travail en file d’attente, elle exécute son algorithme, qui produira un nouveau résultat.
private:
// Wait for something to execute or execute something already in queue.
void internal_wait_or_execute(std::unique_lock<std::mutex>& a_lock, size_t a_recursion_depth)
{
if (my_stop)
return;
if (my_work_items.size() <= 0)
{
my_cond.wait(a_lock);
return;
}
work_t work = std::move(my_work_items.back());
my_work_items.pop_back();
a_lock.unlock();
work.task(work.item, a_recursion_depth + 1);
my_cond.notify_all();
}
La seule chose subtile à observer, c’est que si la fonction était en attente et est réveillée, alors elle retourne immédiatement au lieu d’essayer d’exécuter un travail quelconque. Il y a une bonne raison pour un retour immédiat : le réveil peut être dû à un résultat devenant disponible ou à une unité de travail ajoutée. Comme nous ne savons pas de quel cas il s’agit et comme l’appelant pourrait être intéressé par ces nouveaux résultats, nous revenons à l’appelant pour qu’il puisse vérifier. Peut-être que la valeur future qu’il attendait est prête !
Enfin, voici la fonction de soumission des travaux à exécuter :
// Queue the the given function and work item to be executed in a thread.
std::future<result_t> add_work(work_item_t a_work_item, size_t a_recusion_depth, function_t a_function)
{
if (my_stop)
return {};
// Only queue the work item if we've recursed into the threaded work only a few times.
// Otherwise, we can end-up with too-deep stack recursion and crash.
if (a_recusion_depth < my_max_recursion)
{
// Shallow: queue the function to be called by any thread.
work_t work;
work.task = std::move(task_t(a_function));
work.item = std::move(a_work_item);
auto result = work.task.get_future();
{
std::unique_lock lock(my_mutex);
my_work_items.emplace_back(std::move(work));
}
my_cond.notify_all();
return result;
}
else
{
// Too deep: call the function directly instead.
std::promise<result_t> result;
result.set_value(a_function(a_work_item, a_recusion_depth + 1));
return result.get_future();
}
}
La principale chose inattendue à remarquer est la vérification de la profondeur de récursivité. Le problème subtil que ceci vise à éviter est lié à la mise en œuvre des fonctions wait_for()
et wait_or_execute()
. Comme l’attente peut provoquer l’exécution d’une autre unité de travail et cette unité de travail pourrait à son tour finir par attendre et exécuter une autre unité de travail, cela pourrait faire boule de neige et devenir une récursion très profonde.
Malheureusement, nous ne pouvons pas refuser d’exécuter un travail, car cela pourrait signifier que tous les fils cesseraient d’exécuter du travail. Le système cesserait de travailler et s’arrêterait ! Ainsi, au lieu de cela, lorsque le maximum de profondeur est atteint à l’intérieur d’un fils d’exécution, tout travail supplémentaire mis en file d’attente par ce fils est exécuté immédiatement.
Bien que cela semble équivalent à mettre en file d’attente le travail à accomplir, ce n’est pas le cas. Comme vous voyez, la quantité de travail nécessaire pour évaluer une branche d’un algorithme est limitée. En contraste, le nombre d’unités de travail qui peuvent être mises en attente en raison de toutes les branches des algorithmes peut être extrêmement grand. En effet, on peut supposer que l’algorithme a été conçu de manière à ce qu’une branche ne cause pas de récursion excessive. Nous ne pouvons pas supposer la même chose sur le total de tous les travaux mis en attente par plusieurs branches indépendantes de l’algorithme.
Pour cette raison, il est également judicieux de vérifier la profondeur de récurrence dans l’algorithme lui-même et ne pas même mettre en file d’attente ces éléments de travail, une fois la profondeur maximale de récursion atteinte. Il devrait plutôt appeler leur fonction directement dans l’algorithme, plutôt que passer par la file d’attente, pour rendre le tout plus efficace.
En dehors de cette subtilité, le reste du code ne fait que mettre en file d’attente l’unité de travail et réveiller tout fil qui voulait exécuter un travail.
Conclusion
Comme on le voit, cette mise en place d’une file d’attente de travail remplace les fonctions asynchrones et les valeurs futures par une réserve de fils. Pour l’appelant, il suffit de connaître deux fonctions : add_work()
et wait_for()
. L’interface est donc assez simple à utiliser, mais il donne qu’un contrôle supplémentaire sur le multithreading pour éviter de surcharger le processeur.
J’espère qu’un jour, le standard C++ sera doté d’un design intégré pour les files d’attente et les pools de fils, afin de ne pas avoir à les faire à la main comme ici. D’ici là, n’hésitez pas à réutiliser mon design.