如何使用testcontainers和spring-kafka准备测试

huangapple 未分类评论43阅读模式
英文:

How to prepare tests using testcontainers and spring-kafka

问题

我正在尝试为Kafka消息设置集成测试,并从使用Embedded-Kafka切换到Testcontainers。以下是docker-compose的配置和所有集成测试的基类:

kafka-compose.yaml:

version: '3.3'

services:
  zookeeper:
    image: "wurstmeister/zookeeper"
  kafka:
    image: "wurstmeister/kafka:2.12-2.2.2"
    ports:
      - "9092:9092"
    depends_on:
      - "zookeeper"
    environment:
      KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
      KAFKA_ADVERTISED_HOST_NAME: "${KAFKA_HOST:-localhost}"
      KAFKA_ADVERTISED_PORT: "9092"
      KAFKA_CREATE_TOPICS: "recoverer-test:1:1,some-topic"
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
@SpringBootTest
@Slf4j
public class IntegrationTest {
  private static final DockerComposeContainer kafkaContainer = initializeKafkaContainer();

  protected static DockerComposeContainer initializeKafkaContainer() {
    log.info(
        "Initializing kafka container. Should be called only once. Current value of the kafkaContainer: {}",
        kafkaContainer);
    try {
      var kafkaContainer =
          new DockerComposeContainer(new File("src/test/resources/kafka-compose.yml"))
              .withExposedService("kafka_1", 9092);
      kafkaContainer.start();

      var bootstrapServers =
          format(
              "PLAINTEXT://%s:%s",
              kafkaContainer.getServiceHost("kafka_1", 9092),
              kafkaContainer.getServicePort("kafka_1", 9092));

      System.setProperty("spring.embedded.kafka.brokers", bootstrapServers);

      return kafkaContainer;
    } catch (Throwable t) {
      log.error("Can't initialize the Kafka test container.", t);
      throw t;
    }
  }
  
@DirtiesContext(classMode = ClassMode.BEFORE_CLASS)
class PerformSomethingInboundAdapterTest extends IntegrationTest {

  private static final String GROUP_ID = "test-group-id";
  private static final TopicPartition PARTITION = new TopicPartition(SOME_TOPIC, 0);
  private static final Instant RECEIVED_AT = now();
  private static final CustomerNumber CUSTOMER_NUMBER = CustomerNumber.of(600830);

  @Autowired private KafkaListenerEndpointRegistry kafkaListenerRegistry;

  @Autowired private ConsumerFactory<String, String> consumerFactory;

  @Autowired private KafkaTemplate<Object, Object> kafkaTemplate;

  @MockBean private ActivateSomethingActivities activateCampaignActivities;

  private Consumer<String, String> consumer;

  private long initiallyCommittedOffset;

  @BeforeEach
  void startKafkaListener() {
    kafkaListenerRegistry.getListenerContainers().forEach(Lifecycle::start);
  }

  @AfterEach
  void stopKafkaListener() {
    kafkaListenerRegistry.getListenerContainers().forEach(Lifecycle::stop);
  }

  @Test
  void shouldPerformSomething() {
...
  }

我遇到了几个问题:
1)似乎spring-kafka及其@KafkaListener在所有使用@SpringBootTest注解的可能测试中都是活动的,而不仅仅是在与kafka有关的测试中。这意味着发送到kafka主题的消息可以被任意测试消费。首先问一下spring-kafka的开发人员:是否可以与Testcontainers一起使用spring-kafka-test?是否有可能在每个测试中停止所有@KafkaListener,并仅在特定的@SpringBootTest测试中显式启用它们?
2)Testcontainers附带了一个Kafka模块。这个模块使用confluent kafka docker镜像,该镜像在配置方面非常顽固。例如,您无法设置某些代理属性,也无法告诉容器在启动后应创建哪个主题。在与该模块的斗争后,我决定使用带有wurstmeister/kafka镜像的docker-compose模块。后一种方法的问题在于,当我使用命令行maven运行测试时,我会收到告知kafka已在9092端口上运行的错误消息。似乎maven在mvn test期间启动了几个JVM,因此静态字段kafkaContainer会被初始化多次。为什么会发生这种情况?

英文:

I'm trying to setup my integration tests for kafka messaging and to switch from using Embedded-Kafka in favor to the Testcontainers. Given following configuration for docker-compose and a base class for all integration tests:

kafka-compose.yaml:

version: &#39;3.3&#39;

services:
  zookeeper:
    image: &quot;wurstmeister/zookeeper&quot;
  kafka:
    image: &quot;wurstmeister/kafka:2.12-2.2.2&quot;
    ports:
      - &quot;9092:9092&quot;
    depends_on:
      - &quot;zookeeper&quot;
    environment:
      KAFKA_ZOOKEEPER_CONNECT: &quot;zookeeper:2181&quot;
      KAFKA_ADVERTISED_HOST_NAME: &quot;${KAFKA_HOST:-localhost}&quot;
      KAFKA_ADVERTISED_PORT: &quot;9092&quot;
      KAFKA_CREATE_TOPICS: &quot;recoverer-test:1:1,some-topic&quot;
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: &quot;false&quot;
@SpringBootTest
@Slf4j
public class IntegrationTest {
  private static final DockerComposeContainer kafkaContainer = initializeKafkaContainer();

