WL
Java Full Stack Developer
Wassim Lagnaoui

Lesson 23: Spring Boot Async Programming and Messaging with Kafka Part 1

Master asynchronous programming in Spring Boot and get started with Apache Kafka: async processing, Kafka introduction, setup, and producing/consuming messages for scalable applications.

Introduction

Modern applications need to handle multiple tasks simultaneously without making users wait for slow operations to complete. Asynchronous programming allows your application to start a task and continue with other work while that task runs in the background, much like how you can start your washing machine and continue with other activities while it runs. Apache Kafka takes this concept further by enabling different parts of your application, or even different applications, to communicate through messages without being directly connected. This lesson introduces you to Spring Boot's asynchronous programming capabilities using @Async and CompletableFuture, then explores Apache Kafka as a powerful messaging system for building scalable, event-driven applications. You'll learn how to set up Kafka, produce messages to topics, and consume those messages in your Spring Boot applications. These skills are essential for building responsive applications that can handle high loads and complex workflows efficiently.


Async Programming Basics

Definition

Asynchronous programming allows your application to execute operations without blocking the main thread, enabling better resource utilization and improved user experience. Instead of waiting for a slow operation to complete before moving to the next task, async programming starts the operation and immediately returns control to the caller. The operation continues running in the background, and the result is made available when the operation completes. This approach is particularly valuable for I/O operations like database queries, web service calls, file operations, and email sending that typically involve waiting for external systems.

Analogy

Asynchronous programming is like being an efficient restaurant manager during a busy dinner rush. Instead of taking an order, walking to the kitchen, waiting for the food to be prepared, bringing it back to the table, and only then taking the next order (synchronous approach), a good manager takes the first order, gives it to the kitchen, immediately moves to take the second order while the first meal is being prepared, then the third order, and so on. The manager keeps track of all orders and delivers meals as they become ready, without ever standing idle waiting for the kitchen. This way, the restaurant serves many more customers in the same amount of time, and customers don't have to wait as long because multiple orders are being processed simultaneously. The manager (main thread) stays responsive and productive, while the kitchen staff (background threads) handle the time-consuming food preparation tasks.

Examples

Synchronous vs Asynchronous email sending:

// Synchronous - blocks until email is sent
public void registerUser(User user) {
    userRepository.save(user);
    emailService.sendWelcomeEmail(user); // Waits here
    logger.info("User registered: " + user.getEmail());
}
// Asynchronous - continues immediately
public void registerUser(User user) {
    userRepository.save(user);
    emailService.sendWelcomeEmailAsync(user); // Doesn't wait
    logger.info("User registered: " + user.getEmail());
}

Async method with @Async annotation:

@Service
public class EmailService {

    @Async
    public CompletableFuture sendWelcomeEmailAsync(User user) {
        // Simulating email sending delay
        Thread.sleep(2000);
        emailClient.send(user.getEmail(), "Welcome!");
        return CompletableFuture.completedFuture(null);
    }
}

Benefits of async processing:

// Multiple async operations can run simultaneously
CompletableFuture emailTask = emailService.sendEmail(user);
CompletableFuture smsTask = smsService.sendSms(user);
CompletableFuture auditTask = auditService.logRegistration(user);

// All three operations run concurrently
CompletableFuture.allOf(emailTask, smsTask, auditTask).join();

Spring @Async Annotation

Definition

The @Async annotation in Spring Boot enables methods to run asynchronously in separate threads, allowing the calling thread to continue execution without waiting for the method to complete. When you annotate a method with @Async, Spring creates a proxy that intercepts the method call and executes it in a background thread from a thread pool. The method can return void for fire-and-forget operations, or CompletableFuture for operations where you need to handle the result later. To use @Async, you must enable async processing with @EnableAsync in your configuration class.

Analogy

Using @Async is like having a team of assistants in an office who can handle tasks independently. When you (the main thread) need to send a document for review, instead of walking to the reviewer's office, waiting for them to read it, and bringing back their feedback (synchronous), you simply hand the document to one of your assistants (@Async method) and continue with your other work. The assistant takes the document to the reviewer, waits for the feedback, and either files it away (void return) or brings it back to your desk when you're ready to look at it (CompletableFuture return). You can give multiple documents to different assistants simultaneously, and each assistant works independently. The office manager (@EnableAsync) coordinates this system and ensures there are enough assistants available to handle the workload efficiently.

