WL
Java Full Stack Developer
Wassim Lagnaoui

Messaging with Kafka/RabbitMQ

Master asynchronous messaging patterns with Kafka and RabbitMQ for scalable, decoupled microservices communication.

Introduction

Overview of messaging in microservices

Messaging is a communication pattern where services send messages through intermediary message brokers instead of making direct API calls, enabling asynchronous, decoupled communication between distributed components. In microservices architectures, messaging systems act as the nervous system that connects various services, allowing them to communicate without needing to know each other's exact locations, availability status, or processing capabilities. Messages can carry commands that instruct services to perform actions, events that notify about things that have happened, or data that needs to be shared across service boundaries. This approach creates a more resilient and flexible system architecture compared to synchronous service-to-service communication.

Why async messaging is important for decoupling and scalability

Asynchronous messaging eliminates tight coupling between services by allowing producers to send messages without waiting for consumers to process them, meaning services can operate independently and at their own pace without blocking each other. This decoupling enables better scalability because slow or temporarily unavailable consumers don't affect the performance of message producers, and you can scale producer and consumer services independently based on their specific load patterns. Messaging systems provide built-in buffering capabilities that handle traffic spikes gracefully, allowing producers to continue sending messages even when consumers are overwhelmed, and consumers can process messages when resources become available. Additionally, messaging enables fault tolerance through features like message persistence, retries, and dead letter queues that ensure important messages aren't lost even when individual services fail.


Kafka Basics

Topics: what they are and why they matter

Kafka topics are named channels or categories where messages are published and consumed, serving as the primary organizational unit for message streams in Kafka clusters. Each topic represents a particular type of data or event stream, such as "user-registrations," "order-events," or "payment-notifications," allowing you to logically separate different kinds of messages while maintaining high throughput and parallel processing capabilities. Topics are partitioned across multiple Kafka brokers for scalability and fault tolerance, with each partition maintaining an ordered sequence of messages that can be consumed independently by different consumer instances. The topic-based organization enables multiple producers to write to the same topic simultaneously and multiple consumer groups to read from the same topic with different processing logic, creating flexible pub-sub messaging patterns.

Producer implementation example

Kafka producers are responsible for publishing messages to specific topics, and Spring Boot provides excellent integration through the KafkaTemplate class that simplifies message production with features like automatic serialization, error handling, and delivery confirmations. The producer configuration includes settings for Kafka broker connections, serialization formats for keys and values, and performance tuning parameters like batch size and compression. Producers can send messages synchronously for guaranteed delivery confirmation or asynchronously for higher throughput, and they automatically handle partitioning, retries, and connection management to ensure reliable message delivery to Kafka brokers.

Maven Dependencies

<!-- pom.xml dependencies -->
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-actuator</artifactId>
</dependency>

Producer Implementation

@Service
@Slf4j
public class OrderEventProducer {

    private final KafkaTemplate<String, OrderEvent> kafkaTemplate;

    public OrderEventProducer(KafkaTemplate<String, OrderEvent> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void publishOrderCreated(OrderEvent orderEvent) {
        try {
            kafkaTemplate.send("order-events", orderEvent.getOrderId(), orderEvent)
                .addCallback(
                    result -> log.info("Order event sent successfully: {}", orderEvent.getOrderId()),
                    failure -> log.error("Failed to send order event: {}", orderEvent.getOrderId(), failure)
                );
        } catch (Exception e) {
            log.error("Error publishing order event: {}", orderEvent.getOrderId(), e);
        }
    }

    public void publishOrderStatusChanged(String orderId, OrderStatus newStatus) {
        OrderEvent event = OrderEvent.builder()
            .orderId(orderId)
            .eventType("ORDER_STATUS_CHANGED")
            .status(newStatus)
            .timestamp(Instant.now())
            .build();

        kafkaTemplate.send("order-events", orderId, event);
    }
}

Consumer implementation example

Kafka consumers subscribe to topics and process messages in real-time, with Spring Boot's @KafkaListener annotation providing a declarative way to create message consumers that automatically handle deserialization, offset management, and error handling. Consumer groups allow multiple consumer instances to share the workload of processing messages from a topic, with Kafka automatically distributing partitions among available consumers for parallel processing and load balancing. Consumers can be configured for different processing guarantees, from at-least-once delivery with automatic offset commits to exactly-once processing with manual offset management, depending on your application's consistency requirements.

Consumer Implementation

@Component
@Slf4j
public class OrderEventConsumer {

