WL
Java Full Stack Developer
Wassim Lagnaoui

Lesson 24: Spring Boot Async Programming and Messaging with Kafka Part 2

Master advanced Kafka patterns, stream processing, exactly-once semantics, and building robust event-driven architectures with Spring Boot for scalable microservices.

Introduction

Building on the fundamentals from the previous lesson, this lesson dives deep into advanced Kafka patterns and techniques that are essential for production-ready event-driven systems. While basic producers and consumers get you started, real-world applications need sophisticated patterns like stream processing, exactly-once delivery guarantees, and robust error handling strategies. You'll learn about Kafka Streams for real-time data transformation, how to implement exactly-once semantics to prevent duplicate processing, and advanced consumer group management for scalable message consumption. We'll explore critical patterns like event sourcing for maintaining complete audit trails, the saga pattern for managing distributed transactions, and the outbox pattern for ensuring data consistency. Additionally, you'll discover how to implement dead letter queues for handling poison messages and monitoring strategies to maintain healthy Kafka deployments. These advanced concepts are what separate basic messaging implementations from enterprise-grade, resilient systems that can handle the complexities of modern distributed applications.


Kafka Streams

Definition

Kafka Streams is a client library for building applications and microservices that process and analyze data stored in Kafka in real-time. Unlike traditional batch processing, Kafka Streams enables continuous processing of streaming data with low latency and high throughput. It provides a high-level DSL (Domain Specific Language) for common stream processing operations like filtering, mapping, grouping, aggregating, and joining streams. Kafka Streams applications are regular Java applications that can be deployed anywhere, and they automatically handle parallelization, fault tolerance, and exactly-once processing semantics. The library treats input data as continuous streams and produces output streams, making it perfect for real-time analytics, data transformation, and event processing.

Analogy

Kafka Streams is like having a sophisticated assembly line in a modern factory that processes items continuously as they arrive on the conveyor belt. Instead of collecting items in batches and processing them all at once (traditional batch processing), each item is inspected, modified, or combined with other items immediately as it passes through different stations on the assembly line. Workers at each station (stream processors) can filter out defective items, transform them by adding components, group similar items together, or merge items from multiple conveyor belts. The assembly line runs 24/7, automatically adjusting the number of workers based on the volume of items, and if one worker gets sick, another can immediately take their place without stopping the entire line. The factory keeps a detailed log of every operation performed on each item, so if something goes wrong, they can trace back exactly what happened and even replay the processing from any point in time.

Examples

Basic stream processing setup:

@Configuration
@EnableKafkaStreams
public class StreamsConfig {

    @Bean
    public KafkaStreamsConfiguration kStreamsConfig() {
        Map<String, Object> props = new HashMap<>();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "order-processing-app");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        return new KafkaStreamsConfiguration(props);
    }
}

Simple stream transformation:

@Component
public class OrderStreamProcessor {

    @Autowired
    public void processOrderStream(StreamsBuilder streamsBuilder) {
        KStream<String, String> orderStream = streamsBuilder.stream("raw-orders");

        orderStream
            .filter((key, value) -> isValidOrder(value))
            .mapValues(this::enrichOrderData)
            .to("processed-orders");
    }
}

Stream aggregation and windowing:

public void calculateOrderStats(StreamsBuilder builder) {
    KStream<String, Order> orders = builder.stream("orders");

    // Count orders per customer in 5-minute windows
    KTable<Windowed<String>, Long> orderCounts = orders
        .groupByKey()
        .windowedBy(TimeWindows.of(Duration.ofMinutes(5)))
        .count();

    orderCounts.toStream().to("order-statistics");
}

Stream joins for complex processing:

public void joinOrdersWithPayments(StreamsBuilder builder) {
    KStream<String, Order> orders = builder.stream("orders");
    KStream<String, Payment> payments = builder.stream("payments");

    // Join orders with payments within 10-minute window
    KStream<String, OrderPayment> enrichedOrders = orders.join(
        payments,
        (order, payment) -> new OrderPayment(order, payment),
        JoinWindows.of(Duration.ofMinutes(10))
    );

    enrichedOrders.to("enriched-orders");
}

Exactly-Once Semantics

Definition