Examples

Enable async processing:

@Configuration
@EnableAsync
public class AsyncConfig {
    // Async processing is now enabled
}

Simple async method (fire-and-forget):

@Service
public class NotificationService {

    @Async
    public void sendNotification(String message, String recipient) {
        // This runs in a background thread
        logger.info("Sending notification to: " + recipient);
        notificationClient.send(message, recipient);
        logger.info("Notification sent successfully");
    }
}

Async method returning CompletableFuture:

@Service
public class UserService {

    @Async
    public CompletableFuture findUserAsync(Long userId) {
        User user = userRepository.findById(userId);
        return CompletableFuture.completedFuture(user);
    }
}

Using async methods in controller:

@RestController
public class UserController {

    @PostMapping("/users")
    public ResponseEntity createUser(@RequestBody User user) {
        User savedUser = userService.save(user);

        // These run asynchronously - doesn't block response
        notificationService.sendNotification("Welcome!", user.getEmail());
        auditService.logUserCreation(user.getId());

        return ResponseEntity.ok(savedUser);
    }
}

CompletableFuture

Definition

CompletableFuture is Java's powerful API for asynchronous programming that allows you to write non-blocking code, chain operations, combine multiple async operations, and handle results or exceptions when they become available. Unlike simple async methods that you fire and forget, CompletableFuture gives you control over the async operation's lifecycle. You can chain transformations, combine multiple futures, add callbacks for completion or error handling, and even set timeouts. CompletableFuture bridges the gap between starting an async operation and processing its result, making complex async workflows manageable and readable.

Analogy

CompletableFuture is like having a smart courier service with tracking and automated workflows. When you send a package (start an async operation), you get a tracking number (CompletableFuture) that you can use to monitor the delivery status. You can set up automated actions: when the package is delivered, automatically send a thank-you email to the recipient; if delivery fails, automatically schedule a retry; when multiple packages arrive at the same destination, combine them into one notification. You can also chain deliveries: when Package A is delivered, automatically send Package B, and when Package B arrives, send Package C. The courier service handles all the complexity of timing, routing, and coordination, while you just define what should happen when each step completes. You can even set up conditional logic: if the package arrives before noon, send it to the office; if it arrives later, send it to the home address.

Examples

Creating and completing a CompletableFuture:

// Create a future that completes immediately
CompletableFuture future = CompletableFuture.completedFuture("Hello");

// Create a future with async computation
CompletableFuture asyncFuture = CompletableFuture.supplyAsync(() -> {
    return "Computed result";
});

Chaining operations with thenApply:

CompletableFuture result = userService.findUserAsync(1L)
    .thenApply(user -> user.getEmail())
    .thenApply(email -> email.toUpperCase())
    .thenApply(email -> "Welcome " + email);

Combining multiple futures:

CompletableFuture userFuture = userService.findUserAsync(1L);
CompletableFuture> ordersFuture = orderService.findOrdersAsync(1L);

CompletableFuture combined = userFuture.thenCombine(ordersFuture,
    (user, orders) -> user.getName() + " has " + orders.size() + " orders");

Error handling with CompletableFuture:

CompletableFuture future = userService.findUserAsync(1L)
    .thenApply(user -> user.getEmail())
    .exceptionally(throwable -> {
        logger.error("Failed to get user email", throwable);
        return "default@example.com";
    });

Thread Pools

Definition

Thread pools manage a collection of reusable threads that execute async tasks, providing better resource management and performance control compared to creating new threads for each task. Spring Boot provides default thread pools for @Async methods, but you can customize them to optimize for your specific workload. Thread pool configuration includes core pool size (minimum threads always available), maximum pool size (maximum threads that can be created), queue capacity (how many tasks can wait), and keep-alive time (how long idle threads stay alive). Proper thread pool configuration prevents resource exhaustion and ensures optimal performance under different load conditions.

Analogy

A thread pool is like the staff management system at a busy restaurant. Instead of hiring a new waiter every time a customer arrives and firing them when the customer leaves (creating new threads), the restaurant maintains a core team of waiters who are always on duty (core pool size) and can hire additional temporary staff during peak hours up to a maximum capacity (maximum pool size). When it's busy, customer requests wait in a queue to be served by the next available waiter (task queue). During slow periods, some temporary staff might be sent home after a certain idle time (keep-alive time), but the core staff always remains. This system ensures consistent service, manages labor costs, and handles varying customer loads efficiently without the overhead of constantly hiring and training new staff.