    private final InventoryService inventoryService;
    private final NotificationService notificationService;

    public OrderEventConsumer(InventoryService inventoryService,
                             NotificationService notificationService) {
        this.inventoryService = inventoryService;
        this.notificationService = notificationService;
    }

    @KafkaListener(topics = "order-events", groupId = "inventory-service")
    public void handleOrderEvent(OrderEvent orderEvent,
                                @Header("kafka_receivedMessageKey") String key,
                                @Header("kafka_receivedPartition") int partition,
                                @Header("kafka_offset") long offset) {

        log.info("Received order event: {} from partition: {} offset: {}",
                orderEvent.getOrderId(), partition, offset);

        try {
            switch (orderEvent.getEventType()) {
                case "ORDER_CREATED":
                    inventoryService.reserveItems(orderEvent.getItems());
                    log.info("Inventory reserved for order: {}", orderEvent.getOrderId());
                    break;
                case "ORDER_CANCELLED":
                    inventoryService.releaseItems(orderEvent.getItems());
                    log.info("Inventory released for order: {}", orderEvent.getOrderId());
                    break;
                default:
                    log.warn("Unknown event type: {}", orderEvent.getEventType());
            }
        } catch (Exception e) {
            log.error("Error processing order event: {}", orderEvent.getOrderId(), e);
            throw e; // Will trigger retry mechanism
        }
    }

    @KafkaListener(topics = "order-events", groupId = "notification-service")
    public void handleOrderNotification(OrderEvent orderEvent) {
        try {
            if ("ORDER_CREATED".equals(orderEvent.getEventType())) {
                notificationService.sendOrderConfirmation(orderEvent);
            }
        } catch (Exception e) {
            log.error("Failed to send notification for order: {}", orderEvent.getOrderId(), e);
        }
    }
}

Handling retries and Dead Letter Queues (DLQs)

Kafka retry mechanisms handle transient failures by automatically reprocessing failed messages a configured number of times with exponential backoff delays, helping overcome temporary issues like network glitches or service unavailability without losing messages. When messages exceed the maximum retry attempts, they can be sent to Dead Letter Topics (DLT) for manual investigation and reprocessing, preventing poison messages from blocking the processing of subsequent valid messages. Spring Kafka provides comprehensive retry configuration including custom retry policies, backoff strategies, and DLT routing that can be tailored to different types of failures and business requirements. Proper DLT handling includes alerting mechanisms, message inspection tools, and reprocessing workflows that allow operations teams to investigate failures and recover from exceptional scenarios.

Kafka Configuration

# application.yml - Kafka Configuration
spring:
  kafka:
    bootstrap-servers: localhost:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      acks: all
      retries: 3
      properties:
        spring.json.type.mapping: "orderEvent:com.example.events.OrderEvent"
    consumer:
      group-id: order-service
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      properties:
        spring.json.trusted.packages: "com.example.events"
        spring.json.type.mapping: "orderEvent:com.example.events.OrderEvent"
      enable-auto-commit: false
      auto-offset-reset: earliest

Retry and Error Handling Configuration

@Component
public class KafkaRetryConfig {

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, OrderEvent> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, OrderEvent> factory =
            new ConcurrentKafkaListenerContainerFactory<>();

        factory.setConsumerFactory(consumerFactory());
        factory.setCommonErrorHandler(errorHandler());
        factory.setConcurrency(3);
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);

