ConsumerFactory and a KafkaListenerContainerFactory.@KafkaListener annotation.@EnableKafka annotation is required on the configuration class to enable the detection of @KafkaListener annotation on spring-managed beans:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapAddress);
props.put(
ConsumerConfig.GROUP_ID_CONFIG,
groupId);
props.put(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
props.put(
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
1
2
3
4
@KafkaListener(topics = "topicName", groupId = "foo")
public void listenGroupFoo(String message) {
System.out.println("Received Message in group foo: " + message);
}
1
@KafkaListener(topics = "topic1, topic2", groupId = "foo")
1
2
3
4
5
6
7
8
@KafkaListener(topics = "topicName")
public void listenWithHeaders(
@Payload String message,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
System.out.println(
"Received Message: " + message"
+ "from partition: " + partition);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
@KafkaListener(
topicPartitions = @TopicPartition(topic = "topicName",
partitionOffsets = {
@PartitionOffset(partition = "0", initialOffset = "0"),
@PartitionOffset(partition = "3", initialOffset = "0")}),
containerFactory = "partitionsKafkaListenerContainerFactory")
public void listenToPartition(
@Payload String message,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
System.out.println(
"Received Message: " + message"
+ "from partition: " + partition);
}
1
2
@KafkaListener(topicPartitions
= @TopicPartition(topic = "topicName", partitions = { "0", "1" }))
1
2
3
4
5
6
7
8
9
10
11
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String>
filterKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setRecordFilterStrategy(
record -> record.value().contains("World"));
return factory;
}
1
2
3
4
5
6
@KafkaListener(
topics = "topicName",
containerFactory = "filterKafkaListenerContainerFactory")
public void listenWithFilter(String message) {
System.out.println("Received Message in filtered listener: " + message);
}
ProducerFactory and a deserializer in ConsumerFactory.1
2
3
4
5
6
7
public class Greeting {
private String msg;
private String name;
// standard getters, setters and constructor
}
JsonSerializer.1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Bean
public ProducerFactory<String, Greeting> greetingProducerFactory() {
// ...
configProps.put(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, Greeting> greetingKafkaTemplate() {
return new KafkaTemplate<>(greetingProducerFactory());
}
kafkaTemplate.send(topicName, new Greeting("Hello", "World"));
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Bean
public ConsumerFactory<String, Greeting> greetingConsumerFactory() {
// ...
return new DefaultKafakConsumerFactory<>(
props,
new StringDeserializer(),
new JsonDeserializer<>(Greeting.class);
)
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Greeting> greetingKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Greeting> factory = new ConcurrentKafkaListenerContainerFactory<>();
facotry.serConsumerFactory(greetingConsumerFactory());
return factory;
}
1
2
3
4
5
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.9.7</version>
</dependency>
1
2
3
4
5
6
@KafkaListener(
topics = "topicName",
containerFactory = "greetingKafkaListenerContainerFactory")
public void greetingListener(Greeting greeting) {
// process greeting message
}
Farewell:Greeting adn Farewell objects to the same topic.1
2
3
4
public class Farewell {
private String message;
private Integer remainingMinutes;
}
1
configProps.put(JsonSerializer.TYPE_MAPPINGS, "greeting:com.baeldung.spring.kafka.Greeting, farewell:com.baeldung.spring.kafka.Farewell");
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Bean
public ProducerFactory<String, Object> multiTypeProducerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
configProps.put(JsonSerializer.TYPE_MAPPINGS,
"greeting:com.baeldung.spring.kafka.Greeting, farewell:com.baeldung.spring.kafka.Farewell");
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, Object> multiTypeKafkaTemplate() {
return new KafkaTemplate<>(multiTypeProducerFactory());
}
1
2
3
multiTypeKafkaTemplate.send(multiTypeTopicName, new Greeting("Greetings", "World!"));
multiTypeKafkaTemplate.send(multiTypeTopicName, new Farewell("Farewell", 25));
multiTypeKafkaTemplate.send(multiTypeTopicName, "Simple string message");
MessageConverter in the ConsumerJackson2JavaTypeMapper explicitly to use the type header to determine the target class for deserialization:1
typeMapper.setTypePrecedence(Jackson2JavaTypeMapper.TypePrecedence.TYPE_ID);
1
2
3
4
Map<String, Class<?>> mappings = new HashMap<>();
mappings.put("greeting", Greeting.class);
mappings.put("farewell", Farewell.class);
typeMapper.setIdClassMapping(mappings);
1
typeMapper.addTrustedPackages("com.baeldung.spring.kafka");
MessageConverter:
1
2
3
4
5
6
7
8
9
10
11
12
13
@Bean
public RecordMessageConverter multiTypeConverter() {
StringJsonMessageConverter converter = new StringJsonMessageConverter();
DefaultJackson2JavaTypeMapper typeMapper = new DefaultJackson2JavaTypeMapper();
typeMapper.setTypePrecedence(Jackson2JavaTypeMapper.TypePrecedence.TYPE_ID);
typeMapper.addTrustedPackages("com.baeldung.spring.kafka");
Map<String, Class<?>> mappings = new HashMap<>();
mappings.put("greeting", Greeting.class);
mappings.put("farewell", Farewell.class);
typeMapper.setIdClassMapping(mappings);
converter.setTypeMapper(typeMapper);
return converter;
}
need to tell our ConcurrentKafkaListenerContainerFactory to use the MessageConverter and a rather basic ConsumerFactory:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Bean
public ConsumerFactory<String, Object> multiTypeConsumerFactory() {
HashMap<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> multiTypeKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(multiTypeConsumerFactory());
factory.setMessageConverter(multiTypeConverter());
return factory;
}
@KafkaHandler in the Listener1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Component
@KafkaListener(id = "multiGroup", topics = "multitype")
public class MultiTypeKafkaListener {
@KafkaHandler
public void handleGreeting(Greeting greeting) {
System.out.println("Greeting received: " + greeting);
}
@KafkaHandler
public void handleF(Farewell farewell) {
System.out.println("Farewell received: " + farewell);
}
@KafkaHandler(isDefault = true)
public void unknown(Object object) {
System.out.println("Unkown type received: " + object);
}
}