Examples

Custom thread pool configuration:

@Configuration
@EnableAsync
public class AsyncConfig implements AsyncConfigurer {

    @Bean(name = "emailExecutor")
    public Executor emailExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(5);
        executor.setMaxPoolSize(10);
        executor.setQueueCapacity(100);
        executor.setThreadNamePrefix("email-");
        executor.initialize();
        return executor;
    }
}

Using specific thread pool:

@Service
public class EmailService {

    @Async("emailExecutor")
    public CompletableFuture sendEmailAsync(String to, String message) {
        // Uses the emailExecutor thread pool
        emailClient.send(to, message);
        return CompletableFuture.completedFuture(null);
    }
}

Multiple thread pools for different tasks:

@Bean(name = "reportExecutor")
public Executor reportExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(2);  // CPU-intensive tasks
    executor.setMaxPoolSize(4);
    executor.setQueueCapacity(50);
    return executor;
}

@Async("reportExecutor")
public CompletableFuture generateReportAsync(Long userId) {
    return CompletableFuture.completedFuture(reportService.generate(userId));
}

Thread pool monitoring:

@Component
public class ThreadPoolMonitor {

    @Autowired
    @Qualifier("emailExecutor")
    private ThreadPoolTaskExecutor emailExecutor;

    @Scheduled(fixedRate = 30000)
    public void monitorThreadPool() {
        logger.info("Email executor - Active: {}, Pool size: {}, Queue size: {}",
            emailExecutor.getActiveCount(),
            emailExecutor.getPoolSize(),
            emailExecutor.getQueueSize());
    }
}

Kafka Introduction

Definition

Apache Kafka is a distributed streaming platform that allows applications to publish and subscribe to streams of messages in a fault-tolerant, scalable way. Kafka organizes messages into topics (categories), which are split into partitions for parallel processing and replicated across multiple servers for reliability. Producers send messages to topics, while consumers read messages from topics, enabling loose coupling between different parts of your system. Kafka excels at handling high-throughput, real-time data streams and is commonly used for event sourcing, log aggregation, metrics collection, and building event-driven microservices architectures.

Analogy

Kafka is like a massive, organized newsstand system in a busy city that handles news distribution for the entire metropolitan area. The newsstand has different sections (topics) for different types of news: sports, business, local events, weather, and entertainment. Each section has multiple identical newsstands (partitions) distributed across the city for faster access and redundancy. Newspapers and magazines (producers) deliver their stories to the appropriate sections, while readers (consumers) can subscribe to specific sections they're interested in. The system keeps copies of each publication at multiple locations (replication) so that if one newsstand is damaged, the news is still available elsewhere. Readers can start reading from any point in the publication history, and new readers can catch up by reading older issues. The system can handle millions of readers and thousands of publications simultaneously, making sure everyone gets the news they want when they want it.

Examples

Kafka key concepts:

// Topic: A category of messages (like "user-events")
// Partition: A subdivision of a topic for parallel processing
// Producer: Application that sends messages to topics
// Consumer: Application that reads messages from topics
// Consumer Group: Multiple consumers working together

Common Kafka use cases:

// Event-driven architecture
userService.createUser(user);
// Publishes "user-created" event to Kafka

// Other services can react to this event:
// - Email service sends welcome email
// - Analytics service tracks user registration
// - Recommendation service prepares personalized content

Kafka benefits for microservices:

// Loose coupling - services don't need to know about each other
// Scalability - can handle millions of messages per second
// Reliability - messages are persisted and replicated
// Real-time processing - low latency message delivery
// Event sourcing - complete audit trail of all events

Kafka vs traditional messaging:

// Traditional: Direct service calls (tight coupling)
orderService.createOrder(order);
emailService.sendConfirmation(order); // Direct dependency

// Kafka: Event-driven (loose coupling)
orderService.createOrder(order);
// Publishes event, doesn't know who consumes it
kafkaTemplate.send("order-events", orderCreatedEvent);

Kafka Setup

Definition

Setting up Kafka with Spring Boot involves adding the necessary dependencies, configuring connection properties, and creating the required beans for producers and consumers. Spring Boot provides excellent integration with Kafka through Spring Kafka, which simplifies configuration and provides useful abstractions like KafkaTemplate for sending messages and @KafkaListener for consuming messages. The setup includes configuring bootstrap servers (Kafka broker addresses), serializers for converting objects to bytes, deserializers for converting bytes back to objects, and various other properties for security, error handling, and performance tuning.