        return factory;
    }

    @Bean
    public DefaultErrorHandler errorHandler() {
        // Retry 3 times with exponential backoff
        ExponentialBackOffWithMaxRetries backOff = new ExponentialBackOffWithMaxRetries(3);
        backOff.setInitialInterval(1000L);
        backOff.setMultiplier(2.0);
        backOff.setMaxInterval(10000L);

        DefaultErrorHandler errorHandler = new DefaultErrorHandler(
            new DeadLetterPublishingRecoverer(kafkaTemplate()), backOff);

        // Don't retry for certain exceptions
        errorHandler.addNotRetryableExceptions(IllegalArgumentException.class);

        return errorHandler;
    }

    @Bean
    public ConsumerFactory<String, OrderEvent> consumerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        configProps.put(ConsumerConfig.GROUP_ID_CONFIG, "order-service");
        configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        configProps.put(JsonDeserializer.TRUSTED_PACKAGES, "com.example.events");

        return new DefaultKafkaConsumerFactory<>(configProps);
    }
}

RabbitMQ Basics

Queues: purpose and usage

RabbitMQ queues are message storage containers that hold messages until they can be consumed by applications, providing reliable message delivery through features like persistence, acknowledgments, and various routing mechanisms. Unlike Kafka topics, RabbitMQ queues are designed for point-to-point messaging where each message is typically consumed by only one consumer, making them ideal for work distribution, task queues, and request-response patterns. Queues can be configured as durable (survive broker restarts), exclusive (used by only one connection), or auto-delete (removed when no longer used), and they support various message routing patterns through exchanges that determine how messages reach specific queues. RabbitMQ's queue-based architecture excels at traditional messaging scenarios where you need guaranteed delivery, complex routing logic, and fine-grained control over message processing workflows.

Producer example

RabbitMQ producers send messages to exchanges, which then route messages to appropriate queues based on routing keys and binding configurations, providing flexible message distribution patterns. Spring Boot's RabbitTemplate simplifies message production with automatic serialization, connection management, and integration with Spring's transaction management for reliable message publishing. Producers can use different exchange types (direct, topic, fanout, headers) to implement various messaging patterns, from simple point-to-point communication to complex pub-sub scenarios with sophisticated routing rules.

Maven Dependencies

<!-- pom.xml dependencies -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-actuator</artifactId>
</dependency>

Producer Implementation

@Service
@Slf4j
public class EmailNotificationProducer {

    private final RabbitTemplate rabbitTemplate;

    public EmailNotificationProducer(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }

    public void sendWelcomeEmail(String userId, String email) {
        EmailNotification notification = EmailNotification.builder()
            .userId(userId)
            .email(email)
            .templateType("WELCOME")
            .priority("HIGH")
            .timestamp(Instant.now())
            .build();

        try {
            rabbitTemplate.convertAndSend(
                "notification.exchange",
                "email.welcome",
                notification
            );
            log.info("Welcome email notification sent for user: {}", userId);
        } catch (Exception e) {
            log.error("Failed to send welcome email notification: {}", userId, e);
        }
    }

    public void sendOrderConfirmation(String orderId, String customerEmail) {
        EmailNotification notification = EmailNotification.builder()
            .orderId(orderId)
            .email(customerEmail)
            .templateType("ORDER_CONFIRMATION")
            .priority("MEDIUM")
            .timestamp(Instant.now())
            .build();

        rabbitTemplate.convertAndSend(
            "notification.exchange",
            "email.order",
            notification
        );
    }

    public void sendBulkPromotionalEmail(List<String> emailList, String campaignId) {
        BulkEmailNotification bulkNotification = BulkEmailNotification.builder()
            .emails(emailList)
            .campaignId(campaignId)
            .templateType("PROMOTIONAL")
            .priority("LOW")
            .build();

        rabbitTemplate.convertAndSend(
            "notification.exchange",
            "email.bulk",
            bulkNotification
        );
    }
}

Consumer example

