При попытке считать сообщение из kafka получаю странную ошибку:
1 2 3 4 |
[kafka.request.logger] [] - Completed request:Name: FetchRequest; Version: 3; CorrelationId: 40488; ClientId: consumer-1; ReplicaId: -1; MaxWait: 500 ms; MinBytes: 1 bytes; MaxBytes:52428800 bytes from connection 127.0.0.1:53434-127.0.0.1:50755;totalTime:0,requestQueueTime:0,localTime:0,remoteTime:0,responseQueueTime:0,sendTime:0,securityProtocol:PLAINTEXT,principal:User:ANONYMOUS [org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer] [] - Container exception org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition shiftServiceInQueue-0 at offset 0 Caused by: org.apache.kafka.common.errors.SerializationException: Size of data received by IntegerDeserializer is not 4 |
В конце концов оказалось, что проблема была в следующем коде:
1 2 3 4 5 6 7 8 9 10 11 |
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testT", "false", embeddedKafka); DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<Integer, String>(consumerProps);//, //new StringDeserializer(), new StringDeserializer()); ContainerProperties containerProperties = new ContainerProperties(additionalTopics); Arrays.stream(additionalTopics).forEach(topic -> records.put(topic, new LinkedBlockingQueue<>())); KafkaMessageListenerContainer<Integer, String> container = new KafkaMessageListenerContainer<Integer, String>(cf, containerProperties); |
Чтобы всё заработало, нужно просто поправить тип ключа с Integer на String:
1 2 3 4 5 6 7 8 9 10 11 |
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testT", "false", embeddedKafka); DefaultKafkaConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<String, String>(consumerProps, new StringDeserializer(), new StringDeserializer()); ContainerProperties containerProperties = new ContainerProperties(additionalTopics); Arrays.stream(additionalTopics).forEach(topic -> records.put(topic, new LinkedBlockingQueue<>())); KafkaMessageListenerContainer<String, String> container = new KafkaMessageListenerContainer<String, String>(cf, containerProperties); |
То есть у KafkaConsumerFactory просто указываем строковые ключи и StringDeserializer-ы.