Analogy

Setting up Kafka is like establishing a comprehensive postal system for a new city. First, you need to set up the post office infrastructure (Kafka brokers) with multiple locations throughout the city for reliability and accessibility. Then you configure the addressing system (topics and partitions) so mail can be properly sorted and delivered. You establish protocols for how letters should be packaged (serializers) and how they should be unpacked when delivered (deserializers). You set up mail trucks (producers) with routes to collect and deliver mail to the post offices, and mail carriers (consumers) who pick up mail from specific post office boxes and deliver it to the intended recipients. You also establish security procedures, backup systems, and quality standards to ensure mail is delivered reliably and securely throughout the city.

Examples

Maven dependency for Spring Kafka:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

Basic Kafka configuration:

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.group-id=my-app-group

Kafka configuration class:

@Configuration
@EnableKafka
public class KafkaConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(props);
    }
}

Docker setup for local development:

version: '3.8'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181

  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092

Producing Messages

Definition

Producing messages in Kafka involves sending data to specific topics using a KafkaTemplate, which is Spring's abstraction for interacting with Kafka producers. Producers can send messages synchronously (waiting for confirmation) or asynchronously (fire-and-forget or with callbacks). Messages consist of a key (optional, used for partitioning), a value (the actual data), and headers (metadata). Kafka producers automatically handle partitioning, batching, compression, and retry logic. You can send simple strings, JSON objects, or custom serialized data, and Spring Boot makes it easy to integrate message production into your business logic.

Analogy

Producing Kafka messages is like being a journalist who submits articles to different newspaper sections. When you finish writing an article (create a message), you decide which section it belongs to (choose a topic) - sports, business, or local news. You can either hand-deliver the article to the editor and wait for confirmation that it will be published (synchronous send), or drop it in the editor's inbox and continue with your next article (asynchronous send). Some articles are time-sensitive breaking news that must be published immediately (priority messages), while others are feature stories that can wait. The newspaper's publishing system (Kafka) automatically distributes your article to the appropriate printing facilities (partitions) based on the topic and ensures it reaches all subscribers who are interested in that section. You can also include notes for the editor (headers) with special instructions about how the article should be handled.

Examples

Simple message production:

@Service
public class EventPublisher {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void publishUserEvent(String userId, String event) {
        kafkaTemplate.send("user-events", userId, event);
    }
}

Async message production with callback:

public void publishOrderEvent(Order order) {
    String orderJson = objectMapper.writeValueAsString(order);

    kafkaTemplate.send("order-events", order.getId().toString(), orderJson)
        .addCallback(
            result -> logger.info("Order event sent successfully: {}", order.getId()),
            failure -> logger.error("Failed to send order event: {}", order.getId(), failure)
        );
}

Publishing domain events from service:

@Service
public class UserService {

    @Autowired
    private EventPublisher eventPublisher;

    public User createUser(User user) {
        User savedUser = userRepository.save(user);

        // Publish event for other services to react
        UserCreatedEvent event = new UserCreatedEvent(
            savedUser.getId(),
            savedUser.getEmail(),
            Instant.now()
        );

        eventPublisher.publishUserCreated(event);
        return savedUser;
    }
}

Transactional message production:

@Transactional
public void processOrderWithEvents(Order order) {
    // Save to database
    orderRepository.save(order);

    // Send event - only if database transaction succeeds
    kafkaTemplate.send("order-events", orderCreatedEvent);

    // If any exception occurs, both database and Kafka operations rollback
}

Consuming Messages

Definition

Consuming messages in Kafka involves reading data from topics using consumers that can process messages individually or in batches. Spring Boot simplifies consumption with the @KafkaListener annotation, which automatically handles the complexities of consumer group management, offset tracking, and error handling. Consumers can be part of consumer groups where multiple instances share the workload by processing different partitions. Key concepts include offset management (tracking which messages have been processed), consumer group coordination (distributing partitions among group members), and error handling strategies for dealing with processing failures. Consumers can process messages synchronously or asynchronously depending on your use case.

Analogy