RabbitMQ consumers use the @RabbitListener annotation to declaratively process messages from specific queues, with Spring Boot handling message deserialization, acknowledgment, and error scenarios automatically. Consumers can be configured for different acknowledgment modes, from automatic acknowledgment for fire-and-forget scenarios to manual acknowledgment for guaranteed processing, ensuring messages aren't lost when processing fails. Multiple consumers can listen to the same queue for load balancing, or different consumers can listen to different queues bound to the same exchange for implementing complex workflow patterns.

Consumer Implementation

@Component
@Slf4j
public class EmailNotificationConsumer {

    private final EmailService emailService;
    private final NotificationRepository notificationRepository;

    public EmailNotificationConsumer(EmailService emailService,
                                   NotificationRepository notificationRepository) {
        this.emailService = emailService;
        this.notificationRepository = notificationRepository;
    }

    @RabbitListener(queues = "email.welcome.queue")
    public void processWelcomeEmail(EmailNotification notification) {
        try {
            log.info("Processing welcome email for user: {}", notification.getUserId());

            emailService.sendWelcomeEmail(
                notification.getEmail(),
                notification.getUserId()
            );

            // Save notification record
            notificationRepository.save(NotificationRecord.builder()
                .userId(notification.getUserId())
                .type("WELCOME_EMAIL")
                .status("SENT")
                .sentAt(Instant.now())
                .build());

            log.info("Welcome email sent successfully to: {}", notification.getEmail());

        } catch (Exception e) {
            log.error("Failed to process welcome email: {}", notification.getUserId(), e);
            throw new AmqpRejectAndRequeueException("Email processing failed", e);
        }
    }

    @RabbitListener(queues = "email.order.queue")
    public void processOrderConfirmation(EmailNotification notification) {
        try {
            emailService.sendOrderConfirmation(
                notification.getEmail(),
                notification.getOrderId()
            );
            log.info("Order confirmation sent for order: {}", notification.getOrderId());
        } catch (Exception e) {
            log.error("Failed to send order confirmation: {}", notification.getOrderId(), e);
            throw new AmqpRejectAndRequeueException("Order confirmation failed", e);
        }
    }

    @RabbitListener(queues = "email.bulk.queue", concurrency = "3-5")
    public void processBulkEmails(BulkEmailNotification bulkNotification) {
        try {
            for (String email : bulkNotification.getEmails()) {
                emailService.sendPromotionalEmail(email, bulkNotification.getCampaignId());
                Thread.sleep(100); // Rate limiting for bulk emails
            }
            log.info("Bulk emails processed for campaign: {}", bulkNotification.getCampaignId());
        } catch (Exception e) {
            log.error("Failed to process bulk emails: {}", bulkNotification.getCampaignId(), e);
        }
    }
}

Handling retries and DLQs

RabbitMQ provides sophisticated retry and dead letter queue mechanisms through message TTL (time-to-live), exchange configurations, and Spring Boot's retry templates that can handle various failure scenarios with different retry strategies. When messages fail processing, they can be automatically retried with exponential backoff delays, and after exceeding retry limits, they're routed to dead letter exchanges and queues for manual investigation and reprocessing. Spring AMQP integrates retry logic with RabbitMQ's native features like message republishing, TTL-based delays, and custom error handling strategies that ensure robust message processing even in the face of systematic failures. Dead letter queues serve as safety nets that preserve failed messages with full context information, enabling operations teams to analyze failure patterns and implement fixes without losing critical business data.

RabbitMQ Configuration

# application.yml - RabbitMQ Configuration
spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    listener:
      simple:
        retry:
          enabled: true
          initial-interval: 1000ms
          max-attempts: 3
          multiplier: 2
        default-requeue-rejected: false
        acknowledge-mode: auto
    template:
      retry:
        enabled: true
        initial-interval: 1000ms
        max-attempts: 3

Queue and Exchange Configuration

@Configuration
public class RabbitMQConfig {

