Contribution Libre Elixir Développement 2019 | 05 | 15

Elixir amqp 1.2 et confirm handler

RabbitMq permet de recevoir des confirmations de ventilation des messages par l’exchange. Le client Elixir, amqp (> 1.2) permet d’attendre de manière bloquante ces confirmations, mais ne propose pas d’API pour enregistrer un handler. Ce qui m’a donné l’opportunité de faire cette contribution, qui fait maintenant partie de la version 1.2.

Cet article résume l’histoire de cette contribution à des fins didactiques.

Le contexte

La fonctionnalité de publisher confirm proposée par RabbitMq, permet en sacrifiant un peu de performances, d’obtenir une confirmation que le message publié à bien été accepté par les queues destinataires.

Imagine que tu publie un message dans le bus AMQP, ce que tu obtiens de manière synchrone lors d’un publish, c’est une confirmation que l’exchange à bien reçu le message. Cela ne garanti pas que le message est bien ventilé vers les queues enregistrées comme destinataires.

Une option serait d’utiliser les transactions du protocol AMQP 0-9-1, mais elle ont un énorme impacte sur le débit de messages.

L’autre option est d’utiliser une extension du protocole proposée par RabbitMq: publisher confirm. Cette extension te permet de d’activer le mode confirmation pour un channel.

Ce qui a pour effet de reproduire le sémantique de l’acquittement de message, au niveau de l’exchange. Quand celui-ci parvient a router un message, il retourne un message d’acquittement.

Le paquet Hex amqp < 1.2 expose une fonction AMQP.Confirm.select/1 qui permet de d’activer les confirmations sur le channel. Et une suite de fonctions wait_for_confirms*(/1|/2) qui permettent d’attendre de manière synchrone la confirmation des derniers messages envoyés.

Dans mon système je conserve un état des messages à envoyer, et des messages envoyés avec succès. Mais je ne souhaite pas bloquer l’acteur en charge de publier les messages pour valider la confirmation de l’envoi de ceux-ci. J’aimerais plutôt exploiter au mieux l’outillage en déléguant la confirmation de l’envoi à un autre acteur.

En me documentant sur le fonctionnement du publisher confirm et des recommandations pour RabbitMq, et en lisant le code source d’autres clients, je me suis rendu compte que plusieurs clients implémentaient un fonction register_confirm_handler et next_publish_seqno.

Ces fonctions permettent respectivement de déclarer un handler pour traiter les confirmations et d’obtenir l’identifiant du message publié pour un channel.

C’est justement ce dont j’avais besoin.

Le client Elixir s’appuyant sur le client Erlang, qui lui implémement ces fonctions, il n’y avait plus qu’à se pencher pour récolter, permettant de fermer quelque issues au passage.

Le développement

Je vais ré-exposer les fonctions proposée par Erlang, dans la continuité de la codebase actuelle. Habituellement je consulte donc avec la plus grande précaution les consignes de contribution. Mais dans le cas présent elles brillaient par leur absence.

Dans ce cas, je m’imprègne des pratiques des mainteneurs précédents en explorant le code source.

Je trouve que c’est une étape indispensable, open source, ou pas. Cela contribue à rendre le code homogène et compréhensible.

Ensuite j’écris un test simple. Je n’argumente plus sur la nécéssité d’écrire les tests en premier.

Dans un premier temps le setup.

Il me faut :

Puisque toute les opérations se font au niveau du channel, je l’ajoute au contexte qui sera passé en argument du test.

// test/confirm.exs

defmodule ConfirmTest do
  use ExUnit.Case

  alias AMQP.Connection
  alias AMQP.Channel
  alias AMQP.Confirm

  setup do
    {:ok, conn} = Connection.open
    {:ok, chan} = Channel.open(conn)
    :ok = Confirm.select(chan)
    on_exit fn ->
      :ok = Connection.close(conn)
    end
    {:ok, chan: chan}
  end
  
  // ...

Ensuite le premier test, simple, récupérer le numéro de séquence du message.

En consultant la documentation de RabbitMq et du client Erlang, je comprend que le premier numéro de séquence d’un channel vide est 1 et que la fonction Erlang attend l’identifiant du process channel en argument.

Ensuite je décide du nom et module de la fonction, en m’inspirant des pratiques du code source actuel, qui est de reprendre celui de la fonction Erlang.

Voci le test:

  describe "next_publish_seqno" do
    test "returns 1 whe no messages where sent", ctx do
      assert 1 == Confirm.next_publish_seqno(ctx[:chan])
    end
  end