Consuming Kafka messages is like having a team of specialized librarians who monitor different sections of a massive library and process new books as they arrive. Each librarian (consumer) is responsible for monitoring specific shelves (partitions) within their assigned sections (topics). When new books arrive, the librarians follow their specific procedures: the history librarian catalogs historical books, the science librarian processes scientific publications, and the fiction librarian handles novels. If there are multiple librarians working on the same section (consumer group), they divide the shelves among themselves so no book is processed twice. Each librarian keeps a bookmark (offset) to track exactly where they left off, so if they take a break or if a new librarian joins the team, work can continue seamlessly from the right position. If a librarian encounters a damaged book (processing error), they follow established procedures to either repair it, set it aside for special handling, or escalate to a supervisor.

Examples

Simple message consumer:

@Component
public class UserEventConsumer {

    @KafkaListener(topics = "user-events", groupId = "user-service-group")
    public void handleUserEvent(String message) {
        logger.info("Received user event: {}", message);
        // Process the event
        processUserEvent(message);
    }
}

Consumer with key and value:

@KafkaListener(topics = "order-events", groupId = "notification-service")
public void handleOrderEvent(
    @Payload String orderData,
    @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String orderId) {

    logger.info("Processing order {} with data: {}", orderId, orderData);

    Order order = objectMapper.readValue(orderData, Order.class);
    emailService.sendOrderConfirmation(order);
}

Batch message consumption:

@KafkaListener(topics = "analytics-events",
               groupId = "analytics-batch-processor",
               containerFactory = "batchListenerContainerFactory")
public void handleAnalyticsBatch(List<String> events) {
    logger.info("Processing batch of {} analytics events", events.size());

    // Process events in batch for better performance
    analyticsService.processBatch(events);
}

Error handling in consumers:

@KafkaListener(topics = "payment-events", groupId = "payment-processor")
public void handlePaymentEvent(String paymentData) {
    try {
        PaymentEvent event = objectMapper.readValue(paymentData, PaymentEvent.class);
        paymentService.processPayment(event);
    } catch (Exception e) {
        logger.error("Failed to process payment event: {}", paymentData, e);
        // Send to dead letter topic or retry queue
        kafkaTemplate.send("payment-events-dlq", paymentData);
    }
}

Summary

You've now learned the fundamentals of asynchronous programming in Spring Boot and messaging with Apache Kafka. Async programming with @Async and CompletableFuture allows your applications to handle multiple tasks simultaneously without blocking, improving performance and user experience. Thread pools provide efficient resource management for concurrent operations. Apache Kafka enables building scalable, event-driven architectures where services communicate through messages rather than direct calls, promoting loose coupling and resilience. You understand how to set up Kafka, produce messages to topics, and consume messages using Spring Boot's excellent integration support. These skills form the foundation for building reactive, scalable applications that can handle high loads and complex workflows efficiently. In the next lesson, you'll explore advanced Kafka patterns including stream processing, exactly-once semantics, and building robust event-driven systems that can handle real-world complexity.

Programming Challenge

Challenge: Build an Async E-commerce Order Processing System

Task: Create an order processing system that demonstrates async programming and Kafka messaging patterns.

Requirements:

  1. Order Service: Creates orders and publishes events to Kafka
  2. Inventory Service: Consumes order events and checks/reserves inventory
  3. Payment Service: Processes payments asynchronously
  4. Email Service: Sends confirmation emails using @Async
  5. Analytics Service: Consumes events for reporting (batch processing)

Async features to implement:

  • Use @Async for email sending and audit logging
  • Implement CompletableFuture for payment processing
  • Configure custom thread pools for different tasks
  • Create async methods that return CompletableFuture
  • Chain multiple async operations together

Kafka integration:

  • Set up Kafka with Docker Compose
  • Create topics: order-events, inventory-events, payment-events
  • Implement producers using KafkaTemplate
  • Create consumers with @KafkaListener
  • Handle both individual and batch message processing
  • Add error handling and dead letter topics

Workflow:

  1. User creates order → Order Service saves and publishes order-created event
  2. Inventory Service consumes event → checks stock → publishes inventory-reserved event
  3. Payment Service consumes event → processes payment asynchronously → publishes payment-completed event
  4. Email Service consumes payment event → sends confirmation email asynchronously
  5. Analytics Service consumes all events → processes in batches for reporting

Learning Goals: Practice async programming patterns, understand Kafka producer/consumer implementation, learn event-driven architecture design, and gain experience with Spring Boot's async and messaging capabilities.