    @Bean
    public TopicExchange notificationExchange() {
        return new TopicExchange("notification.exchange", true, false);
    }

    @Bean
    public Queue emailWelcomeQueue() {
        return QueueBuilder.durable("email.welcome.queue")
            .withArgument("x-dead-letter-exchange", "notification.dlx")
            .withArgument("x-dead-letter-routing-key", "email.welcome.failed")
            .withArgument("x-message-ttl", 60000) // 1 minute TTL
            .build();
    }

    @Bean
    public Queue emailOrderQueue() {
        return QueueBuilder.durable("email.order.queue")
            .withArgument("x-dead-letter-exchange", "notification.dlx")
            .withArgument("x-dead-letter-routing-key", "email.order.failed")
            .build();
    }

    @Bean
    public Queue emailBulkQueue() {
        return QueueBuilder.durable("email.bulk.queue")
            .withArgument("x-dead-letter-exchange", "notification.dlx")
            .withArgument("x-dead-letter-routing-key", "email.bulk.failed")
            .build();
    }

    @Bean
    public Queue emailWelcomeDLQ() {
        return QueueBuilder.durable("email.welcome.dlq").build();
    }

    @Bean
    public Queue emailOrderDLQ() {
        return QueueBuilder.durable("email.order.dlq").build();
    }

    @Bean
    public Queue emailBulkDLQ() {
        return QueueBuilder.durable("email.bulk.dlq").build();
    }

    @Bean
    public TopicExchange deadLetterExchange() {
        return new TopicExchange("notification.dlx", true, false);
    }

    // Bindings for main queues
    @Bean
    public Binding emailWelcomeBinding() {
        return BindingBuilder
            .bind(emailWelcomeQueue())
            .to(notificationExchange())
            .with("email.welcome");
    }

    @Bean
    public Binding emailOrderBinding() {
        return BindingBuilder
            .bind(emailOrderQueue())
            .to(notificationExchange())
            .with("email.order");
    }

    @Bean
    public Binding emailBulkBinding() {
        return BindingBuilder
            .bind(emailBulkQueue())
            .to(notificationExchange())
            .with("email.bulk");
    }

    // Dead letter queue bindings
    @Bean
    public Binding emailWelcomeDLQBinding() {
        return BindingBuilder
            .bind(emailWelcomeDLQ())
            .to(deadLetterExchange())
            .with("email.welcome.failed");
    }

    @Bean
    public Binding emailOrderDLQBinding() {
        return BindingBuilder
            .bind(emailOrderDLQ())
            .to(deadLetterExchange())
            .with("email.order.failed");
    }

    @Bean
    public Binding emailBulkDLQBinding() {
        return BindingBuilder
            .bind(emailBulkDLQ())
            .to(deadLetterExchange())
            .with("email.bulk.failed");
    }

    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
            ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setDefaultRequeueRejected(false);
        factory.setErrorHandler(new ConditionalRejectingErrorHandler());
        return factory;
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate template = new RabbitTemplate(connectionFactory);
        template.setMessageConverter(new Jackson2JsonMessageConverter());
        return template;
    }
}

Patterns & Best Practices

Pub/Sub pattern and when to use it

The Publish/Subscribe pattern enables one-to-many communication where publishers send messages to topics or exchanges without knowing which subscribers will receive them, creating loose coupling and enabling dynamic system composition. This pattern is ideal for broadcasting events like user registrations, order completions, or system notifications where multiple services need to react differently to the same event without the publisher needing to know about all consumers. Pub/Sub excels in scenarios where you want to add new functionality by simply subscribing to existing events, such as adding analytics, audit logging, or notification services without modifying existing business logic. The pattern also supports fan-out scenarios where a single event triggers multiple workflows, like order processing that simultaneously updates inventory, sends confirmations, and logs analytics data.

Event-driven microservices patterns

