Les messages en rafale sont le cauchemar de nombreuses applications de streaming. Ils sont comme cet ami qui débarque à l'improviste avec 50 personnes pour dîner. Vous n'êtes pas préparé, vous êtes submergé, et vous ne passez certainement pas un bon moment.

Entrez Kafka Streams et Quarkus

Alors, pourquoi choisir Kafka Streams et Quarkus pour cette tâche herculéenne ? C'est comme demander pourquoi vous choisiriez une Ferrari pour une course. Kafka Streams est conçu pour le traitement d'événements à haut débit, tandis que Quarkus apporte ses pouvoirs Java supersoniques et subatomiques.

  • Kafka Streams : Distribué, évolutif et tolérant aux pannes. Parfait pour gérer des flux massifs de données.
  • Quarkus : Léger, démarrage rapide et faible empreinte mémoire. Idéal pour les environnements cloud-native.

Ensemble, ils sont le Batman et Robin du traitement des messages en rafale. Voyons comment nous pouvons exploiter leurs pouvoirs.

Concevoir pour la Rafale

Avant de plonger dans le code, comprenons comment Kafka Streams traite les données. Tout est question de topologie, mon ami !


StreamsBuilder builder = new StreamsBuilder();
KStream inputStream = builder.stream("input-topic");

KStream processedStream = inputStream
    .filter((key, value) -> value != null)
    .mapValues(value -> value.toUpperCase());

processedStream.to("output-topic");

Topology topology = builder.build();

Cette simple topologie lit à partir d'un sujet d'entrée, filtre les valeurs nulles, convertit les messages en majuscules et écrit dans un sujet de sortie. Mais comment la rendre résistante aux rafales ?

Univers Parallèle : Configurer la Concurrence

La clé pour gérer les messages en rafale est le parallélisme. Ajustons notre configuration Quarkus pour libérer toute la puissance de Kafka Streams :


# application.properties
kafka-streams.num.stream.threads=4
kafka-streams.max.poll.records=500
quarkus.kafka-streams.topics=input-topic,output-topic

Voici ce qui se passe :

  • num.stream.threads : Nous demandons à Kafka Streams d'utiliser 4 threads pour le traitement. Ajustez cela en fonction de vos cœurs CPU.
  • max.poll.records : Cela limite le nombre d'enregistrements traités en un seul cycle de sondage, empêchant notre application de mordre plus qu'elle ne peut mâcher.

Débordement de Buffer : Gérer le Flux de Données

Lorsqu'il s'agit de messages en rafale, le buffering est votre meilleur ami. C'est comme avoir une salle d'attente pour vos messages. Configurons quelques propriétés liées au buffer :


kafka-streams.buffer.memory=67108864
kafka-streams.batch.size=16384
kafka-streams.linger.ms=100

Ces paramètres aident à gérer le flux de données :

  • buffer.memory : Nombre total d'octets de mémoire que le producteur peut utiliser pour mettre en tampon les enregistrements.
  • batch.size : Taille maximale d'une requête en octets.
  • linger.ms : Combien de temps attendre avant d'envoyer un lot s'il n'est pas plein.

Contre-pression : L'Art de Dire "Ralentis"

La contre-pression est cruciale lorsqu'il s'agit de messages en rafale. C'est comme dire à votre ami bavard, "Attends, j'ai besoin d'une minute pour traiter ce que tu viens de dire." Dans Kafka Streams, nous pouvons implémenter la contre-pression en utilisant la classe Produced :


processedStream.to("output-topic", Produced.with(Serdes.String(), Serdes.String())
    .withStreamPartitioner((topic, key, value, numPartitions) -> {
        // Logique de partitionnement personnalisé pour distribuer la charge
        return Math.abs(key.hashCode()) % numPartitions;
    }));

Ce partitionneur personnalisé aide à distribuer la charge entre les partitions, empêchant une seule partition de devenir un goulot d'étranglement.