  protected static DockerComposeContainer initializeKafkaContainer() {
    log.info(
        &quot;Initializing kafka container. Should be called only once. Current value of the kafkaContainer: {}&quot;,
        kafkaContainer);
    try {
      var kafkaContainer =
          new DockerComposeContainer(new File(&quot;src/test/resources/kafka-compose.yml&quot;))
              .withExposedService(&quot;kafka_1&quot;, 9092);
      kafkaContainer.start();

      var bootstrapServers =
          format(
              &quot;PLAINTEXT://%s:%s&quot;,
              kafkaContainer.getServiceHost(&quot;kafka_1&quot;, 9092),
              kafkaContainer.getServicePort(&quot;kafka_1&quot;, 9092));

      System.setProperty(&quot;spring.embedded.kafka.brokers&quot;, bootstrapServers);

      return kafkaContainer;
    } catch (Throwable t) {
      log.error(&quot;Can&#39;t initialize the Kafka test container.&quot;, t);
      throw t;
    }
  }
  
@DirtiesContext(classMode = ClassMode.BEFORE_CLASS)
class PerformSomethingInboundAdapterTest extends IntegrationTest {

  private static final String GROUP_ID = &quot;test-group-id&quot;;
  private static final TopicPartition PARTITION = new TopicPartition(SOME_TOPIC, 0);
  private static final Instant RECEIVED_AT = now();
  private static final CustomerNumber CUSTOMER_NUMBER = CustomerNumber.of(600830);

  @Autowired private KafkaListenerEndpointRegistry kafkaListenerRegistry;

  @Autowired private ConsumerFactory&lt;String, String&gt; consumerFactory;

  @Autowired private KafkaTemplate&lt;Object, Object&gt; kafkaTemplate;

  @MockBean private ActivateSomethingActivities activateCampaignActivities;

  private Consumer&lt;String, String&gt; consumer;

  private long initiallyCommittedOffset;

  @BeforeEach
  void startKafkaListener() {
    kafkaListenerRegistry.getListenerContainers().forEach(Lifecycle::start);
  }

  @AfterEach
  void stopKafkaListener() {
    kafkaListenerRegistry.getListenerContainers().forEach(Lifecycle::stop);
  }

  @Test
  void shouldPerformSomething() {
...
  }

There are few problems which I encountered:

  1. It seems that spring-kafka and it's @KafkaListeners are active during all possible tests annotated with @SpringBootTest and not only during the kafka specific ones. That means a message sent to a kafka topic can be consumed by an arbitrary test. First of all the question for the spring-kafka guys: is it possible to use spring-kafka-test along with Testcontainers at all? Are there some possibility to stop all the @KafkaListener's for each test and enable them explicitly for specific @SpringBootTest tests?
  2. Testcontainers comes with a Kafka module on board. This one uses the confluent kafka docker image which is very stubborn in regards on configuration. For instance you can't set some broker properties and you can't tell the container which topic should be created after the start. After struggling with this module I decided to use the docker-compose module with the wurstmeister/kafka image. The problem with latter approach is that when I run the tests with command-line maven I get the error message telling that kafka is already running on 9092 port. It seems that maven starts few JVM during mvn test and as consequence the static field kafkaContainer gets initialised few times. Why does it happen?

答案1

得分: 0

  1. 在类上使用 @DirtiesContext 在测试完成时关闭监听器。

无法理解 #2。

英文:
  1. Use @DirtiesContext on the class to shut down the listeners when a test completes.

No idea for #2.

答案2

得分: 0

1st 可能通过为不同的测试使用不同的上下文来克服

示例:

@ExtendWith(SpringExtension.class)
@DataJpaTest
@AutoConfigureTestDatabase(replace = AutoConfigureTestDatabase.Replace.NONE)
@TestExecutionListeners({DependencyInjectionTestExecutionListener.class,
        FlywayTestExecutionListener.class})
@FlywayTest
@ActiveProfiles({"test"})
public abstract class AbstractDatabaseTest {

这是我为集成测试创建的一个测试,我只需在需要数据库层的地方扩展这个测试,您可以尝试为Kafka测试创建类似的测试。总的来说,@SpringBootTest 会启动整个应用程序,这可能需要更长的时间,个人而言我不喜欢它。

2nd 很难说。您可以尝试在 initializeKafkaContainer() 中打印堆栈跟踪,以查看哪个测试会这样做。或者,如果您采用先前的方法,您可以在抽象类中的静态块中进行初始化,然后扩展它的每个测试都将使用现有的静态容器,并且仅初始化一次。

英文:

1st might be overcome by using different contexts for different tests

Example:

@ExtendWith(SpringExtension.class)
@DataJpaTest
@AutoConfigureTestDatabase(replace = AutoConfigureTestDatabase.Replace.NONE)
@TestExecutionListeners({DependencyInjectionTestExecutionListener.class,
        FlywayTestExecutionListener.class})
@FlywayTest
@ActiveProfiles({&quot;test&quot;})
public abstract class AbstractDatabaseTest {

This is a test I created for integration testing, I would just extend this test where I needed the database layer, you could try creating a similar one for Kafka tests.
In general @SpringBootTest boostraps the entire application which might take longer and I don't like it personally.

2nd it's hard to say. You may try to print a stacktrace in initializeKafkaContainer() to see which test does that. Or if you try to go with the previous approach, you can do initialization in a static block in abstract class and then every test that extends it will use the existing static container and it will be initilialized only once.

huangapple
  • 本文由 发表于 2020年6月29日 17:14:24
  • 转载请务必保留本文链接:https://java.coder-hub.com/62634909.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定