Event-driven microservices communicate primarily through domain events that represent meaningful business occurrences, creating architectures where services react to changes rather than being directly invoked by other services. This approach promotes temporal decoupling where services don't need to be available simultaneously, spatial decoupling where services don't need to know each other's locations, and logical decoupling where services react to business events rather than implementation details. Common patterns include event sourcing where state changes are stored as events, CQRS (Command Query Responsibility Segregation) where read and write models are separated, and saga patterns for managing distributed transactions across multiple services. Event-driven architectures enable better scalability, fault tolerance, and system evolution because services can be added, modified, or removed without affecting other services as long as they respect the event contracts.

Tips for reliable messaging (idempotency, retries, DLQs)

Idempotency ensures that processing the same message multiple times produces the same result, which is critical for reliable messaging systems because messages can be delivered more than once due to network issues, retries, or system failures. Implement idempotency by using unique message IDs, database constraints, or application-level deduplication logic that can safely ignore duplicate message processing attempts. Design retry strategies with exponential backoff and maximum retry limits to handle transient failures without overwhelming struggling services, and use circuit breaker patterns to prevent cascading failures when downstream services become unavailable. Dead Letter Queues provide essential safety nets for messages that cannot be processed successfully, enabling manual investigation, system debugging, and message reprocessing after fixing underlying issues, while preventing poison messages from blocking the processing of valid messages in the system.

Complete Event-Driven Example

Event Classes

// Event Classes
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class OrderEvent {
    private String orderId;
    private String userId;
    private String eventType;
    private List<OrderItem> items;
    private BigDecimal totalAmount;
    private OrderStatus status;
    private Instant timestamp;
}

@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class PaymentEvent {
    private String paymentId;
    private String orderId;
    private String eventType;
    private BigDecimal amount;
    private PaymentStatus status;
    private Instant timestamp;
}

@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class OrderItem {
    private String productId;
    private String productName;
    private Integer quantity;
    private BigDecimal price;
}

public enum OrderStatus {
    PENDING, CONFIRMED, CANCELLED, SHIPPED, DELIVERED
}

public enum PaymentStatus {
    PENDING, COMPLETED, FAILED, REFUNDED
}

Order Service - Event Publisher

@Service
@Transactional
@Slf4j
public class OrderService {

    private final OrderRepository orderRepository;
    private final KafkaTemplate<String, OrderEvent> kafkaTemplate;

    public OrderService(OrderRepository orderRepository,
                       KafkaTemplate<String, OrderEvent> kafkaTemplate) {
        this.orderRepository = orderRepository;
        this.kafkaTemplate = kafkaTemplate;
    }

    public Order createOrder(CreateOrderRequest request) {
        // Create order
        Order order = Order.builder()
            .userId(request.getUserId())
            .items(request.getItems())
            .totalAmount(calculateTotal(request.getItems()))
            .status(OrderStatus.PENDING)
            .createdAt(Instant.now())
            .build();

        Order savedOrder = orderRepository.save(order);
        log.info("Order created: {}", savedOrder.getId());

        // Publish event
        OrderEvent event = OrderEvent.builder()
            .orderId(savedOrder.getId())
            .userId(savedOrder.getUserId())
            .eventType("ORDER_CREATED")
            .items(savedOrder.getItems())
            .totalAmount(savedOrder.getTotalAmount())
            .status(savedOrder.getStatus())
            .timestamp(Instant.now())
            .build();

        kafkaTemplate.send("order-events", savedOrder.getId(), event)
            .addCallback(
                result -> log.info("Order event published successfully: {}", savedOrder.getId()),
                failure -> log.error("Failed to publish order event: {}", savedOrder.getId(), failure)
            );

        return savedOrder;
    }