État d'Esprit : Optimiser les Magasins d'État

Les magasins d'état dans Kafka Streams peuvent être un goulot d'étranglement de performance lors du traitement des rafales. Optimisons-les :


kafka-streams.state.dir=/path/to/state/dir
kafka-streams.commit.interval.ms=100
kafka-streams.cache.max.bytes.buffering=10485760

Ces paramètres aident à gérer l'état plus efficacement :

  • state.dir : Où stocker l'état. Utilisez un SSD rapide pour de meilleures performances.
  • commit.interval.ms : Fréquence de sauvegarde de la progression du traitement.
  • cache.max.bytes.buffering : Mémoire maximale pour mettre en tampon les enregistrements avant de les valider.

Compresser pour Impressionner : Compression des Messages

Lorsqu'il s'agit de messages en rafale, chaque octet compte. Activons la compression :


kafka-streams.compression.type=lz4

LZ4 offre un bon équilibre entre le taux de compression et la vitesse, parfait pour gérer les rafales.

Faites Confiance, mais Vérifiez : Tests et Surveillance

Maintenant que nous avons optimisé notre application, comment savoir si elle peut gérer la rafale ? Entrez les tests de résistance et la surveillance.

Tests de Résistance avec JMeter

Créez un plan de test JMeter pour simuler une rafale de 50 000 messages :


<?xml version="1.0" encoding="UTF-8"?>
<jmeterTestPlan version="1.2" properties="5.0" jmeter="5.4.1">
  <hashTree>
    <TestPlan guiclass="TestPlanGui" testclass="TestPlan" testname="Kafka Burst Test" enabled="true">
      <stringProp name="TestPlan.comments"></stringProp>
      <boolProp name="TestPlan.functional_mode">false</boolProp>
      <boolProp name="TestPlan.tearDown_on_shutdown">true</boolProp>
      <boolProp name="TestPlan.serialize_threadgroups">false</boolProp>
      <elementProp name="TestPlan.user_defined_variables" elementType="Arguments" guiclass="ArgumentsPanel" testclass="Arguments" testname="User Defined Variables" enabled="true">
        <collectionProp name="Arguments.arguments"/>
      </elementProp>
      <stringProp name="TestPlan.user_define_classpath"></stringProp>
    </TestPlan>
    <hashTree>
      <ThreadGroup guiclass="ThreadGroupGui" testclass="ThreadGroup" testname="Kafka Producers" enabled="true">
        <stringProp name="ThreadGroup.on_sample_error">continue</stringProp>
        <elementProp name="ThreadGroup.main_controller" elementType="LoopController" guiclass="LoopControlPanel" testclass="LoopController" testname="Loop Controller" enabled="true">
          <boolProp name="LoopController.continue_forever">false</boolProp>
          <stringProp name="LoopController.loops">50000</stringProp>
        </elementProp>
        <stringProp name="ThreadGroup.num_threads">10</stringProp>
        <stringProp name="ThreadGroup.ramp_time">1</stringProp>
        <boolProp name="ThreadGroup.scheduler">false</boolProp>
        <stringProp name="ThreadGroup.duration"></stringProp>
        <stringProp name="ThreadGroup.delay"></stringProp>
        <boolProp name="ThreadGroup.same_user_on_next_iteration">true</boolProp>
      </ThreadGroup>
      <hashTree>
        <JavaSampler guiclass="JavaTestSamplerGui" testclass="JavaSampler" testname="Java Request" enabled="true">
          <elementProp name="arguments" elementType="Arguments" guiclass="ArgumentsPanel" testclass="Arguments" enabled="true">
            <collectionProp name="Arguments.arguments">
              <elementProp name="kafka.topic" elementType="Argument">
                <stringProp name="Argument.name">kafka.topic</stringProp>
                <stringProp name="Argument.value">input-topic</stringProp>
                <stringProp name="Argument.metadata">=</stringProp>
              </elementProp>
              <elementProp name="kafka.key" elementType="Argument">
                <stringProp name="Argument.name">kafka.key</stringProp>
                <stringProp name="Argument.value">${__UUID()}</stringProp>
                <stringProp name="Argument.metadata">=</stringProp>
              </elementProp>
              <elementProp name="kafka.message" elementType="Argument">
                <stringProp name="Argument.name">kafka.message</stringProp>
                <stringProp name="Argument.value">Test message ${__threadNum}</stringProp>
                <stringProp name="Argument.metadata">=</stringProp>
              </elementProp>
            </collectionProp>
          </elementProp>
          <stringProp name="classname">com.example.KafkaProducerSampler</stringProp>
        </JavaSampler>
        <hashTree/>
      </hashTree>
    </hashTree>
  </hashTree>