Exactly-once semantics in Kafka ensures that each message is processed exactly once, even in the presence of failures, network issues, or application restarts. This is crucial for financial transactions, inventory management, and any scenario where duplicate processing could cause serious problems. Kafka achieves this through a combination of idempotent producers (preventing duplicate sends), transactional messaging (atomic writes across multiple partitions), and consumer offset management (tracking what's been processed). Exactly-once processing is more complex than at-least-once or at-most-once delivery but provides the strongest guarantees for critical business operations.

Analogy

Exactly-once semantics is like having a foolproof bank transfer system that ensures money is moved from one account to another exactly once, no matter what technical problems occur. When you initiate a transfer, the system assigns a unique transaction ID and keeps a detailed record of every step. If the network fails during the transfer, the system can check its records and continue from where it left off without starting over or creating a duplicate transaction. The system has multiple safeguards: it won't deduct money from your account twice (idempotent producer), it ensures that either both the deduction and deposit happen or neither does (transactional semantics), and it keeps perfect records of what's been completed (offset management). Even if the bank's computer crashes in the middle of the transfer, when it restarts, it can examine its transaction log and complete only what wasn't finished, never duplicating work that was already done.

Examples

Producer configuration for exactly-once:

@Bean
public ProducerFactory<String, String> exactlyOnceProducerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
    props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "payment-producer");
    props.put(ProducerConfig.ACKS_CONFIG, "all");
    return new DefaultKafkaProducerFactory<>(props);
}

Transactional message production:

@Service
@Transactional
public class PaymentService {

    @Autowired
    private KafkaTransactionManager transactionManager;

    @KafkaTransactional
    public void processPayment(PaymentRequest request) {
        // Database operation
        Payment payment = paymentRepository.save(new Payment(request));

        // Kafka message - both succeed or both fail
        kafkaTemplate.send("payment-completed", payment.getId(), payment);
        kafkaTemplate.send("audit-events", "payment-processed", payment.getId());
    }
}

Consumer configuration for exactly-once:

@Bean
public ConsumerFactory<String, String> exactlyOnceConsumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    return new DefaultKafkaConsumerFactory<>(props);
}

Manual offset management for exactly-once:

@KafkaListener(topics = "payment-events")
public void handlePayment(ConsumerRecord<String, String> record,
                         Acknowledgment acknowledgment) {
    try {
        // Process the payment
        paymentProcessor.process(record.value());

        // Only acknowledge after successful processing
        acknowledgment.acknowledge();
    } catch (Exception e) {
        logger.error("Payment processing failed", e);
        // Don't acknowledge - message will be redelivered
    }
}

Consumer Groups Advanced

Definition

Advanced consumer group management involves sophisticated strategies for scaling message consumption, handling rebalancing, managing consumer lag, and optimizing throughput across multiple consumer instances. Consumer groups automatically distribute partitions among group members, but understanding partition assignment strategies, rebalancing triggers, and lag monitoring is crucial for building resilient, high-performance systems. Advanced topics include sticky partition assignment for better performance, cooperative rebalancing to minimize downtime, consumer lag monitoring for capacity planning, and strategies for handling slow consumers that could impact the entire group's performance.

Analogy

Advanced consumer group management is like managing a large team of specialized workers in a busy sorting facility that processes packages arriving on multiple conveyor belts. The facility manager (Kafka coordinator) intelligently assigns each worker to specific conveyor belts based on their current workload and expertise. When new workers join the shift or others take breaks, the manager quickly redistributes the conveyor belt assignments to maintain optimal processing speed without stopping the entire operation. The manager continuously monitors how fast each worker is processing packages compared to the arrival rate, identifying bottlenecks and redistributing work accordingly. If one conveyor belt gets backed up because a worker is slower, the system can temporarily assign additional workers to that belt or implement strategies to prevent the backup from affecting other parts of the operation. The facility keeps detailed performance metrics, tracking processing rates, backlogs, and worker efficiency to optimize the entire operation.

Examples

Advanced consumer group configuration:

@Bean
public ConsumerFactory<String, String> advancedConsumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "order-processing-group");
    props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
              StickyAssignor.class.getName());
    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
    props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 3000);
    return new DefaultKafkaConsumerFactory<>(props);
}

