Codersee
Kotlin on the backend
Codersee
Kotlin on the backend
In this step-by-step guide, I will show you how to create a Kafka Producer and Consumer with Spring Boot and Kotlin.
In this step-by-step guide, I will show you how to use Apache Kafka with Spring Boot and Kotlin.
When you finish this article, you will know precisely:
Please keep in mind, that you should have Apache Kafka up and running on your local machine. If you would like to learn how to set it up, then check out my other articles under the Kafka tag.
Let’s start with creating a new Spring Boot project. As always, I highly encourage you to use the Spring Initializr page, when creating a new one:
To be on the same page, I’ve selected Spring Boot version 2.7.4 with Jar packaging and Java 17. Additionally, we will need two more dependencies:
[elementor-template id=”9007393″]
As the first example, let’s take a look at how to publish and consume simple String messages.
After we import our Kafka Spring Boot Kotlin project into our favorite IDE (like IntelliJ, for instance), let’s navigate to the application.yaml file and put the following:
spring: kafka: consumer: bootstrap-servers: localhost:8098 auto-offset-reset: earliest key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer producer: bootstrap-servers: localhost:8098 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer
As we can see, these settings are responsible for setting both Kafka consumer and producer:
As the next step, let’s create Constants.kt file inside the config package:
const val EXAMPLE_TOPIC_NAME = "someTopicOne" const val GROUP_ID = "groupId"
As we can see, we put two String constants here with the example topic name and group ID.
Although this is not necessary, it will help us maintain our code a bit cleaner.
Nextly, let’s implement an ExampleStringProducer class inside the producer package:
@Component class ExampleStringProducer( private val kafkaTemplate: KafkaTemplate<String, String> ) { fun sendStringMessage(message: String) { kafkaTemplate.send(EXAMPLE_TOPIC_NAME, message) } }
As can be seen, in order to send messages we have to inject a KafkaTemplate class. This generic class exposes methods for executing high-level operations and as type parameters (<String, String> in our case), we have to specify the desired key and the value types.
When it comes to the sendStringMessage function, it simply takes a String argument called message and sends it to the topic- called someTopicOne in our case.
With that being done, let’s implement a consumer called ExampleConsumer:
@Component class ExampleConsumer { private val logger = LoggerFactory.getLogger(this.javaClass) @KafkaListener(topics = [EXAMPLE_TOPIC_NAME], groupId = GROUP_ID) fun firstListener(message: String) { logger.info("Message received: [$message]") } }
This time, in order to make our firstListener
function a target of a Kafka message listener on the specified topics, we have to mark it with the @KafkaListener annotation. Moreover, if we would like to handle multiple topics with this function, we could simply specify their names in the topics array.
It’s worth mentioning that with the groupId
, we override the group ID property for the consumer factory, but for this particular listener only.
And to summarize this code snippet- the incoming message will be simply logged into the output.
Finally, let’s expose a test REST endpoint, which we will use to send messages:
@RestController class ExampleController( private val exampleStringProducer: ExampleStringProducer ) { @PostMapping("/test") @ResponseStatus(HttpStatus.NO_CONTENT) fun sendTestMessage( @RequestBody requestBody: RequestBodyDto ) { exampleStringProducer.sendStringMessage( message = requestBody.message ) } data class RequestBodyDto(val message: String) }
As we can see, this handler will be responding to the POST requests to the /test endpoint.
As a request body, we will send the message, which will be then passed to the sendStringMessage function and if everything works fine, we should get a 204 No Content response.
In order to test our functionality, let’s run a couple of POST requests:
curl --location --request POST 'localhost:8080/test' \ --header 'Content-Type: application/json' \ --data-raw '{     "message": "SomeMessage" }'
As a result, we should see our messages printed to the output, like these:
Message received: [Some Example Message] Message received: [Another one]
With all of that being done, we know precisely how to send and retrieve String messages.
So as the next thing, let’s learn how to publish and consume JSON Kafka messages in our Spring Boot Kotlin project.
This time, let’s start a bit differently (and you will see why later) , by creating DTOs- ExampleDto and UserDto inside the dto package:
// ExampleDto.kt file: data class ExampleDto(val someMessage: String) // UserDto.kt file: data class UserDto(val id: Long, val name: String)
We will use them later to transport our messages.
As the next step, let’s introduce two additional topics names to the Constants.kt:
const val EXAMPLE_TOPIC_NAME_TWO = "someTopicTwo" const val EXAMPLE_TOPIC_NAME_THREE = "someTopicThree"
Nextly, let’s create an ExampleJsonProducer class:
@Component class ExampleJsonProducer( private val exampleDtoKafkaTemplate: KafkaTemplate<String, ExampleDto>, private val userDtoKafkaTemplate: KafkaTemplate<String, UserDto> ) { fun sendExampleDtoMessage(dto: ExampleDto) { exampleDtoKafkaTemplate.send(EXAMPLE_TOPIC_NAME_TWO, dto) } fun sendUserDtoMessage(dto: UserDto) { userDtoKafkaTemplate.send(EXAMPLE_TOPIC_NAME_THREE, dto) } }
This time, we inject two different KafkaTemplates– each one is responsible for handling specific DTO objects. Apart from that, everything looks almost the same as in paragraph 3- with different constants used.
Following, let’s add two more listeners to the ExampleConsumer class:
@KafkaListener(topics = [EXAMPLE_TOPIC_NAME_TWO], groupId = GROUP_ID) fun secondListener(message: ExampleDto) { logger.info("Message received: [$message]") } @KafkaListener(topics = [EXAMPLE_TOPIC_NAME_THREE], groupId = GROUP_ID) fun secondListener(message: UserDto) { logger.info("Message received: [$message]") }
As we can clearly see, nothing changed except the topics’ names.
One of the last things we have to do are two more updates inside the ExampleController.
As the first one, we have to inject our new producer:
@RestController class ExampleController( private val exampleStringProducer: ExampleStringProducer, private val exampleJsonProducer: ExampleJsonProducer )
And add two more invocations inside the sendTestMessage function:
exampleJsonProducer.sendExampleDtoMessage( dto = ExampleDto(requestBody.message) ) exampleJsonProducer.sendUserDtoMessage( dto = UserDto( id = Random.nextLong(0, 100), name = requestBody.message ) )
As we can see, when we query our endpoint with some payload, we expect that the provided message will be sent to our new topics. Moreover, in the case of UserDto, the id will be a randomly generated Long value.
As the last thing before testing, let’s edit the application.yaml file a bit:
spring: kafka: consumer: bootstrap-servers: localhost:8098 auto-offset-reset: earliest key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer producer: bootstrap-servers: localhost:8098 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
The only values we want to change here are value-deserializer for our consumer and value-serializer for our producer. In order to work with JSON payloads, we have to utilize the JsonDeserializer and JsonSerializer here.
The title is a bit of a spoiler here (and the main reason we didn’t start paragraph 4 with a YAML file), but let’s run the application and query our endpoint:
curl --location --request POST 'localhost:8080/test' \ --header 'Content-Type: application/json' \ --data-raw '{ "message": "SomeMessage" }'
After that, when we check the logs, we will see an infinite loop with a thrown exception.
If we stop the application (or you are a robot :D), we will see the following message (among others):
This error handler cannot process ‘SerializationException’s directly; please consider configuring an ‘ErrorHandlingDeserializer’ in the value and/or key deserializer
Well, this message is pretty descriptive so when we apply this hint, the application.yaml file looks like this:
spring: kafka: consumer: bootstrap-servers: localhost:8098 auto-offset-reset: earliest key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer properties: spring: deserializer: value: delegate: class: org.springframework.kafka.support.serializer.JsonDeserializer producer: bootstrap-servers: localhost:8098 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
Basically, as a fix, we have to utilize the ErrorHandlingDeserializer class, which lets Spring resolve the deserialization issue the consumer’s poll() method resolves.
To do so with a YAML config, we have to:
And again, spoiler alert 😀
Let’s rerun the POST request and see what happens:
curl --location --request POST 'localhost:8080/test' \ --header 'Content-Type: application/json' \ --data-raw '{ "message": "SomeMessage" }'
As we can clearly see, we got rid of the infinite loop, but still, we can’t read messages.
And this time message is pretty descriptive, as well:
The class ‘com.codersee.kafkaexample.dto.ExampleDto’ is not in the trusted packages: [java.util, java.lang]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).
Similarly, to fix this issue we have to add new property- properties.spring.json.trusted.packages– and either point to the package, where our DTOs live or set a wildcard.
In my case, the final application.yaml file looks, as follows:
spring: kafka: consumer: bootstrap-servers: localhost:8098 auto-offset-reset: earliest key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer properties: spring: json: trusted: packages: "com.codersee.kafkaexample.dto" deserializer: value: delegate: class: org.springframework.kafka.support.serializer.JsonDeserializer producer: bootstrap-servers: localhost:8098 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
Finally, we can rerun our beloved Spring Boot Kafka with Kotlin application and try the request once again:
curl --location --request POST 'localhost:8080/test' \ --header 'Content-Type: application/json' \ --data-raw '{ "message": "SomeMessage" }'
As a result, we should see the following:
Message received: [SomeMessage] Message received: [ExampleDto(someMessage=SomeMessage)] Message received: [UserDto(id=26, name=SomeMessage)]
As we can see, all 3 Apache Kafka producers and consumers are working, as expected (and if no, then let me know in the comment section).
And that would be all for this article about how to work with Apache Kafka With Spring Boot and Kotlin. If you’d like to see the whole project source code, then visit this GitHub repository.
I really hope that you enjoyed this step-by-step guide and will be delighted if you would like to share your feedback with me and the others in the comments section below.
Take care and have a great day! 😀
Hey Piotr,
Thanks for this really great step by step tutorial. I just want to notice that the yaml file for “Fix an infinite-loop problem” is not correct which “spring” is missing. I have also checked the github repo and it’s ok there but only in the screenshot in the 4.7 an infinite-loop problem.
Hello! 🙂
Thank you for such kind words!
You’re totally right, and thank you for letting me know. Consider it done.
Hi Piotr!
Thanks for this really great tutorial! I liked it very much, these sort of step by step examples are useful for a lot of us, who tries to learn Kotlin and apply modern tech in our projects.
I am not sure, if you can do tutorials by request. But lots of us could really use a step by step guide how to use “avro4k” with Kotlin, Spring Boot, and Kafka. There are a few of around trying to explain how to use Avro and Kafka with Spring Boot. But all of them (at least what I found) wanted me to generate my value objects from schemas. Well, I don’t want that. It doesn’t feel it right to me. That’s when “avro4k” came across. But I couldn’t really get it working. I know, that’s my bad, and maybe this is why, I got really disappointed first. And then angry about it, and then I got so pissed off, that I’d just deleted even the repo in which I stored my example code… I know, it wasn’t a smart decision, but your example worked perfectly with JSON and that was good enough for me at that time.
But you know, the voices just keep telling me that, it would be better to use Avro. And then I was thinking about you have far more greater experience with Kotlin and SpringBoot then me and my likeminded friends, so probably it might not be too complicated for you. So, if you can put together a similar step by step guide for sending two type of messages to different Kafka topics using “avro4k” for serialisation, and the consume both of them in the same consumer application, using Spring Boot, that would be much appreciated!
Regards,
Mike