Logo Spiria

Emballé pour le multithreading

8 octobre 2020.
Photo John K Thorne, dp.

L’un des défis de l’écriture de code multithread est qu’il est souvent nécessaire de partager des données qui n’ont pas été conçues pour être utilisées par plusieurs threads en même temps. Dans cet article de blogue, je veux montrer deux approches pour protéger ces données : la duplication et l’emballage (“wrapping”).

La duplication

Cette première approche est la plus simple. Il suffit de dupliquer les données pour chaque thread d’exécution. Pour que cela fonctionne, les données doivent répondre à quelques critères :

  • être faciles à identifier,
  • ne pas avoir de parties cachées,
  • être faciles à reproduire,
  • ne pas avoir d’exigences essentielles pour être partagées en continu.

Si les données répondent à tous ces critères, la duplication est alors la solution la plus rapide et la plus sûre. Habituellement, les données qui peuvent être utilisées de cette manière sont essentiellement un groupe de valeurs, comme une structure pure en C++, contenant des valeurs simples.

L’emballage

Si vos données ne répondent pas au critère de duplication, l’approche de l’emballage des données peut être utilisée. Un cas courant est celui où on a une interface qui devrait être partagée entre plusieurs threads d’exécution. Voici les étapes de la création d’un emballage :

  • Identifier l’interface qui doit être isolée.
  • Écrire un simple protecteur multithread sur l’interface.
  • Écrire une implémentation simple de l’interface pour chaque thread.

Pour illustrer la technique, je vais vous montrer un exemple d’emballage que j’ai récemment fait en C++. Le code fait partie de l’application Tantrix Solver que j’ai écrite. L’élément particulier que je devais convertir pour une utilisation multithread était l’interface du rapport d’avancement.

Le code pour cette application est disponible sur GitHub.

Identifier l’interface

La première étape consiste à identifier ce qui sera utilisé par les threads. Cela peut nécessiter un certain remaniement dans le cas d’un groupe disparate d’éléments sans cohésion. Dans l’exemple de code, il s’agissait déjà d’une interface appelée progress_t. À noter qu’il n’y a qu’une seule fonction virtuelle qui doit être protégée : update_progress().

   // Report progress of work.
   //
   // Not thread safe. Wrap in a multi_thread_progress_t if needed.

   struct progress_t
   {
      // Create a progress reporter.
      progress_t() = default;

      // Force to report the progress tally.
      void flush_progress();

      // Clear the progress.
      void clear_progress();

      // Update the progress with an additional count.
      void progress(size_t a_done_count);

      size_t total_count_so_far() const;

   protected:
      // Update the total progress so far to the actual implementation.
      virtual void update_progress(size_t a_total_count_so_far) = 0;
   };

Protecteur multithread

La deuxième étape consiste à créer un protecteur multithread. La conception de tous les protecteurs est toujours la même :

  • Ne pas dériver de l’interface à protéger.
  • Conserver l’implémentation originale de l’interface non protégée.
  • Fournir une protection multithread, généralement avec un mutex.
  • Fournir un accès interne à l’implémentation par thread.

La raison de ne pas implémenter l’interface souhaitée est que le protecteur multithread n’est pas destiné à être utilisé directement. En le rendant non compatible, il ne peut pas être utilisé accidentellement.

Sa mise en œuvre imitera toujours très fidèlement l’interface. La différence est que chaque fonction correspondante verrouillera le mutex et appellera l’interface originale, non sécurisée pour les threads. C’est ainsi qu’il est protégé contre le multithread.