Il ne reste plus qu’a le lancer pour le regarder échouer, puis écrire le code pour le faire passer.

Dans le module AMQP.Confirm, j’ajoute une fonction qui:

# lib/amqp/confirm.ex

  @doc """
  On channel with confirm activated, return the next message sequence number.
  To use in combination with `register_handler/2`
  """
  @spec next_publish_seqno(Channel.t) :: non_neg_integer
  def next_publish_seqno(%Channel{pid: pid}) do
    :amqp_channel.next_publish_seqno(pid**
  end

Ensuite je regarde le test passer, easy :) Deux points très importants ici:

L’étape suivante c’est d’ajouter la fonction qui permet d’enregistrer un handler. Tous ce que qui a été dit avant reste vrai (tests, documentation, conventions …).

La fonction Erlang register_confirm_handler attend en argument les identifiants du channel et du handler et retourne un ok. La documentation précise également que le handler recevra une structure en guise de message #basic.ack{} ou #basic.nack{}.

Ce ne sont pas des structures valides pour Elixir, ce sont des Erlang Records. Comme la sémantique de confirmation est la même que celle de l’acquittement de messages par le consommateur, la transposition des Records Erlang vers Elixir existe déjà.

En explorant le fichier core.ex de la bibliothèque j’apprends que le process recevra un message :basic_ack ou :basic_nack. Il sera accompagné d’un ou plusieurs numéros de séquence. Si il y en a plusieurs, le troisième argument du message vaudra true.

Cette fois-ci le test consiste à:

  describe "register_handler" do
    test "handler receive confirm with message seqno", ctx do
      seq_no = Confirm.next_publish_seqno(ctx[:chan])
      :ok = AMQP.Basic.publish(ctx[:chan], "", "", "foo")
      :ok = Confirm.register_handler(ctx[:chan], self())
      assert_receive {:basic_ack, seq_no, false}
      :ok = Confirm.unregister_handler(ctx[:chan])
    end
  end

Faire échouer les tests et passer à l’implémentation.

La fonction register_handler prend en argument le channel et l’identifiant du handler.

Elle spawn un process qui va servir d’adapter entre les messages de Erlang vers Elixir. Ce process est supervisé ainsi que celui du handler pour réagir à la mort des process enfants avec un message {:DOWN}.

Puis appelle la fonction privée handle_confirm qui attend la réception d’un message Erlang et se charge de le transposer et de l’envoyer au handler. Pour finir elle utilise la récursion pour rester à l’écoute.

  @doc """
  Register a handler for confirms on channel.
  The handler will receive either:
  * `{:basic_ack, seqno, multiple}`
  * `{:basic_nack, seqno, multiple}`
  The `seqno` (delivery_tag) is an integer, the sequence number of the message.
  `multiple` is a boolean, when `true` means multiple messages confirm, upto `seqno`.
  see https://www.rabbitmq.com/confirms.html
  """
  @spec register_handler(Channel.t, pid) :: :ok
  def register_handler(%Channel{pid: pid}, handler_pid) do
    adapter_pid = spawn fn ->
      Process.flag(:trap_exit, true)
      Process.monitor(handler_pid)
      Process.monitor(pid)
      handle_confirm(handler_pid)
    end
    :amqp_channel.register_confirm_handler(pid, adapter_pid)
  end
  
   defp handle_confirm(handler_pid) do
    receive do
      basic_ack(delivery_tag: delivery_tag, multiple: multiple) ->
        send(handler_pid, {:basic_ack, delivery_tag, multiple})
        handle_confirm(handler_pid)

      basic_nack(delivery_tag: delivery_tag, multiple: multiple) ->
        send(handler_pid, {:basic_nack, delivery_tag, multiple})
        handle_confirm(handler_pid)

      {:DOWN, _ref, :process, _pid, reason} ->
          exit(reason)
    end
  end

Les tests passent, il ne reste qu’à ajouter la fonction unregister_hander/1. Je ne la commenterais pas, car tu n’y apprendra rien de plus ce qui a été dit plus haut.

Tu trouvera la commit dans son intégralité sur github.

Pour finir

La PR a été intégrée et fait partie des features de la version 1.2.

Le fait d’avoir adopté les conventions des mainteneurs du projet leur ont permis d’enchaîner un refactoring sans en souffrir.

Et finalement je peux traiter les confirmations de manière asynchrone sans avoir à utiliser l’API du client Erlang dans mon application.