Les événements envoyés par le serveur peuvent sembler être un simple mot à la mode, mais c'est une technologie qui révolutionne discrètement la communication en temps réel. Contrairement aux WebSockets, qui établissent une connexion bidirectionnelle, SSE crée un canal unidirectionnel du serveur vers le client. Cette simplicité est sa superpuissance.

Voici pourquoi SSE dans Quarkus mérite votre attention :

  • Léger et facile à mettre en œuvre
  • Fonctionne sur HTTP standard
  • Gestion automatique de la reconnexion
  • Compatible avec l'infrastructure web existante
  • Parfait pour les scénarios où vous n'avez pas besoin de communication bidirectionnelle

Implémentation de SSE dans Quarkus : Guide de démarrage rapide

Passons à la pratique avec un peu de code. Voici comment vous pouvez implémenter un point de terminaison SSE de base dans Quarkus :


@Path("/events")
public class SSEResource {

    @Inject
    @Channel("news-channel") 
    Emitter<String> emitter;

    @GET
    @Produces(MediaType.SERVER_SENT_EVENTS)
    public Multi<String> stream() {
        return Multi.createFrom().emitter(emitter::send);
    }

    @POST
    @Path("/push")
    public void push(String news) {
        emitter.send(news);
    }
}

Ce simple exemple configure un point de terminaison SSE qui émet des mises à jour d'actualités. Les clients peuvent se connecter au point de terminaison /events pour recevoir des mises à jour, et vous pouvez envoyer de nouveaux événements via le point de terminaison /events/push.

Mise à l'échelle de SSE : Maîtriser la bête de la concurrence

Lors de l'implémentation de SSE dans des systèmes à grande échelle, le contrôle de la concurrence des clients devient crucial. Voici quelques stratégies pour maintenir votre système en bon état de fonctionnement :

1. Utiliser un pool de connexions

Implémentez un pool de connexions pour gérer les connexions SSE. Cela aide à prévenir l'épuisement des ressources lorsque vous traitez un grand nombre de clients simultanés.


@ApplicationScoped
public class SSEConnectionPool {
    private final ConcurrentHashMap<String, SseEventSink> connections = new ConcurrentHashMap<>();

    public void addConnection(String clientId, SseEventSink sink) {
        connections.put(clientId, sink);
    }

    public void removeConnection(String clientId) {
        connections.remove(clientId);
    }

    public void broadcast(String message) {
        connections.values().forEach(sink -> sink.send(sse.newEvent(message)));
    }
}

2. Implémenter la contre-pression

Utilisez les flux réactifs pour implémenter la contre-pression, empêchant les clients surchargés de causer des problèmes :


@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
public Multi<String> stream() {
    return Multi.createFrom().emitter(emitter::send)
        .onOverflow().drop()
        .onItem().transform(item -> {
            // Traiter l'élément
            return item;
        });
}

3. Limitation côté client

Implémentez une limitation côté client pour contrôler le taux auquel les événements sont traités :


const eventSource = new EventSource('/events');
const queue = [];
let processing = false;

eventSource.onmessage = (event) => {
    queue.push(event.data);
    if (!processing) {
        processQueue();
    }
};

function processQueue() {
    if (queue.length === 0) {
        processing = false;
        return;
    }
    processing = true;
    const item = queue.shift();
    // Traiter l'élément
    setTimeout(processQueue, 100); // Limiter à 10 éléments par seconde
}

Stratégies de secours : Quand SSE ne suffit pas

Bien que SSE soit excellent, ce n'est pas toujours la solution parfaite. Voici quelques stratégies de secours :

1. Long Polling

Si SSE n'est pas pris en charge ou échoue, revenez au long polling :


function longPoll() {
    fetch('/events/poll')
        .then(response => response.json())
        .then(data => {
            // Traiter les données
            longPoll(); // Démarrer immédiatement la prochaine requête
        })
        .catch(error => {
            console.error('Erreur de long polling :', error);
            setTimeout(longPoll, 5000); // Réessayer après 5 secondes
        });
}

2. Secours WebSocket

Pour les scénarios nécessitant une communication bidirectionnelle, implémentez un secours WebSocket :


@ServerEndpoint("/websocket")
public class FallbackWebSocket {
    @OnOpen
    public void onOpen(Session session) {
        // Gérer une nouvelle connexion
    }

    @OnMessage
    public void onMessage(String message, Session session) {
        // Gérer le message entrant
    }
}

Maintenir la connexion : Intervalles de battement de cœur

Pour maintenir les connexions SSE et détecter les déconnexions, implémentez des intervalles de battement de cœur :


@Scheduled(every="30s")
void sendHeartbeat() {
    emitter.send("heartbeat");
}

Côté client :


let lastHeartbeat = Date.now();

eventSource.onmessage = (event) => {
    if (event.data === 'heartbeat') {
        lastHeartbeat = Date.now();
        return;
    }
    // Traiter les événements réguliers
};

setInterval(() => {
    if (Date.now() - lastHeartbeat > 60000) {
        // Pas de battement de cœur pendant 60 secondes, reconnecter
        eventSource.close();
        connectSSE();
    }
}, 5000);

Débogage des problèmes de connexion à grande échelle

Lorsqu'on traite SSE à grande échelle, le débogage peut être difficile. Voici quelques conseils pour vous faciliter la vie :

1. Implémenter une journalisation détaillée

Utilisez les capacités de journalisation de Quarkus pour suivre les connexions et événements SSE :


@Inject
Logger logger;

@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
public Multi<String> stream(@Context SecurityContext ctx) {
    String clientId = ctx.getUserPrincipal().getName();
    logger.infof("Connexion SSE établie pour le client : %s", clientId);
    return Multi.createFrom().emitter(emitter::send)
        .onTermination().invoke(() -> {
            logger.infof("Connexion SSE terminée pour le client : %s", clientId);
        });
}

2. Implémenter des métriques

Utilisez Micrometer dans Quarkus pour suivre les métriques importantes :


@Inject
MeterRegistry registry;

@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
public Multi<String> stream() {
    registry.counter("sse.connections").increment();
    return Multi.createFrom().emitter(emitter::send)
        .onTermination().invoke(() -> {
            registry.counter("sse.disconnections").increment();
        });
}

3. Utiliser le traçage distribué

Implémentez le traçage distribué pour suivre les événements SSE dans votre système :


@Inject
Tracer tracer;

@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
public Multi<String> stream() {
    Span span = tracer.buildSpan("sse-stream").start();
    return Multi.createFrom().emitter(emitter::send)
        .onItem().invoke(item -> {
            tracer.buildSpan("sse-event")
                .asChildOf(span)
                .start()
                .finish();
        })
        .onTermination().invoke(span::finish);
}

Conclusion : La puissance de SSE dans Quarkus

Les événements envoyés par le serveur dans Quarkus offrent une alternative puissante et légère pour la communication en temps réel dans les systèmes à grande échelle. En implémentant un contrôle de la concurrence approprié, des stratégies de secours, des mécanismes de battement de cœur et des pratiques de débogage robustes, vous pouvez exploiter tout le potentiel de SSE.

Rappelez-vous, bien que les WebSockets puissent sembler être le choix évident, SSE peut souvent fournir la simplicité et l'évolutivité dont vous avez besoin. Alors, la prochaine fois que vous concevez un système en temps réel, donnez à SSE une chance de briller. Votre futur vous (et votre équipe d'exploitation) vous en remerciera !

"La simplicité est la sophistication ultime." - Léonard de Vinci

Maintenant, allez de l'avant et construisez des systèmes en temps réel impressionnants et évolutifs avec SSE et Quarkus !