Spring Cloud Stream and Apache Kafka


Hi, I am Malathi Boggavarapu working at Volvo Group and i live in Gothenburg, Sweden. I have been working on Java since several years and had vast experience and knowledge across various technologies.

Apache Kafka is a distributed streaming platform that provides scalable, high-throughput messaging systems in place of traditional messaging systems like JMS. It is developed by LinkedIn. I will come up with new post which will explain about Kafka architecture in detail as this post demonstrates only the simple application which uses Kafka as streaming platform. 

In this post we take a look at how to build a streaming application using Spring Boot, Spring Cloud Stream and Apache Kafka. Spring Cloud Stream is a framework built upon Spring Boot which is used to build message driven microservices.

I have also uploaded the exercise files to my GitHub. You can find them at

/SpringCloudKafka


So let's get started now.

First we need to download and install Apache Kafka. Download Kafka from here and untar it. After it is installed, go ahead and start zookeeper and Kafka server. Before starting Kafka server, it is necessary that ZooKeeper is Up and running. Follow the below steps to start both of them

- Open windows command prompt and navigate to Kafka installation root directory.
- Run the following to start zookeeper
  bin\windows\zookeeper-server-start.bat config\zookeeper.properties
- After zookeeper is started, run the following to start Kafka server
  bin\windows\kafka-server-start.bat config\server.properties

If Kafka fails to start for some reason, please delete kafka-logs located at tmp folder (located at the root of the drive in which Kafka in installed) and then start Kafka again.

After the servers are started, it's time to build a project. Go to https://start.spring.io to create a Maven Project.


Follow the below steps to generate the project.

- Add Group and ArtifactId.
- Add following dependencies under Dependencies section
  Cloud Stream
  Kafka
  DevTools
  Actuator
- Click on Generate Project button
- Extract the zip file and import the project into the IDE.


Define Kafka streams


Now it's time to define the Kafka Streams. Our application should be able to communicate with Kafka. So we need to define outbound stream to write messages to Kafka topic and inbound stream to read messages from kafka topic.  Spring Cloud provides an easy way to define them using an interface. See the below class that illustrate this. GreetingsStream is an interface that defines two methods inboundGreetings that reads messages from Kafka topic and outboundGreetings method that write messages to Kafka topic.

package com.example.kafkademo;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;

public interface GreetingsStream {
   String INPUT = "greetings-in";
    String OUTPUT = "greetings-out";
    
    @Input(INPUT)
    SubscribableChannel inboundGreetings ();
    
    @Output(OUTPUT)
    MessageChannel outboundGreetings();
}



Our next step is to configure Spring Cloud stream. We should bind Spring Cloud stream to our existing GreetingsStream interface. See below



package com.example.kafkademo;
import org.springframework.cloud.stream.annotation.EnableBinding;

@EnableBinding(GreetingsStream.class)
public class StreamsConfig {

}

We bind the streams using annotation @EnableBinding by passing GreetingsStream interface to it.

Configuration properties for Kafka

The configuration properties are stored in application.properties by default. It can also be an yaml file. 
i prefer to use yaml file. In the below file, greetings is the Kafka topic.

spring:
  cloud:
    stream:
      kafka:
        binder:
          brokers: localhost:9092
      bindings:
        greetings-in:
          destination: greetings
          contentType: application/json
        greetings-out:
          destination: greetings
          contentType: application/json

The next step is to create a simple POJO which represent a message object we read from and write 
to greetings kafka topic. Below is the Greetings POJO.


package com.example.kafkademo;

public class Greetings{
   private long timestamp;
       private String message;
    public long getTimestamp() {
      return timestamp;
   }
   public void setTimestamp(long timestamp) {
      this.timestamp = timestamp;
   }
   public String getMessage() {
      return message;
   }
   public void setMessage(String message) {
      this.message = message;
   }
}

Create a Service Layer to Write to Kafka

Now we write a Service class which writes Greetings object to greetings Kafka topic. In sendGreeting
method, we send the payload Greetings to the Kafka stream.
package com.example.kafkademo;

import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import org.springframework.util.MimeTypeUtils;

import lombok.extern.slf4j.Slf4j;

@Service
@Slf4j
public class GreetingsService {
   
   private final GreetingsStream greetingsStreams;
   
   public GreetingsService(GreetingsStream greetingsStreams) {
        this.greetingsStreams= greetingsStreams;
    }
    public void sendGreeting(final Greetings greetings) {
        //log.info("Sending greetings {}", greetings);        
MessageChannel messageChannel = greetingsStreams.outboundGreetings();
        messageChannel.send(MessageBuilder
                .withPayload(greetings)
                .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
                .build());
    }
}

Create a REST API

Now we create a REST API endpoint which is accessed by enduser. When the enduser submits a message,
the Greetings object (payload) will be created and will be sent to the Kafka stream.
See the below controller class that handles the user request.
package com.example.kafkademo;

import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseStatus;
import org.springframework.web.bind.annotation.RestController;

@RestControllerpublic class GreetingsController {
   private final GreetingsService greetingsService;
    public GreetingsController(GreetingsService greetingsService) {
        this.greetingsService = greetingsService;
    }
    @GetMapping("/greetings")
    @ResponseStatus(HttpStatus.ACCEPTED)
    public void greetings(@RequestParam("message") String message) {
        Greetings greetings = new Greetings();
        greetings.setMessage(message);
        greetingsService.sendGreeting(greetings);
    }
}

Listener Class

Now we need to create a listener class which listens to the messages on greetings kafka topic which is defined in application.yaml file. So go ahead and create a listener class which is called something like GreetingsListener.
package com.example.kafkademo;

import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

import lombok.extern.slf4j.Slf4j;

@Component@Slf4jpublic class GreetingsListener {
   @StreamListener(GreetingsStream.INPUT)
    public void handleGreetings(@Payload Greetings greetings) {
        log.info("Received greetings: {}", greetings);
    }
}

handleGreetings method will be invoked by Spring Cloud stream whenever a message object Greetings 
is added to the Kafka greetings topic. Spring Cloud Stream identifies the method using the annotation 
@StreamListener. As you see we are just logging the payload Greetings that is read from the Kafka topic.

Now it is the time to test the application. Open the Spring Boot main class which is created automatically 
when we generated project using Spring Initializer. See below.

package com.example.kafkademo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
 
@SpringBootApplication
public class StreamKafkaApplication { 
 public static void main(String[] args) { 
 SpringApplication.run(StreamKafkaApplication.class, args); 
 }
}

Right click on the class and select Run as -> Spring Boot App. The Spring Boot server will be started at port 8080. Make sure that Kafka and zookeeper servers are up and running.
Now go to the browser and access the url.
http://localhost:8080/greetings?message=hello
Check you console to see the message "hello" read from the kafka topic.
I hope this tutorial is helpful. 

Please post your comments below about your questions and feedback.

Comments

Post a Comment

Popular posts from this blog

Bash - Execute Pl/Sql script from Shell script

Gradle Fundamentals

Load Balancing using Spring Cloud Netflix Ribbon