    public Order updateOrderStatus(String orderId, OrderStatus newStatus) {
        Order order = orderRepository.findById(orderId)
            .orElseThrow(() -> new OrderNotFoundException("Order not found: " + orderId));

        order.setStatus(newStatus);
        order.setUpdatedAt(Instant.now());
        Order updatedOrder = orderRepository.save(order);

        // Publish status change event
        OrderEvent event = OrderEvent.builder()
            .orderId(orderId)
            .userId(order.getUserId())
            .eventType("ORDER_STATUS_CHANGED")
            .status(newStatus)
            .timestamp(Instant.now())
            .build();

        kafkaTemplate.send("order-events", orderId, event);
        return updatedOrder;
    }

    private BigDecimal calculateTotal(List<OrderItem> items) {
        return items.stream()
            .map(item -> item.getPrice().multiply(BigDecimal.valueOf(item.getQuantity())))
            .reduce(BigDecimal.ZERO, BigDecimal::add);
    }
}

Payment Service - Event Consumer & Publisher

@Component
@Slf4j
public class PaymentEventHandler {

    private final PaymentService paymentService;
    private final KafkaTemplate<String, PaymentEvent> kafkaTemplate;

    public PaymentEventHandler(PaymentService paymentService,
                              KafkaTemplate<String, PaymentEvent> kafkaTemplate) {
        this.paymentService = paymentService;
        this.kafkaTemplate = kafkaTemplate;
    }

    @KafkaListener(topics = "order-events", groupId = "payment-service")
    @Retryable(maxAttempts = 3, backoff = @Backoff(delay = 1000, multiplier = 2))
    public void handleOrderCreated(OrderEvent orderEvent) {
        if ("ORDER_CREATED".equals(orderEvent.getEventType())) {
            log.info("Processing payment for order: {}", orderEvent.getOrderId());

            try {
                PaymentResult result = paymentService.processPayment(
                    orderEvent.getOrderId(),
                    orderEvent.getTotalAmount(),
                    orderEvent.getUserId()
                );

                PaymentEvent paymentEvent = PaymentEvent.builder()
                    .paymentId(result.getPaymentId())
                    .orderId(orderEvent.getOrderId())
                    .eventType(result.isSuccessful() ? "PAYMENT_COMPLETED" : "PAYMENT_FAILED")
                    .amount(orderEvent.getTotalAmount())
                    .status(result.getStatus())
                    .timestamp(Instant.now())
                    .build();

                kafkaTemplate.send("payment-events", orderEvent.getOrderId(), paymentEvent)
                    .addCallback(
                        success -> log.info("Payment event published: {}", paymentEvent.getEventType()),
                        failure -> log.error("Failed to publish payment event", failure)
                    );

            } catch (PaymentException e) {
                log.error("Payment processing failed for order: {}", orderEvent.getOrderId(), e);

                // Publish payment failed event
                PaymentEvent failedEvent = PaymentEvent.builder()
                    .orderId(orderEvent.getOrderId())
                    .eventType("PAYMENT_FAILED")
                    .amount(orderEvent.getTotalAmount())
                    .status(PaymentStatus.FAILED)
                    .timestamp(Instant.now())
                    .build();

                kafkaTemplate.send("payment-events", orderEvent.getOrderId(), failedEvent);
                throw e; // Will trigger retry
            }
        }
    }
}

Lesson Summary

In this lesson, we explored asynchronous messaging patterns with Kafka and RabbitMQ for microservices communication. Here's a comprehensive summary of all the concepts and implementation approaches covered:

Messaging Fundamentals

  • Purpose: Communication pattern using intermediary message brokers for asynchronous, decoupled service interaction
  • Benefits: Eliminates tight coupling, enables better scalability, and provides built-in buffering for traffic spikes
  • Message types: Commands (instruct actions), events (notify occurrences), and data sharing across service boundaries
  • Resilience: Fault tolerance through message persistence, retries, and dead letter queues

Kafka Architecture and Concepts

  • Topics: Named channels for organizing message streams with partitioning for scalability and parallel processing
  • Partitions: Ordered sequences of messages distributed across brokers for high throughput and fault tolerance
  • Producers: Applications that publish messages to topics with automatic serialization and delivery confirmation
  • Consumers: Applications that subscribe to topics and process messages with group-based load distribution