</jmeterTestPlan>

Ce plan de test simule 10 threads envoyant chacun 5 000 messages, totalisant 50 000 messages en rafale.

Surveillance avec Prometheus et Grafana

Configurez Prometheus et Grafana pour surveiller votre application Quarkus. Ajoutez ce qui suit à votre application.properties :


quarkus.micrometer.export.prometheus.enabled=true
quarkus.micrometer.binder.kafka.enabled=true

Créez un tableau de bord Grafana pour visualiser des métriques comme le débit des messages, le temps de traitement et l'utilisation des ressources.

Le Grand Final : Tout Mettre Ensemble

Maintenant que nous avons optimisé, configuré et testé notre application Kafka Streams sur Quarkus, voyons-la en action :


@ApplicationScoped
public class BurstMessageProcessor {

    @Inject
    StreamsBuilder streamsBuilder;

    @Produces
    @ApplicationScoped
    public Topology buildTopology() {
        KStream inputStream = streamsBuilder.stream("input-topic");

        KStream processedStream = inputStream
            .filter((key, value) -> value != null)
            .mapValues(value -> value.toUpperCase())
            .peek((key, value) -> System.out.println("Processing: " + value));

        processedStream.to("output-topic", Produced.with(Serdes.String(), Serdes.String())
            .withStreamPartitioner((topic, key, value, numPartitions) -> {
                return Math.abs(key.hashCode()) % numPartitions;
            }));

        return streamsBuilder.build();
    }
}

Cette application Kafka Streams propulsée par Quarkus est maintenant prête à gérer ces 50 000 messages en rafale comme un champion !

Conclusion : Leçons Apprises

Gérer les messages en rafale dans Kafka Streams sur Quarkus n'est pas une mince affaire, mais avec les bonnes techniques, c'est tout à fait gérable. Voici ce que nous avons appris :

  • Le parallélisme est essentiel : Utilisez plusieurs threads et partitions pour répartir la charge.
  • Buffer intelligemment : Configurez vos buffers pour lisser la rafale.
  • Implémentez la contre-pression : Ne laissez pas votre application mordre plus qu'elle ne peut mâcher.
  • Optimisez les magasins d'état : Une gestion d'état rapide et efficace est cruciale pour un traitement à haut débit.
  • Compressez les messages : Économisez de la bande passante et de la puissance de traitement avec une compression intelligente.
  • Testez et surveillez : Vérifiez toujours vos optimisations et gardez un œil sur les performances.

Rappelez-vous, gérer les messages en rafale est autant un art qu'une science. Continuez à expérimenter, tester et optimiser. Votre application Kafka Streams vous en remerciera, et vos utilisateurs aussi lorsqu'ils feront l'expérience d'un traitement ultra-rapide même pendant les périodes les plus chargées.

Maintenant, allez de l'avant et domptez ces rafales de messages comme le super-héros du streaming que vous êtes !

"Dans le monde du traitement de flux, il ne s'agit pas de savoir à quel point vous pouvez frapper. Il s'agit de savoir à quel point vous pouvez être frappé et continuer à avancer." - Rocky Balboa (s'il était ingénieur en données)

Bon streaming !