На самом деле существует проект EmbeddedKafka, но мы пойдём другим путём. Представьте, что вы подключили kafka-maven-plugin, ну или каким-нибудь другим способом запускаете Kafka для интеграционных тестов.
Скорее всего, у вас возникнет проблема с тем, что сообщения из kafka могут не приходить или приходить с опозданием.
Такая проблема возникает из-за того, что когда вы внутри интеграционного теста в метода с пометкой @Before подключаете Consumer к какому-нибудь топику Kafka, то это запускает процесс переназначения партиций и consumer-ов для подключения нового consumer-а. И новый consumer не будет считывать из очереди сообщения до окончания этого процесса, который, кстати, происходит асинхронно.
Для того чтобы дождаться окончания переназначения партиций и consumer-ов нужно использовать метод:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 |
private static final String[] topics = {"topic1", "mytopic2"}; @Before public void createKafkaConsumer() { kafkaConsumer = new KafkaConsumer<>(createKafkaConsumerProperties()); class CountAssignedConsumerRebalanceListener implements ConsumerRebalanceListener { class RebalanceInfo { private int partitionsAssigned; private int partitionsCount; public int getPartitionsAssigned() { return partitionsAssigned; } public void setPartitionsAssigned(int partitionsAssigned) { this.partitionsAssigned = partitionsAssigned; } public int getPartitionsCount() { return partitionsCount; } public void setPartitionsCount(int partitionsCount) { this.partitionsCount = partitionsCount; } public void decrementPartitionsAssigned() { partitionsAssigned--; } public void incrementPartitionsAssigned() { partitionsAssigned++; } } private Map<String, RebalanceInfo> assignedPartitions = new HashMap<>(); CountAssignedConsumerRebalanceListener() { Arrays.stream(topics).forEach(topic ->{ RebalanceInfo rebalanceInfo = new RebalanceInfo(); rebalanceInfo.setPartitionsCount(kafkaConsumer.partitionsFor(topic).size()); logger.debug("Topic {} partitions count {}.", topic, rebalanceInfo.getPartitionsCount()); assignedPartitions.put(topic, rebalanceInfo); }); } @Override public void onPartitionsRevoked(Collection<TopicPartition> partitions) { partitions.stream().map(TopicPartition::topic) .map(assignedPartitions::get) .forEach(RebalanceInfo::decrementPartitionsAssigned); } @Override public void onPartitionsAssigned(Collection<TopicPartition> partitions) { partitions.stream().map(TopicPartition::topic) .map(assignedPartitions::get) .forEach(RebalanceInfo::incrementPartitionsAssigned); } public boolean checkAllPartitionsAssigned() { return assignedPartitions.values().stream() .allMatch(rebalanceInfo -> rebalanceInfo.getPartitionsAssigned() == rebalanceInfo.getPartitionsCount()); } }; CountAssignedConsumerRebalanceListener rebalanceListener = new CountAssignedConsumerRebalanceListener(); kafkaConsumer.subscribe(Arrays.asList(topics), rebalanceListener); // We need to poll to join the consumer group and wait for partition assignment. long assignedPartitionsCheckStartTime = System.currentTimeMillis(); while (rebalanceListener.checkAllPartitionsAssigned()) { kafkaConsumer.poll(kafkaPollTimeout); if (System.currentTimeMillis() < assignedPartitionsCheckStartTime + kafkaInitTimeout) { throw new IllegalStateException("Failed to wait for partitions assignment."); } } } /** * Используйте этот метод внутри ваших тестов для считывания * записи из Kafka */ public ConsumerRecord<String, String> readFromKafka(String topic) { Queue<ConsumerRecord<String, String>> topicRecords = kafkaRecords.get(topic); ConsumerRecord<String, String> result = topicRecords.poll(); if (result == null) { long startTime = System.currentTimeMillis(); while ((result == null) && (System.currentTimeMillis() - startTime < kafkaWaitTimeout)) { kafkaConsumer.poll(kafkaPollTimeout).forEach(consumerRecord -> kafkaRecords.get(consumerRecord.topic()).offer(consumerRecord)); result = topicRecords.poll(); } } return result; } @After public void closeKafkaConsumer() { if (kafkaConsumer == null) return; try { kafkaConsumer.poll(kafkaPollTimeout); kafkaRecords.entrySet().stream().forEach(entry -> entry.getValue().clear()); } finally { kafkaConsumer.close(); } } |
Что такое kafkaRecords в участке кода? Нет ли ссылки на GitHub?)
Примера, к сожалению нет. В данном случае
Имеется в виду, на сколько я помню, что внутри readFromKafka мы считываем все сообщения из самой кафка, какие есть, и кладём в kafkaRecords. В данном примере, возможно, readFromKafka случайно попал. Я уже не очень помню. Здесь был основной акцент на том, что нужно обязательно дождаться переназначения партиций, как это сделано в методе createKafkaConsumer.
Метод readFromKafka при желании можно использовать, например, перед каждым запуском теста, чтобы сначала считать все сообщения, которые уже отправлены предыдущими тестами в Kafka, чтобы запуск этого теста был чистым. Для хорошего теста уж точно нужны не сообщения предыдущих тестов, а нужны только сообщения, которые были отправлены в Kafka в рамках этого теста.
А ещё лучше, наверное, readFromKafka использовать по окончании теста, чтобы считать все сообщения, которые были отправлены в Kafka, и сравнить с тем, что должно было быть туда отправлено.