Consumer with partition-specific processing:

@KafkaListener(topics = "orders", groupId = "order-processing-group")
public void processOrder(ConsumerRecord<String, Order> record,
                        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {

    logger.info("Processing order from partition {}: {}", partition, record.key());

    // Partition-specific processing logic
    if (partition % 2 == 0) {
        processHighPriorityOrder(record.value());
    } else {
        processStandardOrder(record.value());
    }
}

Consumer lag monitoring:

@Component
public class ConsumerLagMonitor {

    @Autowired
    private KafkaAdmin kafkaAdmin;

    @Scheduled(fixedRate = 30000)
    public void monitorConsumerLag() {
        AdminClient adminClient = AdminClient.create(kafkaAdmin.getConfigurationProperties());

        try {
            Map<TopicPartition, OffsetAndMetadata> offsets =
                adminClient.listConsumerGroupOffsets("order-processing-group")
                          .partitionsToOffsetAndMetadata().get();

            for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {
                long lag = calculateLag(entry.getKey(), entry.getValue());
                if (lag > 1000) {
                    logger.warn("High consumer lag detected: {} messages", lag);
                }
            }
        } catch (Exception e) {
            logger.error("Failed to monitor consumer lag", e);
        }
    }
}

Graceful consumer shutdown:

@Component
public class OrderConsumer {

    private volatile boolean running = true;

    @EventListener
    public void handleShutdown(ContextClosedEvent event) {
        running = false;
    }

    @KafkaListener(topics = "orders")
    public void processOrder(Order order) {
        if (!running) {
            logger.info("Shutdown in progress, skipping message processing");
            return;
        }

        orderService.processOrder(order);
    }
}

Dead Letter Queues

Definition

Dead Letter Queues (DLQ) are special topics used to store messages that cannot be processed successfully after multiple retry attempts, preventing poison messages from blocking normal message processing. When a consumer repeatedly fails to process a message due to data corruption, format issues, or business logic errors, the message is sent to a DLQ for manual investigation and handling. This pattern ensures that one problematic message doesn't stop the processing of other valid messages. DLQs typically include metadata about the original failure, retry count, and error details to help with debugging and eventual reprocessing once the issue is resolved.

Analogy

Dead Letter Queues are like the quality control reject bin in a manufacturing plant where defective items that can't be fixed on the production line are set aside for special handling. When an item comes down the assembly line and workers try multiple times to fix or process it but keep failing, instead of letting it jam up the entire production line, they place it in a special reject bin with a detailed tag explaining what went wrong, how many times they tried to fix it, and when it failed. The production line continues operating smoothly while quality control specialists later examine the rejected items to understand why they failed - whether it's a design flaw, corrupted materials, or a process issue. Once they identify and fix the root cause, they can often reprocess the rejected items successfully and update the production line to prevent similar failures in the future.

Examples

DLQ configuration with retry policy:

@Bean
public RetryTopicConfiguration retryTopicConfiguration() {
    return RetryTopicConfigurationBuilder
        .newInstance()
        .fixedBackOff(1000)
        .maxAttempts(3)
        .includeTopics("order-events")
        .dltStrategy(DltStrategy.FAIL_ON_ERROR)
        .build();
}

Consumer with DLQ handling:

@Component
public class OrderEventConsumer {

    @KafkaListener(topics = "order-events")
    public void handleOrderEvent(Order order) {
        try {
            orderService.processOrder(order);
        } catch (ValidationException e) {
            logger.error("Order validation failed: {}", order.getId(), e);
            throw e; // Will be retried and eventually sent to DLQ
        } catch (Exception e) {
            logger.error("Unexpected error processing order: {}", order.getId(), e);
            throw e;
        }
    }

    @DltHandler
    public void handleDltOrder(Order order, @Header KafkaHeaders.EXCEPTION_MESSAGE String error) {
        logger.error("Order sent to DLT: {} due to: {}", order.getId(), error);

        // Send alert or create ticket for manual review
        alertService.sendDltAlert(order, error);

        // Store in database for later analysis
        dltRepository.save(new DltRecord(order, error, Instant.now()));
    }
}

Manual DLQ message reprocessing:

@Service
public class DltReprocessingService {

    @KafkaListener(topics = "order-events.DLT")
    public void processDltMessages(Order order,
                                  @Header KafkaHeaders.ORIGINAL_TOPIC String originalTopic) {

        logger.info("Reprocessing DLT message from topic: {}", originalTopic);

        try {
            // Apply fixes or updated logic
            Order correctedOrder = orderCorrectionService.fix(order);

            // Send back to original topic for reprocessing
            kafkaTemplate.send(originalTopic, correctedOrder);

        } catch (Exception e) {
            logger.error("Failed to reprocess DLT message", e);
        }
    }
}

DLQ monitoring and alerting:

@Component
public class DltMonitor {

    @Scheduled(fixedRate = 60000)
    public void monitorDltQueues() {
        List<String> dltTopics = List.of("order-events.DLT", "payment-events.DLT");

        for (String topic : dltTopics) {
            long messageCount = getDltMessageCount(topic);
            if (messageCount > 10) {
                alertService.sendAlert("High DLT volume in " + topic + ": " + messageCount);
            }
        }
    }
}

Event Sourcing

Definition

Event sourcing is an architectural pattern where application state is determined by a sequence of events rather than storing current state directly. Instead of updating records in place, every change to application state is captured as an immutable event and stored in an event store (often Kafka). The current state is reconstructed by replaying all events from the beginning. This provides a complete audit trail, enables temporal queries (what was the state at any point in time), supports complex business workflows, and allows for easy debugging and testing. Event sourcing is particularly powerful when combined with CQRS (Command Query Responsibility Segregation) for separating read and write models.

Analogy

Event sourcing is like maintaining a complete, detailed diary of everything that happens in your life instead of just keeping a summary of your current situation. Rather than having a single document that says "John has $1000 in his bank account," event sourcing keeps a chronological record of every transaction: "John deposited $500 on Monday, spent $20 on coffee Tuesday, received $520 salary on Friday." To know John's current balance, you read through all the transactions and calculate the total. This approach means you can answer questions like "What was John's balance last Wednesday?" or "How much did John spend on coffee this month?" You can also detect patterns, audit every change, and if you discover an error in your calculation logic, you can replay all the events with the corrected logic to get the right current state. It's like having a perfect, immutable historical record that never loses information.

Examples

Event store implementation:

@Entity
public class EventStore {
    private String aggregateId;
    private String eventType;
    private String eventData;
    private LocalDateTime timestamp;
    private Long version;

    // Event sourcing ensures immutability
    // No update methods, only insert
}

Domain event definition:

public abstract class DomainEvent {
    private final String aggregateId;
    private final LocalDateTime occurredOn;

    public DomainEvent(String aggregateId) {
        this.aggregateId = aggregateId;
        this.occurredOn = LocalDateTime.now();
    }
}

public class AccountDebitedEvent extends DomainEvent {
    private final BigDecimal amount;
    private final String reason;

    public AccountDebitedEvent(String accountId, BigDecimal amount, String reason) {
        super(accountId);
        this.amount = amount;
        this.reason = reason;
    }
}

Aggregate with event sourcing:

public class Account {
    private String id;
    private BigDecimal balance;
    private List<DomainEvent> uncommittedEvents = new ArrayList<>();

    public void debit(BigDecimal amount, String reason) {
        if (balance.compareTo(amount) < 0) {
            throw new InsufficientFundsException();
        }

        AccountDebitedEvent event = new AccountDebitedEvent(id, amount, reason);
        apply(event);
        uncommittedEvents.add(event);
    }

    private void apply(AccountDebitedEvent event) {
        this.balance = this.balance.subtract(event.getAmount());
    }

    // Reconstruct state from events
    public static Account fromEvents(List<DomainEvent> events) {
        Account account = new Account();
        events.forEach(account::apply);
        return account;
    }
}

Event sourcing with Kafka:

@Service
public class AccountEventStore {

    @Autowired
    private KafkaTemplate<String, DomainEvent> kafkaTemplate;

    public void saveEvents(String aggregateId, List<DomainEvent> events) {
        for (DomainEvent event : events) {
            kafkaTemplate.send("account-events", aggregateId, event);
        }
    }

    @KafkaListener(topics = "account-events")
    public void handleAccountEvent(DomainEvent event) {
        // Update read model or projections
        projectionService.updateProjection(event);
    }
}

Saga Pattern

Definition

The Saga pattern manages distributed transactions across multiple microservices by breaking them into a series of local transactions, each with a corresponding compensating action for rollback. Since distributed transactions are complex and can hurt performance, sagas provide eventual consistency by coordinating multiple services through event-driven choreography or centralized orchestration. If any step fails, the saga executes compensating transactions to undo previous steps, ensuring data consistency across services. There are two main approaches: choreography (services coordinate through events) and orchestration (a central coordinator manages the workflow).

Analogy

The Saga pattern is like planning a complex international trip with multiple bookings where each step depends on the previous one, but you can't use a single payment that covers everything. You book flights, then hotels, then car rentals, then activities - each with a separate, non-refundable payment. However, you have a detailed plan: if the hotel booking fails, you know exactly how to cancel the flight and get credit; if the car rental falls through, you know how to cancel both the hotel and get flight credit; and so on. You either complete the entire trip successfully, or you systematically undo each completed booking in reverse order until you're back to where you started. You might coordinate this yourself by watching for confirmation emails and acting accordingly (choreography), or you might hire a travel agent who manages the entire process and handles cancellations if something goes wrong (orchestration).

Examples

Saga orchestrator pattern:

@Service
public class OrderSagaOrchestrator {

    @SagaOrchestrationStart
    public void processOrder(OrderCreatedEvent event) {
        // Step 1: Reserve inventory
        sagaManager.choreography()
            .step("reserve-inventory")
            .invokeParticipant("inventory-service", new ReserveInventoryCommand(event))
            .onRevert("release-inventory", new ReleaseInventoryCommand(event))

            // Step 2: Process payment
            .step("process-payment")
            .invokeParticipant("payment-service", new ProcessPaymentCommand(event))
            .onRevert("refund-payment", new RefundPaymentCommand(event))

            // Step 3: Ship order
            .step("ship-order")
            .invokeParticipant("shipping-service", new ShipOrderCommand(event))
            .onRevert("cancel-shipment", new CancelShipmentCommand(event));
    }
}

Saga participant implementation:

@Component
public class InventoryServiceParticipant {

    @SagaParticipant
    @KafkaListener(topics = "inventory-commands")
    public void handleReserveInventory(ReserveInventoryCommand command) {
        try {
            inventoryService.reserve(command.getProductId(), command.getQuantity());
            sagaManager.reportSuccess(command.getSagaId(), "inventory-reserved");
        } catch (InsufficientInventoryException e) {
            sagaManager.reportFailure(command.getSagaId(), "inventory-unavailable");
        }
    }

    @SagaCompensation
    @KafkaListener(topics = "inventory-compensations")
    public void handleReleaseInventory(ReleaseInventoryCommand command) {
        inventoryService.release(command.getProductId(), command.getQuantity());
        sagaManager.reportCompensationCompleted(command.getSagaId());
    }
}

Choreography-based saga:

// Order Service
@EventHandler
public void on(OrderCreatedEvent event) {
    eventPublisher.publish(new ReserveInventoryEvent(event.getOrderId()));
}

// Inventory Service
@EventHandler
public void on(ReserveInventoryEvent event) {
    try {
        inventoryService.reserve(event.getProductId());
        eventPublisher.publish(new InventoryReservedEvent(event.getOrderId()));
    } catch (Exception e) {
        eventPublisher.publish(new InventoryReservationFailedEvent(event.getOrderId()));
    }
}

// Payment Service
@EventHandler
public void on(InventoryReservedEvent event) {
    eventPublisher.publish(new ProcessPaymentEvent(event.getOrderId()));
}

Saga state management:

@Entity
public class SagaInstance {
    private String sagaId;
    private String sagaType;
    private SagaStatus status;
    private String currentStep;
    private Map<String, Object> sagaData;
    private List<CompensationAction> compensations;

    public void addCompensation(CompensationAction action) {
        compensations.add(0, action); // Add to beginning for reverse order
    }

    public void executeCompensations() {
        compensations.forEach(CompensationAction::execute);
    }
}

Outbox Pattern

Definition

The Outbox pattern ensures reliable publication of events by storing them in the same database transaction as the business data, then publishing them to the message broker in a separate process. This solves the dual-write problem where you need to update your database and send a message atomically - if either operation fails, both should be rolled back. The pattern works by storing events in an "outbox" table within the same database transaction as your business logic, then using a separate publisher process to read from the outbox and publish events to Kafka. This guarantees that events are published exactly once and only for committed database transactions.

Analogy

The Outbox pattern is like a reliable mail system in a busy office where important documents must be both filed in the office records and sent to external partners. Instead of trying to file the document and mail it simultaneously (which could fail if the postal service is down), the office uses a two-step process: first, they file the document in their records along with a note in their "outgoing mail" tray about what needs to be sent (all in one atomic filing action). Then, a dedicated mail clerk regularly checks the outgoing mail tray and sends out all the noted correspondence. If the filing fails, nothing goes in the outgoing mail tray. If the mailing fails, the clerk will try again later since the note remains in the tray. This ensures that every filed document results in exactly one external communication, and every external communication corresponds to a properly filed document, even if there are temporary postal service outages or filing system problems.

Examples

Outbox table definition:

@Entity
@Table(name = "outbox_events")
public class OutboxEvent {
    @Id
    private String id;
    private String aggregateId;
    private String eventType;
    private String eventData;
    private LocalDateTime createdAt;
    private boolean processed;

    // No-args constructor and getters
}

Service with outbox pattern:

@Service
@Transactional
public class OrderService {

    @Autowired
    private OrderRepository orderRepository;

    @Autowired
    private OutboxEventRepository outboxRepository;

    public Order createOrder(CreateOrderRequest request) {
        // Business logic - save order
        Order order = new Order(request);
        Order savedOrder = orderRepository.save(order);

        // Store event in outbox - same transaction
        OrderCreatedEvent event = new OrderCreatedEvent(savedOrder);
        OutboxEvent outboxEvent = new OutboxEvent(
            UUID.randomUUID().toString(),
            savedOrder.getId(),
            "OrderCreated",
            objectMapper.writeValueAsString(event)
        );

        outboxEventRepository.save(outboxEvent);

        return savedOrder;
        // Transaction commits - both order and outbox event are saved atomically
    }
}

Outbox event publisher:

@Component
public class OutboxEventPublisher {

    @Autowired
    private OutboxEventRepository outboxRepository;

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @Scheduled(fixedDelay = 5000)
    @Transactional
    public void publishOutboxEvents() {
        List<OutboxEvent> unpublishedEvents = outboxRepository.findByProcessedFalse();

        for (OutboxEvent event : unpublishedEvents) {
            try {
                // Publish to Kafka
                kafkaTemplate.send(getTopicName(event.getEventType()),
                                 event.getAggregateId(),
                                 event.getEventData());

                // Mark as processed
                event.setProcessed(true);
                outboxRepository.save(event);

            } catch (Exception e) {
                logger.error("Failed to publish outbox event: {}", event.getId(), e);
                // Will retry on next scheduled run
            }
        }
    }
}

Change Data Capture (CDC) approach:

// Alternative: Use Debezium or similar CDC tool
// to automatically publish database changes to Kafka

@Configuration
public class DebeziumConfig {

    @Bean
    public DebeziumEngine<RecordChangeEvent<SourceRecord>> debeziumEngine() {
        return DebeziumEngine.create(Connect.class)
            .using(getDebeziumProperties())
            .notifying(this::handleChangeEvent)
            .build();
    }

    private void handleChangeEvent(RecordChangeEvent<SourceRecord> event) {
        // Automatically publish database changes to Kafka
        if (isOutboxTable(event)) {
            publishEventToKafka(event);
        }
    }
}

Outbox cleanup:

@Component
public class OutboxCleaner {

    @Scheduled(cron = "0 0 2 * * ?") // Run at 2 AM daily
    @Transactional
    public void cleanupProcessedEvents() {
        LocalDateTime cutoff = LocalDateTime.now().minusDays(7);

        int deletedCount = outboxRepository.deleteByProcessedTrueAndCreatedAtBefore(cutoff);
        logger.info("Cleaned up {} processed outbox events", deletedCount);
    }
}

Monitoring Kafka

Definition

Monitoring Kafka involves tracking metrics across producers, consumers, brokers, and topics to ensure optimal performance, identify bottlenecks, and prevent issues before they impact applications. Key metrics include throughput (messages per second), latency (end-to-end message delivery time), consumer lag (how far behind consumers are), partition distribution, broker health, and error rates. Effective monitoring combines JMX metrics, application-level metrics, and log analysis to provide comprehensive visibility into the Kafka ecosystem. Tools like Prometheus, Grafana, and specialized Kafka monitoring solutions help visualize these metrics and alert on anomalies.

Analogy

Monitoring Kafka is like running a comprehensive health and performance monitoring system for a busy metropolitan postal service that handles millions of letters and packages daily. You track how many items are processed per hour at each sorting facility (throughput), how long it takes mail to travel from sender to recipient (latency), how backed up each postal route is (consumer lag), and whether delivery trucks are evenly distributed across districts (partition balance). You monitor the health of sorting machines (broker health), track error rates like lost or damaged packages, and watch for unusual patterns that might indicate problems. Your monitoring dashboard shows real-time statistics from every post office, alerts you when delivery times exceed acceptable limits, warns about equipment failures before they cause major disruptions, and helps you optimize routes and staffing. Just as the postal service needs this visibility to ensure reliable mail delivery, Kafka systems need comprehensive monitoring to ensure reliable message delivery at scale.

Examples

Producer metrics monitoring:

@Component
public class KafkaProducerMetrics {

    private final MeterRegistry meterRegistry;
    private final Counter successCounter;
    private final Counter errorCounter;
    private final Timer sendTimer;

    public KafkaProducerMetrics(MeterRegistry meterRegistry) {
        this.meterRegistry = meterRegistry;
        this.successCounter = Counter.builder("kafka.producer.success")
            .register(meterRegistry);
        this.errorCounter = Counter.builder("kafka.producer.error")
            .register(meterRegistry);
        this.sendTimer = Timer.builder("kafka.producer.send.duration")
            .register(meterRegistry);
    }

    public void recordSuccess() {
        successCounter.increment();
    }

    public void recordError() {
        errorCounter.increment();
    }
}

Consumer lag monitoring:

@Component
public class ConsumerLagMonitor {

    @Autowired
    private KafkaAdmin kafkaAdmin;

    @Gauge(name = "kafka.consumer.lag", description = "Consumer lag per partition")
    public double getConsumerLag(@GaugeParameter("topic") String topic,
                                @GaugeParameter("partition") int partition,
                                @GaugeParameter("group") String groupId) {

        try (AdminClient adminClient = AdminClient.create(kafkaAdmin.getConfigurationProperties())) {

            TopicPartition topicPartition = new TopicPartition(topic, partition);

            // Get current offset
            OffsetAndMetadata committed = adminClient
                .listConsumerGroupOffsets(groupId)
                .partitionsToOffsetAndMetadata()
                .get()
                .get(topicPartition);

            // Get latest offset
            Map<TopicPartition, OffsetSpec> latestOffsets =
                Map.of(topicPartition, OffsetSpec.latest());
            Long latestOffset = adminClient
                .listOffsets(latestOffsets)
                .partitionResult(topicPartition)
                .get()
                .offset();

            return latestOffset - committed.offset();

        } catch (Exception e) {
            logger.error("Failed to calculate consumer lag", e);
            return -1;
        }
    }
}

Health check endpoint:

@Component
public class KafkaHealthIndicator implements HealthIndicator {

    @Autowired
    private KafkaAdmin kafkaAdmin;

    @Override
    public Health health() {
        try (AdminClient adminClient = AdminClient.create(kafkaAdmin.getConfigurationProperties())) {

            // Check cluster health
            Collection<Node> nodes = adminClient.describeCluster().nodes().get();

            if (nodes.isEmpty()) {
                return Health.down()
                    .withDetail("error", "No Kafka brokers available")
                    .build();
            }

            return Health.up()
                .withDetail("brokers", nodes.size())
                .withDetail("cluster-id", adminClient.describeCluster().clusterId().get())
                .build();

        } catch (Exception e) {
            return Health.down()
                .withDetail("error", e.getMessage())
                .build();
        }
    }
}

Custom metrics dashboard configuration:

@Configuration
public class KafkaMetricsConfig {

    @Bean
    public MeterFilter kafkaMetricsFilter() {
        return MeterFilter.denyNameStartsWith("kafka.consumer.fetch");
    }

    @EventListener
    public void onProducerMetric(KafkaProducerMetricEvent event) {
        Metrics.gauge("kafka.producer.batch.size", event.getBatchSize());
        Metrics.gauge("kafka.producer.record.queue.time", event.getQueueTime());
    }

    @Scheduled(fixedRate = 30000)
    public void publishCustomMetrics() {
        // Publish business-specific metrics
        Metrics.gauge("kafka.topics.count", getTopicCount());
        Metrics.gauge("kafka.partitions.under.replicated", getUnderReplicatedPartitions());
    }
}

Summary

You've now mastered advanced Kafka patterns and techniques essential for building production-ready, event-driven systems. Kafka Streams enables real-time data processing and transformation with powerful windowing and join capabilities. Exactly-once semantics provides the strongest delivery guarantees for critical business operations through idempotent producers and transactional messaging. Advanced consumer group management techniques help you build scalable, resilient message processing systems with proper lag monitoring and rebalancing strategies. Dead letter queues ensure poison messages don't disrupt normal processing while providing mechanisms for error analysis and reprocessing. You understand sophisticated architectural patterns like event sourcing for complete audit trails, the saga pattern for managing distributed transactions, and the outbox pattern for reliable event publishing. Finally, comprehensive monitoring strategies help you maintain healthy Kafka deployments with proper visibility into performance and potential issues. These advanced concepts enable you to build enterprise-grade, event-driven architectures that can handle the complexity and scale requirements of modern distributed systems while maintaining reliability and consistency.

Programming Challenge

Challenge: Build an Advanced Event-Driven E-commerce Platform

Task: Create a comprehensive e-commerce system implementing advanced Kafka patterns and enterprise-grade event-driven architecture.

Core Services:

  1. Order Service: Manages orders with event sourcing and outbox pattern
  2. Inventory Service: Handles stock management with exactly-once processing
  3. Payment Service: Processes payments with saga pattern coordination
  4. Shipping Service: Manages shipments with dead letter queue handling
  5. Analytics Service: Real-time processing with Kafka Streams
  6. Notification Service: Handles alerts with consumer groups

Advanced Patterns to Implement:

  • Event Sourcing: Order aggregate with complete event history
  • Saga Pattern: Order fulfillment saga with compensation logic
  • Outbox Pattern: Reliable event publishing from order service
  • Exactly-Once Semantics: Payment processing with transactional guarantees
  • Dead Letter Queues: Error handling for all services
  • Stream Processing: Real-time analytics and fraud detection

Kafka Streams Features:

  • Real-time order statistics (count, total value) in sliding windows
  • Customer behavior analysis joining orders with user events
  • Fraud detection using pattern matching and thresholds
  • Inventory level monitoring with automated reorder triggers
  • Revenue dashboard with real-time aggregations

Monitoring and Observability:

  • Custom metrics for each service (throughput, latency, errors)
  • Consumer lag monitoring with alerting
  • Health check endpoints for all Kafka integrations
  • DLQ monitoring with automatic alert generation
  • Saga state tracking and visualization

Technical Requirements:

  • Exactly-once configuration for critical paths
  • Sticky partition assignment for optimal performance
  • Graceful shutdown handling for all consumers
  • Event schema evolution support
  • Comprehensive error handling and retry policies
  • Performance optimization (batching, compression)

Workflow Example:

  1. Customer places order → Order service creates order with event sourcing
  2. Order saga begins → Coordinates inventory, payment, and shipping
  3. Kafka Streams processes order event → Updates real-time analytics
  4. Fraud detection stream flags suspicious patterns → Triggers alerts
  5. If any saga step fails → Compensation actions restore consistency
  6. Success path → Customer receives notifications at each step
  7. Analytics dashboard shows real-time business metrics

Learning Goals: Master enterprise Kafka patterns, implement robust error handling strategies, understand event-driven architecture trade-offs, gain experience with production monitoring and observability, and build systems that handle real-world complexity and scale requirements.