Introduction
In a scenario where you want to develop an application that is supposed to be receiving, processing and persisting live data, apache kafka would be one of the solutions to help achieve that. If for example you want to receive truck information, like the location, the current fuel status, current mileage, in real time, you would have sensors from your trucks send that information to kafka. The data sent to kafka would then be consumed by services that require that information. The services that consume that data would be like the notifications service and analytics service. This data is consumed continuously, what is called event streaming. In this guide, we are going to look at kafka and spring boot in action.
Prerequisites
This guide assume that you have the following:
- Java 8 or above installed
- Kafka Cluster running
What we will learn
- What is apache kafka
- Creating a spring boot application
- Running our application
1. What is apache kafka
Apache kafka is an event streaming open-source distributed system. Events or messages are sent to kafka by producers. They are read by consumers. These events are written inside partitions of a topic. When a consumer is ready to read the events, it will read from a specific partition and topic. Broker is a server inside the kafka cluster that hosts the topics and acts as storage. The storage feature is what makes kafka retain messages for some time before the messages expire and get deleted.
Data can only be read once by a consumer, and this is made possible by use of offsets. Messages sent to a topic by producers should have the same schema and consumer clients should be configured to consume messages with that schema.
A.) Sending data to kafka cluster
With kafka we can have many producers, writing to a topic. In the above image, we have two producers writing to the “truck_info_topic” and “driver_info_topic” topics.
B.) Messages inside partitions
Messages are stored inside partitions. It is these partitions that makes our kafka cluster to be scalable. This is due to the fact that you can add as many partitions as you like for your topic. We add partitions depending on the number of consumers we plan to have. Having less partitions than the number of consumers will have the extra consumers sitting idle. Our messages get distributed among the partitions and each partition will have its own offset starting from zero. Offsets are used to indicate the last message that was processed by a consumer or consumer group.
C.) Consumer and consumer groups
Messages are read sequentially by consumers. In order to scale properly we would need to use consumer groups. In consumer groups, we specify the number of consumers we need reading from a topic. This is especially important for CPU intensive operations as different messages can be processed in parallel. Each consumer in a group is assigned a partition to read from.
When we have one consumer and one partition in a topic, we don’t need to create a consumer group.
2. Run our kafka cluster
In order to run our kafka cluster, we need to have the service for managing our clusters running. This service can either be zookeeper or kraft.
First download the latest kafka and extract the file. Open your terminal inside your extracted folder and start zookeeper with the following command
a.) Start zookeeper server
bin/zookeeper-server-start.sh config/zookeeper.properties
b.) Start the kafka broker
bin/kafka-server-start.sh config/server.properties
To use kraft as our cluster manager follow the below commands.
a.) We start by generating a cluster UUID
KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
b.) We then format our log directory
bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties
c.) Run our kafka cluster with kraft
bin/kafka-server-start.sh config/kraft/server.properties
3. Creating a spring boot application
Download a fresh spring-boot project using Spring Initializer. I will be using maven for demonstration in this guide. Include the following dependencies in your project
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.13.3</version>
</dependency>
<dependency>
<groupId>jakarta.json</groupId>
<artifactId>jakarta.json-api</artifactId>
<version>2.0.1</version>
</dependency>
In your application.properties file, include the following configuration
kafka.bootstrap-servers=http://localhost:9200
In our example where we expect truck information to be sent and received, we will have the following data transfer objects (dto). Create a TruckInformationRequest.java with the following code
@AllArgsConstructor
@NoArgsConstructor // without this, kafka will throw a InvalidDefinitionException when trying to deserialize json
@Data
@Builder
public class TruckInformationRequest {
private String truck_id;
private String truck_mileage;
private String truck_latitude;
private String truck_longitude;
private String truck_health;
}
We need to configure a topic and partitions for our truck information messages. Create a class called KafkaTopics.java that will be used to configure all our topics in the application. Include the following code that will create 3 partitions and only 3 replicas for our topic.
@Configuration
public class KafkaTopics {
@Bean
public NewTopic truckInfoTopic() {
return TopicBuilder.name("truck_info_topic").partitions(3).replicas(3).build();
}
}
With 3 replicas, our consumers will still receive messages even if two brokers are down. This will ensure high availability of our messages.
Let us now configure our producer client. Create a class named KafkaTruckInformationProducer.java and include the following code
@Configuration
public class KafkaTruckInformationProducer {
@Value("${kafka.bootstrap-servers}")
private String bootstrapServers;
private Map<String,Object> producerConfig() {
Map<String,Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServers);
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return config;
}
@Bean
public ProducerFactory<String, TruckInformationRequest> truckInfoProducerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfig());
}
@Bean
public KafkaTemplate<String, TruckInformationRequest> truckInfoTemplate(
ProducerFactory<String,TruckInformationRequest> producerFactory
) {
return new KafkaTemplate<>(producerFactory);
}
}
For our consumer client, create another class named KafkaTruckInformationConsumer.java and include the following code
@Configuration
public class KafkaTruckInformationConsumer {
@Value("${kafka.bootstrap-servers}")
private String bootstrapServers;
private Map<String, Object> consumerConfig() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
config.put(ConsumerConfig.GROUP_ID_CONFIG,"truck_info_consumer_group");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
return config;
}
@Bean
public ConsumerFactory<String, TruckInformationRequest> truckInfoConsumerFactory() {
return new DefaultKafkaConsumerFactory<>(
consumerConfig(),
new StringDeserializer(),
new JsonDeserializer<>(TruckInformationRequest.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String,TruckInformationRequest> truckInfoListener() {
ConcurrentKafkaListenerContainerFactory<String,TruckInformationRequest> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(truckInfoConsumerFactory());
return factory;
}
}
Sensors or computers from our trucks will need to send real time messages to our servers that will then queue those messages into our kafka topic. In that case, we will create our controller class named TruckInformationController.java and include the following
@RestController
@RequestMapping("/v1/delivery/trucks")
public class TruckInformationController {
private final KafkaTemplate kafkaTemplate;
public TruckInformationController(KafkaTemplate kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
@PostMapping
public ResponseEntity<String> sendTruckInfoToKafka(@RequestBody TruckInformationRequest request) {
// send our message to kafka truck info topic
kafkaTemplate.send("truck_info_topic",request);
return new ResponseEntity<>("Truck info sent to kafka", HttpStatus.OK);
}
At this point, our messages are available in our partitions for consumption. In order to consume events from our topic, we need to configure a kafka listener. Create a class called TruckInformationListener.java.
@Component
public class TruckInformationListener {
private final TruckDeliveryProcessingService service;
public TruckInformationListener(EsProfileService service) {
this.service = service;
}
@KafkaListener(
topics = "truck_info_topic",
groupId = "truck_info_consumer_group",
containerFactory = "truckInfoListener"
)
public void realtimeTruckInformation(TruckInformationRequest request) {
service.processTruckInformation(request);
}
}
Conclusion
In this tutorial, we have been able to send messages in realtime to our kafka topic and consume them. Your task is now to scale your application to have 3 consumers to match the number of partitions. Also create a topic, configure producers and consumer clients to send and receive driver information in real time.
To learn more about Apache Kafka, you can visit the official documentation.