Kafka에 대한 설정 및 Producer와 Consumer의 예제에 대해서 알아보았다. 그럼 이 코드들이 잘 동작하는지는 어떻게 확인해야할까?
실제 서비스들 간의 메세지 전송 및 결과를 확인하기 위해서는 테스트 환경을 구축하여 서비스를 띄운 후에 테스트 해보는 것이 가장 정확하다. Docker compose로 Kafka broker와 Zookeeper를 띄우고 Bootstarp address에 Kafka broker 주소를 넣어준 뒤 서비스를 띄워서 테스트를 하면된다.
하지만, 이러한 환경은 귀찮은 면이 있다. 항상 테스트를 위해서 서비스를 띄워두는 것은 리소스 낭비이다. 그래서 Kafka의 메세지 전송을 테스트 할 수 있는 환경이 필요하다. Spring boot 환경에서는 이를 테스트 할 수 있는 라이브러리를 제공하고 있다.
아래의 라이브러리가 그것이며, 이름은 Spring kafka test library 이다.
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
위의 Configuration을 추가하면 Embedded kafka 를 사용할 수 있게 되고, 이 환경을 통해서 Message 전송을 임의로 만들 수 있게 된다.
단, Container 환경에서 Embedded kafka로 테스트를 진행하면 정상적으로 진행되지 않다.
현재, 나의 환경이 Jenkins를 Kubernetes 내부에 띄우고 Pod agent로 Docker Image build 및 Release Job을 수행하고 있고, 이때 Unit test를 수행하여 코드의 안정성을 확인하고 있다. 하지만, Embedded kafka관련 Test는 수행하고 못하고 있다.
Test 예제
application-test.yaml
Embedeed kafka를 사용하기 위해서는 Embedded kafka에서 띄우는 서버를 사용하여야 한다.
# kafka
kafka:
bootstrap:
addresses: ${spring.embedded.kafka.brokers:localhost:9092}
Essential Configuration (JUnit5)
Embedded kafka를 사용하기 위해서는 필수적으로 넣어주어야 하는 Configuration 이 있다.
우선 @EmbeddedKafka annotation으로 Embedded Kafka를 추가해 준다. 그리고 이 Embedded kafka가 동작하기 위해서는 Integration Test 환경으로 수행되어야 한다. 그 말은 Spring 의 Dependency가 사용되는 환경에서 수행되어야 하기 때문에 @SpringbootTest annotation이 추가되어야 하고, 그와 더불어 @ExtendWith(SpringExtension.class)가 추가되어야 정상적인 동작이 된다. 이러한 셋팅은 JUnit5 기반으로 테스트 될때이며 JUnit4는 그 방법이 다르다.
@Slf4j
@Tag("embedded-kafka-test")
@SpringBootTest
@ExtendWith(SpringExtension.class)
@EmbeddedKafka
@AutoConfigureMockMvc
@TestPropertySource(properties = "spring.config.location=classpath:application-test.yaml")
public class JwtTokenMessageServiceImplTest {
...
}
Kafka Broker Configuration
Embedded Kafka의 Broker를 셋팅해주는 부분이다. @Autowired annotation으로 Dependency Injection을 해주면 된다.
Broker count는 2개, Partition은 2개로 설정해 주었다. 마지막은 설정과 동시에 생성할 Topic인데, Topic이 여러 개 일 때에는 뒤쪽으로 ,(콤마)로 구분하여 계속 넣어주면 된다.
Intellij를 사용한다면 이 부분에서 @Autowired 로 Dependency Injection시 Dependecy를 찾을 수 없다는 Inspection Message를 받을 수 있다. 이 부부은 Intellij의 문제로 난 이 Inspection의 level을 warning으로 바꾸었다.
2022.02.05 - [Tool/IntelliJ] - [Intellij] @Autowired inspection 오류 무시하는 방법
...
@Autowired
private EmbeddedKafkaBroker embeddedKafkaBroker = new EmbeddedKafkaBroker(
2,
true,
2,
JwtTokenTopic.TOPIC_CREATE_JWT_TOKEN
);
...
Producer configuration
Producer configuration은 간단하다. Produecer Property는 Default 셋팅으로 설정하였다. 그리고 Key Serializer는 String, Value Serialilzer는 Json으로 설정하였다. 이 부분의 서비스에서 사용하는 설정과 동일하게 하면 된다.
private Producer<String, Object> configEmbeddedKafkaProducer() {
Map<String, Object> producerProperties = new HashMap<>(KafkaTestUtils.producerProps(embeddedKafkaBroker));
return new DefaultKafkaProducerFactory<>(producerProperties, new StringSerializer(), new JsonSerializer<>()).createProducer();
}
Consumer Configuration
Consumer의 설정은 복잡하다. 왜냐하면 Kafka의 Consumer의 설정에서 보았듯이 KafkaLinstener의 설정이 추가되어야 하기 때문이다. Embedded Kafka도 마찬가지로 Produer가 보내는 메세지가 Topic에 들어오면 Listener가 해당 메세지를 처리하도록 해야하기 때문이다.
1) 우선 Consumer Property 셋팅을 한다. 기본 셋팅을 가지고 가되, Auto commit만 false로 셋팅하였다. 나의 코드에서 Commit을 수동을 하고 있다.
2) ConsumerFactory를 만들고, Container Property를 topic과 함께 생성하고, KafkaMessageListenerContainer를 만든다.
3) CusumerRecord를 순차적으로 저장시키고, 순차적으로 처리하기 위해, BlockingQueue로 선언한다.
4) 이후, 생성된 KafkaMessageListenerContainer에 Listener 함수를 추가한다. 이 함수에서는 Log로 Message를 출력하고, BlockQueue에 record를 추가한다. 이 record를 필요한 시점에 뽑아서 확인한다.
private KafkaMessageListenerContainer<String, String> container;
private BlockingQueue<ConsumerRecord<String, String>> records;
private void configEmbeddedKafkaConsumer(String topic) {
Map<String, Object> consumerProperties = new HashMap<>(KafkaTestUtils.consumerProps("auth", "false", embeddedKafkaBroker));
DefaultKafkaConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory<>(consumerProperties);
ContainerProperties containerProperties = new ContainerProperties(topic);
container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);
records = new LinkedBlockingDeque<>();
container.setupMessageListener((MessageListener<String, String>) record -> {
LOGGER.debug("test-listener received message='{}'",
record.value());
records.add(record);
});
container.start();
ContainerTestUtils.waitForAssignment(container, embeddedKafkaBroker.getPartitionsPerTopic());
}
전체 Configuration 부분 코드
@Slf4j
@SpringBootTest
@ExtendWith(SpringExtension.class)
@EmbeddedKafka
@AutoConfigureMockMvc
@TestPropertySource(properties = "spring.config.location=classpath:application-test.yaml")
public class JwtTokenMessageServiceImplTest {
@Autowired
private JwtTokenMessageService jwtTokenMessageService;
@MockBean
private JwtTokenProvider jwtTokenProvider;
@Autowired
private EmbeddedKafkaBroker embeddedKafkaBroker = new EmbeddedKafkaBroker(
2,
true,
2,
JwtTokenTopic.TOPIC_CREATE_JWT_TOKEN
);
private KafkaMessageListenerContainer<String, String> container;
private BlockingQueue<ConsumerRecord<String, String>> records;
@BeforeEach
void setUp() {}
private void configEmbeddedKafkaConsumer(String topic) {
Map<String, Object> consumerProperties = new HashMap<>(KafkaTestUtils.consumerProps("auth", "false", embeddedKafkaBroker));
DefaultKafkaConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory<>(consumerProperties);
ContainerProperties containerProperties = new ContainerProperties(topic);
container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);
records = new LinkedBlockingDeque<>();
container.setupMessageListener((MessageListener<String, String>) record -> {
LOGGER.debug("test-listener received message='{}'",
record.value());
records.add(record);
});
container.start();
ContainerTestUtils.waitForAssignment(container, embeddedKafkaBroker.getPartitionsPerTopic());
}
private Producer<String, Object> configEmbeddedKafkaProducer() {
Map<String, Object> producerProperties = new HashMap<>(KafkaTestUtils.producerProps(embeddedKafkaBroker));
return new DefaultKafkaProducerFactory<>(producerProperties, new StringSerializer(), new JsonSerializer<>()).createProducer();
}
@AfterEach
void tearDown() {
container.stop();
}
}
Producer Test Code예제
void testCreateMessageForJwtTokenCache() throws InterruptedException, JsonProcessingException {
// Consumer 생성
configEmbeddedKafkaConsumer(JwtTokenTopic.TOPIC_CREATE_JWT_TOKEN);
Account account = TestAccountBuilder.accountWithToken();
// 전송할 테스트 메세지 생성
JwtTokenMessage.CreateMessage createMessage = JwtTokenMessage.CreateMessage.from(account, null);
String expectedMessage = objectMapper.writeValueAsString(createMessage);
// 확인하지 않을 함수에 대한 Mocking
when(jwtTokenProvider.getExpiredDate(any())).thenReturn(null);
// Embedded kafka로 Message 전송
jwtTokenMessageService.creteJwtTokenCache(account);
// BlockingQueue에 메세지를 가져옴. 10초 동안 메세지가 들어오지 않으면 Error 발생
ConsumerRecord<String, String> record = records.poll(10, TimeUnit.SECONDS);
// 수신한 메세지 확인
assertThat(record).isNotNull();
assertThat(record.value()).isEqualTo(expectedMessage);
assertThat(record).has(key(null));
}
Consumer Test Code 예제
void testCreateMessageForJwtTokenCache() {
// 전송할 메세지 생성
JwtTokenMessage.CreateMessage createMessage = new JwtTokenMessage.CreateMessage();
createMessage.setAccountId(1L);
createMessage.setJwtToken("test-jwtToken");
createMessage.setExpiration(120L);
// 메세지를 사용하여 ProducerRecord를 생성한다.
ProducerRecord<String, Object> producerRecord = new ProducerRecord<>(JwtTokenTopic.TOPIC_CREATE_JWT_TOKEN, createMessage);
// 메세지 전송
producer.send(producerRecord);
producer.flush();
// Mocking
when(jwtTokenService.create(any())).thenReturn(TestJwtTokenBuilder.defaultJwtToken());
// Consumer가 메세지를 잘 받아서 Listener가 수행되는지를 확인 (Listener의 특정 Function이 수행되는지를 확인)
verify(jwtTokenService, timeout(10000L).times(1)).create(any());
}
이 정도면 간단한 메세지 전송 여부는 확인이 가능하다.