Kafka Implementation

  • Producer setup: KafkaTemplate for message publishing with async callbacks and error handling
  • Consumer setup: @KafkaListener annotation for declarative message consumption with automatic deserialization
  • Configuration: Bootstrap servers, serializers/deserializers, acknowledgment settings, and auto-offset management
  • Performance: Batch processing, compression, and concurrent consumers for high-throughput scenarios

Kafka Reliability Patterns

  • Retry mechanisms: Exponential backoff for transient failures with configurable maximum attempts
  • Dead Letter Topics: Capture failed messages for investigation and manual reprocessing
  • Error handling: DefaultErrorHandler with custom retry policies and exception classification
  • Offset management: Manual vs automatic commit strategies for different consistency requirements

RabbitMQ Architecture and Concepts

  • Queues: Message storage containers for point-to-point messaging with persistence and acknowledgment features
  • Exchanges: Message routing components (direct, topic, fanout, headers) for flexible distribution patterns
  • Routing keys: Message attributes used by exchanges to determine queue destinations
  • Bindings: Connections between exchanges and queues with routing rule specifications

RabbitMQ Implementation

  • Producer setup: RabbitTemplate for message publishing with exchange and routing key specifications
  • Consumer setup: @RabbitListener annotation for queue-specific message processing with concurrency control
  • Queue configuration: Durable queues, dead letter exchanges, and TTL settings for reliable messaging
  • Connection management: Connection factories, retry policies, and acknowledgment modes

RabbitMQ Reliability Patterns

  • Dead Letter Queues: Automatic routing of failed messages to designated queues for investigation
  • Message TTL: Time-to-live settings for automatic message expiration and cleanup
  • Acknowledgments: Manual vs automatic acknowledgment for guaranteed message processing
  • Retry logic: Spring AMQP retry templates with exponential backoff and circuit breaker integration

Messaging Patterns

  • Pub/Sub pattern: One-to-many communication for broadcasting events to multiple interested subscribers
  • Event-driven architecture: Services react to domain events rather than being directly invoked
  • Command patterns: Request-response messaging for directing specific actions to target services
  • Event sourcing: Storing state changes as events for audit trails and system reconstruction

Event-Driven Microservices

  • Temporal decoupling: Services don't need to be available simultaneously for communication
  • Spatial decoupling: Services don't need to know each other's locations or endpoints
  • Logical decoupling: Services react to business events rather than implementation details
  • System evolution: Services can be added, modified, or removed without affecting other services

Reliability Best Practices

  • Idempotency: Ensure processing same message multiple times produces same result
  • Retry strategies: Exponential backoff with maximum limits and circuit breaker patterns
  • Dead letter handling: Safety nets for poison messages with investigation and reprocessing capabilities
  • Message deduplication: Unique message IDs and application-level deduplication logic

Complete Event-Driven Example

  • Order processing: Event-driven workflow spanning order, payment, and inventory services
  • Event publishing: Transactional outbox pattern for reliable event publishing
  • Event consumption: Multiple services reacting to same events with different business logic
  • Compensation patterns: Handling failures through compensating events and saga patterns

Monitoring and Operations

  • Message metrics: Throughput, latency, error rates, and queue depths for operational visibility
  • Consumer lag: Monitoring processing delays and scaling consumer instances accordingly
  • Dead letter monitoring: Alerting on failed messages and automated reprocessing workflows
  • Performance tuning: Batch sizes, concurrency levels, and resource allocation optimization

Key Takeaways

  • Asynchronous messaging enables scalable, resilient microservices through decoupled communication patterns
  • Kafka excels at high-throughput event streaming with strong ordering guarantees and horizontal scalability
  • RabbitMQ provides sophisticated routing and traditional queuing patterns with complex workflow support
  • Event-driven architectures promote loose coupling and enable system evolution through domain event communication
  • Proper error handling, idempotency, and monitoring are essential for reliable production messaging systems