Voici un exemple pour l’interface progress_t :

    // Wrap a non-thread-safe progress in a multi-thread-safe progress.
    //
    // The progress can only be reported by a per-thread-progress referencing
    // this multi-thread progress.

    struct multi_thread_progress_t
    {
      // Wrap a non-threas safe progress.
      multi_thread_progress_t() = default;
      multi_thread_progress_t(progress_t& a_non_thread_safe_progress)
         : my_non_thread_safe_progress(&a_non_thread_safe_progress), my_report_every(a_non_thread_safe_progress.my_report_every) {}

      // Report the final progress tally when destroyed.
      ~multi_thread_progress_t();

      // Force to report the progress tally.
      void flush_progress() { report_to_non_thread_safe_progress(my_total_count_so_far); }

      // Clear the progress.
      void clear_progress() { my_total_count_so_far = 0; }

    protected:
      // Receive progress from a per-thread progress. (see below)
      void update_progress_from_thread(size_t a_count_from_thread);

      // Propagate the progress to the non-thread-safe progress.
      void report_to_non_thread_safe_progress(size_t a_count);

    private:
      progress_t*          my_non_thread_safe_progress = nullptr;
      size_t               my_report_every = 100 * 1000;
      std::atomic  my_total_count_so_far = 0;
      std::mutex           my_mutex;

      friend struct per_thread_progress_t;
    };

Les fonctions importantes sont : update_progress_from_thread() et report_to_non_thread_safe_progress(). La première reçoit les progrès de chaque thread, dont le code sera présenté plus tard. La fonction accumule le total dans une variable multithread-safe et ne l’envoie que lorsque ce total franchit un seuil donné. La deuxième fonction envoie les progrès réalisés à la version originale de l’interface, sous la protection d’un mutex. Voici le code pour les deux fonctions :

    void multi_thread_progress_t::update_progress_from_thread(size_t a_count_from_thread)
    {
      if (!my_non_thread_safe_progress)
         return;

      const size_t pre_count = my_total_count_so_far.fetch_add(a_count_from_thread);
      const size_t post_count = pre_count + a_count_from_thread;

      if ((pre_count / my_report_every) != (post_count / my_report_every))
      {
         report_to_non_thread_safe_progress(post_count);
      }
    }

    void multi_thread_progress_t::report_to_non_thread_safe_progress(size_t a_count)
    {
      std::lock_guard lock(my_mutex);
      my_non_thread_safe_progress->update_progress(a_count);
    }

Code pour chaque thread

La dernière partie du modèle est le code de l’interface originale pour chaque thread. Dans ce cas, nous voulons dériver de l’interface. Cela permettra de remplacer la version originale de l’interface qui ne supportait pas le multithreading ! Ce code par thread est destiné à être utilisé par un seul thread. La protection multithread se fait dans le protecteur multithread que nous avons montré auparavant.

Cette division du travail entre le protecteur et les parties par thread simplifie grandement le raisonnement sur le code et simplifie le code lui-même.

Voici le code de progress_t par thread de notre exemple :

    // Report the progress of work from one thread to a multi-thread progress.
    //
    // Create one instance in each thread. It caches the thread progress and
    // only report from time to time to the multi-thread progress to avoid
    // accessing the shared atomic variable too often.

    struct per_thread_progress_t : progress_t
    {
      // Create a per-thread progress that report to the given multi-thread progress.
      per_thread_progress_t() = default;
      per_thread_progress_t(multi_thread_progress_t& a_mt_progress)
         : progress_t(a_mt_progress.my_report_every / 10), my_mt_progress(&a_mt_progress) {}

      per_thread_progress_t(const per_thread_progress_t& an_other)
         : progress_t(an_other), my_mt_progress(an_other.my_mt_progress) { clear_progress(); }

      per_thread_progress_t& operator=(const per_thread_progress_t& an_other)
      {
         progress_t::operator=(an_other);
         // Avoid copying the per-thread progress accumulated.
         clear_progress();
         return *this;
      }

      // Report the final progress tally when destroyed.
      ~per_thread_progress_t();

    protected:
      // Propagate the progress to the multi-thread progress.
      void update_progress(size_t a_total_count_so_far) override
      {
         if (!my_mt_progress)
            return;

         my_mt_progress->update_progress_from_thread(a_total_count_so_far);
         clear_progress();
      }

    private:
      multi_thread_progress_t*   my_mt_progress = nullptr;
    };

Conclusion

J’ai utilisé ce design pour résoudre plusieurs fois des problèmes multithread. Cela m’a toujours été utile. N’hésitez pas à réutiliser ce design là où vous en avez besoin !

L’exemple particulier utilisé ici se trouve dans la bibliothèque “utility” du projet Tantrix Solver disponible sur GitHub.